Skip to content

Commit

Permalink
Reduce http2 buffer slicing (netty#8598)
Browse files Browse the repository at this point in the history
Motivation

DefaultHttp2FrameReader currently does a fair amount of "intermediate"
slicing which can be avoided.

Modifications

Avoid slicing the input buffer in DefaultHttp2FrameReader until
necessary. In one instance this also means retainedSlice can be used
instead (which may also avoid allocating).

Results

Less allocations when using http2.
  • Loading branch information
njhill authored and normanmaurer committed Nov 29, 2018
1 parent 8eb3130 commit a0c3081
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,48 +239,49 @@ private void processPayloadState(ChannelHandlerContext ctx, ByteBuf in, Http2Fra
return;
}

// Get a view of the buffer for the size of the payload.
ByteBuf payload = in.readSlice(payloadLength);
// Only process up to payloadLength bytes.
int payloadEndIndex = in.readerIndex() + payloadLength;

// We have consumed the data, next time we read we will be expecting to read a frame header.
readingHeaders = true;

// Read the payload and fire the frame event to the listener.
switch (frameType) {
case DATA:
readDataFrame(ctx, payload, listener);
readDataFrame(ctx, in, payloadEndIndex, listener);
break;
case HEADERS:
readHeadersFrame(ctx, payload, listener);
readHeadersFrame(ctx, in, payloadEndIndex, listener);
break;
case PRIORITY:
readPriorityFrame(ctx, payload, listener);
readPriorityFrame(ctx, in, listener);
break;
case RST_STREAM:
readRstStreamFrame(ctx, payload, listener);
readRstStreamFrame(ctx, in, listener);
break;
case SETTINGS:
readSettingsFrame(ctx, payload, listener);
readSettingsFrame(ctx, in, listener);
break;
case PUSH_PROMISE:
readPushPromiseFrame(ctx, payload, listener);
readPushPromiseFrame(ctx, in, payloadEndIndex, listener);
break;
case PING:
readPingFrame(ctx, payload.readLong(), listener);
readPingFrame(ctx, in.readLong(), listener);
break;
case GO_AWAY:
readGoAwayFrame(ctx, payload, listener);
readGoAwayFrame(ctx, in, payloadEndIndex, listener);
break;
case WINDOW_UPDATE:
readWindowUpdateFrame(ctx, payload, listener);
readWindowUpdateFrame(ctx, in, listener);
break;
case CONTINUATION:
readContinuationFrame(payload, listener);
readContinuationFrame(in, payloadEndIndex, listener);
break;
default:
readUnknownFrame(ctx, payload, listener);
readUnknownFrame(ctx, in, payloadEndIndex, listener);
break;
}
in.readerIndex(payloadEndIndex);
}

private void verifyDataFrame() throws Http2Exception {
Expand Down Expand Up @@ -408,21 +409,20 @@ private void verifyUnknownFrame() throws Http2Exception {
verifyNotProcessingHeaders();
}

private void readDataFrame(ChannelHandlerContext ctx, ByteBuf payload,
private void readDataFrame(ChannelHandlerContext ctx, ByteBuf payload, int payloadEndIndex,
Http2FrameListener listener) throws Http2Exception {
int padding = readPadding(payload);
verifyPadding(padding);

// Determine how much data there is to read by removing the trailing
// padding.
int dataLength = lengthWithoutTrailingPadding(payload.readableBytes(), padding);
int dataLength = lengthWithoutTrailingPadding(payloadEndIndex - payload.readerIndex(), padding);

ByteBuf data = payload.readSlice(dataLength);
listener.onDataRead(ctx, streamId, data, padding, flags.endOfStream());
payload.skipBytes(payload.readableBytes());
}

private void readHeadersFrame(final ChannelHandlerContext ctx, ByteBuf payload,
private void readHeadersFrame(final ChannelHandlerContext ctx, ByteBuf payload, int payloadEndIndex,
Http2FrameListener listener) throws Http2Exception {
final int headersStreamId = streamId;
final Http2Flags headersFlags = flags;
Expand All @@ -439,7 +439,7 @@ private void readHeadersFrame(final ChannelHandlerContext ctx, ByteBuf payload,
throw streamError(streamId, PROTOCOL_ERROR, "A stream cannot depend on itself.");
}
final short weight = (short) (payload.readUnsignedByte() + 1);
final ByteBuf fragment = payload.readSlice(lengthWithoutTrailingPadding(payload.readableBytes(), padding));
final int lenToRead = lengthWithoutTrailingPadding(payloadEndIndex - payload.readerIndex(), padding);

// Create a handler that invokes the listener when the header block is complete.
headersContinuation = new HeadersContinuation() {
Expand All @@ -449,10 +449,10 @@ public int getStreamId() {
}

@Override
public void processFragment(boolean endOfHeaders, ByteBuf fragment,
public void processFragment(boolean endOfHeaders, ByteBuf fragment, int len,
Http2FrameListener listener) throws Http2Exception {
final HeadersBlockBuilder hdrBlockBuilder = headersBlockBuilder();
hdrBlockBuilder.addFragment(fragment, ctx.alloc(), endOfHeaders);
hdrBlockBuilder.addFragment(fragment, len, ctx.alloc(), endOfHeaders);
if (endOfHeaders) {
listener.onHeadersRead(ctx, headersStreamId, hdrBlockBuilder.headers(), streamDependency,
weight, exclusive, padding, headersFlags.endOfStream());
Expand All @@ -461,7 +461,7 @@ public void processFragment(boolean endOfHeaders, ByteBuf fragment,
};

// Process the initial fragment, invoking the listener's callback if end of headers.
headersContinuation.processFragment(flags.endOfHeaders(), fragment, listener);
headersContinuation.processFragment(flags.endOfHeaders(), payload, lenToRead, listener);
resetHeadersContinuationIfEnd(flags.endOfHeaders());
return;
}
Expand All @@ -475,10 +475,10 @@ public int getStreamId() {
}

@Override
public void processFragment(boolean endOfHeaders, ByteBuf fragment,
public void processFragment(boolean endOfHeaders, ByteBuf fragment, int len,
Http2FrameListener listener) throws Http2Exception {
final HeadersBlockBuilder hdrBlockBuilder = headersBlockBuilder();
hdrBlockBuilder.addFragment(fragment, ctx.alloc(), endOfHeaders);
hdrBlockBuilder.addFragment(fragment, len, ctx.alloc(), endOfHeaders);
if (endOfHeaders) {
listener.onHeadersRead(ctx, headersStreamId, hdrBlockBuilder.headers(), padding,
headersFlags.endOfStream());
Expand All @@ -487,8 +487,8 @@ public void processFragment(boolean endOfHeaders, ByteBuf fragment,
};

// Process the initial fragment, invoking the listener's callback if end of headers.
final ByteBuf fragment = payload.readSlice(lengthWithoutTrailingPadding(payload.readableBytes(), padding));
headersContinuation.processFragment(flags.endOfHeaders(), fragment, listener);
int len = lengthWithoutTrailingPadding(payloadEndIndex - payload.readerIndex(), padding);
headersContinuation.processFragment(flags.endOfHeaders(), payload, len, listener);
resetHeadersContinuationIfEnd(flags.endOfHeaders());
}

Expand Down Expand Up @@ -543,7 +543,7 @@ private void readSettingsFrame(ChannelHandlerContext ctx, ByteBuf payload,
}
}

private void readPushPromiseFrame(final ChannelHandlerContext ctx, ByteBuf payload,
private void readPushPromiseFrame(final ChannelHandlerContext ctx, ByteBuf payload, int payloadEndIndex,
Http2FrameListener listener) throws Http2Exception {
final int pushPromiseStreamId = streamId;
final int padding = readPadding(payload);
Expand All @@ -558,9 +558,9 @@ public int getStreamId() {
}

@Override
public void processFragment(boolean endOfHeaders, ByteBuf fragment,
public void processFragment(boolean endOfHeaders, ByteBuf fragment, int len,
Http2FrameListener listener) throws Http2Exception {
headersBlockBuilder().addFragment(fragment, ctx.alloc(), endOfHeaders);
headersBlockBuilder().addFragment(fragment, len, ctx.alloc(), endOfHeaders);
if (endOfHeaders) {
listener.onPushPromiseRead(ctx, pushPromiseStreamId, promisedStreamId,
headersBlockBuilder().headers(), padding);
Expand All @@ -569,8 +569,8 @@ public void processFragment(boolean endOfHeaders, ByteBuf fragment,
};

// Process the initial fragment, invoking the listener's callback if end of headers.
final ByteBuf fragment = payload.readSlice(lengthWithoutTrailingPadding(payload.readableBytes(), padding));
headersContinuation.processFragment(flags.endOfHeaders(), fragment, listener);
int len = lengthWithoutTrailingPadding(payloadEndIndex - payload.readerIndex(), padding);
headersContinuation.processFragment(flags.endOfHeaders(), payload, len, listener);
resetHeadersContinuationIfEnd(flags.endOfHeaders());
}

Expand All @@ -583,11 +583,11 @@ private void readPingFrame(ChannelHandlerContext ctx, long data,
}
}

private static void readGoAwayFrame(ChannelHandlerContext ctx, ByteBuf payload,
private static void readGoAwayFrame(ChannelHandlerContext ctx, ByteBuf payload, int payloadEndIndex,
Http2FrameListener listener) throws Http2Exception {
int lastStreamId = readUnsignedInt(payload);
long errorCode = payload.readUnsignedInt();
ByteBuf debugData = payload.readSlice(payload.readableBytes());
ByteBuf debugData = payload.readSlice(payloadEndIndex - payload.readerIndex());
listener.onGoAwayRead(ctx, lastStreamId, errorCode, debugData);
}

Expand All @@ -601,18 +601,17 @@ private void readWindowUpdateFrame(ChannelHandlerContext ctx, ByteBuf payload,
listener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
}

private void readContinuationFrame(ByteBuf payload, Http2FrameListener listener)
private void readContinuationFrame(ByteBuf payload, int payloadEndIndex, Http2FrameListener listener)
throws Http2Exception {
// Process the initial fragment, invoking the listener's callback if end of headers.
final ByteBuf continuationFragment = payload.readSlice(payload.readableBytes());
headersContinuation.processFragment(flags.endOfHeaders(), continuationFragment,
listener);
headersContinuation.processFragment(flags.endOfHeaders(), payload,
payloadEndIndex - payload.readerIndex(), listener);
resetHeadersContinuationIfEnd(flags.endOfHeaders());
}

private void readUnknownFrame(ChannelHandlerContext ctx, ByteBuf payload, Http2FrameListener listener)
throws Http2Exception {
payload = payload.readSlice(payload.readableBytes());
private void readUnknownFrame(ChannelHandlerContext ctx, ByteBuf payload,
int payloadEndIndex, Http2FrameListener listener) throws Http2Exception {
payload = payload.readSlice(payloadEndIndex - payload.readerIndex());
listener.onUnknownFrame(ctx, frameType, streamId, flags, payload);
}

Expand Down Expand Up @@ -664,7 +663,7 @@ private abstract class HeadersContinuation {
* @param fragment the fragment of the header block to be added.
* @param listener the listener to be notified if the header block is completed.
*/
abstract void processFragment(boolean endOfHeaders, ByteBuf fragment,
abstract void processFragment(boolean endOfHeaders, ByteBuf fragment, int len,
Http2FrameListener listener) throws Http2Exception;

final HeadersBlockBuilder headersBlockBuilder() {
Expand Down Expand Up @@ -704,33 +703,32 @@ private void headerSizeExceeded() throws Http2Exception {
* This is used for an optimization for when the first fragment is the full
* block. In that case, the buffer is used directly without copying.
*/
final void addFragment(ByteBuf fragment, ByteBufAllocator alloc, boolean endOfHeaders) throws Http2Exception {
final void addFragment(ByteBuf fragment, int len, ByteBufAllocator alloc,
boolean endOfHeaders) throws Http2Exception {
if (headerBlock == null) {
if (fragment.readableBytes() > headersDecoder.configuration().maxHeaderListSizeGoAway()) {
if (len > headersDecoder.configuration().maxHeaderListSizeGoAway()) {
headerSizeExceeded();
}
if (endOfHeaders) {
// Optimization - don't bother copying, just use the buffer as-is. Need
// to retain since we release when the header block is built.
headerBlock = fragment.retain();
headerBlock = fragment.readRetainedSlice(len);
} else {
headerBlock = alloc.buffer(fragment.readableBytes());
headerBlock.writeBytes(fragment);
headerBlock = alloc.buffer(len).writeBytes(fragment, len);
}
return;
}
if (headersDecoder.configuration().maxHeaderListSizeGoAway() - fragment.readableBytes() <
if (headersDecoder.configuration().maxHeaderListSizeGoAway() - len <
headerBlock.readableBytes()) {
headerSizeExceeded();
}
if (headerBlock.isWritable(fragment.readableBytes())) {
if (headerBlock.isWritable(len)) {
// The buffer can hold the requested bytes, just write it directly.
headerBlock.writeBytes(fragment);
headerBlock.writeBytes(fragment, len);
} else {
// Allocate a new buffer that is big enough to hold the entire header block so far.
ByteBuf buf = alloc.buffer(headerBlock.readableBytes() + fragment.readableBytes());
buf.writeBytes(headerBlock);
buf.writeBytes(fragment);
ByteBuf buf = alloc.buffer(headerBlock.readableBytes() + len);
buf.writeBytes(headerBlock).writeBytes(fragment, len);
headerBlock.release();
headerBlock = buf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf
ctx.write(lastFrame, promiseAggregator.newPromise());

// Write the payload.
lastFrame = data.readSlice(maxFrameSize);
lastFrame = data.readableBytes() != maxFrameSize ? data.readSlice(maxFrameSize) : data;
data = null;
ctx.write(lastFrame, promiseAggregator.newPromise());
}
Expand Down

0 comments on commit a0c3081

Please sign in to comment.