Skip to content

Commit

Permalink
apacheGH-40823: [Java] Adding Spotless to Compression module (apache#…
Browse files Browse the repository at this point in the history
…42060)

### Rationale for this change

Adding code style and formatting options for Compression module. 

### What changes are included in this PR?

Code formatting spotless plugin has been added. 

### Are these changes tested?

Yes, but doesn't involve test cases, the plugin itself corrects. 
* GitHub Issue: apache#40823

Authored-by: Vibhatha Abeykoon <[email protected]>
Signed-off-by: David Li <[email protected]>
  • Loading branch information
vibhatha authored Jun 11, 2024
1 parent 4df00fa commit 1a48366
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 153 deletions.
5 changes: 5 additions & 0 deletions java/compression/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ under the License.
<name>Arrow Compression</name>
<description>(Experimental/Contrib) A library for working with the compression/decompression of Arrow data.</description>

<properties>
<checkstyle.config.location>dev/checkstyle/checkstyle-spotless.xml</checkstyle.config.location>
<spotless.java.excludes>none</spotless.java.excludes>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.arrow</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.compression;

import org.apache.arrow.vector.compression.CompressionCodec;
Expand All @@ -23,7 +22,7 @@
/**
* Default implementation of factory supported LZ4 and ZSTD compression.
*
* // TODO(ARROW-12115): Rename this class.
* <p>// TODO(ARROW-12115): Rename this class.
*/
public class CommonsCompressionFactory implements CompressionCodec.Factory {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.compression;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.Preconditions;
Expand All @@ -32,42 +30,47 @@
import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
import org.apache.commons.compress.utils.IOUtils;

/**
* Compression codec for the LZ4 algorithm.
*/
/** Compression codec for the LZ4 algorithm. */
public class Lz4CompressionCodec extends AbstractCompressionCodec {

@Override
protected ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
"The uncompressed buffer size exceeds the integer limit %s.", Integer.MAX_VALUE);
Preconditions.checkArgument(
uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
"The uncompressed buffer size exceeds the integer limit %s.",
Integer.MAX_VALUE);

byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
uncompressedBuffer.getBytes(/*index=*/0, inBytes);
uncompressedBuffer.getBytes(/*index=*/ 0, inBytes);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (InputStream in = new ByteArrayInputStream(inBytes);
OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
IOUtils.copy(in, out);
} catch (IOException e) {
throw new RuntimeException(e);
}

byte[] outBytes = baos.toByteArray();

ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
ArrowBuf compressedBuffer =
allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
compressedBuffer.setBytes(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes);
compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
return compressedBuffer;
}

@Override
protected ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
"The compressed buffer size exceeds the integer limit %s", Integer.MAX_VALUE);
Preconditions.checkArgument(
compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
"The compressed buffer size exceeds the integer limit %s",
Integer.MAX_VALUE);

long decompressedLength = readUncompressedLength(compressedBuffer);

byte[] inBytes = new byte[(int) (compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH)];
byte[] inBytes =
new byte
[(int) (compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH)];
compressedBuffer.getBytes(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, inBytes);
ByteArrayOutputStream out = new ByteArrayOutputStream((int) decompressedLength);
try (InputStream in = new FramedLZ4CompressorInputStream(new ByteArrayInputStream(inBytes))) {
Expand All @@ -78,7 +81,7 @@ protected ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBu

byte[] outBytes = out.toByteArray();
ArrowBuf decompressedBuffer = allocator.buffer(outBytes.length);
decompressedBuffer.setBytes(/*index=*/0, outBytes);
decompressedBuffer.setBytes(/*index=*/ 0, outBytes);
decompressedBuffer.writerIndex(decompressedLength);
return decompressedBuffer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.compression;


import com.github.luben.zstd.Zstd;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.compression.AbstractCompressionCodec;
import org.apache.arrow.vector.compression.CompressionUtil;

import com.github.luben.zstd.Zstd;

/**
* Compression codec for the ZSTD algorithm.
*/
/** Compression codec for the ZSTD algorithm. */
public class ZstdCompressionCodec extends AbstractCompressionCodec {

private int compressionLevel;
Expand All @@ -46,10 +41,13 @@ protected ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBu
long maxSize = Zstd.compressBound(uncompressedBuffer.writerIndex());
long dstSize = CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + maxSize;
ArrowBuf compressedBuffer = allocator.buffer(dstSize);
long bytesWritten = Zstd.compressUnsafe(
compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, dstSize,
/*src*/uncompressedBuffer.memoryAddress(), /*srcSize=*/uncompressedBuffer.writerIndex(),
/*level=*/this.compressionLevel);
long bytesWritten =
Zstd.compressUnsafe(
compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,
dstSize,
/*src*/ uncompressedBuffer.memoryAddress(),
/*srcSize=*/ uncompressedBuffer.writerIndex(),
/*level=*/ this.compressionLevel);
if (Zstd.isError(bytesWritten)) {
compressedBuffer.close();
throw new RuntimeException("Error compressing: " + Zstd.getErrorName(bytesWritten));
Expand All @@ -62,17 +60,23 @@ protected ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBu
protected ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
long decompressedLength = readUncompressedLength(compressedBuffer);
ArrowBuf uncompressedBuffer = allocator.buffer(decompressedLength);
long decompressedSize = Zstd.decompressUnsafe(uncompressedBuffer.memoryAddress(), decompressedLength,
/*src=*/compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,
compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
long decompressedSize =
Zstd.decompressUnsafe(
uncompressedBuffer.memoryAddress(),
decompressedLength,
/*src=*/ compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,
compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
if (Zstd.isError(decompressedSize)) {
uncompressedBuffer.close();
throw new RuntimeException("Error decompressing: " + Zstd.getErrorName(decompressedLength));
}
if (decompressedLength != decompressedSize) {
uncompressedBuffer.close();
throw new RuntimeException("Expected != actual decompressed length: " +
decompressedLength + " != " + decompressedSize);
throw new RuntimeException(
"Expected != actual decompressed length: "
+ decompressedLength
+ " != "
+ decompressedSize);
}
uncompressedBuffer.writerIndex(decompressedLength);
return uncompressedBuffer;
Expand Down
Loading

0 comments on commit 1a48366

Please sign in to comment.