diff --git a/NOTICE.txt b/NOTICE.txt index f30b1f531d72..9d1757c46b08 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -97,6 +97,14 @@ pure Java, which can be obtained at: * HOMEPAGE: * http://www.jcraft.com/jzlib/ +This product optionally depends on 'Compress-LZF', a Java library for encoding and +decoding data in LZF format, written by Tatu Saloranta. It can be obtained at: + + * LICENSE: + * license/LICENSE.compress-lzf.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/ning/compress + This product optionally depends on 'Protocol Buffers', Google's data interchange format, which can be obtained at: diff --git a/codec/pom.xml b/codec/pom.xml index 95109f48f49c..baa04fe80347 100644 --- a/codec/pom.xml +++ b/codec/pom.xml @@ -49,6 +49,11 @@ jzlib true + + com.ning + compress-lzf + true + diff --git a/codec/src/main/java/io/netty/handler/codec/compression/LzfDecoder.java b/codec/src/main/java/io/netty/handler/codec/compression/LzfDecoder.java new file mode 100644 index 000000000000..2d65ef7b1a66 --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/compression/LzfDecoder.java @@ -0,0 +1,179 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.compression; + +import com.ning.compress.BufferRecycler; +import com.ning.compress.lzf.ChunkDecoder; +import com.ning.compress.lzf.util.ChunkDecoderFactory; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +import static com.ning.compress.lzf.LZFChunk.BYTE_Z; +import static com.ning.compress.lzf.LZFChunk.BYTE_V; +import static com.ning.compress.lzf.LZFChunk.MAX_HEADER_LEN; +import static com.ning.compress.lzf.LZFChunk.HEADER_LEN_COMPRESSED; +import static com.ning.compress.lzf.LZFChunk.HEADER_LEN_NOT_COMPRESSED; +import static com.ning.compress.lzf.LZFChunk.BLOCK_TYPE_NON_COMPRESSED; +import static com.ning.compress.lzf.LZFChunk.BLOCK_TYPE_COMPRESSED; + +/** + * Uncompresses a {@link ByteBuf} encoded with the LZF format. + * + * See original LZF package + * and LZF format for full description. + */ +public class LzfDecoder extends ByteToMessageDecoder { + /** + * A brief signature for content auto-detection. + */ + private static final short SIGNATURE_OF_CHUNK = BYTE_Z << 8 | BYTE_V; + + /** + * Offset to the "Type" in chunk header. + */ + private static final int TYPE_OFFSET = 2; + + /** + * Offset to the "ChunkLength" in chunk header. + */ + private static final int CHUNK_LENGTH_OFFSET = 3; + + /** + * Offset to the "OriginalLength" in chunk header. + */ + private static final int ORIGINAL_LENGTH_OFFSET = 5; + + /** + * Underlying decoder in use. + */ + private final ChunkDecoder decoder; + + /** + * Object that handles details of buffer recycling. + */ + private final BufferRecycler recycler; + + /** + * Determines the state of flow. + */ + private boolean corrupted; + + /** + * Creates a new LZF decoder with the most optimal available methods for underlying data access. + * It will "unsafe" instance if one can be used on current JVM. + * It should be safe to call this constructor as implementations are dynamically loaded; however, on some + * non-standard platforms it may be necessary to use {@link #LzfDecoder(boolean)} with {@code true} param. + */ + public LzfDecoder() { + this(false); + } + + /** + * Creates a new LZF decoder with specified decoding instance. + * + * @param safeInstance + * If {@code true} decoder will use {@link ChunkDecoder} that only uses standard JDK access methods, + * and should work on all Java platforms and JVMs. + * Otherwise decoder will try to use highly optimized {@link ChunkDecoder} implementation that uses + * Sun JDK's {@link sun.misc.Unsafe} class (which may be included by other JDK's as well). + */ + public LzfDecoder(boolean safeInstance) { + decoder = safeInstance ? + ChunkDecoderFactory.safeInstance() + : ChunkDecoderFactory.optimalInstance(); + + recycler = BufferRecycler.instance(); + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + for (;;) { + if (corrupted) { + in.skipBytes(in.readableBytes()); + return; + } + + if (in.readableBytes() < HEADER_LEN_NOT_COMPRESSED) { + return; + } + final int idx = in.readerIndex(); + final int type = in.getByte(idx + TYPE_OFFSET); + final int chunkLength = in.getUnsignedShort(idx + CHUNK_LENGTH_OFFSET); + final int totalLength = (type == BLOCK_TYPE_NON_COMPRESSED ? + HEADER_LEN_NOT_COMPRESSED : MAX_HEADER_LEN) + chunkLength; + if (in.readableBytes() < totalLength) { + return; + } + + try { + if (in.getUnsignedShort(idx) != SIGNATURE_OF_CHUNK) { + throw new DecompressionException("Unexpected signature of chunk"); + } + switch (type) { + case BLOCK_TYPE_NON_COMPRESSED: { + in.skipBytes(HEADER_LEN_NOT_COMPRESSED); + out.add(in.readBytes(chunkLength)); + break; + } + case BLOCK_TYPE_COMPRESSED: { + final int originalLength = in.getUnsignedShort(idx + ORIGINAL_LENGTH_OFFSET); + + final byte[] inputArray; + final int inPos; + if (in.hasArray()) { + inputArray = in.array(); + inPos = in.arrayOffset() + idx + HEADER_LEN_COMPRESSED; + } else { + inputArray = recycler.allocInputBuffer(chunkLength); + in.getBytes(idx + HEADER_LEN_COMPRESSED, inputArray, 0, chunkLength); + inPos = 0; + } + + ByteBuf uncompressed = ctx.alloc().heapBuffer(originalLength, originalLength); + final byte[] outputArray = uncompressed.array(); + final int outPos = uncompressed.arrayOffset(); + + boolean success = false; + try { + decoder.decodeChunk(inputArray, inPos, outputArray, outPos, outPos + originalLength); + uncompressed.writerIndex(uncompressed.writerIndex() + originalLength); + out.add(uncompressed); + in.skipBytes(totalLength); + success = true; + } finally { + if (!success) { + uncompressed.release(); + } + } + + if (!in.hasArray()) { + recycler.releaseInputBuffer(inputArray); + } + break; + } + default: + throw new DecompressionException("Unknown type of chunk: " + type + " (expected: 0 or 1)"); + } + } catch (Exception e) { + corrupted = true; + throw e; + } + } + } +} diff --git a/codec/src/main/java/io/netty/handler/codec/compression/LzfEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/LzfEncoder.java new file mode 100644 index 000000000000..827ac99a3e23 --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/compression/LzfEncoder.java @@ -0,0 +1,140 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.compression; + +import com.ning.compress.BufferRecycler; +import com.ning.compress.lzf.ChunkEncoder; +import com.ning.compress.lzf.LZFEncoder; +import com.ning.compress.lzf.util.ChunkEncoderFactory; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + +import static com.ning.compress.lzf.LZFChunk.*; + +/** + * Compresses a {@link ByteBuf} using the LZF format. + * + * See original LZF package + * and LZF format for full description. + */ +public class LzfEncoder extends MessageToByteEncoder { + /** + * Minimum block size ready for compression. Blocks with length + * less than {@link #MIN_BLOCK_TO_COMPRESS} will write as uncompressed. + */ + private static final int MIN_BLOCK_TO_COMPRESS = 16; + + /** + * Underlying decoder in use. + */ + private final ChunkEncoder encoder; + + /** + * Object that handles details of buffer recycling. + */ + private final BufferRecycler recycler; + + /** + * Creates a new LZF encoder with the most optimal available methods for underlying data access. + * It will "unsafe" instance if one can be used on current JVM. + * It should be safe to call this constructor as implementations are dynamically loaded; however, on some + * non-standard platforms it may be necessary to use {@link #LzfEncoder(boolean)} with {@code true} param. + */ + public LzfEncoder() { + this(false, MAX_CHUNK_LEN); + } + + /** + * Creates a new LZF encoder with specified encoding instance. + * + * @param safeInstance + * If {@code true} encoder will use {@link ChunkEncoder} that only uses standard JDK access methods, + * and should work on all Java platforms and JVMs. + * Otherwise encoder will try to use highly optimized {@link ChunkEncoder} implementation that uses + * Sun JDK's {@link sun.misc.Unsafe} class (which may be included by other JDK's as well). + */ + public LzfEncoder(boolean safeInstance) { + this(safeInstance, MAX_CHUNK_LEN); + } + + /** + * Creates a new LZF encoder with specified total length of encoded chunk. You can configure it to encode + * your data flow more efficient if you know the avarage size of messages that you send. + * + * @param totalLength + * Expected total length of content to compress; only matters for outgoing messages that is smaller + * than maximum chunk size (64k), to optimize encoding hash tables. + */ + public LzfEncoder(int totalLength) { + this(false, totalLength); + } + + /** + * Creates a new LZF encoder with specified settings. + * + * @param safeInstance + * If {@code true} encoder will use {@link ChunkEncoder} that only uses standard JDK access methods, + * and should work on all Java platforms and JVMs. + * Otherwise encoder will try to use highly optimized {@link ChunkEncoder} implementation that uses + * Sun JDK's {@link sun.misc.Unsafe} class (which may be included by other JDK's as well). + * @param totalLength + * Expected total length of content to compress; only matters for outgoing messages that is smaller + * than maximum chunk size (64k), to optimize encoding hash tables. + */ + public LzfEncoder(boolean safeInstance, int totalLength) { + super(false); + if (totalLength < MIN_BLOCK_TO_COMPRESS || totalLength > MAX_CHUNK_LEN) { + throw new IllegalArgumentException("totalLength: " + totalLength + + " (expected: " + MIN_BLOCK_TO_COMPRESS + '-' + MAX_CHUNK_LEN + ')'); + } + + encoder = safeInstance ? + ChunkEncoderFactory.safeNonAllocatingInstance(totalLength) + : ChunkEncoderFactory.optimalNonAllocatingInstance(totalLength); + + recycler = BufferRecycler.instance(); + } + + @Override + protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception { + final int length = in.readableBytes(); + final int idx = in.readerIndex(); + final byte[] input; + final int inputPtr; + if (in.hasArray()) { + input = in.array(); + inputPtr = in.arrayOffset() + idx; + } else { + input = recycler.allocInputBuffer(length); + in.getBytes(idx, input, 0, length); + inputPtr = 0; + } + + final int maxOutputLength = LZFEncoder.estimateMaxWorkspaceSize(length); + out.ensureWritable(maxOutputLength); + final byte[] output = out.array(); + final int outputPtr = out.arrayOffset() + out.writerIndex(); + final int outputLength = LZFEncoder.appendEncoded(encoder, + input, inputPtr, length, output, outputPtr) - outputPtr; + out.writerIndex(out.writerIndex() + outputLength); + in.skipBytes(length); + + if (!in.hasArray()) { + recycler.releaseInputBuffer(input); + } + } +} diff --git a/codec/src/test/java/io/netty/handler/codec/compression/LzfDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/compression/LzfDecoderTest.java new file mode 100644 index 000000000000..e959cf02da12 --- /dev/null +++ b/codec/src/test/java/io/netty/handler/codec/compression/LzfDecoderTest.java @@ -0,0 +1,148 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.compression; + +import com.ning.compress.lzf.LZFEncoder; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.util.internal.ThreadLocalRandom; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import static com.ning.compress.lzf.LZFChunk.BYTE_Z; +import static com.ning.compress.lzf.LZFChunk.BYTE_V; +import static com.ning.compress.lzf.LZFChunk.BLOCK_TYPE_NON_COMPRESSED; +import static org.junit.Assert.*; + +public class LzfDecoderTest { + + private static final ThreadLocalRandom rand; + + private static final byte[] BYTES_SMALL = new byte[256]; + private static final byte[] BYTES_LARGE = new byte[256000]; + + static { + rand = ThreadLocalRandom.current(); + //fill arrays with compressible data + for (int i = 0; i < BYTES_SMALL.length; i++) { + BYTES_SMALL[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt(); + } + for (int i = 0; i < BYTES_LARGE.length; i++) { + BYTES_LARGE[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt(); + } + } + + @Rule + public ExpectedException expected = ExpectedException.none(); + + private EmbeddedChannel channel; + + @Before + public void initChannel() { + channel = new EmbeddedChannel(new LzfDecoder()); + } + + @Test + public void testUnexpectedSignatureOfChunk() throws Exception { + expected.expect(DecompressionException.class); + expected.expectMessage("Unexpected signature of chunk"); + + ByteBuf in = Unpooled.buffer(); + in.writeShort(0x1234); //random value + in.writeByte(BLOCK_TYPE_NON_COMPRESSED); + in.writeShort(0); + + channel.writeInbound(in); + } + + @Test + public void testUnknownTypeOfChunk() throws Exception { + expected.expect(DecompressionException.class); + expected.expectMessage("Unknown type of chunk"); + + ByteBuf in = Unpooled.buffer(); + in.writeByte(BYTE_Z); + in.writeByte(BYTE_V); + in.writeByte(0xFF); //random value + in.writeInt(0); + + channel.writeInbound(in); + } + + private static void testDecompression(final EmbeddedChannel channel, final byte[] data) throws Exception { + byte[] compressedArray = LZFEncoder.encode(data); + ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray); + + channel.writeInbound(compressed); + + ByteBuf uncompressed = readUncompressed(channel); + ByteBuf dataBuf = Unpooled.wrappedBuffer(data); + + assertEquals(dataBuf, uncompressed); + + uncompressed.release(); + dataBuf.release(); + } + + @Test + public void testDecompressionOfSmallChunkOfData() throws Exception { + testDecompression(channel, BYTES_SMALL); + } + + @Test + public void testDecompressionOfLargeChunkOfData() throws Exception { + testDecompression(channel, BYTES_LARGE); + } + + @Test + public void testDecompressionOfBatchedFlowOfData() throws Exception { + final byte[] data = BYTES_LARGE; + + byte[] compressedArray = LZFEncoder.encode(data); + int written = 0, length = rand.nextInt(100); + while (written + length < compressedArray.length) { + ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, length); + channel.writeInbound(compressed); + written += length; + length = rand.nextInt(100); + } + ByteBuf compressed = Unpooled.wrappedBuffer(compressedArray, written, compressedArray.length - written); + channel.writeInbound(compressed); + + ByteBuf uncompressed = readUncompressed(channel); + ByteBuf dataBuf = Unpooled.wrappedBuffer(data); + + assertEquals(dataBuf, uncompressed); + + uncompressed.release(); + dataBuf.release(); + } + + private static ByteBuf readUncompressed(EmbeddedChannel channel) throws Exception { + CompositeByteBuf uncompressed = Unpooled.compositeBuffer(); + ByteBuf msg; + while ((msg = channel.readInbound()) != null) { + uncompressed.addComponent(msg); + uncompressed.writerIndex(uncompressed.writerIndex() + msg.readableBytes()); + } + + return uncompressed; + } +} diff --git a/codec/src/test/java/io/netty/handler/codec/compression/LzfEncoderTest.java b/codec/src/test/java/io/netty/handler/codec/compression/LzfEncoderTest.java new file mode 100644 index 000000000000..5fc0299941ad --- /dev/null +++ b/codec/src/test/java/io/netty/handler/codec/compression/LzfEncoderTest.java @@ -0,0 +1,108 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.compression; + +import com.ning.compress.lzf.LZFDecoder; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.util.internal.ThreadLocalRandom; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class LzfEncoderTest { + + private static final ThreadLocalRandom rand; + + private static final byte[] BYTES_SMALL = new byte[256]; + private static final byte[] BYTES_LARGE = new byte[256000]; + + static { + rand = ThreadLocalRandom.current(); + //fill arrays with compressible data + for (int i = 0; i < BYTES_SMALL.length; i++) { + BYTES_SMALL[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt(); + } + for (int i = 0; i < BYTES_LARGE.length; i++) { + BYTES_LARGE[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt(); + } + } + + private EmbeddedChannel channel; + + @Before + public void initChannel() { + channel = new EmbeddedChannel(new LzfEncoder()); + } + + private static void testCompression(final EmbeddedChannel channel, final byte[] data) throws Exception { + ByteBuf in = Unpooled.wrappedBuffer(data); + channel.writeOutbound(in); + channel.finish(); + + final byte[] uncompressed = uncompress(channel); + + assertArrayEquals(data, uncompressed); + } + + @Test + public void testCompressionOfSmallChunkOfData() throws Exception { + testCompression(channel, BYTES_SMALL); + } + + @Test + public void testCompressionOfLargeChunkOfData() throws Exception { + testCompression(channel, BYTES_LARGE); + } + + @Test + public void testCompressionOfBatchedFlowOfData() throws Exception { + final byte[] data = BYTES_LARGE; + + int written = 0, length = rand.nextInt(100); + while (written + length < data.length) { + ByteBuf in = Unpooled.wrappedBuffer(data, written, length); + channel.writeOutbound(in); + written += length; + length = rand.nextInt(100); + } + ByteBuf in = Unpooled.wrappedBuffer(data, written, data.length - written); + channel.writeOutbound(in); + channel.finish(); + + final byte[] uncompressed = uncompress(channel); + + assertArrayEquals(data, uncompressed); + } + + private static byte[] uncompress(EmbeddedChannel channel) throws Exception { + CompositeByteBuf out = Unpooled.compositeBuffer(); + ByteBuf msg; + while ((msg = channel.readOutbound()) != null) { + out.addComponent(msg); + out.writerIndex(out.writerIndex() + msg.readableBytes()); + } + + byte[] compressed = new byte[out.readableBytes()]; + out.readBytes(compressed); + out.release(); + + return LZFDecoder.decode(compressed); + } +} diff --git a/codec/src/test/java/io/netty/handler/codec/compression/LzfIntegrationTest.java b/codec/src/test/java/io/netty/handler/codec/compression/LzfIntegrationTest.java new file mode 100644 index 000000000000..a4f8fd1e717b --- /dev/null +++ b/codec/src/test/java/io/netty/handler/codec/compression/LzfIntegrationTest.java @@ -0,0 +1,156 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.compression; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.internal.ThreadLocalRandom; +import org.junit.Test; + +import java.util.Arrays; + +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.*; + +public class LzfIntegrationTest { + + private static final ThreadLocalRandom rand = ThreadLocalRandom.current(); + + public static final byte[] EMPTY = new byte[0]; + + @Test + public void testEmpty() throws Exception { + testIdentity(EMPTY); + } + + @Test + public void testOneByte() throws Exception { + testIdentity(new byte[] { 'A' }); + } + + @Test + public void testTwoBytes() throws Exception { + testIdentity(new byte[] { 'B', 'A' }); + } + + @Test + public void testRegular() throws Exception { + byte[] data = ("Netty is a NIO client server framework which enables quick and easy development " + + "of network applications such as protocol servers and clients.").getBytes(); + testIdentity(data); + } + + @Test + public void testLargeRandom() throws Exception { + byte[] data = new byte[1048576]; + rand.nextBytes(data); + testIdentity(data); + } + + @Test + public void testPartRandom() throws Exception { + byte[] data = new byte[12345]; + rand.nextBytes(data); + for (int i = 0; i < 1024; i++) { + data[i] = 123; + } + testIdentity(data); + } + + @Test + public void testCompressible() throws Exception { + byte[] data = new byte[10000]; + for (int i = 0; i < data.length; i++) { + data[i] = i % 4 != 0 ? 0 : (byte) rand.nextInt(); + } + testIdentity(data); + } + + @Test + public void testLongBlank() throws Exception { + byte[] data = new byte[100000]; + testIdentity(data); + } + + @Test + public void testLongSame() throws Exception { + byte[] data = new byte[100000]; + Arrays.fill(data, (byte) 123); + testIdentity(data); + } + + @Test + public void testSequential() throws Exception { + byte[] data = new byte[49]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) i; + } + testIdentity(data); + } + + private static void testIdentity(byte[] data) { + ByteBuf in = Unpooled.wrappedBuffer(data); + EmbeddedChannel encoder = new EmbeddedChannel(new LzfEncoder()); + EmbeddedChannel decoder = new EmbeddedChannel(new LzfDecoder()); + try { + ByteBuf msg; + + encoder.writeOutbound(in.copy()); + encoder.finish(); + CompositeByteBuf compressed = Unpooled.compositeBuffer(); + while ((msg = encoder.readOutbound()) != null) { + compressed.addComponent(msg); + compressed.writerIndex(compressed.writerIndex() + msg.readableBytes()); + } + assertThat(compressed, is(notNullValue())); + + decoder.writeInbound(compressed.retain()); + assertFalse(compressed.isReadable()); + CompositeByteBuf decompressed = Unpooled.compositeBuffer(); + while ((msg = decoder.readInbound()) != null) { + decompressed.addComponent(msg); + decompressed.writerIndex(decompressed.writerIndex() + msg.readableBytes()); + } + assertEquals(in, decompressed); + + compressed.release(); + decompressed.release(); + in.release(); + } finally { + encoder.close(); + decoder.close(); + + for (;;) { + Object msg = encoder.readOutbound(); + if (msg == null) { + break; + } + ReferenceCountUtil.release(msg); + } + + for (;;) { + Object msg = decoder.readInbound(); + if (msg == null) { + break; + } + ReferenceCountUtil.release(msg); + } + } + } +} diff --git a/license/LICENSE.compress-lzf.txt b/license/LICENSE.compress-lzf.txt new file mode 100644 index 000000000000..c5da4e134815 --- /dev/null +++ b/license/LICENSE.compress-lzf.txt @@ -0,0 +1,11 @@ +Copyright 2009-2010 Ning, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); you may not +use this file except in compliance with the License. You may obtain a copy of +the License at http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS,WITHOUT +WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +License for the specific language governing permissions and limitations under +the License. diff --git a/pom.xml b/pom.xml index e6a710171894..52bc98ff23b0 100644 --- a/pom.xml +++ b/pom.xml @@ -421,6 +421,11 @@ jzlib 1.1.2 + + com.ning + compress-lzf + 1.0.1 + org.rxtx