Skip to content

Commit

Permalink
HTTP/2 child channel may discard flush when done from an arbitrary th…
Browse files Browse the repository at this point in the history
…read (netty#10019)

Motivation:

We need to carefully manage flushes to ensure we not discard these by mistake due wrongly implemented consolidation of flushes.

Modifications:

- Ensure we reset flag before we actually call flush0(...)
- Add unit test

Result:

Fixes netty#10015
  • Loading branch information
normanmaurer authored Feb 11, 2020
1 parent ef50cf5 commit 2d4b5ab
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1035,11 +1035,10 @@ public void flush() {
// There is nothing to flush so this is a NOOP.
return;
}
try {
flush0(parentContext());
} finally {
writeDoneAndNoFlush = false;
}
// We need to set this to false before we call flush0(...) as ChannelFutureListener may produce more data
// that are explicit flushed.
writeDoneAndNoFlush = false;
flush0(parentContext());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
Expand All @@ -26,6 +29,7 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
import io.netty.util.NetUtil;
import io.netty.util.ReferenceCountUtil;
import org.junit.After;
Expand All @@ -34,12 +38,33 @@

import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.assertFalse;

public class Http2MultiplexTransportTest {
private static final ChannelHandler DISCARD_HANDLER = new ChannelInboundHandlerAdapter() {

@Override
public boolean isSharable() {
return true;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
ReferenceCountUtil.release(evt);
}
};

private EventLoopGroup eventLoopGroup;
private Channel clientChannel;
private Channel serverChannel;
Expand All @@ -66,13 +91,13 @@ public void teardown() {

@Test(timeout = 10000)
public void asyncSettingsAckWithMultiplexCodec() throws InterruptedException {
asyncSettingsAck0(new Http2MultiplexCodecBuilder(true, new HttpInboundHandler()).build(), null);
asyncSettingsAck0(new Http2MultiplexCodecBuilder(true, DISCARD_HANDLER).build(), null);
}

@Test(timeout = 10000)
public void asyncSettingsAckWithMultiplexHandler() throws InterruptedException {
asyncSettingsAck0(new Http2FrameCodecBuilder(true).build(),
new Http2MultiplexHandler(new HttpInboundHandler()));
new Http2MultiplexHandler(DISCARD_HANDLER));
}

private void asyncSettingsAck0(final Http2FrameCodec codec, final ChannelHandler multiplexer)
Expand Down Expand Up @@ -120,7 +145,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(Http2MultiplexCodecBuilder
.forClient(new HttpInboundHandler()).autoAckSettingsFrame(false).build());
.forClient(DISCARD_HANDLER).autoAckSettingsFrame(false).build());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Expand Down Expand Up @@ -152,6 +177,81 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
serverAckAllLatch.await();
}

@ChannelHandler.Sharable
private static final class HttpInboundHandler extends ChannelInboundHandlerAdapter { }
@Test(timeout = 5000L)
public void testFlushNotDiscarded()
throws InterruptedException {
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);

try {
ServerBootstrap sb = new ServerBootstrap();
sb.group(eventLoopGroup);
sb.channel(NioServerSocketChannel.class);
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new Http2FrameCodecBuilder(true).build());
ch.pipeline().addLast(new Http2MultiplexHandler(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (msg instanceof Http2HeadersFrame && ((Http2HeadersFrame) msg).isEndStream()) {
executorService.schedule(new Runnable() {
@Override
public void run() {
ctx.writeAndFlush(new DefaultHttp2HeadersFrame(
new DefaultHttp2Headers(), false)).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
ctx.write(new DefaultHttp2DataFrame(
Unpooled.copiedBuffer("Hello World", CharsetUtil.US_ASCII),
true));
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
ctx.flush();
}
});
}
});
}
}, 500, TimeUnit.MILLISECONDS);
}
ReferenceCountUtil.release(msg);
}
}));
}
});
serverChannel = sb.bind(new InetSocketAddress(NetUtil.LOCALHOST, 0)).syncUninterruptibly().channel();

final CountDownLatch latch = new CountDownLatch(1);
Bootstrap bs = new Bootstrap();
bs.group(eventLoopGroup);
bs.channel(NioSocketChannel.class);
bs.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new Http2FrameCodecBuilder(false).build());
ch.pipeline().addLast(new Http2MultiplexHandler(DISCARD_HANDLER));
}
});
clientChannel = bs.connect(serverChannel.localAddress()).syncUninterruptibly().channel();
Http2StreamChannelBootstrap h2Bootstrap = new Http2StreamChannelBootstrap(clientChannel);
h2Bootstrap.handler(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof Http2DataFrame && ((Http2DataFrame) msg).isEndStream()) {
latch.countDown();
}
ReferenceCountUtil.release(msg);
}
});
Http2StreamChannel streamChannel = h2Bootstrap.open().syncUninterruptibly().getNow();
streamChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers(), true))
.syncUninterruptibly();

latch.await();
} finally {
executorService.shutdown();
}
}
}

0 comments on commit 2d4b5ab

Please sign in to comment.