Skip to content

Commit

Permalink
Fix brotli compression during close (netty#13165)
Browse files Browse the repository at this point in the history
Motivation:

Brotli compressed http responses are not properly encoded.

Modification:

- change BrotliEncoder behavior so that the "finish" operation is performed before the channel is closed
- add new Brotli unit tests

Result:

All brotli tests pass.



Co-authored-by: Norman Maurer <[email protected]>
  • Loading branch information
sullis and normanmaurer authored Feb 2, 2023
1 parent 0710751 commit 48adb26
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
*/
package io.netty.handler.codec.http;

import com.aayushatharva.brotli4j.decoder.DecoderJNI;
import com.aayushatharva.brotli4j.decoder.DirectDecompress;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -34,15 +37,19 @@
import io.netty.channel.local.LocalServerChannel;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.compression.Brotli;
import io.netty.handler.codec.compression.CompressionOptions;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;

import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIf;

import java.nio.charset.StandardCharsets;

Expand Down Expand Up @@ -774,6 +781,91 @@ public void testCustomEncoding() throws Exception {
assertTrue(ch.finishAndReleaseAll());
}

static boolean isBrotliAvailable() {
return Brotli.isAvailable();
}

@Test
@EnabledIf("isBrotliAvailable")
public void testBrotliFullHttpResponse() throws Exception {
HttpContentCompressor compressor = new HttpContentCompressor((CompressionOptions[]) null);
EmbeddedChannel ch = new EmbeddedChannel(compressor);
assertTrue(ch.writeInbound(newBrotliRequest()));

FullHttpResponse res = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
Unpooled.copiedBuffer("Hello Hello Hello Hello Hello", CharsetUtil.US_ASCII));
int len = res.content().readableBytes();
res.headers().set(HttpHeaderNames.CONTENT_LENGTH, len);
res.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
assertTrue(ch.writeOutbound(res));

DefaultHttpResponse response = ch.readOutbound();
assertEquals(String.valueOf(19), response.headers().get(HttpHeaderNames.CONTENT_LENGTH));
assertEquals("text/plain", response.headers().get(HttpHeaderNames.CONTENT_TYPE));
assertEquals("br", response.headers().get(HttpHeaderNames.CONTENT_ENCODING));

CompositeByteBuf contentBuf = Unpooled.compositeBuffer();
HttpContent content;
while ((content = ch.readOutbound()) != null) {
if (content.content().isReadable()) {
contentBuf.addComponent(true, content.content());
} else {
content.content().release();
}
}

DirectDecompress decompressResult = DirectDecompress.decompress(ByteBufUtil.getBytes(contentBuf));
assertEquals(DecoderJNI.Status.DONE, decompressResult.getResultStatus());
assertEquals("Hello Hello Hello Hello Hello",
new String(decompressResult.getDecompressedData(), CharsetUtil.US_ASCII));

assertTrue(ch.finishAndReleaseAll());
contentBuf.release();
}

@Test
@EnabledIf("isBrotliAvailable")
public void testBrotliChunkedContent() throws Exception {
HttpContentCompressor compressor = new HttpContentCompressor((CompressionOptions[]) null);
EmbeddedChannel ch = new EmbeddedChannel(compressor);
assertTrue(ch.writeInbound(newBrotliRequest()));

HttpResponse res = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
res.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
res.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
ch.writeOutbound(res);

HttpResponse outboundRes = ch.readOutbound();
assertThat(outboundRes, is(not(instanceOf(HttpContent.class))));
assertThat(outboundRes.headers().get(HttpHeaderNames.TRANSFER_ENCODING), is("chunked"));
assertThat(outboundRes.headers().get(HttpHeaderNames.CONTENT_LENGTH), is(nullValue()));
assertThat(outboundRes.headers().get(HttpHeaderNames.CONTENT_ENCODING), is("br"));
assertThat(outboundRes.headers().get(HttpHeaderNames.CONTENT_TYPE), is("text/plain"));

ch.writeOutbound(new DefaultHttpContent(Unpooled.copiedBuffer("Hell", CharsetUtil.US_ASCII)));
ch.writeOutbound(new DefaultHttpContent(Unpooled.copiedBuffer("o world. Hello w", CharsetUtil.US_ASCII)));
ch.writeOutbound(new DefaultLastHttpContent(Unpooled.copiedBuffer("orld.", CharsetUtil.US_ASCII)));

CompositeByteBuf contentBuf = Unpooled.compositeBuffer();
HttpContent content;
while ((content = ch.readOutbound()) != null) {
if (content.content().isReadable()) {
contentBuf.addComponent(true, content.content());
} else {
content.content().release();
}
}

DirectDecompress decompressResult = DirectDecompress.decompress(ByteBufUtil.getBytes(contentBuf));
assertEquals(DecoderJNI.Status.DONE, decompressResult.getResultStatus());
assertEquals("Hello world. Hello world.",
new String(decompressResult.getDecompressedData(), CharsetUtil.US_ASCII));

assertTrue(ch.finishAndReleaseAll());
contentBuf.release();
}

@Test
public void testCompressThresholdAllCompress() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new HttpContentCompressor());
Expand Down Expand Up @@ -859,6 +951,12 @@ private static FullHttpRequest newRequest() {
return req;
}

private static FullHttpRequest newBrotliRequest() {
FullHttpRequest req = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
req.headers().set(HttpHeaderNames.ACCEPT_ENCODING, "br");
return req;
}

private static void assertEncodedResponse(EmbeddedChannel ch) {
Object o = ch.readOutbound();
assertThat(o, is(instanceOf(HttpResponse.class)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
Expand Down Expand Up @@ -172,6 +171,10 @@ public boolean isSharable() {
* @throws IOException If an error occurred during closure
*/
public void finish(ChannelHandlerContext ctx) throws IOException {
finishEncode(ctx, ctx.newPromise());
}

private ChannelFuture finishEncode(ChannelHandlerContext ctx, ChannelPromise promise) throws IOException {
Writer writer;

if (isSharable) {
Expand All @@ -184,6 +187,13 @@ public void finish(ChannelHandlerContext ctx) throws IOException {
writer.close();
this.writer = null;
}
return promise;
}

@Override
public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
ChannelFuture f = finishEncode(ctx, ctx.newPromise());
EncoderUtil.closeAfterFinishEncode(ctx, f, promise);
}

/**
Expand Down

0 comments on commit 48adb26

Please sign in to comment.