Skip to content

Commit

Permalink
revert PR 11594 to avoid copy data to direct buffer afer aircompresso…
Browse files Browse the repository at this point in the history
…r upgrade to 0.20 (apache#11792)

### Motivation
Due to aircompressor 0.19 can't work with heap buffer on JDK1.8, so apache#11594 use copy data to direct buffer to avoid NoSuchMethodError exception. Now aircompressor released 0.20 and apache#11790 has upgrade the aircompressor version to 0.20 to fix this issue, we can revert apache#11594 to avoid copy data to direct buffer to improve performance.

### Modification
1. revert Fix java.lang.NoSuchMethodError: java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer when enabling topic metadata compression apache#11594, but keep the tests.
  • Loading branch information
hangc0276 authored Aug 26, 2021
1 parent 051f52d commit cc1b983
Showing 1 changed file with 3 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
return managedLedgerInfo.toByteArray();
}
ByteBuf metadataByteBuf = null;
ByteBuf uncompressedByteBuf = null;
ByteBuf encodeByteBuf = null;
try {
MLDataFormats.ManagedLedgerInfoMetadata mlInfoMetadata = MLDataFormats.ManagedLedgerInfoMetadata
Expand All @@ -330,13 +329,9 @@ public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
metadataByteBuf.writeShort(MAGIC_MANAGED_LEDGER_INFO_METADATA);
metadataByteBuf.writeInt(mlInfoMetadata.getSerializedSize());
metadataByteBuf.writeBytes(mlInfoMetadata.toByteArray());
byte[] byteArray = managedLedgerInfo.toByteArray();
// The reason for copy the data to a direct buffer here is to ensure the metadata compression feature can
// work on JDK1.8, for more details to see: https://github.com/apache/pulsar/issues/11593
uncompressedByteBuf = Unpooled.directBuffer(byteArray.length);
uncompressedByteBuf.writeBytes(byteArray);

encodeByteBuf = getCompressionCodec(compressionType)
.encode(uncompressedByteBuf);
.encode(Unpooled.wrappedBuffer(managedLedgerInfo.toByteArray()));
CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponent(true, metadataByteBuf);
compositeByteBuf.addComponent(true, encodeByteBuf);
Expand All @@ -347,9 +342,6 @@ public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
if (metadataByteBuf != null) {
metadataByteBuf.release();
}
if (uncompressedByteBuf != null) {
uncompressedByteBuf.release();
}
if (encodeByteBuf != null) {
encodeByteBuf.release();
}
Expand All @@ -360,7 +352,6 @@ public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProto
ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
if (byteBuf.readableBytes() > 0 && byteBuf.readShort() == MAGIC_MANAGED_LEDGER_INFO_METADATA) {
ByteBuf decodeByteBuf = null;
ByteBuf compressedByteBuf = null;
try {
int metadataSize = byteBuf.readInt();
byte[] metadataBytes = new byte[metadataSize];
Expand All @@ -369,12 +360,8 @@ public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProto
MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes);

long unpressedSize = metadata.getUncompressedSize();
// The reason for copy the data to a direct buffer here is to ensure the metadata compression feature
// can work on JDK1.8, for more details to see: https://github.com/apache/pulsar/issues/11593
compressedByteBuf = Unpooled.directBuffer(byteBuf.readableBytes());
compressedByteBuf.writeBytes(byteBuf);
decodeByteBuf = getCompressionCodec(metadata.getCompressionType())
.decode(compressedByteBuf, (int) unpressedSize);
.decode(byteBuf, (int) unpressedSize);
byte[] decodeBytes;
// couldn't decode data by ZLIB compression byteBuf array() directly
if (decodeByteBuf.hasArray() && !CompressionType.ZLIB.equals(metadata.getCompressionType())) {
Expand All @@ -392,9 +379,6 @@ public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProto
if (decodeByteBuf != null) {
decodeByteBuf.release();
}
if (compressedByteBuf != null) {
compressedByteBuf.release();
}
byteBuf.release();
}
} else {
Expand Down

0 comments on commit cc1b983

Please sign in to comment.