Skip to content

Commit

Permalink
重要:非connect代理请求,阻塞在远端连接上,防止upload场景下OOM
Browse files Browse the repository at this point in the history
  • Loading branch information
arloor committed Dec 2, 2022
1 parent bb8301b commit d68c767
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 47 deletions.
93 changes: 47 additions & 46 deletions src/main/java/com/arloor/forwardproxy/session/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,14 @@ public void handle(Session session, ChannelHandlerContext channelContext, HttpOb
String portStr = hostPortArray.length == 2 ? hostPortArray[1] : !HttpMethod.CONNECT.equals(request.method()) ? "80" : "443";
session.setPort(Integer.parseInt(portStr));
session.setAttribute(TraceConstant.host.name(), host);
session.setStatus(LAST_HTTP_CONTENT);
// 1. 如果url以 / 开头,则认为是直接请求,而不是代理请求
if (session.getRequest().uri().startsWith("/")) {
session.setStatus(LAST_HTTP_CONTENT);
} else {
session.setStatus(CheckAuth);
session.handle(channelContext, msg);
}

}
}
}, LAST_HTTP_CONTENT {
Expand All @@ -48,14 +55,8 @@ public void handle(Session session, ChannelHandlerContext channelContext, HttpOb
((HttpContent) msg).content().retain();
session.addContent((HttpContent) msg); //暂存transfer-encode: chuncked中的chunk或者普通模式下的body。这里会占用内存,大文件上传下可能会OOM,尚未遇到
if (msg instanceof LastHttpContent) {
// 1. 如果url以 / 开头,则认为是直接请求,而不是代理请求
if (session.getRequest().uri().startsWith("/")) {
session.setStatus(WEB);
session.handle(channelContext, msg);
} else {
session.setStatus(CheckAuth);
session.handle(channelContext, msg);
}
session.setStatus(WEB);
session.handle(channelContext, msg);
}
}
}, WEB {
Expand Down Expand Up @@ -127,9 +128,8 @@ public void handle(Session session, ChannelHandlerContext channelContext, HttpOb
@Override
public void handle(Session session, ChannelHandlerContext channelContext, HttpObject msg) {
HttpRequest request = session.getRequest();
final Channel inboundChannel = channelContext.channel();
Bootstrap b = session.getBootStrap();
b.group(inboundChannel.eventLoop())
b.group(workerGroup)
.channel(OsUtils.socketChannelClazz())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.option(ChannelOption.SO_KEEPALIVE, true)
Expand Down Expand Up @@ -178,7 +178,7 @@ public void handle(Session session, ChannelHandlerContext channelContext, HttpOb
HttpRequest request = session.getRequest();
final Channel inboundChannel = channelContext.channel();
Bootstrap b = session.getBootStrap();
b.group(inboundChannel.eventLoop())
b.group(workerGroup)
.channel(OsUtils.socketChannelClazz())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.option(ChannelOption.SO_KEEPALIVE, true)
Expand All @@ -189,51 +189,52 @@ protected void initChannel(SocketChannel outboundChannel) throws Exception {
outboundChannel.pipeline().addLast(new RelayHandler(channelContext.channel(), session.getHost()));
}
});
b.connect(session.getHost(), session.getPort()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
final Channel outboundChannel = future.channel();
String targetAddr = ((InetSocketAddress) outboundChannel.remoteAddress()).getAddress().getHostAddress();
session.setAttribute(TraceConstant.target.name(), targetAddr);
// Connection established use handler provided results
// 这里有几率抛出NoSuchElementException,原因是连接target host完成时,客户端已经关闭连接。
// 考虑到是比较小的几率,不catch。注:该异常没有啥影响。
channelContext.pipeline().remove(IdleStateHandler.class);
channelContext.pipeline().remove(SessionHandShakeHandler.class);
channelContext.pipeline().remove(HttpResponseEncoder.class);
RelayHandler clientEndtoRemoteHandler = new RelayHandler(outboundChannel, session.getHost());
channelContext.pipeline().addLast(clientEndtoRemoteHandler);
ChannelFuture future = null;
try {
// getpost需要读取后续的httpContent,所以这里进行sync
future = b.connect(session.getHost(), session.getPort()).sync();
} catch (InterruptedException e) {
log.error("", e);
}
if (future != null && future.isSuccess()) {
final Channel outboundChannel = future.channel();
String targetAddr = ((InetSocketAddress) outboundChannel.remoteAddress()).getAddress().getHostAddress();
session.setAttribute(TraceConstant.target.name(), targetAddr);
// Connection established use handler provided results
// 这里有几率抛出NoSuchElementException,原因是连接target host完成时,客户端已经关闭连接。
// 考虑到是比较小的几率,不catch。注:该异常没有啥影响。
channelContext.pipeline().remove(IdleStateHandler.class);
channelContext.pipeline().remove(SessionHandShakeHandler.class);
channelContext.pipeline().remove(HttpResponseEncoder.class);
RelayHandler clientEndtoRemoteHandler = new RelayHandler(outboundChannel, session.getHost());
channelContext.pipeline().addLast(clientEndtoRemoteHandler);
// ctx.channel().config().setAutoRead(true);

//出于未知的原因,不知道为什么fireChannelread不行
clientEndtoRemoteHandler.channelRead(channelContext, request);
session.getContents().forEach(content -> {
try {
clientEndtoRemoteHandler.channelRead(channelContext, content);
} catch (Exception e) {
log.error("处理非CONNECT方法的代理请求失败!", e);
}
});
//出于未知的原因,不知道为什么fireChannelread不行
clientEndtoRemoteHandler.channelRead(channelContext, request);

} else {
// Close the connection if the connection attempt has failed.
channelContext.channel().writeAndFlush(
new DefaultHttpResponse(request.protocolVersion(), INTERNAL_SERVER_ERROR)
);
SocksServerUtils.closeOnFlush(channelContext.channel());
}
}
});
} else {
// Close the connection if the connection attempt has failed.
channelContext.channel().writeAndFlush(
new DefaultHttpResponse(request.protocolVersion(), INTERNAL_SERVER_ERROR)
);
SocksServerUtils.closeOnFlush(channelContext.channel());
}
session.setStatus(WAIT_ESTABLISH);
}
}, WAIT_ESTABLISH { // 等待到target的连接建立前不应该有新请求进入

@Override
public void handle(Session session, ChannelHandlerContext channelContext, HttpObject msg) {
log.error("receive new message before tunnel is established, msg: {}", msg);
if (msg != LastHttpContent.EMPTY_LAST_CONTENT) {
log.error("receive new message before tunnel is established, msg: {}", msg);
if (HttpMethod.CONNECT.equals(session.getRequest().method())) {
log.error("connect请求下有body", msg);
}
}
}
};
private static final EventLoopGroup workerGroup = OsUtils.buildEventLoopGroup(0);

private static final Logger log = LoggerFactory.getLogger(Status.class);

Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/log4j2-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
<logger name="com.arloor.forwardproxy.trace" level="info"
additivity="false">
<appender-ref ref="span"/>
<AppenderRef ref="CONSOLE"/>
<!-- <AppenderRef ref="CONSOLE"/>-->
</logger>
<!--level是日志记录的优先级 -->
<root level="info">
Expand Down

0 comments on commit d68c767

Please sign in to comment.