Skip to content

Commit

Permalink
Remove explicit flushes from HTTP2 encoders, decoders & flow-controllers
Browse files Browse the repository at this point in the history
Motivation:

Allow users of HTTP2 to control when flushes occur so they can optimize network writes.

Modifications:

Removed explicit calls to flush in encoder, decoder & flow-controller
Connection handler now calls flush on read-complete to enable batching writes in response to reads

Result:

Much less flushing occurs for normal HTTP2 request and response patterns.
  • Loading branch information
Louis Ryan authored and Scottmitch committed May 1, 2015
1 parent ea6ba70 commit b122fae
Show file tree
Hide file tree
Showing 12 changed files with 79 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,6 @@ public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exce
// Send an ack back to the remote client.
// Need to retain the buffer here since it will be released after the write completes.
encoder.writePing(ctx, true, data.retain(), ctx.newPromise());
ctx.flush();

listener.onPingRead(ctx, data);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int
}

ChannelFuture future = frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise);
ctx.flush();
return future;
}

Expand All @@ -224,21 +223,18 @@ public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings sett
}

ChannelFuture future = frameWriter.writeSettings(ctx, settings, promise);
ctx.flush();
return future;
}

@Override
public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) {
ChannelFuture future = frameWriter.writeSettingsAck(ctx, promise);
ctx.flush();
return future;
}

@Override
public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data, ChannelPromise promise) {
ChannelFuture future = frameWriter.writePing(ctx, ack, data, promise);
ctx.flush();
return future;
}

Expand All @@ -258,7 +254,6 @@ public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, i
}

ChannelFuture future = frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise);
ctx.flush();
return future;
}

Expand Down Expand Up @@ -345,16 +340,13 @@ public void error(Throwable cause) {
}

@Override
public boolean write(int allowedBytes) {
public void write(int allowedBytes) {
int bytesWritten = 0;
if (data == null || (allowedBytes == 0 && size != 0)) {
// No point writing an empty DATA frame, wait for a bigger allowance.
return;
}
try {
if (data == null) {
return false;
}
if (allowedBytes == 0 && size != 0) {
// No point writing an empty DATA frame, wait for a bigger allowance.
return false;
}
int maxFrameSize = frameWriter().configuration().frameSizePolicy().maxFrameSize();
do {
int allowedFrameSize = Math.min(maxFrameSize, allowedBytes - bytesWritten);
Expand Down Expand Up @@ -386,7 +378,6 @@ public boolean write(int allowedBytes) {
frameWriter().writeData(ctx, stream.id(), toWrite, writeablePadding,
size == bytesWritten && endOfStream, writePromise);
} while (size != bytesWritten && allowedBytes > bytesWritten);
return true;
} finally {
size -= bytesWritten;
}
Expand Down Expand Up @@ -427,10 +418,9 @@ public void error(Throwable cause) {
}

@Override
public boolean write(int allowedBytes) {
public void write(int allowedBytes) {
frameWriter().writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive,
padding, endOfStream, promise);
return true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ private void writeWindowUpdate(ChannelHandlerContext ctx) throws Http2Exception

// Send a window update for the stream/connection.
frameWriter.writeWindowUpdate(ctx, stream.id(), deltaWindowSize, ctx.newPromise());
ctx.flush();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public boolean visit(Http2Stream stream) {
private final Http2Connection.PropertyKey stateKey;
private int initialWindowSize = DEFAULT_WINDOW_SIZE;
private ChannelHandlerContext ctx;
private boolean needFlush;

public DefaultHttp2RemoteFlowController(Http2Connection connection) {
this.connection = checkNotNull(connection, "connection");
Expand Down Expand Up @@ -185,7 +184,6 @@ public void incrementWindowSize(ChannelHandlerContext ctx, Http2Stream stream, i
AbstractState state = state(stream);
state.incrementStreamWindow(delta);
state.writeBytes(state.writableWindow());
flush();
}
}

Expand All @@ -207,11 +205,6 @@ public void sendFlowControlled(ChannelHandlerContext ctx, Http2Stream stream, Fl
return;
}
state.writeBytes(state.writableWindow());
try {
flush();
} catch (Throwable t) {
frame.error(t);
}
}

/**
Expand All @@ -237,16 +230,6 @@ private int connectionWindowSize() {
return connectionState().windowSize();
}

/**
* Flushes the {@link ChannelHandlerContext} if we've received any data frames.
*/
private void flush() {
if (needFlush) {
ctx.flush();
needFlush = false;
}
}

/**
* Writes as many pending bytes as possible, according to stream priority.
*/
Expand All @@ -260,7 +243,6 @@ private void writePendingBytes() throws Http2Exception {

// Now write all of the allocated bytes.
connection.forEachActiveStream(WRITE_ALLOCATED_BYTES);
flush();
}
}

Expand Down Expand Up @@ -604,21 +586,18 @@ private boolean isNextFrameEmpty() {
/**
* Writes the frame and decrements the stream and connection window sizes. If the frame is in the pending
* queue, the written bytes are removed from this branch of the priority tree.
* <p>
* Note: this does not flush the {@link ChannelHandlerContext}.
* </p>
*/
private int write(FlowControlled frame, int allowedBytes) {
int before = frame.size();
int writtenBytes = 0;
int writtenBytes;
// In case an exception is thrown we want to remember it and pass it to cancel(Throwable).
Throwable cause = null;
try {
assert !writing;

// Write the portion of the frame.
writing = true;
needFlush |= frame.write(max(0, allowedBytes));
frame.write(max(0, allowedBytes));
if (!cancelled && frame.size() == 0) {
// This frame has been fully written, remove this frame and notify it. Since we remove this frame
// first, we're guaranteed that its error method will not be called when we call cancel.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce
}

ChannelFuture future = goAway(ctx, null);
ctx.flush();

// If there are no active streams, close immediately after the send is complete.
// Otherwise wait until all streams are inactive.
Expand All @@ -350,6 +351,13 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// Trigger flush after read on the assumption that flush is cheap if there is nothing to write and that
// for flow-control the read may release window that causes data to be written that can now be flushed.
ctx.flush();
}

/**
* Handles {@link Http2Exception} objects that were thrown from other handlers. Ignores all other exceptions.
*/
Expand Down Expand Up @@ -439,6 +447,7 @@ public void onException(ChannelHandlerContext ctx, Throwable cause) {
} else {
onConnectionError(ctx, cause, embedded);
}
ctx.flush();
}

/**
Expand Down Expand Up @@ -483,7 +492,6 @@ public ChannelFuture resetStream(final ChannelHandlerContext ctx, int streamId,
}

ChannelFuture future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise);
ctx.flush();

// Synchronously set the resetSent flag to prevent any subsequent calls
// from resulting in multiple reset frames being sent.
Expand Down Expand Up @@ -518,7 +526,6 @@ public ChannelFuture goAway(final ChannelHandlerContext ctx, final int lastStrea
connection.goAwaySent(lastStreamId, errorCode, debugData);

ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
ctx.flush();

future.addListener(new GenericFutureListener<ChannelFuture>() {
@Override
Expand Down Expand Up @@ -546,7 +553,8 @@ public void operationComplete(ChannelFuture future) throws Exception {
}

/**
* Close the remote endpoint with with a {@code GO_AWAY} frame.
* Close the remote endpoint with with a {@code GO_AWAY} frame. Does <strong>not</strong> flush
* immediately, this is the responsibility of the caller.
*/
private ChannelFuture goAway(ChannelHandlerContext ctx, Http2Exception cause) {
long errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ public interface Http2RemoteFlowController extends Http2FlowController {
* guarantee when the data will be written or whether it will be split into multiple frames
* before sending.
* <p>
* Manually flushing the {@link ChannelHandlerContext} is not required, since the flow
* controller will flush as appropriate.
* Manually flushing the {@link ChannelHandlerContext} is required for writes as the flow controller will
* <strong>not</strong> flush by itself.
*
* @param ctx the context from the handler.
* @param stream the subject stream. Must not be the connection stream object.
Expand Down Expand Up @@ -75,15 +75,14 @@ interface FlowControlled {
* Writes up to {@code allowedBytes} of the encapsulated payload to the stream. Note that
* a value of 0 may be passed which will allow payloads with flow-control size == 0 to be
* written. The flow-controller may call this method multiple times with different values until
* the payload is fully written.
* the payload is fully written, i.e it's size after the write is 0.
* <p>
* When an exception is thrown the {@link Http2RemoteFlowController} will make a call to
* {@link #error(Throwable)}.
* </p>
*
* @param allowedBytes an upper bound on the number of bytes the payload can write at this time.
* @return {@code true} if a flush is required, {@code false} otherwise.
*/
boolean write(int allowedBytes);
void write(int allowedBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
Expand All @@ -48,6 +49,7 @@
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException;
import junit.framework.AssertionFailedError;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -161,6 +163,9 @@ public Http2Stream answer(InvocationOnMock in) throws Throwable {

// Simulate receiving the SETTINGS ACK for the initial settings.
decode().onSettingsAckRead(ctx);

// Disallow any further flushes now that settings ACK has been sent
when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden"));
}

@Test
Expand Down Expand Up @@ -605,7 +610,6 @@ public void settingsReadWithAckShouldNotifylistener() throws Exception {

@Test
public void settingsReadShouldSetValues() throws Exception {
when(connection.isServer()).thenReturn(true);
Http2Settings settings = new Http2Settings();
settings.pushEnabled(true);
settings.initialWindowSize(123);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.ArrayList;
import java.util.List;

import junit.framework.AssertionFailedError;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -207,6 +208,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
when(ctx.newSucceededFuture()).thenReturn(future);
when(ctx.newPromise()).thenReturn(promise);
when(ctx.write(any())).thenReturn(future);
when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden"));

encoder = new DefaultHttp2ConnectionEncoder(connection, writer);
encoder.lifecycleManager(lifecycleManager);
Expand All @@ -217,7 +219,7 @@ public void dataWriteShouldSucceed() throws Exception {
final ByteBuf data = dummyData();
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
assertEquals(payloadCaptor.getValue().size(), 8);
assertTrue(payloadCaptor.getValue().write(8));
payloadCaptor.getValue().write(8);
assertEquals(0, payloadCaptor.getValue().size());
assertEquals("abcdefgh", writtenData.get(0));
assertEquals(0, data.refCnt());
Expand All @@ -229,7 +231,7 @@ public void dataLargerThanMaxFrameSizeShouldBeSplit() throws Exception {
final ByteBuf data = dummyData();
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
assertEquals(payloadCaptor.getValue().size(), 8);
assertTrue(payloadCaptor.getValue().write(8));
payloadCaptor.getValue().write(8);
// writer was called 3 times
assertEquals(3, writtenData.size());
assertEquals("abc", writtenData.get(0));
Expand All @@ -244,7 +246,7 @@ public void paddingSplitOverFrame() throws Exception {
final ByteBuf data = dummyData();
encoder.writeData(ctx, STREAM_ID, data, 5, true, promise);
assertEquals(payloadCaptor.getValue().size(), 13);
assertTrue(payloadCaptor.getValue().write(13));
payloadCaptor.getValue().write(13);
// writer was called 3 times
assertEquals(3, writtenData.size());
assertEquals("abcde", writtenData.get(0));
Expand All @@ -262,7 +264,7 @@ public void frameShouldSplitPadding() throws Exception {
ByteBuf data = dummyData();
encoder.writeData(ctx, STREAM_ID, data, 10, true, promise);
assertEquals(payloadCaptor.getValue().size(), 18);
assertTrue(payloadCaptor.getValue().write(18));
payloadCaptor.getValue().write(18);
// writer was called 4 times
assertEquals(4, writtenData.size());
assertEquals("abcde", writtenData.get(0));
Expand Down Expand Up @@ -292,7 +294,7 @@ private void assertSplitPaddingOnEmptyBuffer(ByteBuf data) throws Exception {
when(frameSizePolicy.maxFrameSize()).thenReturn(5);
encoder.writeData(ctx, STREAM_ID, data, 10, true, promise);
assertEquals(payloadCaptor.getValue().size(), 10);
assertTrue(payloadCaptor.getValue().write(10));
payloadCaptor.getValue().write(10);
// writer was called 2 times
assertEquals(2, writtenData.size());
assertEquals("", writtenData.get(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;

import junit.framework.AssertionFailedError;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
Expand Down Expand Up @@ -64,6 +65,7 @@ public void setup() throws Http2Exception {
MockitoAnnotations.initMocks(this);

when(ctx.newPromise()).thenReturn(promise);
when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden"));

connection = new DefaultHttp2Connection(false);
controller = new DefaultHttp2LocalFlowController(connection, frameWriter, updateRatio);
Expand Down
Loading

0 comments on commit b122fae

Please sign in to comment.