Skip to content

Commit

Permalink
Ensure channelReadComplete() is called only when necessary
Browse files Browse the repository at this point in the history
Motivation:

Even if a handler called ctx.fireChannelReadComplete(), the next handler
should not get its channelReadComplete() invoked if fireChannelRead()
was not invoked before.

Modifications:

- Ensure channelReadComplete() is invoked only when the handler of the
  current context actually produced a message, because otherwise there's
  no point of triggering channelReadComplete().
  i.e. channelReadComplete() must follow channelRead().
- Fix a bug where ctx.read() was not called if the handler of the
  current context did not produce any message, making the connection
  stall. Read the new comment for more information.

Result:

- channelReadComplete() is invoked only when it makes sense.
- No stale connection
  • Loading branch information
trustin committed Feb 7, 2015
1 parent cb5703c commit 14d64d0
Show file tree
Hide file tree
Showing 4 changed files with 366 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in)
ByteBuf cumulation;
private Cumulator cumulator = MERGE_CUMULATOR;
private boolean singleDecode;
private boolean decodeWasNull;
private boolean first;

protected ByteToMessageDecoder() {
Expand Down Expand Up @@ -239,7 +238,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
cumulation = null;
}
int size = out.size();
decodeWasNull = size == 0;

for (int i = 0; i < size; i ++) {
ctx.fireChannelRead(out.get(i));
Expand All @@ -263,12 +261,6 @@ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// - https://github.com/netty/netty/issues/1764
cumulation.discardSomeReadBytes();
}
if (decodeWasNull) {
decodeWasNull = false;
if (!ctx.channel().config().isAutoRead()) {
ctx.read();
}
}
ctx.fireChannelReadComplete();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import org.junit.Assert;
import org.junit.Test;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;

public class ByteToMessageDecoderTest {

Expand Down Expand Up @@ -160,4 +162,61 @@ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
Assert.assertEquals(3, (int) queue.take());
Assert.assertTrue(queue.isEmpty());
}

// See https://github.com/netty/netty/pull/3263
@Test
public void testFireChannelReadCompleteOnlyWhenDecoded() {
final AtomicInteger readComplete = new AtomicInteger();
EmbeddedChannel ch = new EmbeddedChannel(new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// Do nothing
}
}, new ChannelInboundHandlerAdapter() {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
readComplete.incrementAndGet();
}
});
Assert.assertFalse(ch.writeInbound(Unpooled.copiedBuffer("test", CharsetUtil.US_ASCII)));
Assert.assertFalse(ch.finish());
Assert.assertEquals(0, readComplete.get());
}

// See https://github.com/netty/netty/pull/3263
@Test
public void testFireChannelReadCompleteWhenDecodeOnce() {
final AtomicInteger readComplete = new AtomicInteger();
EmbeddedChannel ch = new EmbeddedChannel(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
ctx.fireChannelRead(Unpooled.EMPTY_BUFFER);
}
}, new ByteToMessageDecoder() {
private boolean first = true;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (first) {
first = false;
out.add(in.readSlice(in.readableBytes()).retain());
}
}
}, new ChannelInboundHandlerAdapter() {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
readComplete.incrementAndGet();
}
});
Assert.assertTrue(ch.writeInbound(Unpooled.copiedBuffer("test", CharsetUtil.US_ASCII)));
Assert.assertTrue(ch.finish());
Assert.assertEquals(1, readComplete.get());
for (;;) {
ByteBuf buf = ch.readInbound();
if (buf == null) {
break;
}
buf.release();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,26 @@ private static boolean isSkippable(
private final AbstractChannel channel;
private final DefaultChannelPipeline pipeline;
private final String name;

/**
* Set when a user calls {@link #fireChannelRead(Object)} on this context.
* Cleared when a user calls {@link #fireChannelReadComplete()} on this context.
*
* See {@link #fireChannelReadComplete()} to understand how this flag is used.
*/
private volatile boolean firedChannelRead;

/**
* Set when a user calls {@link #read()} on this context.
* Cleared when a user calls {@link #fireChannelReadComplete()} on this context.
*
* See {@link #fireChannelReadComplete()} to understand how this flag is used.
*/
private volatile boolean invokedRead;

/**
* {@code true} if and only if this context has been removed from the pipeline.
*/
private boolean removed;

final int skipFlags;
Expand Down Expand Up @@ -356,14 +376,51 @@ public ChannelHandlerContext fireUserEventTriggered(Object event) {
public ChannelHandlerContext fireChannelRead(Object msg) {
AbstractChannelHandlerContext next = findContextInbound();
ReferenceCountUtil.touch(msg, next);
firedChannelRead = true;
next.invoker().invokeChannelRead(next, msg);
return this;
}

@Override
public ChannelHandlerContext fireChannelReadComplete() {
AbstractChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelReadComplete(next);
/**
* If the handler of this context did not produce any messages via {@link #fireChannelRead(Object)},
* there's no reason to trigger {@code channelReadComplete()} even if the handler called this method.
*
* This is pretty common for the handlers that transform multiple messages into one message,
* such as byte-to-message decoder and message aggregators.
*/
if (firedChannelRead) {
// The handler of this context produced a message, so we are OK to trigger this event.
firedChannelRead = false;
invokedRead = false;
AbstractChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelReadComplete(next);
return this;
}

/**
* At this point, we are sure the handler of this context did not produce anything and
* we suppressed the {@code channelReadComplete()} event.
*
* If the next handler invoked {@link #read()} to read something but nothing was produced
* by the handler of this context, someone has to issue another {@link #read()} operation
* until the handler of this context produces something.
*
* Why? Because otherwise the next handler will not receive {@code channelRead()} nor
* {@code channelReadComplete()} event at all for the {@link #read()} operation it issued.
*/
if (invokedRead && !channel().config().isAutoRead()) {
/**
* The next (or upstream) handler invoked {@link #read()}, but it didn't get any
* {@code channelRead()} event. We should read once more, so that the handler of the current
* context have a chance to produce something.
*/
read();
} else {
invokedRead = false;
}

return this;
}

Expand Down Expand Up @@ -451,6 +508,7 @@ public ChannelFuture deregister(ChannelPromise promise) {
@Override
public ChannelHandlerContext read() {
AbstractChannelHandlerContext next = findContextOutbound();
invokedRead = true;
next.invoker().invokeRead(next);
return this;
}
Expand Down
Loading

0 comments on commit 14d64d0

Please sign in to comment.