Skip to content

Commit

Permalink
Reduce coupeling between Http2FrameCodec and Http2Multiplex* (netty#9273
Browse files Browse the repository at this point in the history
)

Motivation:

Http2MultiplexCodec and Http2MultiplexHandler had a very strong coupling with Http2FrameCodec which we can reduce easily. The end-goal should be to have no coupling at all.

Modifications:

- Reduce coupling by move some common logic to Http2CodecUtil
- Move logic to check if a stream may have existed before to Http2FrameCodec
- Use ArrayDeque as replacement for custom double-linked-list which makes the code a lot more readable
- Use WindowUpdateFrame to signal consume bytes (just as users do when they use Http2FrameCodec directly)

Result:

Less coupling and cleaner code.
  • Loading branch information
normanmaurer authored Jun 27, 2019
1 parent 856f118 commit df46a34
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ private enum ReadStatus {
private Runnable fireChannelWritabilityChangedTask;

private boolean outboundClosed;
private int flowControlledBytes;

/**
* This variable represents if a read is in progress for the current channel or was requested.
Expand All @@ -154,13 +155,7 @@ private enum ReadStatus {

/** {@code true} after the first HEADERS frame has been written **/
private boolean firstFrameWritten;

// Currently the child channel and parent channel are always on the same EventLoop thread. This allows us to
// extend the read loop of a child channel if the child channel drains its queued data during read, and the
// parent channel is still in its read loop. The next/previous links build a doubly linked list that the parent
// channel will iterate in its channelReadComplete to end the read cycle for each child channel in the list.
AbstractHttp2StreamChannel next;
AbstractHttp2StreamChannel previous;
private boolean readCompletePending;

AbstractHttp2StreamChannel(DefaultHttp2FrameStream stream, int id, ChannelHandler inboundHandler) {
this.stream = stream;
Expand Down Expand Up @@ -535,16 +530,18 @@ void fireChildRead(Http2Frame frame) {
// otherwise we would have drained it from the queue and processed it during the read cycle.
assert inboundBuffer == null || inboundBuffer.isEmpty();
final RecvByteBufAllocator.Handle allocHandle = unsafe.recvBufAllocHandle();
unsafe.doRead0(frame, allocHandle);
flowControlledBytes += unsafe.doRead0(frame, allocHandle);
// We currently don't need to check for readEOS because the parent channel and child channel are limited
// to the same EventLoop thread. There are a limited number of frame types that may come after EOS is
// read (unknown, reset) and the trade off is less conditionals for the hot path (headers/data) at the
// cost of additional readComplete notifications on the rare path.
if (allocHandle.continueReading()) {
tryAddChildChannelToReadPendingQueue();
if (!readCompletePending) {
readCompletePending = true;
addChannelToReadCompletePendingQueue();
}
} else {
tryRemoveChildChannelFromReadPendingQueue();
unsafe.notifyReadComplete(allocHandle);
unsafe.notifyReadComplete(allocHandle, true);
}
} else {
if (inboundBuffer == null) {
Expand All @@ -556,8 +553,8 @@ void fireChildRead(Http2Frame frame) {

void fireChildReadComplete() {
assert eventLoop().inEventLoop();
assert readStatus != ReadStatus.IDLE;
unsafe.notifyReadComplete(unsafe.recvBufAllocHandle());
assert readStatus != ReadStatus.IDLE || !readCompletePending;
unsafe.notifyReadComplete(unsafe.recvBufAllocHandle(), false);
}

private final class Http2ChannelUnsafe implements Unsafe {
Expand Down Expand Up @@ -651,14 +648,16 @@ public void operationComplete(ChannelFuture future) {
return;
}
closeInitiated = true;

tryRemoveChildChannelFromReadPendingQueue();
// Just set to false as removing from an underlying queue would even be more expensive.
readCompletePending = false;

final boolean wasActive = isActive();

// Only ever send a reset frame if the connection is still alive and if the stream may have existed
updateLocalWindowIfNeeded();

// Only ever send a reset frame if the connection is still alive and if the stream was created before
// as otherwise we may send a RST on a stream in an invalid state and cause a connection error.
if (parent().isActive() && !readEOS && streamMayHaveExisted(stream())) {
if (parent().isActive() && !readEOS && Http2CodecUtil.isStreamIdValid(stream.id())) {
Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream());
write(resetFrame, unsafe().voidPromise());
flush();
Expand Down Expand Up @@ -782,7 +781,7 @@ void doBeginRead() {
allocHandle.reset(config());
boolean continueReading = false;
do {
doRead0((Http2Frame) message, allocHandle);
flowControlledBytes += doRead0((Http2Frame) message, allocHandle);
} while ((readEOS || (continueReading = allocHandle.continueReading())) &&
(message = inboundBuffer.poll()) != null);

Expand All @@ -791,10 +790,12 @@ void doBeginRead() {
// currently reading it is possile that more frames will be delivered to this child channel. In
// the case that this child channel still wants to read we delay the channelReadComplete on this
// child channel until the parent is done reading.
boolean added = tryAddChildChannelToReadPendingQueue();
assert added;
if (!readCompletePending) {
readCompletePending = true;
addChannelToReadCompletePendingQueue();
}
} else {
notifyReadComplete(allocHandle);
notifyReadComplete(allocHandle, true);
}
}
}
Expand All @@ -803,13 +804,30 @@ void readEOS() {
readEOS = true;
}

void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle) {
assert next == null && previous == null;
private void updateLocalWindowIfNeeded() {
if (flowControlledBytes != 0) {
int bytes = flowControlledBytes;
flowControlledBytes = 0;
write0(parentContext(), new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
writeDoneAndNoFlush = true;
}
}

void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle, boolean forceReadComplete) {
if (!readCompletePending && !forceReadComplete) {
return;
}
// Set to false just in case we added the channel multiple times before.
readCompletePending = false;

if (readStatus == ReadStatus.REQUESTED) {
readStatus = ReadStatus.IN_PROGRESS;
} else {
readStatus = ReadStatus.IDLE;
}

updateLocalWindowIfNeeded();

allocHandle.readComplete();
pipeline().fireChannelReadComplete();
// Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent
Expand All @@ -822,29 +840,20 @@ void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle) {
}

@SuppressWarnings("deprecation")
void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
int doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
pipeline().fireChannelRead(frame);
allocHandle.incMessagesRead(1);

if (frame instanceof Http2DataFrame) {
final int numBytesToBeConsumed = ((Http2DataFrame) frame).initialFlowControlledBytes();
allocHandle.attemptedBytesRead(numBytesToBeConsumed);
allocHandle.lastBytesRead(numBytesToBeConsumed);
if (numBytesToBeConsumed != 0) {
try {
if (consumeBytes(stream, numBytesToBeConsumed)) {
// We wrote some WINDOW_UPDATE frame, so we may need to do a flush.
writeDoneAndNoFlush = true;
flush();
}
} catch (Http2Exception e) {
pipeline().fireExceptionCaught(e);
}
}
return numBytesToBeConsumed;
} else {
allocHandle.attemptedBytesRead(MIN_HTTP2_FRAME_SIZE);
allocHandle.lastBytesRead(MIN_HTTP2_FRAME_SIZE);
}
return 0;
}

@Override
Expand Down Expand Up @@ -1041,10 +1050,7 @@ protected ChannelFuture write0(ChannelHandlerContext ctx, Object msg) {
return promise;
}

protected abstract boolean consumeBytes(Http2FrameStream stream, int bytes) throws Http2Exception;
protected abstract boolean isParentReadInProgress();
protected abstract boolean streamMayHaveExisted(Http2FrameStream stream);
protected abstract void tryRemoveChildChannelFromReadPendingQueue();
protected abstract boolean tryAddChildChannelToReadPendingQueue();
protected abstract void addChannelToReadCompletePendingQueue();
protected abstract ChannelHandlerContext parentContext();
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,4 @@ private static Http2FrameCodec requireHttp2FrameCodec(ChannelHandlerContext ctx)
}
return (Http2FrameCodec) frameCodecCtx.handler();
}

boolean isValidLocalStreamId(Http2FrameStream stream) {
return frameCodec.connection().local().isValidStreamId(stream.id());
}

boolean streamMayHaveExisted(Http2FrameStream stream) {
return frameCodec.connection().streamMayHaveExisted(stream.id());
}

boolean consumeBytes(Http2FrameStream stream, int bytes) throws Http2Exception {
return frameCodec.consumeBytes(stream.id(), bytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ public static boolean isStreamIdValid(int streamId) {
return streamId >= 0;
}

static boolean isStreamIdValid(int streamId, boolean server) {
return isStreamIdValid(streamId) && server == ((streamId & 1) == 0);
}

/**
* Indicates whether or not the given value for max frame size falls within the valid range.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,16 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
} else if (msg instanceof Http2ResetFrame) {
Http2ResetFrame rstFrame = (Http2ResetFrame) msg;
encoder().writeRstStream(ctx, rstFrame.stream().id(), rstFrame.errorCode(), promise);
int id = rstFrame.stream().id();
// Only ever send a reset frame if stream may have existed before as otherwise we may send a RST on a
// stream in an invalid state and cause a connection error.
if (connection().streamMayHaveExisted(id)) {
encoder().writeRstStream(ctx, rstFrame.stream().id(), rstFrame.errorCode(), promise);
} else {
ReferenceCountUtil.release(rstFrame);
promise.setFailure(Http2Exception.streamError(
rstFrame.stream().id(), Http2Error.PROTOCOL_ERROR, "Stream never existed"));
}
} else if (msg instanceof Http2PingFrame) {
Http2PingFrame frame = (Http2PingFrame) msg;
encoder().writePing(ctx, frame.ack(), frame.content(), promise);
Expand Down
Loading

0 comments on commit df46a34

Please sign in to comment.