Skip to content

Commit

Permalink
Revert netty#10326 due regression in FlowControlHandler
Browse files Browse the repository at this point in the history
Motivation:

This reverts commit b559711 due regression introduced by it.

Modification:

Revert commit

Result:

Fixes netty#10464
  • Loading branch information
normanmaurer committed Aug 11, 2020
1 parent 4b7dba1 commit 3ac6a82
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,12 @@ public class FlowControlHandler extends ChannelDuplexHandler {

private ChannelConfig config;

private int readRequestCount;
private boolean shouldConsume;

public FlowControlHandler() {
this(true);
}

/**
* @param releaseMessages If {@code false}, the handler won't release the buffered messages
* when the handler is removed.
*
*/
public FlowControlHandler(boolean releaseMessages) {
this.releaseMessages = releaseMessages;
}
Expand Down Expand Up @@ -145,7 +140,7 @@ public void read(ChannelHandlerContext ctx) throws Exception {
// It seems no messages were consumed. We need to read() some
// messages from upstream and once one arrives it need to be
// relayed to downstream to keep the flow going.
++readRequestCount;
shouldConsume = true;
ctx.read();
}
}
Expand All @@ -161,8 +156,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
// We just received one message. Do we need to relay it regardless
// of the auto reading configuration? The answer is yes if this
// method was called as a result of a prior read() call.
int minConsume = Math.min(readRequestCount, queue.size());
readRequestCount -= minConsume;
int minConsume = shouldConsume ? 1 : 0;
shouldConsume = false;

dequeue(ctx, minConsume);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Interrupte
// channelRead(2)
peer.config().setAutoRead(true);
setAutoReadLatch1.countDown();
assertTrue(msgRcvLatch2.await(1L, SECONDS));
assertTrue(msgRcvLatch1.await(1L, SECONDS));

// channelRead(3)
peer.config().setAutoRead(true);
Expand Down Expand Up @@ -353,7 +353,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {

// Write the message
client.writeAndFlush(newOneMessage())
.syncUninterruptibly();
.syncUninterruptibly();

// channelRead(1)
peer.read();
Expand All @@ -373,63 +373,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
}
}

/**
* The {@link FlowControlHandler} will keep track of read calls when
* when read is called multiple times when the FlowControlHandler queue is empty.
*/
@Test
public void testTrackReadCallCount() throws Exception {
final Exchanger<Channel> peerRef = new Exchanger<Channel>();
final CountDownLatch msgRcvLatch1 = new CountDownLatch(1);
final CountDownLatch msgRcvLatch2 = new CountDownLatch(2);
final CountDownLatch msgRcvLatch3 = new CountDownLatch(3);

ChannelInboundHandlerAdapter handler = new ChannelDuplexHandler() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
peerRef.exchange(ctx.channel(), 1L, SECONDS);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
msgRcvLatch1.countDown();
msgRcvLatch2.countDown();
msgRcvLatch3.countDown();
}
};

FlowControlHandler flow = new FlowControlHandler();
Channel server = newServer(false, flow, handler);
Channel client = newClient(server.localAddress());
try {
// The client connection on the server side
Channel peer = peerRef.exchange(null, 1L, SECONDS);

// Confirm that the queue is empty
assertTrue(flow.isQueueEmpty());
// Request read 3 times
peer.read();
peer.read();
peer.read();

// Write the message
client.writeAndFlush(newOneMessage())
.syncUninterruptibly();

// channelRead(1)
assertTrue(msgRcvLatch1.await(1L, SECONDS));
// channelRead(2)
assertTrue(msgRcvLatch2.await(1L, SECONDS));
// channelRead(3)
assertTrue(msgRcvLatch3.await(1L, SECONDS));
assertTrue(flow.isQueueEmpty());
} finally {
client.close();
server.close();
}
}

@Test
public void testReentranceNotCausesNPE() throws Throwable {
final Exchanger<Channel> peerRef = new Exchanger<Channel>();
Expand Down

0 comments on commit 3ac6a82

Please sign in to comment.