Skip to content

Commit

Permalink
Implemented LZF compression codec
Browse files Browse the repository at this point in the history
Motivation:

LZF compression codec provides sending and receiving data encoded by very fast LZF algorithm.

Modifications:

- Added Compress-LZF library which implements LZF algorithm
- Implemented LzfEncoder which extends MessageToByteEncoder and provides compression of outgoing messages
- Added tests to verify the LzfEncoder and how it can compress data for the next uncompression using the original library
- Implemented LzfDecoder which extends ByteToMessageDecoder and provides uncompression of incoming messages
- Added tests to verify the LzfDecoder and how it can uncompress data after compression using the original library
- Added integration tests for LzfEncoder/Decoder

Result:

Full LZF compression codec which can compress/uncompress data using LZF algorithm.
  • Loading branch information
idelpivnitskiy authored and Norman Maurer committed Jul 17, 2014
1 parent d8a7842 commit 2b37b69
Show file tree
Hide file tree
Showing 9 changed files with 760 additions and 0 deletions.
8 changes: 8 additions & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ pure Java, which can be obtained at:
* HOMEPAGE:
* http://www.jcraft.com/jzlib/

This product optionally depends on 'Compress-LZF', a Java library for encoding and
decoding data in LZF format, written by Tatu Saloranta. It can be obtained at:

* LICENSE:
* license/LICENSE.compress-lzf.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/ning/compress

This product optionally depends on 'Protocol Buffers', Google's data
interchange format, which can be obtained at:

Expand Down
5 changes: 5 additions & 0 deletions codec/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@
<artifactId>jzlib</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.ning</groupId>
<artifactId>compress-lzf</artifactId>
<optional>true</optional>
</dependency>

<!-- Test dependencies for jboss marshalling encoder/decoder -->
<dependency>
Expand Down
179 changes: 179 additions & 0 deletions codec/src/main/java/io/netty/handler/codec/compression/LzfDecoder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;

import com.ning.compress.BufferRecycler;
import com.ning.compress.lzf.ChunkDecoder;
import com.ning.compress.lzf.util.ChunkDecoderFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

import static com.ning.compress.lzf.LZFChunk.BYTE_Z;
import static com.ning.compress.lzf.LZFChunk.BYTE_V;
import static com.ning.compress.lzf.LZFChunk.MAX_HEADER_LEN;
import static com.ning.compress.lzf.LZFChunk.HEADER_LEN_COMPRESSED;
import static com.ning.compress.lzf.LZFChunk.HEADER_LEN_NOT_COMPRESSED;
import static com.ning.compress.lzf.LZFChunk.BLOCK_TYPE_NON_COMPRESSED;
import static com.ning.compress.lzf.LZFChunk.BLOCK_TYPE_COMPRESSED;

/**
* Uncompresses a {@link ByteBuf} encoded with the LZF format.
*
* See original <a href="http://oldhome.schmorp.de/marc/liblzf.html">LZF package</a>
* and <a href="https://github.com/ning/compress/wiki/LZFFormat">LZF format</a> for full description.
*/
public class LzfDecoder extends ByteToMessageDecoder {
/**
* A brief signature for content auto-detection.
*/
private static final short SIGNATURE_OF_CHUNK = BYTE_Z << 8 | BYTE_V;

/**
* Offset to the "Type" in chunk header.
*/
private static final int TYPE_OFFSET = 2;

/**
* Offset to the "ChunkLength" in chunk header.
*/
private static final int CHUNK_LENGTH_OFFSET = 3;

/**
* Offset to the "OriginalLength" in chunk header.
*/
private static final int ORIGINAL_LENGTH_OFFSET = 5;

/**
* Underlying decoder in use.
*/
private final ChunkDecoder decoder;

/**
* Object that handles details of buffer recycling.
*/
private final BufferRecycler recycler;

/**
* Determines the state of flow.
*/
private boolean corrupted;

/**
* Creates a new LZF decoder with the most optimal available methods for underlying data access.
* It will "unsafe" instance if one can be used on current JVM.
* It should be safe to call this constructor as implementations are dynamically loaded; however, on some
* non-standard platforms it may be necessary to use {@link #LzfDecoder(boolean)} with {@code true} param.
*/
public LzfDecoder() {
this(false);
}

/**
* Creates a new LZF decoder with specified decoding instance.
*
* @param safeInstance
* If {@code true} decoder will use {@link ChunkDecoder} that only uses standard JDK access methods,
* and should work on all Java platforms and JVMs.
* Otherwise decoder will try to use highly optimized {@link ChunkDecoder} implementation that uses
* Sun JDK's {@link sun.misc.Unsafe} class (which may be included by other JDK's as well).
*/
public LzfDecoder(boolean safeInstance) {
decoder = safeInstance ?
ChunkDecoderFactory.safeInstance()
: ChunkDecoderFactory.optimalInstance();

recycler = BufferRecycler.instance();
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
for (;;) {
if (corrupted) {
in.skipBytes(in.readableBytes());
return;
}

if (in.readableBytes() < HEADER_LEN_NOT_COMPRESSED) {
return;
}
final int idx = in.readerIndex();
final int type = in.getByte(idx + TYPE_OFFSET);
final int chunkLength = in.getUnsignedShort(idx + CHUNK_LENGTH_OFFSET);
final int totalLength = (type == BLOCK_TYPE_NON_COMPRESSED ?
HEADER_LEN_NOT_COMPRESSED : MAX_HEADER_LEN) + chunkLength;
if (in.readableBytes() < totalLength) {
return;
}

try {
if (in.getUnsignedShort(idx) != SIGNATURE_OF_CHUNK) {
throw new DecompressionException("Unexpected signature of chunk");
}
switch (type) {
case BLOCK_TYPE_NON_COMPRESSED: {
in.skipBytes(HEADER_LEN_NOT_COMPRESSED);
out.add(in.readBytes(chunkLength));
break;
}
case BLOCK_TYPE_COMPRESSED: {
final int originalLength = in.getUnsignedShort(idx + ORIGINAL_LENGTH_OFFSET);

final byte[] inputArray;
final int inPos;
if (in.hasArray()) {
inputArray = in.array();
inPos = in.arrayOffset() + idx + HEADER_LEN_COMPRESSED;
} else {
inputArray = recycler.allocInputBuffer(chunkLength);
in.getBytes(idx + HEADER_LEN_COMPRESSED, inputArray, 0, chunkLength);
inPos = 0;
}

ByteBuf uncompressed = ctx.alloc().heapBuffer(originalLength, originalLength);
final byte[] outputArray = uncompressed.array();
final int outPos = uncompressed.arrayOffset();

boolean success = false;
try {
decoder.decodeChunk(inputArray, inPos, outputArray, outPos, outPos + originalLength);
uncompressed.writerIndex(uncompressed.writerIndex() + originalLength);
out.add(uncompressed);
in.skipBytes(totalLength);
success = true;
} finally {
if (!success) {
uncompressed.release();
}
}

if (!in.hasArray()) {
recycler.releaseInputBuffer(inputArray);
}
break;
}
default:
throw new DecompressionException("Unknown type of chunk: " + type + " (expected: 0 or 1)");
}
} catch (Exception e) {
corrupted = true;
throw e;
}
}
}
}
140 changes: 140 additions & 0 deletions codec/src/main/java/io/netty/handler/codec/compression/LzfEncoder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;

import com.ning.compress.BufferRecycler;
import com.ning.compress.lzf.ChunkEncoder;
import com.ning.compress.lzf.LZFEncoder;
import com.ning.compress.lzf.util.ChunkEncoderFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

import static com.ning.compress.lzf.LZFChunk.*;

/**
* Compresses a {@link ByteBuf} using the LZF format.
*
* See original <a href="http://oldhome.schmorp.de/marc/liblzf.html">LZF package</a>
* and <a href="https://github.com/ning/compress/wiki/LZFFormat">LZF format</a> for full description.
*/
public class LzfEncoder extends MessageToByteEncoder<ByteBuf> {
/**
* Minimum block size ready for compression. Blocks with length
* less than {@link #MIN_BLOCK_TO_COMPRESS} will write as uncompressed.
*/
private static final int MIN_BLOCK_TO_COMPRESS = 16;

/**
* Underlying decoder in use.
*/
private final ChunkEncoder encoder;

/**
* Object that handles details of buffer recycling.
*/
private final BufferRecycler recycler;

/**
* Creates a new LZF encoder with the most optimal available methods for underlying data access.
* It will "unsafe" instance if one can be used on current JVM.
* It should be safe to call this constructor as implementations are dynamically loaded; however, on some
* non-standard platforms it may be necessary to use {@link #LzfEncoder(boolean)} with {@code true} param.
*/
public LzfEncoder() {
this(false, MAX_CHUNK_LEN);
}

/**
* Creates a new LZF encoder with specified encoding instance.
*
* @param safeInstance
* If {@code true} encoder will use {@link ChunkEncoder} that only uses standard JDK access methods,
* and should work on all Java platforms and JVMs.
* Otherwise encoder will try to use highly optimized {@link ChunkEncoder} implementation that uses
* Sun JDK's {@link sun.misc.Unsafe} class (which may be included by other JDK's as well).
*/
public LzfEncoder(boolean safeInstance) {
this(safeInstance, MAX_CHUNK_LEN);
}

/**
* Creates a new LZF encoder with specified total length of encoded chunk. You can configure it to encode
* your data flow more efficient if you know the avarage size of messages that you send.
*
* @param totalLength
* Expected total length of content to compress; only matters for outgoing messages that is smaller
* than maximum chunk size (64k), to optimize encoding hash tables.
*/
public LzfEncoder(int totalLength) {
this(false, totalLength);
}

/**
* Creates a new LZF encoder with specified settings.
*
* @param safeInstance
* If {@code true} encoder will use {@link ChunkEncoder} that only uses standard JDK access methods,
* and should work on all Java platforms and JVMs.
* Otherwise encoder will try to use highly optimized {@link ChunkEncoder} implementation that uses
* Sun JDK's {@link sun.misc.Unsafe} class (which may be included by other JDK's as well).
* @param totalLength
* Expected total length of content to compress; only matters for outgoing messages that is smaller
* than maximum chunk size (64k), to optimize encoding hash tables.
*/
public LzfEncoder(boolean safeInstance, int totalLength) {
super(false);
if (totalLength < MIN_BLOCK_TO_COMPRESS || totalLength > MAX_CHUNK_LEN) {
throw new IllegalArgumentException("totalLength: " + totalLength +
" (expected: " + MIN_BLOCK_TO_COMPRESS + '-' + MAX_CHUNK_LEN + ')');
}

encoder = safeInstance ?
ChunkEncoderFactory.safeNonAllocatingInstance(totalLength)
: ChunkEncoderFactory.optimalNonAllocatingInstance(totalLength);

recycler = BufferRecycler.instance();
}

@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
final int length = in.readableBytes();
final int idx = in.readerIndex();
final byte[] input;
final int inputPtr;
if (in.hasArray()) {
input = in.array();
inputPtr = in.arrayOffset() + idx;
} else {
input = recycler.allocInputBuffer(length);
in.getBytes(idx, input, 0, length);
inputPtr = 0;
}

final int maxOutputLength = LZFEncoder.estimateMaxWorkspaceSize(length);
out.ensureWritable(maxOutputLength);
final byte[] output = out.array();
final int outputPtr = out.arrayOffset() + out.writerIndex();
final int outputLength = LZFEncoder.appendEncoded(encoder,
input, inputPtr, length, output, outputPtr) - outputPtr;
out.writerIndex(out.writerIndex() + outputLength);
in.skipBytes(length);

if (!in.hasArray()) {
recycler.releaseInputBuffer(input);
}
}
}
Loading

0 comments on commit 2b37b69

Please sign in to comment.