diff --git a/all/pom.xml b/all/pom.xml index cce1021b0e37..5590527ae0db 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -189,6 +189,13 @@ compile true + + ${project.groupId} + netty-handler-proxy + ${project.version} + compile + true + ${project.groupId} netty-transport diff --git a/codec-dns/pom.xml b/codec-dns/pom.xml index 5dd0108a2ad7..734c791a75e7 100644 --- a/codec-dns/pom.xml +++ b/codec-dns/pom.xml @@ -34,11 +34,6 @@ netty-codec ${project.version} - - ${project.groupId} - netty-handler - ${project.version} - org.apache.directory.server diff --git a/codec-http/pom.xml b/codec-http/pom.xml index 482ff6cc7247..56b62f0fb428 100644 --- a/codec-http/pom.xml +++ b/codec-http/pom.xml @@ -38,6 +38,7 @@ ${project.groupId} netty-handler ${project.version} + true com.jcraft diff --git a/codec-memcache/pom.xml b/codec-memcache/pom.xml index 89bcf1f14009..170e905a32db 100644 --- a/codec-memcache/pom.xml +++ b/codec-memcache/pom.xml @@ -34,11 +34,6 @@ netty-codec ${project.version} - - ${project.groupId} - netty-handler - ${project.version} - diff --git a/codec-mqtt/pom.xml b/codec-mqtt/pom.xml index 35d87c3550fd..ef5e2469bb36 100644 --- a/codec-mqtt/pom.xml +++ b/codec-mqtt/pom.xml @@ -34,11 +34,7 @@ netty-codec ${project.version} - - ${project.groupId} - netty-handler - ${project.version} - + org.mockito mockito-all diff --git a/codec-socks/pom.xml b/codec-socks/pom.xml index 65631b568f6f..36136b9baa88 100644 --- a/codec-socks/pom.xml +++ b/codec-socks/pom.xml @@ -34,11 +34,6 @@ netty-codec ${project.version} - - ${project.groupId} - netty-handler - ${project.version} - diff --git a/codec-socks/src/main/java/io/netty/handler/codec/socksx/v4/Socks4CmdResponseDecoder.java b/codec-socks/src/main/java/io/netty/handler/codec/socksx/v4/Socks4CmdResponseDecoder.java index 27bf318e3a82..c8f11e795d11 100755 --- a/codec-socks/src/main/java/io/netty/handler/codec/socksx/v4/Socks4CmdResponseDecoder.java +++ b/codec-socks/src/main/java/io/netty/handler/codec/socksx/v4/Socks4CmdResponseDecoder.java @@ -57,7 +57,6 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List o msg = new Socks4CmdResponse(cmdStatus, host, port); } } - ctx.pipeline().remove(this); out.add(msg); } diff --git a/codec-socks/src/main/java/io/netty/handler/codec/socksx/v5/Socks5CmdResponseDecoder.java b/codec-socks/src/main/java/io/netty/handler/codec/socksx/v5/Socks5CmdResponseDecoder.java index 5354bf57b4bf..e612197afb5f 100755 --- a/codec-socks/src/main/java/io/netty/handler/codec/socksx/v5/Socks5CmdResponseDecoder.java +++ b/codec-socks/src/main/java/io/netty/handler/codec/socksx/v5/Socks5CmdResponseDecoder.java @@ -83,7 +83,6 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List o } } } - ctx.pipeline().remove(this); out.add(msg); } diff --git a/codec-stomp/pom.xml b/codec-stomp/pom.xml index d00fab513263..99d550eed8dd 100644 --- a/codec-stomp/pom.xml +++ b/codec-stomp/pom.xml @@ -34,11 +34,6 @@ netty-codec ${project.version} - - ${project.groupId} - netty-handler - ${project.version} - diff --git a/example/pom.xml b/example/pom.xml index e75c7afa8424..9ba137078055 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -44,6 +44,11 @@ netty-handler ${project.version} + + ${project.groupId} + netty-handler-proxy + ${project.version} + ${project.groupId} netty-codec-http diff --git a/handler-proxy/pom.xml b/handler-proxy/pom.xml new file mode 100644 index 000000000000..e87b26f598e7 --- /dev/null +++ b/handler-proxy/pom.xml @@ -0,0 +1,55 @@ + + + + + 4.0.0 + + io.netty + netty-parent + 5.0.0.Alpha2-SNAPSHOT + + + netty-handler-proxy + jar + + Netty/Handler/Proxy + + + + ${project.groupId} + netty-transport + ${project.version} + + + ${project.groupId} + netty-codec-socks + ${project.version} + + + ${project.groupId} + netty-codec-http + ${project.version} + + + ${project.groupId} + netty-handler + ${project.version} + test + + + + diff --git a/handler-proxy/src/main/java/io/netty/handler/proxy/HttpProxyHandler.java b/handler-proxy/src/main/java/io/netty/handler/proxy/HttpProxyHandler.java new file mode 100644 index 000000000000..4cd9de41c4f2 --- /dev/null +++ b/handler-proxy/src/main/java/io/netty/handler/proxy/HttpProxyHandler.java @@ -0,0 +1,161 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.proxy; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.AsciiString; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpHeaders.Names; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.util.CharsetUtil; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +public final class HttpProxyHandler extends ProxyHandler { + + private static final String PROTOCOL = "http"; + private static final String AUTH_BASIC = "basic"; + + private final HttpClientCodec codec = new HttpClientCodec(); + private final String username; + private final String password; + private final CharSequence authorization; + private HttpResponseStatus status; + + public HttpProxyHandler(SocketAddress proxyAddress) { + super(proxyAddress); + username = null; + password = null; + authorization = null; + } + + public HttpProxyHandler(SocketAddress proxyAddress, String username, String password) { + super(proxyAddress); + if (username == null) { + throw new NullPointerException("username"); + } + if (password == null) { + throw new NullPointerException("password"); + } + this.username = username; + this.password = password; + + ByteBuf authz = Unpooled.copiedBuffer(username + ':' + password, CharsetUtil.UTF_8); + ByteBuf authzBase64 = Base64.encode(authz, false); + + authorization = new AsciiString(authzBase64.toString(CharsetUtil.US_ASCII)); + + authz.release(); + authzBase64.release(); + } + + @Override + public String protocol() { + return PROTOCOL; + } + + @Override + public String authScheme() { + return authorization != null? AUTH_BASIC : AUTH_NONE; + } + + public String username() { + return username; + } + + public String password() { + return password; + } + + @Override + protected void addCodec(ChannelHandlerContext ctx) throws Exception { + ChannelPipeline p = ctx.pipeline(); + String name = ctx.name(); + p.addBefore(name, null, codec); + } + + @Override + protected void removeEncoder(ChannelHandlerContext ctx) throws Exception { + ctx.pipeline().remove(codec.encoder()); + } + + @Override + protected void removeDecoder(ChannelHandlerContext ctx) throws Exception { + ctx.pipeline().remove(codec.decoder()); + } + + @Override + protected Object newInitialMessage(ChannelHandlerContext ctx) throws Exception { + InetSocketAddress raddr = destinationAddress(); + String rhost; + if (raddr.isUnresolved()) { + rhost = raddr.getHostString(); + } else { + rhost = raddr.getAddress().getHostAddress(); + } + + FullHttpRequest req = new DefaultFullHttpRequest( + HttpVersion.HTTP_1_0, HttpMethod.CONNECT, + rhost + ':' + raddr.getPort(), + Unpooled.EMPTY_BUFFER, false); + + SocketAddress proxyAddress = proxyAddress(); + if (proxyAddress instanceof InetSocketAddress) { + InetSocketAddress hostAddr = (InetSocketAddress) proxyAddress; + req.headers().set(Names.HOST, hostAddr.getHostString() + ':' + hostAddr.getPort()); + } + + if (authorization != null) { + req.headers().set(Names.AUTHORIZATION, authorization); + } + + return req; + } + + @Override + protected boolean handleResponse(ChannelHandlerContext ctx, Object response) throws Exception { + if (response instanceof HttpResponse) { + if (status != null) { + throw new ProxyConnectException(exceptionMessage("too many responses")); + } + status = ((HttpResponse) response).status(); + } + + boolean finished = response instanceof LastHttpContent; + if (finished) { + if (status == null) { + throw new ProxyConnectException(exceptionMessage("missing response")); + } + if (status.code() != 200) { + throw new ProxyConnectException(exceptionMessage("status: " + status)); + } + } + + return finished; + } +} diff --git a/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyConnectException.java b/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyConnectException.java new file mode 100644 index 000000000000..d6bfaf738f06 --- /dev/null +++ b/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyConnectException.java @@ -0,0 +1,38 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.proxy; + +import java.net.ConnectException; + +public class ProxyConnectException extends ConnectException { + private static final long serialVersionUID = 5211364632246265538L; + + public ProxyConnectException() { } + + public ProxyConnectException(String msg) { + super(msg); + } + + public ProxyConnectException(Throwable cause) { + initCause(cause); + } + + public ProxyConnectException(String msg, Throwable cause) { + super(msg); + initCause(cause); + } +} diff --git a/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyConnectionEvent.java b/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyConnectionEvent.java new file mode 100644 index 000000000000..73443d954e2d --- /dev/null +++ b/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyConnectionEvent.java @@ -0,0 +1,105 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.proxy; + +import io.netty.util.internal.StringUtil; + +import java.net.SocketAddress; + +public final class ProxyConnectionEvent { + + private final String protocol; + private final String authScheme; + private final SocketAddress proxyAddress; + private final SocketAddress destinationAddress; + private String strVal; + + /** + * Creates a new event that indicates a successful connection attempt to the destination address. + */ + public ProxyConnectionEvent( + String protocol, String authScheme, SocketAddress proxyAddress, SocketAddress destinationAddress) { + if (protocol == null) { + throw new NullPointerException("protocol"); + } + if (authScheme == null) { + throw new NullPointerException("authScheme"); + } + if (proxyAddress == null) { + throw new NullPointerException("proxyAddress"); + } + if (destinationAddress == null) { + throw new NullPointerException("destinationAddress"); + } + + this.protocol = protocol; + this.authScheme = authScheme; + this.proxyAddress = proxyAddress; + this.destinationAddress = destinationAddress; + } + + /** + * Returns the name of the proxy protocol in use. + */ + public String protocol() { + return protocol; + } + + /** + * Returns the name of the authentication scheme in use. + */ + public String authScheme() { + return authScheme; + } + + /** + * Returns the address of the proxy server. + */ + @SuppressWarnings("unchecked") + public T proxyAddress() { + return (T) proxyAddress; + } + + /** + * Returns the address of the destination. + */ + @SuppressWarnings("unchecked") + public T destinationAddress() { + return (T) destinationAddress; + } + + @Override + public String toString() { + if (strVal != null) { + return strVal; + } + + StringBuilder buf = new StringBuilder(128); + buf.append(StringUtil.simpleClassName(this)); + buf.append('('); + buf.append(protocol); + buf.append(", "); + buf.append(authScheme); + buf.append(", "); + buf.append(proxyAddress); + buf.append(" => "); + buf.append(destinationAddress); + buf.append(')'); + + return strVal = buf.toString(); + } +} diff --git a/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyHandler.java b/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyHandler.java new file mode 100644 index 000000000000..3035ef3d496f --- /dev/null +++ b/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyHandler.java @@ -0,0 +1,449 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.proxy; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.channel.PendingWriteQueue; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.DefaultPromise; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.ScheduledFuture; +import io.netty.util.internal.OneTimeTask; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.net.SocketAddress; +import java.nio.channels.ConnectionPendingException; +import java.util.concurrent.TimeUnit; + +public abstract class ProxyHandler extends ChannelDuplexHandler { + + private static final InternalLogger logger = InternalLoggerFactory.getInstance(ProxyHandler.class); + + /** + * The default connect timeout: 10 seconds. + */ + private static final long DEFAULT_CONNECT_TIMEOUT_MILLIS = 10000; + + /** + * A string that signifies 'no authentication' or 'anonymous'. + */ + static final String AUTH_NONE = "none"; + + private final SocketAddress proxyAddress; + private volatile SocketAddress destinationAddress; + private volatile long connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT_MILLIS; + + private volatile ChannelHandlerContext ctx; + private PendingWriteQueue pendingWrites; + private boolean finished; + private boolean suppressChannelReadComplete; + private boolean flushedPrematurely; + private final LazyChannelPromise connectPromise = new LazyChannelPromise(); + private ScheduledFuture connectTimeoutFuture; + private final ChannelFutureListener writeListener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + setConnectFailure(future.cause()); + } + } + }; + + protected ProxyHandler(SocketAddress proxyAddress) { + if (proxyAddress == null) { + throw new NullPointerException("proxyAddress"); + } + this.proxyAddress = proxyAddress; + } + + /** + * Returns the name of the proxy protocol in use. + */ + public abstract String protocol(); + + /** + * Returns the name of the authentication scheme in use. + */ + public abstract String authScheme(); + + /** + * Returns the address of the proxy server. + */ + @SuppressWarnings("unchecked") + public final T proxyAddress() { + return (T) proxyAddress; + } + + /** + * Returns the address of the destination to connect to via the proxy server. + */ + @SuppressWarnings("unchecked") + public final T destinationAddress() { + return (T) destinationAddress; + } + + /** + * Rerutns {@code true} if and only if the connection to the destination has been established successfully. + */ + public final boolean isConnected() { + return connectPromise.isSuccess(); + } + + /** + * Returns a {@link Future} that is notified when the connection to the destination has been established + * or the connection attempt has failed. + */ + public final Future connectFuture() { + return connectPromise; + } + + /** + * Returns the connect timeout in millis. If the connection attempt to the destination does not finish within + * the timeout, the connection attempt will be failed. + */ + public final long connectTimeoutMillis() { + return connectTimeoutMillis; + } + + /** + * Sets the connect timeout in millis. If the connection attempt to the destination does not finish within + * the timeout, the connection attempt will be failed. + */ + public final void setConnectTimeoutMillis(long connectTimeoutMillis) { + if (connectTimeoutMillis <= 0) { + connectTimeoutMillis = 0; + } + + this.connectTimeoutMillis = connectTimeoutMillis; + } + + @Override + public final void handlerAdded(ChannelHandlerContext ctx) throws Exception { + this.ctx = ctx; + addCodec(ctx); + + if (ctx.channel().isActive()) { + // channelActive() event has been fired already, which means this.channelActive() will + // not be invoked. We have to initialize here instead. + sendInitialMessage(ctx); + } else { + // channelActive() event has not been fired yet. this.channelOpen() will be invoked + // and initialization will occur there. + } + } + + /** + * Adds the codec handlers required to communicate with the proxy server. + */ + protected abstract void addCodec(ChannelHandlerContext ctx) throws Exception; + + /** + * Removes the encoders added in {@link #addCodec(ChannelHandlerContext)}. + */ + protected abstract void removeEncoder(ChannelHandlerContext ctx) throws Exception; + + /** + * Removes the decoders added in {@link #addCodec(ChannelHandlerContext)}. + */ + protected abstract void removeDecoder(ChannelHandlerContext ctx) throws Exception; + + @Override + public final void connect( + ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, + ChannelPromise promise) throws Exception { + + if (destinationAddress != null) { + promise.setFailure(new ConnectionPendingException()); + return; + } + + destinationAddress = remoteAddress; + ctx.connect(proxyAddress, localAddress, promise); + } + + @Override + public final void channelActive(ChannelHandlerContext ctx) throws Exception { + sendInitialMessage(ctx); + ctx.fireChannelActive(); + } + + /** + * Sends the initial message to be sent to the proxy server. This method also starts a timeout task which marks + * the {@link #connectPromise} as failure if the connection attempt does not success within the timeout. + */ + private void sendInitialMessage(final ChannelHandlerContext ctx) throws Exception { + final long connectTimeoutMillis = this.connectTimeoutMillis; + if (connectTimeoutMillis > 0) { + connectTimeoutFuture = ctx.executor().schedule(new OneTimeTask() { + @Override + public void run() { + if (!connectPromise.isDone()) { + setConnectFailure(new ProxyConnectException(exceptionMessage("timeout"))); + } + } + }, connectTimeoutMillis, TimeUnit.MILLISECONDS); + } + + final Object initialMessage = newInitialMessage(ctx); + if (initialMessage != null) { + sendToProxyServer(initialMessage); + } + } + + /** + * Returns a new message that is sent at first time when the connection to the proxy server has been established. + * + * @return the initial message, or {@code null} if the proxy server is expected to send the first message instead + */ + protected abstract Object newInitialMessage(ChannelHandlerContext ctx) throws Exception; + + /** + * Sends the specified message to the proxy server. Use this method to send a response to the proxy server in + * {@link #handleResponse(ChannelHandlerContext, Object)}. + */ + protected final void sendToProxyServer(Object msg) { + ctx.writeAndFlush(msg).addListener(writeListener); + } + + @Override + public final void channelInactive(ChannelHandlerContext ctx) throws Exception { + if (finished) { + ctx.fireChannelInactive(); + } else { + // Disconnected before connected to the destination. + setConnectFailure(new ProxyConnectException(exceptionMessage("disconnected"))); + } + } + + @Override + public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + if (finished) { + ctx.fireExceptionCaught(cause); + } else { + // Exception was raised before the connection attempt is finished. + setConnectFailure(cause); + } + } + + @Override + public final void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (finished) { + // Received a message after the connection has been established; pass through. + suppressChannelReadComplete = false; + ctx.fireChannelRead(msg); + } else { + suppressChannelReadComplete = true; + Throwable cause = null; + try { + boolean done = handleResponse(ctx, msg); + if (done) { + setConnectSuccess(); + } + } catch (Throwable t) { + cause = t; + } finally { + ReferenceCountUtil.release(msg); + if (cause != null) { + setConnectFailure(cause); + } + } + } + } + + /** + * Handles the message received from the proxy server. + * + * @return {@code true} if the connection to the destination has been established, + * {@code false} if the connection to the destination has not been established and more messages are + * expected from the proxy server + */ + protected abstract boolean handleResponse(ChannelHandlerContext ctx, Object response) throws Exception; + + private void setConnectSuccess() { + finished = true; + if (connectTimeoutFuture != null) { + connectTimeoutFuture.cancel(false); + } + + if (connectPromise.trySuccess(ctx.channel())) { + boolean removedCodec = true; + + removedCodec &= safeRemoveEncoder(); + + ctx.fireUserEventTriggered( + new ProxyConnectionEvent(protocol(), authScheme(), proxyAddress, destinationAddress)); + + removedCodec &= safeRemoveDecoder(); + + if (removedCodec) { + writePendingWrites(); + + if (flushedPrematurely) { + ctx.flush(); + } + } else { + // We are at inconsistent state because we failed to remove all codec handlers. + Exception cause = new ProxyConnectException( + "failed to remove all codec handlers added by the proxy handler; bug?"); + failPendingWrites(cause); + ctx.fireExceptionCaught(cause); + ctx.close(); + } + } + } + + private boolean safeRemoveDecoder() { + try { + removeDecoder(ctx); + return true; + } catch (Exception e) { + logger.warn("Failed to remove proxy decoders:", e); + } + + return false; + } + + private boolean safeRemoveEncoder() { + try { + removeEncoder(ctx); + return true; + } catch (Exception e) { + logger.warn("Failed to remove proxy encoders:", e); + } + + return false; + } + + private void setConnectFailure(Throwable cause) { + finished = true; + if (connectTimeoutFuture != null) { + connectTimeoutFuture.cancel(false); + } + + if (!(cause instanceof ProxyConnectException)) { + cause = new ProxyConnectException( + exceptionMessage(cause.toString()), cause); + } + + if (connectPromise.tryFailure(cause)) { + safeRemoveDecoder(); + safeRemoveEncoder(); + + failPendingWrites(cause); + ctx.fireExceptionCaught(cause); + ctx.close(); + } + } + + /** + * Decorates the specified exception message with the common information such as the current protocol, + * authentication scheme, proxy address, and destination address. + */ + protected final String exceptionMessage(String msg) { + if (msg == null) { + msg = ""; + } + + StringBuilder buf = new StringBuilder(128 + msg.length()); + + buf.append(protocol()); + buf.append(", "); + buf.append(authScheme()); + buf.append(", "); + buf.append(proxyAddress); + buf.append(" => "); + buf.append(destinationAddress); + if (msg.length() > 0) { + buf.append(", "); + buf.append(msg); + } + + return buf.toString(); + } + + @Override + public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + if (suppressChannelReadComplete) { + suppressChannelReadComplete = false; + + if (!ctx.channel().config().isAutoRead()) { + ctx.read(); + } + } else { + ctx.fireChannelReadComplete(); + } + } + + @Override + public final void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (finished) { + writePendingWrites(); + ctx.write(msg, promise); + } else { + addPendingWrite(ctx, msg, promise); + } + } + + @Override + public final void flush(ChannelHandlerContext ctx) throws Exception { + if (finished) { + writePendingWrites(); + ctx.flush(); + } else { + flushedPrematurely = true; + } + } + + private void writePendingWrites() { + if (pendingWrites != null) { + pendingWrites.removeAndWriteAll(); + pendingWrites = null; + } + } + + private void failPendingWrites(Throwable cause) { + if (pendingWrites != null) { + pendingWrites.removeAndFailAll(cause); + pendingWrites = null; + } + } + + private void addPendingWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + PendingWriteQueue pendingWrites = this.pendingWrites; + if (pendingWrites == null) { + this.pendingWrites = pendingWrites = new PendingWriteQueue(ctx); + } + pendingWrites.add(msg, promise); + } + + private final class LazyChannelPromise extends DefaultPromise { + @Override + protected EventExecutor executor() { + if (ctx == null) { + throw new IllegalStateException(); + } + return ctx.executor(); + } + } +} diff --git a/handler-proxy/src/main/java/io/netty/handler/proxy/Socks4ProxyHandler.java b/handler-proxy/src/main/java/io/netty/handler/proxy/Socks4ProxyHandler.java new file mode 100644 index 000000000000..ce7dbb843190 --- /dev/null +++ b/handler-proxy/src/main/java/io/netty/handler/proxy/Socks4ProxyHandler.java @@ -0,0 +1,116 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.proxy; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.socksx.v4.Socks4CmdRequest; +import io.netty.handler.codec.socksx.v4.Socks4CmdResponse; +import io.netty.handler.codec.socksx.v4.Socks4CmdResponseDecoder; +import io.netty.handler.codec.socksx.v4.Socks4CmdStatus; +import io.netty.handler.codec.socksx.v4.Socks4CmdType; +import io.netty.handler.codec.socksx.v4.Socks4MessageEncoder; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +public final class Socks4ProxyHandler extends ProxyHandler { + + private static final String PROTOCOL = "socks4"; + private static final String AUTH_USERNAME = "username"; + + private final String username; + + private String decoderName; + private String encoderName; + + public Socks4ProxyHandler(SocketAddress proxyAddress) { + this(proxyAddress, null); + } + + public Socks4ProxyHandler(SocketAddress proxyAddress, String username) { + super(proxyAddress); + if (username != null && username.length() == 0) { + username = null; + } + this.username = username; + } + + @Override + public String protocol() { + return PROTOCOL; + } + + @Override + public String authScheme() { + return username != null? AUTH_USERNAME : AUTH_NONE; + } + + public String username() { + return username; + } + + @Override + protected void addCodec(ChannelHandlerContext ctx) throws Exception { + ChannelPipeline p = ctx.pipeline(); + String name = ctx.name(); + + Socks4CmdResponseDecoder decoder = new Socks4CmdResponseDecoder(); + p.addBefore(name, null, decoder); + + decoderName = p.context(decoder).name(); + encoderName = decoderName + ".encoder"; + + p.addBefore(name, encoderName, Socks4MessageEncoder.INSTANCE); + } + + @Override + protected void removeEncoder(ChannelHandlerContext ctx) throws Exception { + ChannelPipeline p = ctx.pipeline(); + p.remove(encoderName); + } + + @Override + protected void removeDecoder(ChannelHandlerContext ctx) throws Exception { + ChannelPipeline p = ctx.pipeline(); + p.remove(decoderName); + } + + @Override + protected Object newInitialMessage(ChannelHandlerContext ctx) throws Exception { + InetSocketAddress raddr = destinationAddress(); + String rhost; + if (raddr.isUnresolved()) { + rhost = raddr.getHostString(); + } else { + rhost = raddr.getAddress().getHostAddress(); + } + return new Socks4CmdRequest( + username != null? username : "", Socks4CmdType.CONNECT, rhost, raddr.getPort()); + } + + @Override + protected boolean handleResponse(ChannelHandlerContext ctx, Object response) throws Exception { + final Socks4CmdResponse res = (Socks4CmdResponse) response; + final Socks4CmdStatus status = res.cmdStatus(); + if (status == Socks4CmdStatus.SUCCESS) { + return true; + } + + throw new ProxyConnectException(exceptionMessage("cmdStatus: " + status)); + } +} diff --git a/handler-proxy/src/main/java/io/netty/handler/proxy/Socks5ProxyHandler.java b/handler-proxy/src/main/java/io/netty/handler/proxy/Socks5ProxyHandler.java new file mode 100644 index 000000000000..a96c2838df96 --- /dev/null +++ b/handler-proxy/src/main/java/io/netty/handler/proxy/Socks5ProxyHandler.java @@ -0,0 +1,208 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.proxy; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.socksx.v5.Socks5AddressType; +import io.netty.handler.codec.socksx.v5.Socks5AuthRequest; +import io.netty.handler.codec.socksx.v5.Socks5AuthResponse; +import io.netty.handler.codec.socksx.v5.Socks5AuthResponseDecoder; +import io.netty.handler.codec.socksx.v5.Socks5AuthScheme; +import io.netty.handler.codec.socksx.v5.Socks5AuthStatus; +import io.netty.handler.codec.socksx.v5.Socks5CmdRequest; +import io.netty.handler.codec.socksx.v5.Socks5CmdResponse; +import io.netty.handler.codec.socksx.v5.Socks5CmdResponseDecoder; +import io.netty.handler.codec.socksx.v5.Socks5CmdStatus; +import io.netty.handler.codec.socksx.v5.Socks5CmdType; +import io.netty.handler.codec.socksx.v5.Socks5InitRequest; +import io.netty.handler.codec.socksx.v5.Socks5InitResponse; +import io.netty.handler.codec.socksx.v5.Socks5InitResponseDecoder; +import io.netty.handler.codec.socksx.v5.Socks5MessageEncoder; +import io.netty.util.NetUtil; +import io.netty.util.internal.StringUtil; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Arrays; +import java.util.Collections; + +public final class Socks5ProxyHandler extends ProxyHandler { + + private static final String PROTOCOL = "socks5"; + private static final String AUTH_PASSWORD = "password"; + + private static final Socks5InitRequest INIT_REQUEST_NO_AUTH = + new Socks5InitRequest(Collections.singletonList(Socks5AuthScheme.NO_AUTH)); + + private static final Socks5InitRequest INIT_REQUEST_PASSWORD = + new Socks5InitRequest(Arrays.asList(Socks5AuthScheme.NO_AUTH, Socks5AuthScheme.AUTH_PASSWORD)); + + private final String username; + private final String password; + + private String decoderName; + private String encoderName; + + public Socks5ProxyHandler(SocketAddress proxyAddress) { + this(proxyAddress, null, null); + } + + public Socks5ProxyHandler(SocketAddress proxyAddress, String username, String password) { + super(proxyAddress); + if (username != null && username.length() == 0) { + username = null; + } + if (password != null && password.length() == 0) { + password = null; + } + this.username = username; + this.password = password; + } + + @Override + public String protocol() { + return PROTOCOL; + } + + @Override + public String authScheme() { + return socksAuthScheme() == Socks5AuthScheme.AUTH_PASSWORD? AUTH_PASSWORD : AUTH_NONE; + } + + public String username() { + return username; + } + + public String password() { + return password; + } + + @Override + protected void addCodec(ChannelHandlerContext ctx) throws Exception { + ChannelPipeline p = ctx.pipeline(); + String name = ctx.name(); + + Socks5InitResponseDecoder decoder = new Socks5InitResponseDecoder(); + p.addBefore(name, null, decoder); + + decoderName = p.context(decoder).name(); + encoderName = decoderName + ".encoder"; + + p.addBefore(name, encoderName, Socks5MessageEncoder.INSTANCE); + } + + @Override + protected void removeEncoder(ChannelHandlerContext ctx) throws Exception { + ctx.pipeline().remove(encoderName); + } + + @Override + protected void removeDecoder(ChannelHandlerContext ctx) throws Exception { + ChannelPipeline p = ctx.pipeline(); + if (p.context(decoderName) != null) { + p.remove(decoderName); + } + } + + @Override + protected Object newInitialMessage(ChannelHandlerContext ctx) throws Exception { + return socksAuthScheme() == Socks5AuthScheme.AUTH_PASSWORD? INIT_REQUEST_PASSWORD : INIT_REQUEST_NO_AUTH; + } + + @Override + protected boolean handleResponse(ChannelHandlerContext ctx, Object response) throws Exception { + if (response instanceof Socks5InitResponse) { + Socks5InitResponse res = (Socks5InitResponse) response; + Socks5AuthScheme authScheme = socksAuthScheme(); + + if (res.authScheme() != Socks5AuthScheme.NO_AUTH && authScheme != res.authScheme()) { + // Server did not allow unauthenticated access nor accept the requested authentication scheme. + throw new ProxyConnectException(exceptionMessage("unexpected authScheme: " + res.authScheme())); + } + + switch (authScheme) { + case NO_AUTH: + sendConnectCommand(ctx); + break; + case AUTH_PASSWORD: + // In case of password authentication, send an authentication request. + ctx.pipeline().addBefore(encoderName, decoderName, new Socks5AuthResponseDecoder()); + sendToProxyServer( + new Socks5AuthRequest(username != null? username : "", password != null? password : "")); + break; + default: + // Should never reach here. + throw new Error(); + } + + return false; + } + + if (response instanceof Socks5AuthResponse) { + // Received an authentication response from the server. + Socks5AuthResponse res = (Socks5AuthResponse) response; + if (res.authStatus() != Socks5AuthStatus.SUCCESS) { + throw new ProxyConnectException(exceptionMessage("authStatus: " + res.authStatus())); + } + + sendConnectCommand(ctx); + return false; + } + + // This should be the last message from the server. + Socks5CmdResponse res = (Socks5CmdResponse) response; + if (res.cmdStatus() != Socks5CmdStatus.SUCCESS) { + throw new ProxyConnectException(exceptionMessage("cmdStatus: " + res.cmdStatus())); + } + + return true; + } + + private Socks5AuthScheme socksAuthScheme() { + Socks5AuthScheme authScheme; + if (username == null && password == null) { + authScheme = Socks5AuthScheme.NO_AUTH; + } else { + authScheme = Socks5AuthScheme.AUTH_PASSWORD; + } + return authScheme; + } + + private void sendConnectCommand(ChannelHandlerContext ctx) throws Exception { + InetSocketAddress raddr = destinationAddress(); + Socks5AddressType addrType; + String rhost; + if (raddr.isUnresolved()) { + addrType = Socks5AddressType.DOMAIN; + rhost = raddr.getHostString(); + } else { + rhost = raddr.getAddress().getHostAddress(); + if (NetUtil.isValidIpV4Address(rhost)) { + addrType = Socks5AddressType.IPv4; + } else if (NetUtil.isValidIpV6Address(rhost)) { + addrType = Socks5AddressType.IPv6; + } else { + throw new ProxyConnectException( + exceptionMessage("unknown address type: " + StringUtil.simpleClassName(rhost))); + } + } + + ctx.pipeline().addBefore(encoderName, decoderName, new Socks5CmdResponseDecoder()); + sendToProxyServer(new Socks5CmdRequest(Socks5CmdType.CONNECT, addrType, rhost, raddr.getPort())); + } +} diff --git a/handler-proxy/src/main/java/io/netty/handler/proxy/package-info.java b/handler-proxy/src/main/java/io/netty/handler/proxy/package-info.java new file mode 100644 index 000000000000..8401a4da042c --- /dev/null +++ b/handler-proxy/src/main/java/io/netty/handler/proxy/package-info.java @@ -0,0 +1,21 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +/** + * Adds support for client connections via proxy protocols such as + * SOCKS and + * HTTP CONNECT tunneling + */ +package io.netty.handler.proxy; diff --git a/handler-proxy/src/test/java/io/netty/handler/proxy/HttpProxyServer.java b/handler-proxy/src/test/java/io/netty/handler/proxy/HttpProxyServer.java new file mode 100644 index 000000000000..963a779e838f --- /dev/null +++ b/handler-proxy/src/test/java/io/netty/handler/proxy/HttpProxyServer.java @@ -0,0 +1,166 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.proxy; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.LineBasedFrameDecoder; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders.Names; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpRequestDecoder; +import io.netty.handler.codec.http.HttpResponseEncoder; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.util.CharsetUtil; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.*; + +final class HttpProxyServer extends ProxyServer { + + HttpProxyServer(boolean useSsl, TestMode testMode, InetSocketAddress destination) { + super(useSsl, testMode, destination); + } + + HttpProxyServer( + boolean useSsl, TestMode testMode, InetSocketAddress destination, String username, String password) { + super(useSsl, testMode, destination, username, password); + } + + @Override + protected void configure(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + switch (testMode) { + case INTERMEDIARY: + p.addLast(new HttpServerCodec()); + p.addLast(new HttpObjectAggregator(1)); + p.addLast(new HttpIntermediaryHandler()); + break; + case TERMINAL: + p.addLast(new HttpServerCodec()); + p.addLast(new HttpObjectAggregator(1)); + p.addLast(new HttpTerminalHandler()); + break; + case UNRESPONSIVE: + p.addLast(UnresponsiveHandler.INSTANCE); + break; + } + } + + private boolean authenticate(ChannelHandlerContext ctx, FullHttpRequest req) { + assertThat(req.method(), is(HttpMethod.CONNECT)); + + if (testMode != TestMode.INTERMEDIARY) { + ctx.pipeline().addBefore(ctx.name(), "lineDecoder", new LineBasedFrameDecoder(64, false, true)); + } + + ctx.pipeline().remove(HttpObjectAggregator.class); + ctx.pipeline().remove(HttpRequestDecoder.class); + + boolean authzSuccess = false; + if (username != null) { + String authz = req.headers().get(Names.AUTHORIZATION); + if (authz != null) { + ByteBuf authzBuf64 = Unpooled.copiedBuffer(authz, CharsetUtil.US_ASCII); + ByteBuf authzBuf = Base64.decode(authzBuf64); + authz = authzBuf.toString(CharsetUtil.US_ASCII); + authzBuf64.release(); + authzBuf.release(); + String expectedAuthz = username + ':' + password; + authzSuccess = expectedAuthz.equals(authz); + } + } else { + authzSuccess = true; + } + + return authzSuccess; + } + + private final class HttpIntermediaryHandler extends IntermediaryHandler { + + private SocketAddress intermediaryDestination; + + @Override + protected boolean handleProxyProtocol(ChannelHandlerContext ctx, Object msg) throws Exception { + FullHttpRequest req = (FullHttpRequest) msg; + FullHttpResponse res; + if (!authenticate(ctx, req)) { + res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.UNAUTHORIZED); + res.headers().set(Names.CONTENT_LENGTH, 0); + } else { + res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + String uri = req.uri(); + int lastColonPos = uri.lastIndexOf(':'); + assertThat(lastColonPos, is(greaterThan(0))); + intermediaryDestination = new InetSocketAddress( + uri.substring(0, lastColonPos), Integer.parseInt(uri.substring(lastColonPos + 1))); + } + + ctx.write(res); + ctx.pipeline().remove(HttpResponseEncoder.class); + return true; + } + + @Override + protected SocketAddress intermediaryDestination() { + return intermediaryDestination; + } + } + + private final class HttpTerminalHandler extends TerminalHandler { + + @Override + protected boolean handleProxyProtocol(ChannelHandlerContext ctx, Object msg) throws Exception { + FullHttpRequest req = (FullHttpRequest) msg; + FullHttpResponse res; + boolean sendGreeting = false; + + if (!authenticate(ctx, req)) { + res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.UNAUTHORIZED); + res.headers().set(Names.CONTENT_LENGTH, 0); + } else if (!req.uri().equals(destination.getHostString() + ':' + destination.getPort())) { + res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN); + res.headers().set(Names.CONTENT_LENGTH, 0); + } else { + res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + sendGreeting = true; + } + + ctx.write(res); + ctx.pipeline().remove(HttpResponseEncoder.class); + + if (sendGreeting) { + ctx.write(Unpooled.copiedBuffer("0\n", CharsetUtil.US_ASCII)); + } + + return true; + } + } +} diff --git a/handler-proxy/src/test/java/io/netty/handler/proxy/ProxyHandlerTest.java b/handler-proxy/src/test/java/io/netty/handler/proxy/ProxyHandlerTest.java new file mode 100644 index 000000000000..ceefa69f16a8 --- /dev/null +++ b/handler-proxy/src/test/java/io/netty/handler/proxy/ProxyHandlerTest.java @@ -0,0 +1,641 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.proxy; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.LineBasedFrameDecoder; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.handler.ssl.util.SelfSignedCertificate; +import io.netty.util.CharsetUtil; +import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.Future; +import io.netty.util.internal.EmptyArrays; +import io.netty.util.internal.StringUtil; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; + +@RunWith(Parameterized.class) +public class ProxyHandlerTest { + + private static final InternalLogger logger = InternalLoggerFactory.getInstance(ProxyHandlerTest.class); + + private static final InetSocketAddress DESTINATION = InetSocketAddress.createUnresolved("destination.com", 42); + private static final InetSocketAddress BAD_DESTINATION = new InetSocketAddress("1.2.3.4", 5); + private static final String USERNAME = "testUser"; + private static final String PASSWORD = "testPassword"; + private static final String BAD_USERNAME = "badUser"; + private static final String BAD_PASSWORD = "badPassword"; + + static final EventLoopGroup group = new NioEventLoopGroup(3, new DefaultThreadFactory("proxy", true)); + + static final SslContext serverSslCtx; + static final SslContext clientSslCtx; + + static { + SslContext sctx; + SslContext cctx; + try { + SelfSignedCertificate ssc = new SelfSignedCertificate(); + sctx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey()); + cctx = SslContext.newClientContext(InsecureTrustManagerFactory.INSTANCE); + } catch (Exception e) { + throw new Error(e); + } + serverSslCtx = sctx; + clientSslCtx = cctx; + } + + static final ProxyServer deadHttpProxy = new HttpProxyServer(false, TestMode.UNRESPONSIVE, null); + static final ProxyServer interHttpProxy = new HttpProxyServer(false, TestMode.INTERMEDIARY, null); + static final ProxyServer anonHttpProxy = new HttpProxyServer(false, TestMode.TERMINAL, DESTINATION); + static final ProxyServer httpProxy = + new HttpProxyServer(false, TestMode.TERMINAL, DESTINATION, USERNAME, PASSWORD); + + static final ProxyServer deadHttpsProxy = new HttpProxyServer(true, TestMode.UNRESPONSIVE, null); + static final ProxyServer interHttpsProxy = new HttpProxyServer(true, TestMode.INTERMEDIARY, null); + static final ProxyServer anonHttpsProxy = new HttpProxyServer(true, TestMode.TERMINAL, DESTINATION); + static final ProxyServer httpsProxy = + new HttpProxyServer(true, TestMode.TERMINAL, DESTINATION, USERNAME, PASSWORD); + + static final ProxyServer deadSocks4Proxy = new Socks4ProxyServer(false, TestMode.UNRESPONSIVE, null); + static final ProxyServer interSocks4Proxy = new Socks4ProxyServer(false, TestMode.INTERMEDIARY, null); + static final ProxyServer anonSocks4Proxy = new Socks4ProxyServer(false, TestMode.TERMINAL, DESTINATION); + static final ProxyServer socks4Proxy = new Socks4ProxyServer(false, TestMode.TERMINAL, DESTINATION, USERNAME); + + static final ProxyServer deadSocks5Proxy = new Socks5ProxyServer(false, TestMode.UNRESPONSIVE, null); + static final ProxyServer interSocks5Proxy = new Socks5ProxyServer(false, TestMode.INTERMEDIARY, null); + static final ProxyServer anonSocks5Proxy = new Socks5ProxyServer(false, TestMode.TERMINAL, DESTINATION); + static final ProxyServer socks5Proxy = + new Socks5ProxyServer(false, TestMode.TERMINAL, DESTINATION, USERNAME, PASSWORD); + + private static final Collection allProxies = Arrays.asList( + deadHttpProxy, interHttpProxy, anonHttpProxy, httpProxy, + deadHttpsProxy, interHttpsProxy, anonHttpsProxy, httpsProxy, + deadSocks4Proxy, interSocks4Proxy, anonSocks4Proxy, socks4Proxy, + deadSocks5Proxy, interSocks5Proxy, anonSocks5Proxy, socks5Proxy + ); + + @Parameters(name = "{index}: {0}") + public static List testItems() { + List items = Arrays.asList( + + // HTTP ------------------------------------------------------- + + new SuccessTestItem( + "Anonymous HTTP proxy: successful connection", + DESTINATION, + new HttpProxyHandler(anonHttpProxy.address())), + + new FailureTestItem( + "Anonymous HTTP proxy: rejected connection", + BAD_DESTINATION, "status: 403", + new HttpProxyHandler(anonHttpProxy.address())), + + new FailureTestItem( + "HTTP proxy: rejected anonymous connection", + DESTINATION, "status: 401", + new HttpProxyHandler(httpProxy.address())), + + new SuccessTestItem( + "HTTP proxy: successful connection", + DESTINATION, + new HttpProxyHandler(httpProxy.address(), USERNAME, PASSWORD)), + + new FailureTestItem( + "HTTP proxy: rejected connection", + BAD_DESTINATION, "status: 403", + new HttpProxyHandler(httpProxy.address(), USERNAME, PASSWORD)), + + new FailureTestItem( + "HTTP proxy: authentication failure", + DESTINATION, "status: 401", + new HttpProxyHandler(httpProxy.address(), BAD_USERNAME, BAD_PASSWORD)), + + new TimeoutTestItem( + "HTTP proxy: timeout", + new HttpProxyHandler(deadHttpProxy.address())), + + // HTTPS ------------------------------------------------------ + + new SuccessTestItem( + "Anonymous HTTPS proxy: successful connection", + DESTINATION, + clientSslCtx.newHandler(PooledByteBufAllocator.DEFAULT), + new HttpProxyHandler(anonHttpsProxy.address())), + + new FailureTestItem( + "Anonymous HTTPS proxy: rejected connection", + BAD_DESTINATION, "status: 403", + clientSslCtx.newHandler(PooledByteBufAllocator.DEFAULT), + new HttpProxyHandler(anonHttpsProxy.address())), + + new FailureTestItem( + "HTTPS proxy: rejected anonymous connection", + DESTINATION, "status: 401", + clientSslCtx.newHandler(PooledByteBufAllocator.DEFAULT), + new HttpProxyHandler(httpsProxy.address())), + + new SuccessTestItem( + "HTTPS proxy: successful connection", + DESTINATION, + clientSslCtx.newHandler(PooledByteBufAllocator.DEFAULT), + new HttpProxyHandler(httpsProxy.address(), USERNAME, PASSWORD)), + + new FailureTestItem( + "HTTPS proxy: rejected connection", + BAD_DESTINATION, "status: 403", + clientSslCtx.newHandler(PooledByteBufAllocator.DEFAULT), + new HttpProxyHandler(httpsProxy.address(), USERNAME, PASSWORD)), + + new FailureTestItem( + "HTTPS proxy: authentication failure", + DESTINATION, "status: 401", + clientSslCtx.newHandler(PooledByteBufAllocator.DEFAULT), + new HttpProxyHandler(httpsProxy.address(), BAD_USERNAME, BAD_PASSWORD)), + + new TimeoutTestItem( + "HTTPS proxy: timeout", + clientSslCtx.newHandler(PooledByteBufAllocator.DEFAULT), + new HttpProxyHandler(deadHttpsProxy.address())), + + // SOCKS4 ----------------------------------------------------- + + new SuccessTestItem( + "Anonymous SOCKS4: successful connection", + DESTINATION, + new Socks4ProxyHandler(anonSocks4Proxy.address())), + + new FailureTestItem( + "Anonymous SOCKS4: rejected connection", + BAD_DESTINATION, "cmdStatus: REJECTED_OR_FAILED", + new Socks4ProxyHandler(anonSocks4Proxy.address())), + + new FailureTestItem( + "SOCKS4: rejected anonymous connection", + DESTINATION, "cmdStatus: IDENTD_AUTH_FAILURE", + new Socks4ProxyHandler(socks4Proxy.address())), + + new SuccessTestItem( + "SOCKS4: successful connection", + DESTINATION, + new Socks4ProxyHandler(socks4Proxy.address(), USERNAME)), + + new FailureTestItem( + "SOCKS4: rejected connection", + BAD_DESTINATION, "cmdStatus: REJECTED_OR_FAILED", + new Socks4ProxyHandler(socks4Proxy.address(), USERNAME)), + + new FailureTestItem( + "SOCKS4: authentication failure", + DESTINATION, "cmdStatus: IDENTD_AUTH_FAILURE", + new Socks4ProxyHandler(socks4Proxy.address(), BAD_USERNAME)), + + new TimeoutTestItem( + "SOCKS4: timeout", + new Socks4ProxyHandler(deadSocks4Proxy.address())), + + // SOCKS5 ----------------------------------------------------- + + new SuccessTestItem( + "Anonymous SOCKS5: successful connection", + DESTINATION, + new Socks5ProxyHandler(anonSocks5Proxy.address())), + + new FailureTestItem( + "Anonymous SOCKS5: rejected connection", + BAD_DESTINATION, "cmdStatus: FORBIDDEN", + new Socks5ProxyHandler(anonSocks5Proxy.address())), + + new FailureTestItem( + "SOCKS5: rejected anonymous connection", + DESTINATION, "unexpected authScheme: AUTH_PASSWORD", + new Socks5ProxyHandler(socks5Proxy.address())), + + new SuccessTestItem( + "SOCKS5: successful connection", + DESTINATION, + new Socks5ProxyHandler(socks5Proxy.address(), USERNAME, PASSWORD)), + + new FailureTestItem( + "SOCKS5: rejected connection", + BAD_DESTINATION, "cmdStatus: FORBIDDEN", + new Socks5ProxyHandler(socks5Proxy.address(), USERNAME, PASSWORD)), + + new FailureTestItem( + "SOCKS5: authentication failure", + DESTINATION, "authStatus: FAILURE", + new Socks5ProxyHandler(socks5Proxy.address(), BAD_USERNAME, BAD_PASSWORD)), + + new TimeoutTestItem( + "SOCKS5: timeout", + new Socks5ProxyHandler(deadSocks5Proxy.address())), + + // HTTP + HTTPS + SOCKS4 + SOCKS5 + + new SuccessTestItem( + "Single-chain: successful connection", + DESTINATION, + new Socks5ProxyHandler(interSocks5Proxy.address()), // SOCKS5 + new Socks4ProxyHandler(interSocks4Proxy.address()), // SOCKS4 + clientSslCtx.newHandler(PooledByteBufAllocator.DEFAULT), + new HttpProxyHandler(interHttpsProxy.address()), // HTTPS + new HttpProxyHandler(interHttpProxy.address()), // HTTP + new HttpProxyHandler(anonHttpProxy.address())), + + // (HTTP + HTTPS + SOCKS4 + SOCKS5) * 2 + + new SuccessTestItem( + "Double-chain: successful connection", + DESTINATION, + new Socks5ProxyHandler(interSocks5Proxy.address()), // SOCKS5 + new Socks4ProxyHandler(interSocks4Proxy.address()), // SOCKS4 + clientSslCtx.newHandler(PooledByteBufAllocator.DEFAULT), + new HttpProxyHandler(interHttpsProxy.address()), // HTTPS + new HttpProxyHandler(interHttpProxy.address()), // HTTP + new Socks5ProxyHandler(interSocks5Proxy.address()), // SOCKS5 + new Socks4ProxyHandler(interSocks4Proxy.address()), // SOCKS4 + clientSslCtx.newHandler(PooledByteBufAllocator.DEFAULT), + new HttpProxyHandler(interHttpsProxy.address()), // HTTPS + new HttpProxyHandler(interHttpProxy.address()), // HTTP + new HttpProxyHandler(anonHttpProxy.address())) + + ); + + // Convert the test items to the list of constructor parameters. + List params = new ArrayList(items.size()); + for (Object i: items) { + params.add(new Object[] { i }); + } + + // Randomize the execution order to increase the possibility of exposing failure dependencies. + Collections.shuffle(params); + + return params; + } + + @AfterClass + public static void stopServers() { + for (ProxyServer p: allProxies) { + p.stop(); + } + } + + private final TestItem testItem; + + public ProxyHandlerTest(TestItem testItem) { + this.testItem = testItem; + } + + @Before + public void clearServerExceptions() throws Exception { + for (ProxyServer p: allProxies) { + p.clearExceptions(); + } + } + + @Test + public void test() throws Exception { + testItem.test(); + } + + @After + public void checkServerExceptions() throws Exception { + for (ProxyServer p: allProxies) { + p.checkExceptions(); + } + } + + private static final class SuccessTestHandler extends SimpleChannelInboundHandler { + + final Queue received = new LinkedBlockingQueue(); + final Queue exceptions = new LinkedBlockingQueue(); + volatile int eventCount; + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ctx.writeAndFlush(Unpooled.copiedBuffer("A\n", CharsetUtil.US_ASCII)); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof ProxyConnectionEvent) { + eventCount ++; + + if (eventCount == 1) { + // Note that ProxyConnectionEvent can be triggered multiple times when there are multiple + // ProxyHandlers in the pipeline. Therefore, we send the 'B' message only on the first event. + ctx.writeAndFlush(Unpooled.copiedBuffer("B\n", CharsetUtil.US_ASCII)); + } + } + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { + String str = ((ByteBuf) msg).toString(CharsetUtil.US_ASCII); + received.add(str); + if ("2".equals(str)) { + ctx.writeAndFlush(Unpooled.copiedBuffer("C\n", CharsetUtil.US_ASCII)); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + exceptions.add(cause); + ctx.close(); + } + } + + private static final class FailureTestHandler extends SimpleChannelInboundHandler { + + final Queue exceptions = new LinkedBlockingQueue(); + + /** + * A latch that counts down when: + * - a pending write attempt in {@link #channelActive(ChannelHandlerContext)} finishes, or + * - the channel is closed. + * By waiting until the latch goes down to 0, we can make sure all assertion failures related with all write + * attempts have been recorded. + */ + final CountDownLatch latch = new CountDownLatch(2); + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ctx.writeAndFlush(Unpooled.copiedBuffer("A\n", CharsetUtil.US_ASCII)).addListener( + new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + latch.countDown(); + if (!(future.cause() instanceof ProxyConnectException)) { + exceptions.add(new AssertionError( + "Unexpected failure cause for initial write: " + future.cause())); + } + } + }); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + latch.countDown(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof ProxyConnectionEvent) { + fail("Unexpected event: " + evt); + } + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { + fail("Unexpected message: " + msg); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + exceptions.add(cause); + ctx.close(); + } + } + + private abstract static class TestItem { + final String name; + final InetSocketAddress destination; + final ChannelHandler[] clientHandlers; + + protected TestItem(String name, InetSocketAddress destination, ChannelHandler... clientHandlers) { + this.name = name; + this.destination = destination; + this.clientHandlers = clientHandlers; + } + + abstract void test() throws Exception; + + protected void assertProxyHandlers(boolean success) { + for (ChannelHandler h: clientHandlers) { + if (h instanceof ProxyHandler) { + ProxyHandler ph = (ProxyHandler) h; + String type = StringUtil.simpleClassName(ph); + Future f = ph.connectFuture(); + if (!f.isDone()) { + logger.warn("{}: not done", type); + } else if (f.isSuccess()) { + if (success) { + logger.debug("{}: success", type); + } else { + logger.warn("{}: success", type); + } + } else { + if (success) { + logger.warn("{}: failure", type, f.cause()); + } else { + logger.debug("{}: failure", type, f.cause()); + } + } + } + } + + for (ChannelHandler h: clientHandlers) { + if (h instanceof ProxyHandler) { + ProxyHandler ph = (ProxyHandler) h; + assertThat(ph.connectFuture().isDone(), is(true)); + assertThat(ph.connectFuture().isSuccess(), is(success)); + } + } + } + + @Override + public String toString() { + return name; + } + } + + private static final class SuccessTestItem extends TestItem { + + private final int expectedEventCount; + + SuccessTestItem(String name, InetSocketAddress destination, ChannelHandler... clientHandlers) { + super(name, destination, clientHandlers); + int expectedEventCount = 0; + for (ChannelHandler h: clientHandlers) { + if (h instanceof ProxyHandler) { + expectedEventCount++; + } + } + + this.expectedEventCount = expectedEventCount; + } + + @Override + protected void test() throws Exception { + final SuccessTestHandler testHandler = new SuccessTestHandler(); + Bootstrap b = new Bootstrap(); + b.group(group); + b.channel(NioSocketChannel.class); + b.handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast(clientHandlers); + p.addLast(new LineBasedFrameDecoder(64)); + p.addLast(testHandler); + } + }); + + boolean finished = b.connect(destination).channel().closeFuture().await(10, TimeUnit.SECONDS); + + logger.debug("Received messages: {}", testHandler.received); + + if (testHandler.exceptions.isEmpty()) { + logger.debug("No recorded exceptions on the client side."); + } else { + for (Throwable t : testHandler.exceptions) { + logger.debug("Recorded exception on the client side: {}", t); + } + } + + assertProxyHandlers(true); + + assertThat(testHandler.received.toArray(), is(new Object[] { "0", "1", "2", "3" })); + assertThat(testHandler.exceptions.toArray(), is(EmptyArrays.EMPTY_OBJECTS)); + assertThat(testHandler.eventCount, is(expectedEventCount)); + assertThat(finished, is(true)); + } + } + + private static final class FailureTestItem extends TestItem { + + private final String expectedMessage; + + FailureTestItem( + String name, InetSocketAddress destination, String expectedMessage, ChannelHandler... clientHandlers) { + super(name, destination, clientHandlers); + this.expectedMessage = expectedMessage; + } + + @Override + protected void test() throws Exception { + final FailureTestHandler testHandler = new FailureTestHandler(); + Bootstrap b = new Bootstrap(); + b.group(group); + b.channel(NioSocketChannel.class); + b.handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast(clientHandlers); + p.addLast(new LineBasedFrameDecoder(64)); + p.addLast(testHandler); + } + }); + + boolean finished = b.connect(destination).channel().closeFuture().await(10, TimeUnit.SECONDS); + finished &= testHandler.latch.await(10, TimeUnit.SECONDS); + + logger.debug("Recorded exceptions: {}", testHandler.exceptions); + + assertProxyHandlers(false); + + assertThat(testHandler.exceptions.size(), is(1)); + Throwable e = testHandler.exceptions.poll(); + assertThat(e, is(instanceOf(ProxyConnectException.class))); + assertThat(String.valueOf(e), containsString(expectedMessage)); + assertThat(finished, is(true)); + } + } + + private static final class TimeoutTestItem extends TestItem { + + TimeoutTestItem(String name, ChannelHandler... clientHandlers) { + super(name, null, clientHandlers); + } + + @Override + protected void test() throws Exception { + final long TIMEOUT = 2000; + for (ChannelHandler h: clientHandlers) { + if (h instanceof ProxyHandler) { + ((ProxyHandler) h).setConnectTimeoutMillis(TIMEOUT); + } + } + + final FailureTestHandler testHandler = new FailureTestHandler(); + Bootstrap b = new Bootstrap(); + b.group(group); + b.channel(NioSocketChannel.class); + b.handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast(clientHandlers); + p.addLast(new LineBasedFrameDecoder(64)); + p.addLast(testHandler); + } + }); + + ChannelFuture cf = b.connect(DESTINATION).channel().closeFuture(); + boolean finished = cf.await(TIMEOUT * 2, TimeUnit.MILLISECONDS); + finished &= testHandler.latch.await(TIMEOUT * 2, TimeUnit.MILLISECONDS); + + logger.debug("Recorded exceptions: {}", testHandler.exceptions); + + assertProxyHandlers(false); + + assertThat(testHandler.exceptions.size(), is(1)); + Throwable e = testHandler.exceptions.poll(); + assertThat(e, is(instanceOf(ProxyConnectException.class))); + assertThat(String.valueOf(e), containsString("timeout")); + assertThat(finished, is(true)); + } + } +} diff --git a/handler-proxy/src/test/java/io/netty/handler/proxy/ProxyServer.java b/handler-proxy/src/test/java/io/netty/handler/proxy/ProxyServer.java new file mode 100644 index 000000000000..b2a726d39498 --- /dev/null +++ b/handler-proxy/src/test/java/io/netty/handler/proxy/ProxyServer.java @@ -0,0 +1,306 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.proxy; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoop; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.ServerSocketChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.CharsetUtil; +import io.netty.util.NetUtil; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; + +abstract class ProxyServer { + + protected final InternalLogger logger = InternalLoggerFactory.getInstance(getClass()); + + private final ServerSocketChannel ch; + private final Queue recordedExceptions = new LinkedBlockingQueue(); + protected final TestMode testMode; + protected final String username; + protected final String password; + protected final InetSocketAddress destination; + + /** + * Starts a new proxy server with disabled authentication for testing purpose. + * + * @param useSsl {@code true} if and only if implicit SSL is enabled + * @param testMode the test mode + * @param destination the expected destination. If the client requests proxying to a different destination, this + * server will reject the connection request. + */ + protected ProxyServer(boolean useSsl, TestMode testMode, InetSocketAddress destination) { + this(useSsl, testMode, destination, null, null); + } + + /** + * Starts a new proxy server with disabled authentication for testing purpose. + * + * @param useSsl {@code true} if and only if implicit SSL is enabled + * @param testMode the test mode + * @param username the expected username. If the client tries to authenticate with a different username, this server + * will fail the authentication request. + * @param password the expected password. If the client tries to authenticate with a different password, this server + * will fail the authentication request. + * @param destination the expected destination. If the client requests proxying to a different destination, this + * server will reject the connection request. + */ + protected ProxyServer( + final boolean useSsl, TestMode testMode, + InetSocketAddress destination, String username, String password) { + + this.testMode = testMode; + this.destination = destination; + this.username = username; + this.password = password; + + ServerBootstrap b = new ServerBootstrap(); + b.channel(NioServerSocketChannel.class); + b.group(ProxyHandlerTest.group); + b.childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + if (useSsl) { + p.addLast(ProxyHandlerTest.serverSslCtx.newHandler(ch.alloc())); + } + + configure(ch); + } + }); + + ch = (ServerSocketChannel) b.bind(NetUtil.LOCALHOST, 0).syncUninterruptibly().channel(); + } + + public final InetSocketAddress address() { + return new InetSocketAddress(NetUtil.LOCALHOST, ch.localAddress().getPort()); + } + + protected abstract void configure(SocketChannel ch) throws Exception; + + final void recordException(Throwable t) { + logger.warn("Unexpected exception from proxy server:", t); + recordedExceptions.add(t); + } + + /** + * Clears all recorded exceptions. + */ + public final void clearExceptions() { + recordedExceptions.clear(); + } + + /** + * Logs all recorded exceptions and raises the last one so that the caller can fail. + */ + public final void checkExceptions() { + Throwable t; + for (;;) { + t = recordedExceptions.poll(); + if (t == null) { + break; + } + + logger.warn("Unexpected exception:", t); + } + + if (t != null) { + PlatformDependent.throwException(t); + } + } + + public final void stop() { + ch.close(); + } + + protected abstract class IntermediaryHandler extends SimpleChannelInboundHandler { + + private final Queue received = new ArrayDeque(); + + private boolean finished; + private Channel backend; + + @Override + protected final void channelRead0(final ChannelHandlerContext ctx, Object msg) throws Exception { + if (finished) { + received.add(ReferenceCountUtil.retain(msg)); + flush(); + return; + } + + boolean finished = handleProxyProtocol(ctx, msg); + if (finished) { + this.finished = true; + ChannelFuture f = connectToDestination(ctx.channel().eventLoop(), new BackendHandler(ctx)); + f.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + recordException(future.cause()); + ctx.close(); + } else { + backend = future.channel(); + flush(); + } + } + }); + } + } + + private void flush() { + if (backend != null) { + boolean wrote = false; + for (;;) { + Object msg = received.poll(); + if (msg == null) { + break; + } + backend.write(msg); + wrote = true; + } + + if (wrote) { + backend.flush(); + } + } + } + + protected abstract boolean handleProxyProtocol(ChannelHandlerContext ctx, Object msg) throws Exception; + + protected abstract SocketAddress intermediaryDestination(); + + private ChannelFuture connectToDestination(EventLoop loop, ChannelHandler handler) { + Bootstrap b = new Bootstrap(); + b.channel(NioSocketChannel.class); + b.group(loop); + b.handler(handler); + return b.connect(intermediaryDestination()); + } + + @Override + public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + if (backend != null) { + backend.close(); + } + } + + @Override + public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + recordException(cause); + ctx.close(); + } + + private final class BackendHandler extends ChannelInboundHandlerAdapter { + + private final ChannelHandlerContext frontend; + + BackendHandler(ChannelHandlerContext frontend) { + this.frontend = frontend; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + frontend.write(msg); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + frontend.flush(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + frontend.close(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + recordException(cause); + ctx.close(); + } + } + } + + protected abstract class TerminalHandler extends SimpleChannelInboundHandler { + + private boolean finished; + + @Override + protected final void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { + if (finished) { + String str = ((ByteBuf) msg).toString(CharsetUtil.US_ASCII); + if ("A\n".equals(str)) { + ctx.write(Unpooled.copiedBuffer("1\n", CharsetUtil.US_ASCII)); + } else if ("B\n".equals(str)) { + ctx.write(Unpooled.copiedBuffer("2\n", CharsetUtil.US_ASCII)); + } else if ("C\n".equals(str)) { + ctx.write(Unpooled.copiedBuffer("3\n", CharsetUtil.US_ASCII)) + .addListener(ChannelFutureListener.CLOSE); + } else { + throw new IllegalStateException("unexpected message: " + str); + } + return; + } + + boolean finished = handleProxyProtocol(ctx, msg); + if (finished) { + this.finished = finished; + } + } + + protected abstract boolean handleProxyProtocol(ChannelHandlerContext ctx, Object msg) throws Exception; + + @Override + public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + @Override + public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + recordException(cause); + ctx.close(); + } + } +} diff --git a/handler-proxy/src/test/java/io/netty/handler/proxy/Socks4ProxyServer.java b/handler-proxy/src/test/java/io/netty/handler/proxy/Socks4ProxyServer.java new file mode 100644 index 000000000000..4a01cebe1d76 --- /dev/null +++ b/handler-proxy/src/test/java/io/netty/handler/proxy/Socks4ProxyServer.java @@ -0,0 +1,140 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.proxy; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.LineBasedFrameDecoder; +import io.netty.handler.codec.socksx.v4.Socks4CmdRequest; +import io.netty.handler.codec.socksx.v4.Socks4CmdRequestDecoder; +import io.netty.handler.codec.socksx.v4.Socks4CmdResponse; +import io.netty.handler.codec.socksx.v4.Socks4CmdStatus; +import io.netty.handler.codec.socksx.v4.Socks4CmdType; +import io.netty.handler.codec.socksx.v4.Socks4MessageEncoder; +import io.netty.util.CharsetUtil; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; + +final class Socks4ProxyServer extends ProxyServer { + + Socks4ProxyServer(boolean useSsl, TestMode testMode, InetSocketAddress destination) { + super(useSsl, testMode, destination); + } + + Socks4ProxyServer(boolean useSsl, TestMode testMode, InetSocketAddress destination, String username) { + super(useSsl, testMode, destination, username, null); + } + + @Override + protected void configure(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + switch (testMode) { + case INTERMEDIARY: + p.addLast(new Socks4CmdRequestDecoder()); + p.addLast(Socks4MessageEncoder.INSTANCE); + p.addLast(new Socks4IntermediaryHandler()); + break; + case TERMINAL: + p.addLast(new Socks4CmdRequestDecoder()); + p.addLast(Socks4MessageEncoder.INSTANCE); + p.addLast(new Socks4TerminalHandler()); + break; + case UNRESPONSIVE: + p.addLast(UnresponsiveHandler.INSTANCE); + break; + } + } + + private boolean authenticate(ChannelHandlerContext ctx, Socks4CmdRequest req) { + assertThat(req.cmdType(), is(Socks4CmdType.CONNECT)); + + if (testMode != TestMode.INTERMEDIARY) { + ctx.pipeline().addBefore(ctx.name(), "lineDecoder", new LineBasedFrameDecoder(64, false, true)); + } + + boolean authzSuccess; + if (username != null) { + authzSuccess = username.equals(req.userId()); + } else { + authzSuccess = true; + } + return authzSuccess; + } + + private final class Socks4IntermediaryHandler extends IntermediaryHandler { + + private SocketAddress intermediaryDestination; + + @Override + protected boolean handleProxyProtocol(ChannelHandlerContext ctx, Object msg) throws Exception { + Socks4CmdRequest req = (Socks4CmdRequest) msg; + Socks4CmdResponse res; + + if (!authenticate(ctx, req)) { + res = new Socks4CmdResponse(Socks4CmdStatus.IDENTD_AUTH_FAILURE); + } else { + res = new Socks4CmdResponse(Socks4CmdStatus.SUCCESS); + intermediaryDestination = new InetSocketAddress(req.host(), req.port()); + } + + ctx.write(res); + ctx.pipeline().remove(Socks4MessageEncoder.class); + + return true; + } + + @Override + protected SocketAddress intermediaryDestination() { + return intermediaryDestination; + } + } + + private final class Socks4TerminalHandler extends TerminalHandler { + @Override + protected boolean handleProxyProtocol(ChannelHandlerContext ctx, Object msg) throws Exception { + Socks4CmdRequest req = (Socks4CmdRequest) msg; + boolean authzSuccess = authenticate(ctx, req); + + Socks4CmdResponse res; + boolean sendGreeting = false; + if (!authzSuccess) { + res = new Socks4CmdResponse(Socks4CmdStatus.IDENTD_AUTH_FAILURE); + } else if (!req.host().equals(destination.getHostString()) || + req.port() != destination.getPort()) { + res = new Socks4CmdResponse(Socks4CmdStatus.REJECTED_OR_FAILED); + } else { + res = new Socks4CmdResponse(Socks4CmdStatus.SUCCESS); + sendGreeting = true; + } + + ctx.write(res); + ctx.pipeline().remove(Socks4MessageEncoder.class); + + if (sendGreeting) { + ctx.write(Unpooled.copiedBuffer("0\n", CharsetUtil.US_ASCII)); + } + + return true; + } + } +} diff --git a/handler-proxy/src/test/java/io/netty/handler/proxy/Socks5ProxyServer.java b/handler-proxy/src/test/java/io/netty/handler/proxy/Socks5ProxyServer.java new file mode 100644 index 000000000000..4b5caa5ce908 --- /dev/null +++ b/handler-proxy/src/test/java/io/netty/handler/proxy/Socks5ProxyServer.java @@ -0,0 +1,170 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.proxy; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.LineBasedFrameDecoder; +import io.netty.handler.codec.socksx.v5.Socks5AddressType; +import io.netty.handler.codec.socksx.v5.Socks5AuthRequest; +import io.netty.handler.codec.socksx.v5.Socks5AuthRequestDecoder; +import io.netty.handler.codec.socksx.v5.Socks5AuthResponse; +import io.netty.handler.codec.socksx.v5.Socks5AuthScheme; +import io.netty.handler.codec.socksx.v5.Socks5AuthStatus; +import io.netty.handler.codec.socksx.v5.Socks5CmdRequest; +import io.netty.handler.codec.socksx.v5.Socks5CmdRequestDecoder; +import io.netty.handler.codec.socksx.v5.Socks5CmdResponse; +import io.netty.handler.codec.socksx.v5.Socks5CmdStatus; +import io.netty.handler.codec.socksx.v5.Socks5CmdType; +import io.netty.handler.codec.socksx.v5.Socks5InitRequest; +import io.netty.handler.codec.socksx.v5.Socks5InitRequestDecoder; +import io.netty.handler.codec.socksx.v5.Socks5InitResponse; +import io.netty.handler.codec.socksx.v5.Socks5MessageEncoder; +import io.netty.util.CharsetUtil; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; + +final class Socks5ProxyServer extends ProxyServer { + + Socks5ProxyServer(boolean useSsl, TestMode testMode, InetSocketAddress destination) { + super(useSsl, testMode, destination); + } + + Socks5ProxyServer( + boolean useSsl, TestMode testMode, InetSocketAddress destination, String username, String password) { + super(useSsl, testMode, destination, username, password); + } + + @Override + protected void configure(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + switch (testMode) { + case INTERMEDIARY: + p.addLast("decoder", new Socks5InitRequestDecoder()); + p.addLast("encoder", Socks5MessageEncoder.INSTANCE); + p.addLast(new Socks5IntermediaryHandler()); + break; + case TERMINAL: + p.addLast("decoder", new Socks5InitRequestDecoder()); + p.addLast("encoder", Socks5MessageEncoder.INSTANCE); + p.addLast(new Socks5TerminalHandler()); + break; + case UNRESPONSIVE: + p.addLast(UnresponsiveHandler.INSTANCE); + break; + } + } + + private boolean authenticate(ChannelHandlerContext ctx, Object msg) { + if (username == null) { + ctx.pipeline().addBefore("encoder", "decoder", new Socks5CmdRequestDecoder()); + ctx.write(new Socks5InitResponse(Socks5AuthScheme.NO_AUTH)); + return true; + } + + if (msg instanceof Socks5InitRequest) { + ctx.pipeline().addBefore("encoder", "decoder", new Socks5AuthRequestDecoder()); + ctx.write(new Socks5InitResponse(Socks5AuthScheme.AUTH_PASSWORD)); + return false; + } + + Socks5AuthRequest req = (Socks5AuthRequest) msg; + if (req.username().equals(username) && req.password().equals(password)) { + ctx.pipeline().addBefore("encoder", "decoder", new Socks5CmdRequestDecoder()); + ctx.write(new Socks5AuthResponse(Socks5AuthStatus.SUCCESS)); + return true; + } + + ctx.pipeline().addBefore("encoder", "decoder", new Socks5AuthRequestDecoder()); + ctx.write(new Socks5AuthResponse(Socks5AuthStatus.FAILURE)); + return false; + } + + private final class Socks5IntermediaryHandler extends IntermediaryHandler { + + private boolean authenticated; + private SocketAddress intermediaryDestination; + + @Override + protected boolean handleProxyProtocol(ChannelHandlerContext ctx, Object msg) throws Exception { + if (!authenticated) { + authenticated = authenticate(ctx, msg); + return false; + } + + Socks5CmdRequest req = (Socks5CmdRequest) msg; + assertThat(req.cmdType(), is(Socks5CmdType.CONNECT)); + + Socks5CmdResponse res; + res = new Socks5CmdResponse(Socks5CmdStatus.SUCCESS, Socks5AddressType.IPv4); + intermediaryDestination = new InetSocketAddress(req.host(), req.port()); + + ctx.write(res); + ctx.pipeline().remove(Socks5MessageEncoder.class); + + return true; + } + + @Override + protected SocketAddress intermediaryDestination() { + return intermediaryDestination; + } + } + + private final class Socks5TerminalHandler extends TerminalHandler { + + private boolean authenticated; + + @Override + protected boolean handleProxyProtocol(ChannelHandlerContext ctx, Object msg) throws Exception { + if (!authenticated) { + authenticated = authenticate(ctx, msg); + return false; + } + + Socks5CmdRequest req = (Socks5CmdRequest) msg; + assertThat(req.cmdType(), is(Socks5CmdType.CONNECT)); + + ctx.pipeline().addBefore(ctx.name(), "lineDecoder", new LineBasedFrameDecoder(64, false, true)); + + Socks5CmdResponse res; + boolean sendGreeting = false; + if (!req.host().equals(destination.getHostString()) || + req.port() != destination.getPort()) { + res = new Socks5CmdResponse(Socks5CmdStatus.FORBIDDEN, Socks5AddressType.IPv4); + } else { + res = new Socks5CmdResponse(Socks5CmdStatus.SUCCESS, Socks5AddressType.IPv4); + sendGreeting = true; + } + + ctx.write(res); + ctx.pipeline().remove(Socks5MessageEncoder.class); + + if (sendGreeting) { + ctx.write(Unpooled.copiedBuffer("0\n", CharsetUtil.US_ASCII)); + } + + return true; + } + } +} diff --git a/handler-proxy/src/test/java/io/netty/handler/proxy/TestMode.java b/handler-proxy/src/test/java/io/netty/handler/proxy/TestMode.java new file mode 100644 index 000000000000..c6b902fe092d --- /dev/null +++ b/handler-proxy/src/test/java/io/netty/handler/proxy/TestMode.java @@ -0,0 +1,23 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.proxy; + +enum TestMode { + INTERMEDIARY, + TERMINAL, + UNRESPONSIVE, +} diff --git a/handler-proxy/src/test/java/io/netty/handler/proxy/UnresponsiveHandler.java b/handler-proxy/src/test/java/io/netty/handler/proxy/UnresponsiveHandler.java new file mode 100644 index 000000000000..32c753baef96 --- /dev/null +++ b/handler-proxy/src/test/java/io/netty/handler/proxy/UnresponsiveHandler.java @@ -0,0 +1,34 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.proxy; + +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; + +@Sharable +final class UnresponsiveHandler extends SimpleChannelInboundHandler { + + static final UnresponsiveHandler INSTANCE = new UnresponsiveHandler(); + + private UnresponsiveHandler() { } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { + // Ignore + } +} diff --git a/pom.xml b/pom.xml index 57ed7a7fd351..2189d125c3a1 100644 --- a/pom.xml +++ b/pom.xml @@ -431,6 +431,7 @@ transport-sctp transport-udt handler + handler-proxy example testsuite microbench