Skip to content

Commit

Permalink
Revert "Do not suppress channelReadComplete() when a handler was just…
Browse files Browse the repository at this point in the history
… added"

This reverts commit 720faa4.
  • Loading branch information
normanmaurer committed Apr 20, 2015
1 parent a123c49 commit bb69281
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,11 @@ public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
ByteBuf bytes = buf.readBytes(readable);
buf.release();
ctx.fireChannelRead(bytes);
ctx.fireChannelReadComplete();
} else {
buf.release();
}
cumulation = null;
ctx.fireChannelReadComplete();
handlerRemoved0(ctx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,21 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
private final DefaultChannelPipeline pipeline;
private final String name;

/**
* Set when the {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)} of
* this context's handler is invoked.
* Cleared when a user calls {@link #fireChannelReadComplete()} on this context.
*
* See {@link #fireChannelReadComplete()} to understand how this flag is used.
*/
boolean invokedThisChannelRead;

/**
* Set when a user calls {@link #fireChannelRead(Object)} on this context.
* Cleared when a user calls {@link #fireChannelReadComplete()} on this context.
*
* See {@link #fireChannelReadComplete()} to understand how this flag is used.
*/
private volatile boolean invokedNextChannelRead;
private volatile boolean firedChannelRead;

/**
* Set when a user calls {@link #read()} on this context.
* Cleared when a user calls {@link #fireChannelReadComplete()} on this context.
*
* See {@link #fireChannelReadComplete()} to understand how this flag is used.
*/
private volatile boolean invokedPrevRead;
private volatile boolean invokedRead;

/**
* {@code true} if and only if this context has been removed from the pipeline.
Expand Down Expand Up @@ -183,7 +174,7 @@ public ChannelHandlerContext fireUserEventTriggered(Object event) {
public ChannelHandlerContext fireChannelRead(Object msg) {
AbstractChannelHandlerContext next = findContextInbound();
ReferenceCountUtil.touch(msg, next);
invokedNextChannelRead = true;
firedChannelRead = true;
next.invoker().invokeChannelRead(next, msg);
return this;
}
Expand All @@ -196,16 +187,11 @@ public ChannelHandlerContext fireChannelReadComplete() {
*
* This is pretty common for the handlers that transform multiple messages into one message,
* such as byte-to-message decoder and message aggregators.
*
* Only one exception is when nobody invoked the channelRead() method of this context's handler.
* It means the handler has been added later dynamically.
*/
if (invokedNextChannelRead || // The handler of this context produced a message, or
!invokedThisChannelRead) { // it is not required to produce a message to trigger the event.

invokedNextChannelRead = false;
invokedPrevRead = false;

if (firedChannelRead) {
// The handler of this context produced a message, so we are OK to trigger this event.
firedChannelRead = false;
invokedRead = false;
AbstractChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelReadComplete(next);
return this;
Expand All @@ -222,15 +208,15 @@ public ChannelHandlerContext fireChannelReadComplete() {
* Why? Because otherwise the next handler will not receive {@code channelRead()} nor
* {@code channelReadComplete()} event at all for the {@link #read()} operation it issued.
*/
if (invokedPrevRead && !channel().config().isAutoRead()) {
if (invokedRead && !channel().config().isAutoRead()) {
/**
* The next (or upstream) handler invoked {@link #read()}, but it didn't get any
* {@code channelRead()} event. We should read once more, so that the handler of the current
* context have a chance to produce something.
*/
read();
} else {
invokedPrevRead = false;
invokedRead = false;
}

return this;
Expand Down Expand Up @@ -320,7 +306,7 @@ public ChannelFuture deregister(ChannelPromise promise) {
@Override
public ChannelHandlerContext read() {
AbstractChannelHandlerContext next = findContextOutbound();
invokedPrevRead = true;
invokedRead = true;
next.invoker().invokeRead(next);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public static void invokeUserEventTriggeredNow(final ChannelHandlerContext ctx,

public static void invokeChannelReadNow(final ChannelHandlerContext ctx, final Object msg) {
try {
((AbstractChannelHandlerContext) ctx).invokedThisChannelRead = true;
((ChannelInboundHandler) ctx.handler()).channelRead(ctx, msg);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,6 @@ public void run() {

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.invokedThisChannelRead = false;
ctx.handler().handlerAdded(ctx);
} catch (Throwable t) {
boolean removed = false;
Expand Down

0 comments on commit bb69281

Please sign in to comment.