Skip to content

Commit

Permalink
Make use of ByteBufProcessor for extract initial line and headers
Browse files Browse the repository at this point in the history
This gives some nice performance boost as readByte() is quite expensive because of the index / replay checks.
  • Loading branch information
Norman Maurer committed Jan 16, 2014
1 parent 4dc78c1 commit 1f04936
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.netty.handler.codec.http;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufProcessor;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
Expand Down Expand Up @@ -106,12 +107,14 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
private final int maxChunkSize;
private final boolean chunkedSupported;
protected final boolean validateHeaders;
private final AppendableCharSequence seq = new AppendableCharSequence(128);
private final HeaderParser headerParser = new HeaderParser(seq);
private final LineParser lineParser = new LineParser(seq);

private HttpMessage message;
private long chunkSize;
private int headerSize;
private long contentLength = Long.MIN_VALUE;
private final AppendableCharSequence sb = new AppendableCharSequence(128);

/**
* The internal state of {@link HttpObjectDecoder}.
Expand Down Expand Up @@ -191,7 +194,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> ou
}
}
case READ_INITIAL: try {
String[] initialLine = splitInitialLine(readLine(buffer, maxInitialLineLength));
String[] initialLine = splitInitialLine(lineParser.parse(buffer));
if (initialLine.length < 3) {
// Invalid initial line - ignore.
checkpoint(State.SKIP_CONTROL_CHARS);
Expand Down Expand Up @@ -299,7 +302,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> ou
* read chunk, read and ignore the CRLF and repeat until 0
*/
case READ_CHUNK_SIZE: try {
AppendableCharSequence line = readLine(buffer, maxInitialLineLength);
AppendableCharSequence line = lineParser.parse(buffer);
int chunkSize = getChunkSize(line.toString());
this.chunkSize = chunkSize;
if (chunkSize == 0) {
Expand Down Expand Up @@ -467,7 +470,7 @@ private State readHeaders(ByteBuf buffer) {
final HttpMessage message = this.message;
final HttpHeaders headers = message.headers();

AppendableCharSequence line = readHeader(buffer);
AppendableCharSequence line = headerParser.parse(buffer);
String name = null;
String value = null;
if (line.length() > 0) {
Expand All @@ -485,7 +488,7 @@ private State readHeaders(ByteBuf buffer) {
value = header[1];
}

line = readHeader(buffer);
line = headerParser.parse(buffer);
} while (line.length() > 0);

// Add the last header.
Expand Down Expand Up @@ -518,7 +521,7 @@ private long contentLength() {

private LastHttpContent readTrailingHeaders(ByteBuf buffer) {
headerSize = 0;
AppendableCharSequence line = readHeader(buffer);
AppendableCharSequence line = headerParser.parse(buffer);
String lastHeader = null;
if (line.length() > 0) {
LastHttpContent trailer = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, validateHeaders);
Expand All @@ -544,7 +547,7 @@ private LastHttpContent readTrailingHeaders(ByteBuf buffer) {
lastHeader = name;
}

line = readHeader(buffer);
line = headerParser.parse(buffer);
} while (line.length() > 0);

return trailer;
Expand All @@ -553,46 +556,6 @@ private LastHttpContent readTrailingHeaders(ByteBuf buffer) {
return LastHttpContent.EMPTY_LAST_CONTENT;
}

private AppendableCharSequence readHeader(ByteBuf buffer) {
AppendableCharSequence sb = this.sb;
sb.reset();
int headerSize = this.headerSize;

loop:
for (;;) {
char nextByte = (char) buffer.readByte();
headerSize ++;

switch (nextByte) {
case HttpConstants.CR:
nextByte = (char) buffer.readByte();
headerSize ++;
if (nextByte == HttpConstants.LF) {
break loop;
}
break;
case HttpConstants.LF:
break loop;
}

// Abort decoding if the header part is too large.
if (headerSize >= maxHeaderSize) {
// TODO: Respond with Bad Request and discard the traffic
// or close the connection.
// No need to notify the upstream handlers - just log.
// If decoding a response, just throw an exception.
throw new TooLongFrameException(
"HTTP header is larger than " +
maxHeaderSize + " bytes.");
}

sb.append(nextByte);
}

this.headerSize = headerSize;
return sb;
}

protected abstract boolean isDecodingRequest();
protected abstract HttpMessage createMessage(String[] initialLine) throws Exception;
protected abstract HttpMessage createInvalidMessage();
Expand All @@ -610,35 +573,6 @@ private static int getChunkSize(String hex) {
return Integer.parseInt(hex, 16);
}

private AppendableCharSequence readLine(ByteBuf buffer, int maxLineLength) {
AppendableCharSequence sb = this.sb;
sb.reset();
int lineLength = 0;
while (true) {
byte nextByte = buffer.readByte();
if (nextByte == HttpConstants.CR) {
nextByte = buffer.readByte();
if (nextByte == HttpConstants.LF) {
return sb;
}
} else if (nextByte == HttpConstants.LF) {
return sb;
} else {
if (lineLength >= maxLineLength) {
// TODO: Respond with Bad Request and discard the traffic
// or close the connection.
// No need to notify the upstream handlers - just log.
// If decoding a response, just throw an exception.
throw new TooLongFrameException(
"An HTTP line is larger than " + maxLineLength +
" bytes.");
}
lineLength ++;
sb.append((char) nextByte);
}
}
}

private static String[] splitInitialLine(AppendableCharSequence sb) {
int aStart;
int aEnd;
Expand Down Expand Up @@ -729,4 +663,86 @@ private static int findEndOfString(CharSequence sb) {
}
return result;
}

private final class HeaderParser implements ByteBufProcessor {
private final AppendableCharSequence seq;

HeaderParser(AppendableCharSequence seq) {
this.seq = seq;
}

public AppendableCharSequence parse(ByteBuf buffer) {
seq.reset();
headerSize = 0;
int i = buffer.forEachByte(this);
buffer.readerIndex(i + 1);
return seq;
}

@Override
public boolean process(byte value) throws Exception {
char nextByte = (char) value;
headerSize++;
if (nextByte == HttpConstants.CR) {
return true;
}
if (nextByte == HttpConstants.LF) {
return false;
}

// Abort decoding if the header part is too large.
if (headerSize >= maxHeaderSize) {
// TODO: Respond with Bad Request and discard the traffic
// or close the connection.
// No need to notify the upstream handlers - just log.
// If decoding a response, just throw an exception.
throw new TooLongFrameException(
"HTTP header is larger than " +
maxHeaderSize + " bytes.");
}

seq.append(nextByte);
return true;
}
}

private final class LineParser implements ByteBufProcessor {
private final AppendableCharSequence seq;
private int size;

LineParser(AppendableCharSequence seq) {
this.seq = seq;
}

public AppendableCharSequence parse(ByteBuf buffer) {
seq.reset();
size = 0;
int i = buffer.forEachByte(this);
buffer.readerIndex(i + 1);
return seq;
}

@Override
public boolean process(byte value) throws Exception {
char nextByte = (char) value;
if (nextByte == HttpConstants.CR) {
return true;
} else if (nextByte == HttpConstants.LF) {
return false;
} else {
if (size >= maxInitialLineLength) {
// TODO: Respond with Bad Request and discard the traffic
// or close the connection.
// No need to notify the upstream handlers - just log.
// If decoding a response, just throw an exception.
throw new TooLongFrameException(
"An HTTP line is larger than " + maxInitialLineLength +
" bytes.");
}
size ++;
seq.append(nextByte);
return true;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ public void testPrematureClosureWithChunkedEncoding1() throws Exception {

// Close the connection without sending anything.
ch.finish();

// The decoder should not generate the last chunk because it's closed prematurely.
assertThat(ch.readInbound(), is(nullValue()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ public int bytesBefore(int index, int length, byte value) {
@Override
public int forEachByte(ByteBufProcessor processor) {
int ret = buffer.forEachByte(processor);
if (!terminated && ret < 0) {
if (ret < 0) {
throw REPLAY;
} else {
return ret;
Expand Down

0 comments on commit 1f04936

Please sign in to comment.