Skip to content

Commit

Permalink
Add throttling to LittleProxy.
Browse files Browse the repository at this point in the history
- Bumped netty version to 4.0.22 for required changes.
- Add GlobalTrafficShapingHandler to DefaultHttpProxyServer for throttling.
  • Loading branch information
mreedell committed Aug 15, 2014
1 parent 7e09961 commit d22b7b1
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.14.Final</version>
<version>4.0.22.Final</version>
<scope>compile</scope>
</dependency>

Expand Down
11 changes: 11 additions & 0 deletions src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.littleshoot.proxy;

import io.netty.handler.traffic.GlobalTrafficShapingHandler;

import java.net.InetSocketAddress;

/**
Expand Down Expand Up @@ -283,6 +285,15 @@ HttpProxyServerBootstrap withConnectTimeout(
*/
HttpProxyServerBootstrap plusActivityTracker(ActivityTracker activityTracker);

/**
* <p>
* Specify a global traffic shaping handler for this proxy server.
* </p>
* @param globalTrafficShapingHandler
* @return
*/
HttpProxyServerBootstrap withGlobalTrafficShapingHandler(GlobalTrafficShapingHandler globalTrafficShapingHandler);

/**
* <p>
* Build and starts the server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

Expand Down Expand Up @@ -122,11 +123,14 @@ public class ClientToProxyConnection extends ProxyConnection<HttpRequest> {

private AtomicBoolean authenticated = new AtomicBoolean();

private final GlobalTrafficShapingHandler globalTrafficShapingHandler;

ClientToProxyConnection(
final DefaultHttpProxyServer proxyServer,
SslEngineSource sslEngineSource,
boolean authenticateClients,
ChannelPipeline pipeline) {
ChannelPipeline pipeline,
GlobalTrafficShapingHandler globalTrafficShapingHandler) {
super(AWAITING_INITIAL, proxyServer, false);

initChannelPipeline(pipeline);
Expand All @@ -149,6 +153,7 @@ public void operationComplete(
}
});
}
this.globalTrafficShapingHandler = globalTrafficShapingHandler;

LOG.debug("Created ClientToProxyConnection");
}
Expand Down Expand Up @@ -236,7 +241,8 @@ private ConnectionState doReadHTTPInitial(HttpRequest httpRequest) {
proxyServer,
this,
serverHostAndPort,
httpRequest);
httpRequest,
globalTrafficShapingHandler);
if (currentServerConnection == null) {
LOG.debug("Unable to create server connection, probably no chained proxies available");
writeBadGateway(httpRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.udt.nio.NioUdtProvider;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.io.File;
Expand Down Expand Up @@ -101,6 +102,7 @@ public class DefaultHttpProxyServer implements HttpProxyServer {
private final int connectTimeout;
private volatile int idleConnectionTimeout;
private final HostResolver serverResolver;
private final GlobalTrafficShapingHandler globalTrafficShapingHandler;

/**
* Track all ActivityTrackers for tracking proxying activity.
Expand Down Expand Up @@ -195,7 +197,8 @@ private DefaultHttpProxyServer(ServerGroup serverGroup,
int idleConnectionTimeout,
Collection<ActivityTracker> activityTrackers,
int connectTimeout,
HostResolver serverResolver) {
HostResolver serverResolver,
GlobalTrafficShapingHandler globalTrafficShapingHandler) {
this.serverGroup = serverGroup;
this.transportProtocol = transportProtocol;
this.address = address;
Expand All @@ -212,6 +215,7 @@ private DefaultHttpProxyServer(ServerGroup serverGroup,
}
this.connectTimeout = connectTimeout;
this.serverResolver = serverResolver;
this.globalTrafficShapingHandler = globalTrafficShapingHandler;
}

boolean isTransparent() {
Expand All @@ -238,6 +242,10 @@ public HostResolver getServerResolver() {
public InetSocketAddress getListenAddress() {
return address;
}

public GlobalTrafficShapingHandler getGlobalTrafficShapingHandler() {
return globalTrafficShapingHandler;
}

@Override
public HttpProxyServerBootstrap clone() {
Expand All @@ -248,7 +256,7 @@ public HttpProxyServerBootstrap clone() {
chainProxyManager,
mitmManager, filtersSource, transparent,
idleConnectionTimeout, activityTrackers, connectTimeout,
serverResolver);
serverResolver, globalTrafficShapingHandler);
}

@Override
Expand Down Expand Up @@ -282,7 +290,8 @@ protected void initChannel(Channel ch) throws Exception {
DefaultHttpProxyServer.this,
sslEngineSource,
authenticateSslClients,
ch.pipeline());
ch.pipeline(),
globalTrafficShapingHandler);
};
};
switch (transportProtocol) {
Expand Down Expand Up @@ -551,6 +560,7 @@ private static class DefaultHttpProxyServerBootstrap implements
private Collection<ActivityTracker> activityTrackers = new ConcurrentLinkedQueue<ActivityTracker>();
private int connectTimeout = 40000;
private HostResolver serverResolver = new DefaultHostResolver();
private GlobalTrafficShapingHandler globalTrafficShapingHandler;

private DefaultHttpProxyServerBootstrap() {
}
Expand All @@ -567,7 +577,8 @@ private DefaultHttpProxyServerBootstrap(
HttpFiltersSource filtersSource,
boolean transparent, int idleConnectionTimeout,
Collection<ActivityTracker> activityTrackers,
int connectTimeout, HostResolver serverResolver) {
int connectTimeout, HostResolver serverResolver,
GlobalTrafficShapingHandler globalTrafficShapingHandler) {
this.original = original;
this.transportProtocol = transportProtocol;
this.address = address;
Expand All @@ -584,6 +595,7 @@ private DefaultHttpProxyServerBootstrap(
}
this.connectTimeout = connectTimeout;
this.serverResolver = serverResolver;
this.globalTrafficShapingHandler = globalTrafficShapingHandler;
}

private DefaultHttpProxyServerBootstrap(Properties props) {
Expand Down Expand Up @@ -732,6 +744,12 @@ public HttpProxyServerBootstrap plusActivityTracker(
return this;
}

@Override
public HttpProxyServerBootstrap withGlobalTrafficShapingHandler(GlobalTrafficShapingHandler globalTrafficShapingHandler) {
this.globalTrafficShapingHandler = globalTrafficShapingHandler;
return this;
}

@Override
public HttpProxyServer start() {
return build().start();
Expand All @@ -753,7 +771,7 @@ transportProtocol, determineListenAddress(),
proxyAuthenticator, chainProxyManager, mitmManager,
filtersSource, transparent,
idleConnectionTimeout, activityTrackers, connectTimeout,
serverResolver);
serverResolver, globalTrafficShapingHandler);
}

private InetSocketAddress determineListenAddress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
Expand All @@ -20,6 +21,7 @@
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
Expand Down Expand Up @@ -114,6 +116,8 @@ public class ProxyToServerConnection extends ProxyConnection<HttpResponse> {
*/
private volatile HttpResponse currentHttpResponse;

private volatile GlobalTrafficShapingHandler trafficHandler;

/**
* Create a new ProxyToServerConnection.
*
Expand All @@ -128,7 +132,8 @@ public class ProxyToServerConnection extends ProxyConnection<HttpResponse> {
static ProxyToServerConnection create(DefaultHttpProxyServer proxyServer,
ClientToProxyConnection clientConnection,
String serverHostAndPort,
HttpRequest initialHttpRequest)
HttpRequest initialHttpRequest,
GlobalTrafficShapingHandler globalTrafficShapingHandler)
throws UnknownHostException {
Queue<ChainedProxy> chainedProxies = new ConcurrentLinkedQueue<ChainedProxy>();
ChainedProxyManager chainedProxyManager = proxyServer
Expand All @@ -142,21 +147,23 @@ static ProxyToServerConnection create(DefaultHttpProxyServer proxyServer,
}
}
return new ProxyToServerConnection(proxyServer, clientConnection,
serverHostAndPort, chainedProxies.poll(), chainedProxies);
serverHostAndPort, chainedProxies.poll(), chainedProxies, globalTrafficShapingHandler);
}

private ProxyToServerConnection(
DefaultHttpProxyServer proxyServer,
ClientToProxyConnection clientConnection,
String serverHostAndPort,
ChainedProxy chainedProxy,
Queue<ChainedProxy> availableChainedProxies)
Queue<ChainedProxy> availableChainedProxies,
GlobalTrafficShapingHandler globalTrafficShapingHandler)
throws UnknownHostException {
super(DISCONNECTED, proxyServer, true);
this.clientConnection = clientConnection;
this.serverHostAndPort = serverHostAndPort;
this.chainedProxy = chainedProxy;
this.availableChainedProxies = availableChainedProxies;
this.trafficHandler = globalTrafficShapingHandler;
setupConnectionParameters();
}

Expand Down Expand Up @@ -686,6 +693,15 @@ private void setupConnectionParameters() throws UnknownHostException {
*/
private void initChannelPipeline(ChannelPipeline pipeline,
HttpRequest httpRequest) {

if(trafficHandler != null) {
long initial = trafficHandler.getReadLimit() / 4;
long max = trafficHandler.getReadLimit() / 2;
AdaptiveRecvByteBufAllocator adaptiveRecvByteBufAllocator = new AdaptiveRecvByteBufAllocator(64, (int)initial, (int)max);
pipeline.channel().config().setRecvByteBufAllocator(adaptiveRecvByteBufAllocator);
pipeline.addLast("global-traffic-shaping", trafficHandler);
}

pipeline.addLast("bytesReadMonitor", bytesReadMonitor);
pipeline.addLast("decoder", new HeadAwareHttpResponseDecoder(
8192,
Expand Down

0 comments on commit d22b7b1

Please sign in to comment.