Skip to content

Commit

Permalink
Use PulsarByteBufAllocator to allocate buffer for chunks at consumer …
Browse files Browse the repository at this point in the history
…side (apache#13536)

### Motivation

Currently Pulsar consumer allocates memory from direct memory via `Unpooled.directBuffer` directly, which doesn't make use of the widely used allocator in Pulsar.

### Modifications

Use `PulsarByteBufAllocator` as the memory allocator for chunks buffer.
  • Loading branch information
BewareMyPower authored Dec 28, 2021
1 parent 1235162 commit a5d3473
Showing 1 changed file with 1 addition and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.scurrilous.circe.checksum.Crc32cIntChecksum;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
Expand Down Expand Up @@ -1270,7 +1269,7 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m
}

if (msgMetadata.getChunkId() == 0) {
ByteBuf chunkedMsgBuffer = Unpooled.directBuffer(msgMetadata.getTotalChunkMsgSize(),
ByteBuf chunkedMsgBuffer = PulsarByteBufAllocator.DEFAULT.buffer(msgMetadata.getTotalChunkMsgSize(),
msgMetadata.getTotalChunkMsgSize());
int totalChunks = msgMetadata.getNumChunksFromMsg();
chunkedMessagesMap.computeIfAbsent(msgMetadata.getUuid(),
Expand Down

0 comments on commit a5d3473

Please sign in to comment.