Skip to content

Commit

Permalink
HTTP/2 LifecycleManager and Http2ConnectionHandler interface clarific…
Browse files Browse the repository at this point in the history
…ations

Motiviation:
The interface provided by Http2LifecycleManager is not clear as to how the writeXXX methods should behave.  The implementation of this interface from the Http2ConnectionHandler's perspecitve is unclear what writeXXX means in this context.

Modifications:
- Method names in Http2LifecycleManager and Http2ConnectionHandler should be renamed and comments should clarify the interfaces.

Results:
Http2LifecycleManager is more clear and Http2ConnectionHandler's implementation makes sense w.r.t to return values.
Scottmitch committed Apr 6, 2015
1 parent 8daaf1c commit cee3cc2
Showing 7 changed files with 67 additions and 63 deletions.
Original file line number Diff line number Diff line change
@@ -262,7 +262,7 @@ public int onDataRead(final ChannelHandlerContext ctx, int streamId, ByteBuf dat
}

if (endOfStream) {
lifecycleManager.closeRemoteSide(stream, ctx.newSucceededFuture());
lifecycleManager.closeStreamRemote(stream, ctx.newSucceededFuture());
}
}
}
@@ -321,7 +321,7 @@ public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers

// If the headers completes this stream, close it.
if (endOfStream) {
lifecycleManager.closeRemoteSide(stream, ctx.newSucceededFuture());
lifecycleManager.closeStreamRemote(stream, ctx.newSucceededFuture());
}
}

Original file line number Diff line number Diff line change
@@ -219,7 +219,7 @@ public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int
public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
ChannelPromise promise) {
// Delegate to the lifecycle manager for proper updating of connection state.
return lifecycleManager.writeRstStream(ctx, streamId, errorCode, promise);
return lifecycleManager.resetStream(ctx, streamId, errorCode, promise);
}

@Override
@@ -287,7 +287,7 @@ public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, i
@Override
public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData,
ChannelPromise promise) {
return lifecycleManager.writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
return lifecycleManager.goAway(ctx, lastStreamId, errorCode, debugData, promise);
}

@Override
@@ -470,7 +470,7 @@ public FlowControlledBase(final ChannelHandlerContext ctx, final Http2Stream str
@Override
public void writeComplete() {
if (endOfStream) {
lifecycleManager.closeLocalSide(stream, promise);
lifecycleManager.closeStreamLocal(stream, promise);
}
}

Original file line number Diff line number Diff line change
@@ -325,7 +325,7 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce
return;
}

ChannelFuture future = writeGoAway(ctx, null);
ChannelFuture future = goAway(ctx, null);

// If there are no active streams, close immediately after the send is complete.
// Otherwise wait until all streams are inactive.
@@ -376,7 +376,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
* @param future If closing, the future after which to close the channel.
*/
@Override
public void closeLocalSide(Http2Stream stream, ChannelFuture future) {
public void closeStreamLocal(Http2Stream stream, ChannelFuture future) {
switch (stream.state()) {
case HALF_CLOSED_LOCAL:
case OPEN:
@@ -396,7 +396,7 @@ public void closeLocalSide(Http2Stream stream, ChannelFuture future) {
* @param future If closing, the future after which to close the channel.
*/
@Override
public void closeRemoteSide(Http2Stream stream, ChannelFuture future) {
public void closeStreamRemote(Http2Stream stream, ChannelFuture future) {
switch (stream.state()) {
case HALF_CLOSED_REMOTE:
case OPEN:
@@ -408,13 +408,6 @@ public void closeRemoteSide(Http2Stream stream, ChannelFuture future) {
}
}

/**
* Closes the given stream and adds a hook to close the channel after the given future
* completes.
*
* @param stream the stream to be closed.
* @param future the future after which to close the channel.
*/
@Override
public void closeStream(final Http2Stream stream, ChannelFuture future) {
stream.close();
@@ -466,7 +459,7 @@ protected void onConnectionError(ChannelHandlerContext ctx, Throwable cause, Htt
if (http2Ex == null) {
http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
}
writeGoAway(ctx, http2Ex).addListener(new ClosingChannelFutureListener(ctx, ctx.newPromise()));
goAway(ctx, http2Ex).addListener(new ClosingChannelFutureListener(ctx, ctx.newPromise()));
}

/**
@@ -478,18 +471,15 @@ protected void onConnectionError(ChannelHandlerContext ctx, Throwable cause, Htt
* @param http2Ex the {@link StreamException} that is embedded in the causality chain.
*/
protected void onStreamError(ChannelHandlerContext ctx, Throwable cause, StreamException http2Ex) {
writeRstStream(ctx, http2Ex.streamId(), http2Ex.error().code(), ctx.newPromise());
resetStream(ctx, http2Ex.streamId(), http2Ex.error().code(), ctx.newPromise());
}

protected Http2FrameWriter frameWriter() {
return encoder().frameWriter();
}

/**
* Writes a {@code RST_STREAM} frame to the remote endpoint and updates the connection state appropriately.
*/
@Override
public ChannelFuture writeRstStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
public ChannelFuture resetStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
final ChannelPromise promise) {
final Http2Stream stream = connection().stream(streamId);
if (stream == null || stream.isResetSent()) {
@@ -519,12 +509,9 @@ public void operationComplete(ChannelFuture future) throws Exception {
return future;
}

/**
* Sends a {@code GO_AWAY} frame to the remote endpoint and updates the connection state appropriately.
*/
@Override
public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData,
ChannelPromise promise) {
public ChannelFuture goAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode,
ByteBuf debugData, ChannelPromise promise) {
Http2Connection connection = connection();
if (connection.goAwayReceived() || connection.goAwaySent()) {
debugData.release();
@@ -540,9 +527,9 @@ public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, lo
}

/**
* Sends a {@code GO_AWAY} frame appropriate for the given exception.
* Close the remote endpoint with with a {@code GO_AWAY} frame.
*/
private ChannelFuture writeGoAway(ChannelHandlerContext ctx, Http2Exception cause) {
private ChannelFuture goAway(ChannelHandlerContext ctx, Http2Exception cause) {
Http2Connection connection = connection();
if (connection.goAwayReceived() || connection.goAwaySent()) {
return ctx.newSucceededFuture();
@@ -553,7 +540,7 @@ private ChannelFuture writeGoAway(ChannelHandlerContext ctx, Http2Exception caus
long errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
ByteBuf debugData = Http2CodecUtil.toByteBuf(ctx, cause);
int lastKnownStream = connection.remote().lastStreamCreated();
return writeGoAway(ctx, lastKnownStream, errorCode, debugData, ctx.newPromise());
return goAway(ctx, lastKnownStream, errorCode, debugData, ctx.newPromise());
}

/**
Original file line number Diff line number Diff line change
@@ -26,44 +26,61 @@
public interface Http2LifecycleManager {

/**
* Closes the local side of the given stream. If this causes the stream to be closed, adds a
* hook to deactivate the stream and close the channel after the given future completes.
*
* Closes the local side of the {@code stream}. Depending on the {@code stream} state this may result in
* {@code stream} being closed. See {@link closeStream(Http2Stream, ChannelFuture)}.
* @param stream the stream to be half closed.
* @param future If closing, the future after which to close the channel.
* @param future See {@link closeStream(Http2Stream, ChannelFuture)}.
*/
void closeLocalSide(Http2Stream stream, ChannelFuture future);
void closeStreamLocal(Http2Stream stream, ChannelFuture future);

/**
* Closes the remote side of the given stream. If this causes the stream to be closed, adds a
* hook to deactivate the stream and close the channel after the given future completes.
*
* Closes the remote side of the {@code stream}. Depending on the {@code stream} state this may result in
* {@code stream} being closed. See {@link closeStream(Http2Stream, ChannelFuture)}.
* @param stream the stream to be half closed.
* @param future If closing, the future after which to close the channel.
* @param future See {@link closeStream(Http2Stream, ChannelFuture)}.
*/
void closeRemoteSide(Http2Stream stream, ChannelFuture future);
void closeStreamRemote(Http2Stream stream, ChannelFuture future);

/**
* Closes the given stream and adds a hook to deactivate the stream and close the channel after
* the given future completes.
*
* @param stream the stream to be closed.
* @param future the future after which to close the channel.
* Closes and deactivates the given {@code stream}. A listener is also attached to {@code future} and upon
* completion the underlying channel will be closed if {@link Http2Connection#numActiveStreams()} is 0.
* @param stream the stream to be closed and deactivated.
* @param future when completed if {@link Http2Connection#numActiveStreams()} is 0 then the underlying channel
* will be closed.
*/
void closeStream(Http2Stream stream, ChannelFuture future);

/**
* Writes a {@code RST_STREAM} frame to the remote endpoint and updates the connection state
* appropriately.
* Ensure the stream identified by {@code streamId} is reset. If our local state does not indicate the stream has
* been reset yet then a {@code RST_STREAM} will be sent to the peer. If our local state indicates the stream
* has already been reset then the return status will indicate success without sending anything to the peer.
* @param ctx The context used for communication and buffer allocation if necessary.
* @param streamId The identifier of the stream to reset.
* @param errorCode Justification as to why this stream is being reset. See {@link Http2Error}.
* @param promise Used to indicate the return status of this operation.
* @return Will be considered successful when the connection and stream state has been updated, and a
* {@code RST_STREAM} frame has been sent to the peer. If the stream state has already been updated and a
* {@code RST_STREAM} frame has been sent then the return status may indicate success immediately.
*/
ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
ChannelFuture resetStream(ChannelHandlerContext ctx, int streamId, long errorCode,
ChannelPromise promise);

/**
* Sends a {@code GO_AWAY} frame to the remote endpoint and updates the connection state
* appropriately.
* Close the connection and prevent the peer from creating streams. After this call the peer
* is not allowed to create any new streams and the local endpoint will be limited to creating streams with
* {@code stream identifier <= lastStreamId}. This may result in sending a {@code GO_AWAY} frame (assuming we
* have not already sent one with {@code Last-Stream-ID <= lastStreamId}), or may just return success if a
* {@code GO_AWAY} has previously been sent.
* @param ctx The context used for communication and buffer allocation if necessary.
* @param lastStreamId The last stream that the local endpoint is claiming it will accept.
* @param errorCode The rational as to why the connection is being closed. See {@link Http2Error}.
* @param debugData For diagnostic purposes (carries no semantic value).
* @param promise Used to indicate the return status of this operation.
* @return Will be considered successful when the connection and stream state has been updated, and a
* {@code GO_AWAY} frame has been sent to the peer. If the stream state has already been updated and a
* {@code GO_AWAY} frame has been sent then the return status may indicate success immediately.
*/
ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode,
ChannelFuture goAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode,
ByteBuf debugData, ChannelPromise promise);

/**
Original file line number Diff line number Diff line change
@@ -236,12 +236,12 @@ public void dataReadAfterRstStreamForStreamInInvalidStateShouldIgnore() throws E
}

@Test
public void dataReadWithEndOfStreamShouldCloseRemoteSide() throws Exception {
public void dataReadWithEndOfStreamShouldcloseStreamRemote() throws Exception {
final ByteBuf data = dummyData();
try {
decode().onDataRead(ctx, STREAM_ID, data, 10, true);
verify(localFlow).receiveFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(10), eq(true));
verify(lifecycleManager).closeRemoteSide(eq(stream), eq(future));
verify(lifecycleManager).closeStreamRemote(eq(stream), eq(future));
verify(listener).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(10), eq(true));
} finally {
data.release();
@@ -284,7 +284,7 @@ public Integer answer(InvocationOnMock in) throws Throwable {
} catch (RuntimeException cause) {
verify(localFlow)
.receiveFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(padding), eq(true));
verify(lifecycleManager).closeRemoteSide(eq(stream), eq(future));
verify(lifecycleManager).closeStreamRemote(eq(stream), eq(future));
verify(listener).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(padding), eq(true));
assertEquals(0, localFlow.unconsumedBytes(stream));
} finally {
@@ -341,7 +341,7 @@ public void headersReadForPromisedStreamShouldCloseStream() throws Exception {
when(stream.state()).thenReturn(RESERVED_REMOTE);
decode().onHeadersRead(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true);
verify(stream).open(true);
verify(lifecycleManager).closeRemoteSide(eq(stream), eq(future));
verify(lifecycleManager).closeStreamRemote(eq(stream), eq(future));
verify(listener).onHeadersRead(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true));
}
@@ -354,7 +354,7 @@ public void headersDependencyNotCreatedShouldCreateAndSucceed() throws Exception
verify(listener).onHeadersRead(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(STREAM_DEPENDENCY_ID),
eq(weight), eq(true), eq(0), eq(true));
verify(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq(weight), eq(true));
verify(lifecycleManager).closeRemoteSide(eq(stream), any(ChannelFuture.class));
verify(lifecycleManager).closeStreamRemote(eq(stream), any(ChannelFuture.class));
}

@Test
@@ -371,7 +371,7 @@ public Http2Stream answer(InvocationOnMock in) throws Throwable {
verify(listener).onHeadersRead(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(STREAM_DEPENDENCY_ID),
eq(weight), eq(true), eq(0), eq(true));
verify(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq(weight), eq(true));
verify(lifecycleManager).closeRemoteSide(eq(stream), any(ChannelFuture.class));
verify(lifecycleManager).closeStreamRemote(eq(stream), any(ChannelFuture.class));
}

@Test(expected = RuntimeException.class)
@@ -388,7 +388,7 @@ public Http2Stream answer(InvocationOnMock in) throws Throwable {
verify(listener, never()).onHeadersRead(any(ChannelHandlerContext.class), anyInt(), any(Http2Headers.class),
anyInt(), anyShort(), anyBoolean(), anyInt(), anyBoolean());
verify(stream).setPriority(eq(STREAM_DEPENDENCY_ID), eq(weight), eq(true));
verify(lifecycleManager, never()).closeRemoteSide(eq(stream), any(ChannelFuture.class));
verify(lifecycleManager, never()).closeStreamRemote(eq(stream), any(ChannelFuture.class));
}

@Test
Original file line number Diff line number Diff line change
@@ -421,7 +421,7 @@ public void rstStreamWriteForUnknownStreamShouldIgnore() throws Exception {
@Test
public void rstStreamWriteShouldCloseStream() throws Exception {
encoder.writeRstStream(ctx, STREAM_ID, PROTOCOL_ERROR.code(), promise);
verify(lifecycleManager).writeRstStream(eq(ctx), eq(STREAM_ID), eq(PROTOCOL_ERROR.code()), eq(promise));
verify(lifecycleManager).resetStream(eq(ctx), eq(STREAM_ID), eq(PROTOCOL_ERROR.code()), eq(promise));
}

@Test
@@ -461,7 +461,7 @@ public void dataWriteShouldCreateHalfClosedStream() {
ByteBuf data = dummyData();
encoder.writeData(ctx, STREAM_ID, data.retain(), 0, true, promise);
verify(remoteFlow).sendFlowControlled(eq(ctx), eq(stream), any(FlowControlled.class));
verify(lifecycleManager).closeLocalSide(stream, promise);
verify(lifecycleManager).closeStreamLocal(stream, promise);
assertEquals(data.toString(UTF_8), writtenData.get(0));
data.release();
}
@@ -483,7 +483,7 @@ public void headersWriteShouldHalfCloseStream() throws Exception {
// Trigger the write and mark the promise successful to trigger listeners
payloadCaptor.getValue().write(0);
promise.trySuccess();
verify(lifecycleManager).closeLocalSide(eq(stream), eq(promise));
verify(lifecycleManager).closeStreamLocal(eq(stream), eq(promise));
}

@Test
@@ -498,7 +498,7 @@ public void headersWriteShouldHalfClosePushStream() throws Exception {
verify(stream).open(true);

promise.trySuccess();
verify(lifecycleManager).closeLocalSide(eq(stream), eq(promise));
verify(lifecycleManager).closeStreamLocal(eq(stream), eq(promise));
}

private void mockSendFlowControlledWriteEverything() {
Original file line number Diff line number Diff line change
@@ -240,7 +240,7 @@ public void encoderAndDecoderAreClosedOnChannelInactive() throws Exception {
@Test
public void writeRstOnNonExistantStreamShouldSucceed() throws Exception {
handler = newHandler();
handler.writeRstStream(ctx, NON_EXISTANT_STREAM_ID, STREAM_CLOSED.code(), promise);
handler.resetStream(ctx, NON_EXISTANT_STREAM_ID, STREAM_CLOSED.code(), promise);
verify(frameWriter, never())
.writeRstStream(any(ChannelHandlerContext.class), anyInt(), anyLong(), any(ChannelPromise.class));
assertTrue(promise.isDone());
@@ -256,7 +256,7 @@ public void writeRstOnClosedStreamShouldSucceed() throws Exception {
when(stream.state()).thenReturn(CLOSED);
// The stream is "closed" but is still known about by the connection (connection().stream(..)
// will return the stream). We should still write a RST_STREAM frame in this scenario.
handler.writeRstStream(ctx, STREAM_ID, STREAM_CLOSED.code(), promise);
handler.resetStream(ctx, STREAM_ID, STREAM_CLOSED.code(), promise);
verify(frameWriter).writeRstStream(eq(ctx), eq(STREAM_ID), anyLong(), any(ChannelPromise.class));
}

0 comments on commit cee3cc2

Please sign in to comment.