Skip to content

Commit

Permalink
[netty#982] [netty#977] [netty#858] Allow to transfer the content a C…
Browse files Browse the repository at this point in the history
…hannelHandlers inbound/outbound buffer on removal/replacement

This changes the behavior of the ChannelPipeline.remove(..) and ChannelPipeline.replace(..) methods in that way
that after invocation it is not possible anymore to access any data in the inbound or outbound buffer. This is
because it empty it now to prevent side-effects. If a user want to preserve the content and forward it to the
next handler in the pipeline it is adviced to use one of the new methods which where introduced.

 - ChannelPipeline.removeAndForward(..)
 - ChannelPipeline.replaceAndForward(..)
  • Loading branch information
Norman Maurer committed Jan 28, 2013
1 parent a27d1cc commit d7bfd44
Show file tree
Hide file tree
Showing 19 changed files with 327 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public void finishHandshake(Channel channel, FullHttpResponse response) {

ChannelPipeline p = channel.pipeline();
p.remove(HttpRequestEncoder.class);
p.get(HttpResponseDecoder.class).replace(
p.replaceAndForward(HttpResponseDecoder.class,
"ws-decoder", new WebSocket00FrameDecoder(maxFramePayloadLength()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public void finishHandshake(Channel channel, FullHttpResponse response) {
setHandshakeComplete();

ChannelPipeline p = channel.pipeline();
p.get(HttpResponseDecoder.class).replace(
p.replaceAndForward(HttpResponseDecoder.class,
"ws-decoder",
new WebSocket07FrameDecoder(false, allowExtensions, maxFramePayloadLength()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public void finishHandshake(Channel channel, FullHttpResponse response) {

ChannelPipeline p = channel.pipeline();
p.remove(HttpRequestEncoder.class);
p.get(HttpResponseDecoder.class).replace(
p.replaceAndForward(HttpResponseDecoder.class,
"ws-decoder",
new WebSocket08FrameDecoder(false, allowExtensions, maxFramePayloadLength()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public void finishHandshake(Channel channel, FullHttpResponse response) {

ChannelPipeline p = channel.pipeline();
p.remove(HttpRequestEncoder.class);
p.get(HttpResponseDecoder.class).replace(
p.replaceAndForward(HttpResponseDecoder.class,
"ws-decoder",
new WebSocket13FrameDecoder(false, allowExtensions, maxFramePayloadLength()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public void operationComplete(ChannelFuture future) {
if (p.get(HttpObjectAggregator.class) != null) {
p.remove(HttpObjectAggregator.class);
}
p.get(HttpRequestDecoder.class).replace("wsdecoder",
p.replaceAndForward(HttpRequestDecoder.class, "wsdecoder",
new WebSocket00FrameDecoder(maxFramePayloadLength()));

p.replace(HttpResponseEncoder.class, "wsencoder", new WebSocket00FrameEncoder());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void operationComplete(ChannelFuture future) {
p.remove(HttpObjectAggregator.class);
}

p.get(HttpRequestDecoder.class).replace("wsdecoder",
p.replaceAndForward(HttpRequestDecoder.class, "wsdecoder",
new WebSocket07FrameDecoder(true, allowExtensions, maxFramePayloadLength()));
p.replace(HttpResponseEncoder.class, "wsencoder", new WebSocket07FrameEncoder(false));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void operationComplete(ChannelFuture future) {
p.remove(HttpObjectAggregator.class);
}

p.get(HttpRequestDecoder.class).replace("wsdecoder",
p.replaceAndForward(HttpRequestDecoder.class, "wsdecoder",
new WebSocket08FrameDecoder(true, allowExtensions, maxFramePayloadLength()));
p.replace(HttpResponseEncoder.class, "wsencoder", new WebSocket08FrameEncoder(false));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void operationComplete(ChannelFuture future) {
p.remove(HttpObjectAggregator.class);
}

p.get(HttpRequestDecoder.class).replace("wsdecoder",
p.replaceAndForward(HttpRequestDecoder.class, "wsdecoder",
new WebSocket13FrameDecoder(true, allowExtensions, maxFramePayloadLength()));
p.replace(HttpResponseEncoder.class, "wsencoder", new WebSocket13FrameEncoder(false));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
public abstract class ByteToByteDecoder extends ChannelInboundByteHandlerAdapter {

private volatile boolean singleDecode;
private boolean removed;

/**
* If set then only one message is decoded on each {@link #inboundBufferUpdated(ChannelHandlerContext)} call.
Expand Down Expand Up @@ -103,7 +102,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
*/
private void callDecode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) {
int oldOutSize = out.readableBytes();
while (!removed && in.readable()) {
while (in.readable()) {
int oldInSize = in.readableBytes();
try {
decode(ctx, in, out);
Expand Down Expand Up @@ -145,10 +144,4 @@ private void callDecode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) {
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
decode(ctx, in, out);
}

@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
super.afterRemove(ctx);
removed = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,14 @@
* </pre>
*/
public abstract class ByteToByteEncoder extends ChannelOutboundByteHandlerAdapter {
private boolean removed;

@Override
public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ByteBuf in = ctx.outboundByteBuffer();
ByteBuf out = ctx.nextOutboundByteBuffer();
boolean encoded = false;

while (!removed && in.readable()) {
while (in.readable()) {
int oldInSize = in.readableBytes();
try {
encode(ctx, in, out);
Expand Down Expand Up @@ -90,10 +89,4 @@ public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce
* @throws Exception is thrown if an error accour
*/
protected abstract void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception;

@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
super.afterRemove(ctx);
removed = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.netty.channel.ChannelHandlerUtil;
import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;

/**
* {@link ChannelInboundByteHandler} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an other
Expand All @@ -42,10 +41,7 @@
public abstract class ByteToMessageDecoder
extends ChannelInboundHandlerAdapter implements ChannelInboundByteHandler {

private ChannelHandlerContext ctx;

private volatile boolean singleDecode;
private boolean removed;

/**
* If set then only one message is decoded on each {@link #inboundBufferUpdated(ChannelHandlerContext)} call.
Expand All @@ -67,12 +63,6 @@ public boolean isSingleDecode() {
return singleDecode;
}

@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
super.beforeAdd(ctx);
}

@Override
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ctx.alloc().buffer();
Expand Down Expand Up @@ -114,7 +104,7 @@ protected void callDecode(ChannelHandlerContext ctx) {
ByteBuf in = ctx.inboundByteBuffer();

boolean decoded = false;
while (!removed && in.readable()) {
while (in.readable()) {
try {
int oldInputLength = in.readableBytes();
Object o = decode(ctx, in);
Expand Down Expand Up @@ -157,31 +147,6 @@ protected void callDecode(ChannelHandlerContext ctx) {
}
}

/**
* Replace this decoder in the {@link ChannelPipeline} with the given handler.
* All remaining bytes in the inbound buffer will be forwarded to the new handler's
* inbound buffer.
*/
public void replace(String newHandlerName, ChannelInboundByteHandler newHandler) {
if (!ctx.executor().inEventLoop()) {
throw new IllegalStateException("not in event loop");
}

// We do not use ChannelPipeline.replace() here so that the current context points
// the new handler.
ctx.pipeline().addAfter(ctx.name(), newHandlerName, newHandler);

ByteBuf in = ctx.inboundByteBuffer();
try {
if (in.readable()) {
ctx.nextInboundByteBuffer().writeBytes(in);
ctx.fireInboundBufferUpdated();
}
} finally {
ctx.pipeline().remove(this);
}
}

/**
* Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
* {@link ByteBuf} has nothing to read anymore, till nothing was read from the input {@link ByteBuf} or till
Expand All @@ -205,10 +170,4 @@ public void replace(String newHandlerName, ChannelInboundByteHandler newHandler)
protected Object decodeLast(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
return decode(ctx, in);
}

@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
super.afterRemove(ctx);
removed = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
public abstract class MessageToByteEncoder<I> extends ChannelOutboundMessageHandlerAdapter<I> {

private final Class<?>[] acceptedMsgTypes;
private boolean removed;

/**
* The types which will be accepted by the encoder. If a received message is an other type it will be just forwared
Expand All @@ -60,7 +59,7 @@ public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce
MessageBuf<I> in = ctx.outboundMessageBuffer();
ByteBuf out = ctx.nextOutboundByteBuffer();

while (!removed) {
for (;;) {
Object msg = in.poll();
if (msg == null) {
break;
Expand Down Expand Up @@ -106,10 +105,4 @@ public boolean isEncodable(Object msg) throws Exception {
* @throws Exception is thrown if an error accour
*/
protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;

@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
super.afterRemove(ctx);
removed = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public abstract class MessageToMessageDecoder<I>
extends ChannelInboundHandlerAdapter implements ChannelInboundMessageHandler<I> {

private final Class<?>[] acceptedMsgTypes;
private boolean removed;

/**
* The types which will be accepted by the decoder. If a received message is an other type it will be just forwarded
Expand All @@ -69,7 +68,7 @@ public void inboundBufferUpdated(ChannelHandlerContext ctx)
throws Exception {
MessageBuf<I> in = ctx.inboundMessageBuffer();
boolean notify = false;
while (!removed) {
for (;;) {
try {
Object msg = in.poll();
if (msg == null) {
Expand Down Expand Up @@ -142,10 +141,4 @@ public boolean isDecodable(Object msg) throws Exception {
protected void freeInboundMessage(I msg) throws Exception {
ChannelHandlerUtil.freeMessage(msg);
}

@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
super.afterRemove(ctx);
removed = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundMessageHandlerAdapter<I> {

private final Class<?>[] acceptedMsgTypes;
private boolean removed;

/**
* The types which will be accepted by the decoder. If a received message is an other type it will be just forwared
Expand All @@ -63,7 +62,7 @@ public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce
MessageBuf<I> in = ctx.outboundMessageBuffer();
boolean encoded = false;

while (!removed) {
for (;;) {
try {
Object msg = in.poll();
if (msg == null) {
Expand Down Expand Up @@ -141,10 +140,4 @@ public boolean isEncodable(Object msg) throws Exception {
protected void freeOutboundMessage(I msg) throws Exception {
ChannelHandlerUtil.freeMessage(msg);
}

@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
super.afterRemove(ctx);
removed = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,8 @@ public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws E
// Unknown protocol; discard everything and close the connection.
in.clear();
ctx.close();
return;
}
}

// Forward the current read buffer as is to the new handlers.
ctx.nextInboundByteBuffer().writeBytes(in);
ctx.fireInboundBufferUpdated();
}

private boolean isSsl(ByteBuf buf) {
Expand Down Expand Up @@ -122,15 +117,15 @@ private void enableSsl(ChannelHandlerContext ctx) {

p.addLast("ssl", new SslHandler(engine));
p.addLast("unificationA", new PortUnificationServerHandler(false, detectGzip));
p.remove(this);
p.removeAndForward(this);
}

private void enableGzip(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast("gzipdeflater", ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
p.addLast("gzipinflater", ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
p.addLast("unificationB", new PortUnificationServerHandler(detectSsl, false));
p.remove(this);
p.removeAndForward(this);
}

private void switchToHttp(ChannelHandlerContext ctx) {
Expand All @@ -139,14 +134,14 @@ private void switchToHttp(ChannelHandlerContext ctx) {
p.addLast("encoder", new HttpResponseEncoder());
p.addLast("deflater", new HttpContentCompressor());
p.addLast("handler", new HttpSnoopServerHandler());
p.remove(this);
p.removeAndForward(this);
}

private void switchToFactorial(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast("decoder", new BigIntegerDecoder());
p.addLast("encoder", new NumberEncoder());
p.addLast("handler", new FactorialServerHandler());
p.remove(this);
p.removeAndForward(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
extends ChannelInboundHandlerAdapter implements ChannelInboundMessageHandler<I> {

private final Class<?>[] acceptedMsgTypes;
private boolean removed;

/**
* The types which will be accepted by the message handler. If a received message is an other type it will be just
Expand Down Expand Up @@ -74,7 +73,7 @@ public final void inboundBufferUpdated(ChannelHandlerContext ctx) throws Excepti

try {
MessageBuf<I> in = ctx.inboundMessageBuffer();
while (!removed) {
for (;;) {
Object msg = in.poll();
if (msg == null) {
break;
Expand Down Expand Up @@ -165,10 +164,4 @@ protected void endMessageReceived(ChannelHandlerContext ctx) throws Exception {
protected void freeInboundMessage(I msg) throws Exception {
ChannelHandlerUtil.freeMessage(msg);
}

@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
super.afterRemove(ctx);
removed = true;
}
}
Loading

0 comments on commit d7bfd44

Please sign in to comment.