diff --git a/conf/pulsar_env.sh b/conf/pulsar_env.sh old mode 100644 new mode 100755 index 967b97ce9d762..36034b56ff5a5 --- a/conf/pulsar_env.sh +++ b/conf/pulsar_env.sh @@ -48,7 +48,7 @@ PULSAR_MEM=${PULSAR_MEM:-"-Xms2g -Xmx2g -XX:MaxDirectMemorySize=4g"} PULSAR_GC=${PULSAR_GC:-"-XX:+UseG1GC -XX:MaxGCPauseMillis=10 -XX:+ParallelRefProcEnabled -XX:+UnlockExperimentalVMOptions -XX:+AggressiveOpts -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1NewSizePercent=50 -XX:+DisableExplicitGC -XX:-ResizePLAB"} # Extra options to be passed to the jvm -PULSAR_EXTRA_OPTS=${PULSAR_EXTRA_OPTS:-"-Dio.netty.leakDetectionLevel=disabled -Dio.netty.recycler.maxCapacity.default=1000 -Dio.netty.recycler.linkCapacity=1024"} +PULSAR_EXTRA_OPTS=${PULSAR_EXTRA_OPTS:-" -Dpulsar.allocator.exit_on_oom=true -Dio.netty.recycler.maxCapacity.default=1000 -Dio.netty.recycler.linkCapacity=1024"} # Add extra paths to the bookkeeper classpath # PULSAR_EXTRA_CLASSPATH= diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 35725e671bfcb..0add15154bf9e 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -34,12 +34,10 @@ import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import java.io.IOException; import java.lang.reflect.Field; -import java.lang.reflect.Modifier; import java.nio.charset.Charset; import java.security.GeneralSecurityException; import java.util.ArrayList; @@ -96,6 +94,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.ByteBufPair; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; @@ -2173,7 +2172,7 @@ public ByteBuf getMessageWithMetadata(byte[] data) throws IOException { int msgMetadataSize = messageData.getSerializedSize(); int headersSize = 4 + msgMetadataSize; - ByteBuf headers = PooledByteBufAllocator.DEFAULT.buffer(headersSize, headersSize); + ByteBuf headers = PulsarByteBufAllocator.DEFAULT.buffer(headersSize, headersSize); ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(headers); headers.writeInt(msgMetadataSize); messageData.writeTo(outStream); diff --git a/pom.xml b/pom.xml index 7ae6b750e8328..7c97e432b6e1a 100644 --- a/pom.xml +++ b/pom.xml @@ -315,6 +315,12 @@ flexible messaging model and an intuitive client API. + + org.apache.bookkeeper + bookkeeper-common-allocator + ${bookkeeper.version} + + org.reflections @@ -1120,8 +1126,9 @@ flexible messaging model and an intuitive client API. org.apache.maven.plugins maven-surefire-plugin - -Xmx2G -XX:MaxDirectMemorySize=8G - -Dio.netty.leakDetectionLevel=advanced + -Xmx2G + -Dpulsar.allocator.pooled=false + -Dpulsar.allocator.leak_detection=Advanced -Dlog4j.configurationFile=log4j2.xml false diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java index b582bb5798291..a274795d7212c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java @@ -39,7 +39,6 @@ import java.text.DateFormat; import java.text.SimpleDateFormat; - import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.replication.AutoRecoveryMain; @@ -50,8 +49,8 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfigurationUtils; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.Commands; -import org.apache.pulsar.common.conf.InternalConfigurationData; import org.apache.pulsar.functions.worker.WorkerConfig; import org.apache.pulsar.functions.worker.WorkerService; import org.slf4j.Logger; @@ -315,6 +314,11 @@ public static void main(String[] args) throws Exception { }) ); + PulsarByteBufAllocator.registerOOMListener(oomException -> { + log.error("-- Shutting down - Received OOM exception: {}", oomException.getMessage(), oomException); + starter.shutdown(); + }); + try { starter.start(); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java index 2cf5fda03145b..cc1a8e5e9ef5a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java @@ -22,14 +22,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import org.apache.bookkeeper.common.allocator.PoolingPolicy; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy; import org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy; import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.Commands; -import org.apache.pulsar.common.conf.InternalConfigurationData; import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping; import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy; import org.apache.pulsar.zookeeper.ZooKeeperCache; @@ -40,6 +39,7 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory { private final AtomicReference rackawarePolicyZkCache = new AtomicReference<>(); private final AtomicReference clientIsolationZkCache = new AtomicReference<>(); + @SuppressWarnings("deprecation") @Override public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws IOException { ClientConfiguration bkConf = new ClientConfiguration(); @@ -61,7 +61,6 @@ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws I bkConf.setStickyReadsEnabled(conf.isBookkeeperEnableStickyReads()); bkConf.setNettyMaxFrameSizeBytes(conf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING); - bkConf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap); if (conf.isBookkeeperClientHealthCheckEnabled()) { bkConf.enableBookieHealthCheck(); bkConf.setBookieHealthCheckInterval(conf.getBookkeeperHealthCheckIntervalSec(), TimeUnit.SECONDS); @@ -109,7 +108,10 @@ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws I } try { - return new BookKeeper(bkConf, zkClient); + return BookKeeper.forConfig(bkConf) + .allocator(PulsarByteBufAllocator.DEFAULT) + .zk(zkClient) + .build(); } catch (InterruptedException | BKException e) { throw new IOException(e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index cc3d138dfff79..767db39a4ec73 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -28,7 +28,6 @@ import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import java.io.IOException; import java.io.OutputStream; @@ -86,6 +85,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue; @@ -101,8 +101,8 @@ import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; -import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; import org.apache.zookeeper.KeeperException; @@ -1087,7 +1087,7 @@ protected Response internalPeekNthMessage(String subName, int messagePosition, b ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, metadata.getUncompressedSize()); // Copy into a heap buffer for output stream compatibility - ByteBuf data = PooledByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(), + ByteBuf data = PulsarByteBufAllocator.DEFAULT.heapBuffer(uncompressedPayload.readableBytes(), uncompressedPayload.readableBytes()); data.writeBytes(uncompressedPayload); uncompressedPayload.release(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 1890efadff51d..775ef707a3a59 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -31,7 +31,6 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; @@ -42,7 +41,6 @@ import java.io.IOException; import java.lang.reflect.Field; import java.net.InetSocketAddress; -import java.net.URI; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -88,20 +86,19 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; -import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect; import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner; import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventType; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.configuration.FieldContext; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; @@ -278,7 +275,7 @@ public void start() throws Exception { pulsar.getConfiguration().getClusterName()); ServerBootstrap bootstrap = new ServerBootstrap(); - bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT); bootstrap.group(acceptorGroup, workerGroup); bootstrap.childOption(ChannelOption.TCP_NODELAY, true); bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java index 9009ff9c58fbf..5c11059c5cd6e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java @@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument; import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import java.io.IOException; @@ -33,7 +32,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.RawMessage; -import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; @@ -102,7 +101,7 @@ public static Optional rebatchMessage(RawMessage msg, ByteBuf payload = msg.getHeadersAndPayload(); MessageMetadata metadata = Commands.parseMessageMetadata(payload); - ByteBuf batchBuffer = PooledByteBufAllocator.DEFAULT.buffer(payload.capacity()); + ByteBuf batchBuffer = PulsarByteBufAllocator.DEFAULT.buffer(payload.capacity()); CompressionType compressionType = metadata.getCompression(); CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java index 72278c90557bf..628e2bee33dfd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawMessageImpl.java @@ -18,20 +18,20 @@ */ package org.apache.pulsar.client.impl; -import java.io.IOException; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import java.io.IOException; + import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.RawMessage; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; - public class RawMessageImpl implements RawMessage { private static final Logger log = LoggerFactory.getLogger(RawMessageImpl.class); @@ -74,7 +74,7 @@ public ByteBuf serialize() { int headerSize = 4 /* IdSize */ + idSize + 4 /* PayloadAndMetadataSize */; int totalSize = headerSize + headersAndPayload.readableBytes(); - ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(totalSize); + ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(totalSize); buf.writeInt(idSize); try { ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(buf); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index a5a561ddd4a6b..08230c8c502bb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -22,6 +22,9 @@ import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; + import java.lang.reflect.Field; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -39,15 +42,12 @@ import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.ByteBufPair; import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream; import org.testng.annotations.Test; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.UnpooledByteBufAllocator; - /** */ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { @@ -64,7 +64,7 @@ public static byte[] createMessageWrittenToLedger(String msg) throws Exception { int payloadSize = data.readableBytes(); int totalSize = 4 + msgMetadataSize + payloadSize; - ByteBuf headers = PooledByteBufAllocator.DEFAULT.heapBuffer(totalSize, totalSize); + ByteBuf headers = PulsarByteBufAllocator.DEFAULT.heapBuffer(totalSize, totalSize); ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(headers); headers.writeInt(msgMetadataSize); messageMetadata.writeTo(outStream); diff --git a/pulsar-client-admin-shaded/pom.xml b/pulsar-client-admin-shaded/pom.xml index 3e1f95ca1988c..d72feac1f63eb 100644 --- a/pulsar-client-admin-shaded/pom.xml +++ b/pulsar-client-admin-shaded/pom.xml @@ -95,6 +95,7 @@ io.opencensus:* org.objenesis:* org.yaml:snakeyaml + org.apache.bookkeeper:bookkeeper-common-allocator @@ -219,6 +220,10 @@ org.yaml org.apache.pulsar.shade.org.yaml + + org.apache.bookkeeper + org.apache.pulsar.shade.org.apache.bookkeeper + diff --git a/pulsar-client-all/pom.xml b/pulsar-client-all/pom.xml index 408b61ddf131d..44de80fdb953d 100644 --- a/pulsar-client-all/pom.xml +++ b/pulsar-client-all/pom.xml @@ -151,6 +151,7 @@ org.xerial.snappy:snappy-java org.apache.commons:commons-compress org.tukaani:xz + org.apache.bookkeeper:bookkeeper-common-allocator @@ -172,6 +173,10 @@ org.asynchttpclient org.apache.pulsar.shade.org.asynchttpclient + + org.apache.bookkeeper + org.apache.pulsar.shade.org.apache.bookkeeper + org.apache.commons org.apache.pulsar.shade.org.apache.commons diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-shaded/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka-shaded/pom.xml index 512b334f37659..0429209a86b57 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka-shaded/pom.xml +++ b/pulsar-client-kafka-compat/pulsar-client-kafka-shaded/pom.xml @@ -70,6 +70,7 @@ org.apache.pulsar:pulsar-client-original org.apache.commons:commons-lang3 commons-codec:commons-codec + org.apache.bookkeeper:bookkeeper-common-allocator commons-collections:commons-collections org.asynchttpclient:* io.netty:netty-codec-http @@ -228,6 +229,10 @@ org.tukaani org.apache.pulsar.shade.org.tukaani + + org.apache.bookkeeper + org.apache.pulsar.shade.org.apache.bookkeeper + diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml index 88e59c075917e..543c601548ae3 100644 --- a/pulsar-client-shaded/pom.xml +++ b/pulsar-client-shaded/pom.xml @@ -92,6 +92,7 @@ org.apache.pulsar:pulsar-client-original + org.apache.bookkeeper:bookkeeper-common-allocator org.apache.commons:commons-lang3 commons-codec:commons-codec commons-collections:commons-collections @@ -249,6 +250,10 @@ org.tukaani org.apache.pulsar.shade.org.tukaani + + org.apache.bookkeeper + org.apache.pulsar.shade.org.apache.bookkeeper + diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java index 0be5aae92ebef..4e49dcf20059a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java @@ -18,21 +18,20 @@ */ package org.apache.pulsar.client.impl; +import com.google.common.collect.Lists; + +import io.netty.buffer.ByteBuf; + import java.util.List; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.Commands; -import org.apache.pulsar.common.api.PulsarDecoder; import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; - /** * container for individual messages being published until they are batched and sent to broker */ @@ -91,7 +90,7 @@ void add(MessageImpl msg, SendCallback callback) { // the first message sequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder()); this.firstCallback = callback; - batchedMessageMetadataAndPayload = PooledByteBufAllocator.DEFAULT + batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT .buffer(Math.min(maxBatchSize, MAX_MESSAGE_BATCH_SIZE_BYTES)); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 1aa1e43185d87..2b2f822243253 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -18,6 +18,18 @@ */ package org.apache.pulsar.client.impl; +import com.google.common.annotations.VisibleForTesting; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.resolver.dns.DnsNameResolver; +import io.netty.resolver.dns.DnsNameResolverBuilder; +import io.netty.util.concurrent.Future; + import java.io.Closeable; import java.io.IOException; import java.net.InetAddress; @@ -32,23 +44,11 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; - -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.Channel; -import io.netty.channel.ChannelException; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.resolver.dns.DnsNameResolver; -import io.netty.resolver.dns.DnsNameResolverBuilder; -import io.netty.util.concurrent.Future; - public class ConnectionPool implements Closeable { protected final ConcurrentHashMap>> pool; @@ -74,7 +74,7 @@ public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGrou bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getConnectionTimeoutMs()); bootstrap.option(ChannelOption.TCP_NODELAY, conf.isUseTcpNoDelay()); - bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT); try { bootstrap.handler(new PulsarChannelInitializer(conf, clientCnxSupplier)); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java index 116410873bc49..480750830d64e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java @@ -23,7 +23,6 @@ import com.google.common.cache.LoadingCache; import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import java.io.IOException; import java.io.Reader; @@ -62,6 +61,7 @@ import org.apache.pulsar.client.api.EncryptionKeyInfo; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.CryptoException; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys; import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; @@ -425,7 +425,7 @@ public synchronized ByteBuf encrypt(Set encKeys, CryptoKeyReader keyRead ByteBuffer sourceNioBuf = payload.nioBuffer(payload.readerIndex(), payload.readableBytes()); int maxLength = cipher.getOutputSize(payload.readableBytes()); - targetBuf = PooledByteBufAllocator.DEFAULT.buffer(maxLength, maxLength); + targetBuf = PulsarByteBufAllocator.DEFAULT.buffer(maxLength, maxLength); ByteBuffer targetNioBuf = targetBuf.nioBuffer(0, maxLength); int bytesStored = cipher.doFinal(sourceNioBuf, targetNioBuf); @@ -513,7 +513,7 @@ private ByteBuf decryptData(SecretKey dataKeySecret, MessageMetadata msgMetadata ByteBuffer sourceNioBuf = payload.nioBuffer(payload.readerIndex(), payload.readableBytes()); int maxLength = cipher.getOutputSize(payload.readableBytes()); - targetBuf = PooledByteBufAllocator.DEFAULT.buffer(maxLength, maxLength); + targetBuf = PulsarByteBufAllocator.DEFAULT.buffer(maxLength, maxLength); ByteBuffer targetNioBuf = targetBuf.nioBuffer(0, maxLength); int decryptedSize = cipher.doFinal(sourceNioBuf, targetNioBuf); diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml index dbf7d4051b737..5a31b68166867 100644 --- a/pulsar-common/pom.xml +++ b/pulsar-common/pom.xml @@ -74,6 +74,11 @@ netty-all + + org.apache.bookkeeper + bookkeeper-common-allocator + + org.lz4 lz4-java diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java b/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java new file mode 100644 index 0000000000000..a324e7f4d80e4 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.allocator; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; + +import lombok.experimental.UtilityClass; +import lombok.extern.slf4j.Slf4j; + +import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder; +import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy; +import org.apache.bookkeeper.common.allocator.PoolingPolicy; + +@UtilityClass +@Slf4j +public class PulsarByteBufAllocator { + + public static final String PULSAR_ALLOCATOR_POOLED = "pulsar.allocator.pooled"; + public static final String PULSAR_ALLOCATOR_EXIT_ON_OOM = "pulsar.allocator.exit_on_oom"; + public static final String PULSAR_ALLOCATOR_LEAK_DETECTION = "pulsar.allocator.leak_detection"; + + public static final ByteBufAllocator DEFAULT; + + private static final List> LISTENERS = new CopyOnWriteArrayList<>(); + + public static void registerOOMListener(Consumer listener) { + LISTENERS.add(listener); + } + + private static final boolean EXIT_ON_OOM; + + static { + boolean isPooled = "true".equalsIgnoreCase(System.getProperty(PULSAR_ALLOCATOR_POOLED, "true")); + EXIT_ON_OOM = "true".equalsIgnoreCase(System.getProperty(PULSAR_ALLOCATOR_EXIT_ON_OOM, "false")); + + LeakDetectionPolicy leakDetectionPolicy = LeakDetectionPolicy + .valueOf(System.getProperty(PULSAR_ALLOCATOR_LEAK_DETECTION, "Disabled")); + + if (log.isDebugEnabled()) { + log.debug("Is Pooled: {} -- Exit on OOM: {}", isPooled, EXIT_ON_OOM); + } + + ByteBufAllocatorBuilder builder = ByteBufAllocatorBuilder.create() + .leakDetectionPolicy(leakDetectionPolicy) + .pooledAllocator(PooledByteBufAllocator.DEFAULT) + .outOfMemoryListener(oomException -> { + // First notify all listeners + LISTENERS.forEach(c -> { + try { + c.accept(oomException); + } catch (Throwable t) { + log.warn("Exception during OOM listener: {}", t.getMessage(), t); + } + }); + + if (EXIT_ON_OOM) { + log.info("Exiting JVM process for OOM error: {}", oomException.getMessage(), oomException); + Runtime.getRuntime().halt(1); + } + }); + + if (isPooled) { + builder.poolingPolicy(PoolingPolicy.PooledDirect); + } else { + builder.poolingPolicy(PoolingPolicy.UnpooledHeap); + } + + DEFAULT = builder.build(); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index c1c97523386f3..f5f0998c9a3cc 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -27,7 +27,6 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import java.io.IOException; @@ -41,6 +40,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.AuthMethod; import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand; @@ -980,7 +980,7 @@ public static ByteBuf serializeWithSize(BaseCommand.Builder cmdBuilder) { int totalSize = cmdSize + 4; int frameSize = totalSize + 4; - ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(frameSize, frameSize); + ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(frameSize, frameSize); // Prepend 2 lengths to the buffer buf.writeInt(totalSize); @@ -1020,7 +1020,7 @@ private static ByteBufPair serializeCommandSendWithSize(BaseCommand.Builder cmdB int headersSize = 4 + headerContentSize; // totalSize + headerLength int checksumReaderIndex = -1; - ByteBuf headers = PooledByteBufAllocator.DEFAULT.buffer(headersSize, headersSize); + ByteBuf headers = PulsarByteBufAllocator.DEFAULT.buffer(headersSize, headersSize); headers.writeInt(totalSize); // External frame try { @@ -1077,7 +1077,7 @@ public static ByteBuf serializeMetadataAndPayload(ChecksumType checksumType, int checksumReaderIndex = -1; int totalSize = headerContentSize + payloadSize; - ByteBuf metadataAndPayload = PooledByteBufAllocator.DEFAULT.buffer(totalSize, totalSize); + ByteBuf metadataAndPayload = PulsarByteBufAllocator.DEFAULT.buffer(totalSize, totalSize); try { ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(metadataAndPayload); @@ -1209,7 +1209,7 @@ private static ByteBufPair serializeCommandMessageWithSize(BaseCommand cmd, Byte int totalSize = 4 + cmdSize + metadataAndPayload.readableBytes(); int headersSize = 4 + 4 + cmdSize; - ByteBuf headers = PooledByteBufAllocator.DEFAULT.buffer(headersSize); + ByteBuf headers = PulsarByteBufAllocator.DEFAULT.buffer(headersSize); headers.writeInt(totalSize); // External frame try { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java index a57992cfabd78..417a1b3597672 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java @@ -18,12 +18,15 @@ */ package org.apache.pulsar.common.compression; +import io.netty.buffer.ByteBuf; + import java.io.IOException; import java.nio.ByteBuffer; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import lombok.extern.slf4j.Slf4j; + +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; + import net.jpountz.lz4.LZ4Compressor; import net.jpountz.lz4.LZ4Factory; import net.jpountz.lz4.LZ4FastDecompressor; @@ -54,7 +57,7 @@ public ByteBuf encode(ByteBuf source) { ByteBuffer sourceNio = source.nioBuffer(source.readerIndex(), source.readableBytes()); - ByteBuf target = PooledByteBufAllocator.DEFAULT.buffer(maxLength, maxLength); + ByteBuf target = PulsarByteBufAllocator.DEFAULT.buffer(maxLength, maxLength); ByteBuffer targetNio = target.nioBuffer(0, maxLength); int compressedLength = compressor.compress(sourceNio, 0, uncompressedLength, targetNio, 0, maxLength); @@ -64,7 +67,7 @@ public ByteBuf encode(ByteBuf source) { @Override public ByteBuf decode(ByteBuf encoded, int uncompressedLength) throws IOException { - ByteBuf uncompressed = PooledByteBufAllocator.DEFAULT.buffer(uncompressedLength, uncompressedLength); + ByteBuf uncompressed = PulsarByteBufAllocator.DEFAULT.buffer(uncompressedLength, uncompressedLength); ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength); ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes()); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZLib.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZLib.java index 5137dc1e0effb..da413f3dc46dc 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZLib.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZLib.java @@ -20,14 +20,15 @@ import static com.google.common.base.Preconditions.checkArgument; +import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.FastThreadLocal; + import java.io.IOException; import java.util.zip.DataFormatException; import java.util.zip.Deflater; import java.util.zip.Inflater; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.util.concurrent.FastThreadLocal; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; /** * ZLib Compression @@ -64,7 +65,7 @@ public ByteBuf encode(ByteBuf source) { int length = source.readableBytes(); int sizeEstimate = (int) Math.ceil(source.readableBytes() * 1.001) + 14; - ByteBuf compressed = PooledByteBufAllocator.DEFAULT.heapBuffer(sizeEstimate); + ByteBuf compressed = PulsarByteBufAllocator.DEFAULT.heapBuffer(sizeEstimate); int offset = 0; if (source.hasArray()) { @@ -98,7 +99,7 @@ private static void deflate(Deflater deflater, ByteBuf out) { @Override public ByteBuf decode(ByteBuf encoded, int uncompressedLength) throws IOException { - ByteBuf uncompressed = PooledByteBufAllocator.DEFAULT.heapBuffer(uncompressedLength, uncompressedLength); + ByteBuf uncompressed = PulsarByteBufAllocator.DEFAULT.heapBuffer(uncompressedLength, uncompressedLength); int len = encoded.readableBytes(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java index 5566ec1f10883..04c5aa852d83d 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java @@ -21,11 +21,12 @@ import com.github.luben.zstd.Zstd; import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; + /** * Zstandard Compression */ @@ -38,7 +39,7 @@ public ByteBuf encode(ByteBuf source) { int uncompressedLength = source.readableBytes(); int maxLength = (int) Zstd.compressBound(uncompressedLength); - ByteBuf target = PooledByteBufAllocator.DEFAULT.directBuffer(maxLength, maxLength); + ByteBuf target = PulsarByteBufAllocator.DEFAULT.directBuffer(maxLength, maxLength); int compressedLength; if (source.hasMemoryAddress()) { @@ -58,7 +59,7 @@ public ByteBuf encode(ByteBuf source) { @Override public ByteBuf decode(ByteBuf encoded, int uncompressedLength) throws IOException { - ByteBuf uncompressed = PooledByteBufAllocator.DEFAULT.directBuffer(uncompressedLength, uncompressedLength); + ByteBuf uncompressed = PulsarByteBufAllocator.DEFAULT.directBuffer(uncompressedLength, uncompressedLength); if (encoded.hasMemoryAddress()) { Zstd.decompressUnsafe(uncompressed.memoryAddress(), uncompressedLength, diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java index 5d43fa3a72b4e..177190a1d841e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java @@ -18,6 +18,15 @@ */ package org.apache.pulsar.common.stats; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import io.netty.buffer.PoolArenaMetric; +import io.netty.buffer.PoolChunkListMetric; +import io.netty.buffer.PoolChunkMetric; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.util.internal.PlatformDependent; + import java.lang.management.BufferPoolMXBean; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; @@ -36,15 +45,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -import io.netty.buffer.PoolArenaMetric; -import io.netty.buffer.PoolChunkListMetric; -import io.netty.buffer.PoolChunkMetric; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.util.internal.PlatformDependent; - public class JvmMetrics { private static final Logger log = LoggerFactory.getLogger(JvmMetrics.class); @@ -113,13 +113,14 @@ public List generate() { m.put("jvm_direct_memory_used", getJvmDirectMemoryUsed()); m.put("jvm_max_direct_memory", PlatformDependent.maxDirectMemory()); m.put("jvm_thread_cnt", getThreadCount()); - + this.gcLogger.logMetrics(m); long totalAllocated = 0; long totalUsed = 0; for (PoolArenaMetric arena : PooledByteBufAllocator.DEFAULT.directArenas()) { + this.gcLogger.logMetrics(m); for (PoolChunkListMetric list : arena.chunkLists()) { for (PoolChunkMetric chunk : list) { int size = chunk.chunkSize(); @@ -134,6 +135,8 @@ public List generate() { m.put(this.componentName + "_default_pool_allocated", totalAllocated); m.put(this.componentName + "_default_pool_used", totalUsed); + this.gcLogger.logMetrics(m); + return Lists.newArrayList(m); } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/api/ByteBufPairTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/api/ByteBufPairTest.java index 92efb4f58f7c3..bc426a0fe30dc 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/api/ByteBufPairTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/api/ByteBufPairTest.java @@ -23,20 +23,20 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; -import org.testng.annotations.Test; - import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.testng.annotations.Test; + public class ByteBufPairTest { @Test public void testDoubleByteBuf() throws Exception { - ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128); + ByteBuf b1 = PulsarByteBufAllocator.DEFAULT.heapBuffer(128, 128); b1.writerIndex(b1.capacity()); - ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128); + ByteBuf b2 = PulsarByteBufAllocator.DEFAULT.heapBuffer(128, 128); b2.writerIndex(b2.capacity()); ByteBufPair buf = ByteBufPair.get(b1, b2); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java index 186e7b222f1d5..35d213a80cee0 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java @@ -21,8 +21,14 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import com.scurrilous.circe.checksum.Crc32cIntChecksum; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + import java.io.IOException; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.ByteBufPair; import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.Commands.ChecksumType; @@ -30,12 +36,6 @@ import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream; import org.testng.annotations.Test; -import com.scurrilous.circe.checksum.Crc32cIntChecksum; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.Unpooled; - public class CommandsTest { @Test @@ -80,7 +80,7 @@ public void testChecksumSendCommand() throws Exception { private int computeChecksum(MessageMetadata msgMetadata, ByteBuf compressedPayload) throws IOException { int metadataSize = msgMetadata.getSerializedSize(); int metadataFrameSize = 4 + metadataSize; - ByteBuf metaPayloadFrame = PooledByteBufAllocator.DEFAULT.buffer(metadataFrameSize, metadataFrameSize); + ByteBuf metaPayloadFrame = PulsarByteBufAllocator.DEFAULT.buffer(metadataFrameSize, metadataFrameSize); ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(metaPayloadFrame); metaPayloadFrame.writeInt(metadataSize); msgMetadata.writeTo(outStream); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java index b7e5c85aa27cc..515bb3c033942 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java @@ -21,16 +21,16 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + import java.io.IOException; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.Unpooled; - public class CompressorCodecTest { private static String text = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras id massa odio. Duis commodo ligula sed efficitur cursus. Aliquam sollicitudin, tellus quis suscipit tincidunt, erat sem efficitur nulla, in feugiat diam ex a dolor. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia Curae; Vestibulum ac volutpat nisl, vel aliquam elit. Maecenas auctor aliquet turpis, id ullamcorper metus. Ut tincidunt et magna non ultrices. Quisque lacinia metus sed egestas tincidunt. Sed congue lacinia maximus."; @@ -44,7 +44,7 @@ public Object[][] codecProvider() { void testCompressDecompress(CompressionType type) throws IOException { CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(type); byte[] data = text.getBytes(); - ByteBuf raw = PooledByteBufAllocator.DEFAULT.buffer(); + ByteBuf raw = PulsarByteBufAllocator.DEFAULT.directBuffer(); raw.writeBytes(data); ByteBuf compressed = codec.encode(raw); @@ -85,7 +85,7 @@ void testMultpileUsages(CompressionType type) throws IOException { byte[] data = text.getBytes(); for (int i = 0; i < 5; i++) { - ByteBuf raw = PooledByteBufAllocator.DEFAULT.buffer(); + ByteBuf raw = PulsarByteBufAllocator.DEFAULT.directBuffer(); raw.writeBytes(data); ByteBuf compressed = codec.encode(raw); assertEquals(raw.readableBytes(), data.length); diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java index 828c3710b0c53..54e0de80d631c 100644 --- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java +++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java @@ -20,14 +20,25 @@ import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.base.Preconditions; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.AdaptiveRecvByteBufAllocator; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.DefaultThreadFactory; + import java.io.Closeable; import java.io.IOException; import java.net.InetAddress; +import lombok.Getter; + import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.cache.ConfigurationCacheService; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.pulsar.discovery.service.server.ServiceConfig; @@ -36,16 +47,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.AdaptiveRecvByteBufAllocator; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.util.concurrent.DefaultThreadFactory; -import lombok.Getter; - /** * Main discovery-service which starts component to serve incoming discovery-request over binary-proto channel and * redirects to one of the active broker @@ -79,7 +80,7 @@ public DiscoveryService(ServiceConfig serviceConfig) { /** * Starts discovery service by initializing zookkeeper and server - * + * * @throws Exception */ public void start() throws Exception { @@ -99,7 +100,7 @@ public void start() throws Exception { public void startServer() throws Exception { ServerBootstrap bootstrap = new ServerBootstrap(); - bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT); bootstrap.group(acceptorGroup, workerGroup); bootstrap.childOption(ChannelOption.TCP_NODELAY, true); bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index a8deb4425e175..62768194d9b67 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -22,43 +22,42 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelId; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; + import java.net.URI; import java.net.URISyntaxException; - import java.util.Map; import java.util.concurrent.ConcurrentHashMap; + import javax.net.ssl.SSLSession; import org.apache.http.conn.ssl.DefaultHostnameVerifier; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.PulsarDecoder; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAuthChallenge; import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected; -import org.apache.pulsar.common.conf.InternalConfigurationData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.Channel; -import io.netty.channel.ChannelId; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import io.netty.handler.ssl.SslContext; -import io.netty.handler.ssl.SslHandler; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; - public class DirectProxyHandler { private Channel inboundChannel; @@ -90,7 +89,7 @@ public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, // Tie the backend connection on the same thread to avoid context // switches when passing data between the 2 // connections - b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + b.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT); b.group(inboundChannel.eventLoop()).channel(inboundChannel.getClass()).option(ChannelOption.AUTO_READ, false); b.handler(new ChannelInitializer() { @Override diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index a591af1588de5..a6344856fe10f 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -22,7 +22,6 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; @@ -40,6 +39,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.cache.ConfigurationCacheService; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; @@ -111,7 +111,7 @@ public ProxyService(ProxyConfiguration proxyConfig, } else { this.serviceUrl = null; } - + if (proxyConfig.getServicePortTls().isPresent()) { this.serviceUrlTls = String.format("pulsar://%s:%d/", hostname, proxyConfig.getServicePortTls().get()); } else { @@ -137,7 +137,7 @@ public void start() throws Exception { } ServerBootstrap bootstrap = new ServerBootstrap(); - bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT); bootstrap.group(acceptorGroup, workerGroup); bootstrap.childOption(ChannelOption.TCP_NODELAY, true); bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java index d75ba0a270a97..00bdf0320fadb 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java @@ -29,8 +29,6 @@ import com.google.common.util.concurrent.RateLimiter; import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.Unpooled; import io.netty.util.concurrent.DefaultThreadFactory; import java.text.DecimalFormat; @@ -61,6 +59,7 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.commons.codec.digest.DigestUtils; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.testclient.utils.PaddingDecimalFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -152,7 +151,7 @@ public static void main(String[] args) throws Exception { log.info("Starting Pulsar managed-ledger perf writer with config: {}", w.writeValueAsString(arguments)); byte[] payloadData = new byte[arguments.msgSize]; - ByteBuf payloadBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(arguments.msgSize); + ByteBuf payloadBuffer = PulsarByteBufAllocator.DEFAULT.directBuffer(arguments.msgSize); payloadBuffer.writerIndex(arguments.msgSize); // Now processing command line arguments diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/JvmMetrics.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/JvmMetrics.java index ae7e99277f7e0..fa0c45fb4462a 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/JvmMetrics.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/JvmMetrics.java @@ -82,24 +82,6 @@ public Metrics generate() { m.put("jvm_gc_old_pause", currentOldGcTime); m.put("jvm_gc_old_count", currentOldGcCount); - long totalAllocated = 0; - long totalUsed = 0; - - for (PoolArenaMetric arena : PooledByteBufAllocator.DEFAULT.metric().directArenas()) { - for (PoolChunkListMetric list : arena.chunkLists()) { - for (PoolChunkMetric chunk : list) { - int size = chunk.chunkSize(); - int used = size - chunk.freeBytes(); - - totalAllocated += size; - totalUsed += used; - } - } - } - - m.put("proxy_default_pool_allocated", totalAllocated); - m.put("proxy_default_pool_used", totalUsed); - return m; } diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java index 8d0f5bb2ed3f3..d07bab6cb3bf3 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java @@ -19,11 +19,11 @@ package org.apache.bookkeeper.mledger.offload.jcloud.impl; import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import java.io.IOException; import java.io.InputStream; import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream; import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader.VersionCheck; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.options.GetOptions; @@ -52,7 +52,7 @@ public BlobStoreBackedInputStreamImpl(BlobStore blobStore, String bucket, String this.bucket = bucket; this.key = key; this.versionCheck = versionCheck; - this.buffer = PooledByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize); + this.buffer = PulsarByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize); this.objectLen = objectLen; this.bufferSize = bufferSize; this.cursor = 0; diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java index 345a750c399cb..20ccb3dba5f70 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java @@ -19,7 +19,7 @@ package org.apache.bookkeeper.mledger.offload.jcloud.impl; import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; + import java.io.DataInputStream; import java.io.IOException; import java.util.ArrayList; @@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; + import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; @@ -40,6 +41,7 @@ import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder; import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry; import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader.VersionCheck; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.domain.Blob; import org.slf4j.Logger; @@ -116,7 +118,7 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr long entryId = dataStream.readLong(); if (entryId == nextExpectedId) { - ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(length, length); + ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length); entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf)); int toWrite = length; while (toWrite > 0) { diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java index 253b2e30dcb0d..ccfb20faa9316 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java @@ -21,18 +21,21 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.collect.Lists; + import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.PooledByteBufAllocator; + import java.io.IOException; import java.io.InputStream; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutionException; + import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,8 +129,8 @@ private List readNextEntriesFromLedger(long start, long maxNumberEntrie int entryLength = buf.readableBytes(); long entryId = entry.getEntryId(); - CompositeByteBuf entryBuf = PooledByteBufAllocator.DEFAULT.compositeBuffer(2); - ByteBuf entryHeaderBuf = PooledByteBufAllocator.DEFAULT.buffer(ENTRY_HEADER_SIZE, ENTRY_HEADER_SIZE); + CompositeByteBuf entryBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(2); + ByteBuf entryHeaderBuf = PulsarByteBufAllocator.DEFAULT.buffer(ENTRY_HEADER_SIZE, ENTRY_HEADER_SIZE); entryHeaderBuf.writeInt(entryLength).writeLong(entryId); entryBuf.addComponents(true, entryHeaderBuf, buf); diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java index dd6d99b5825bd..bb406c48b7589 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/DataBlockHeaderImpl.java @@ -22,12 +22,14 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; -import io.netty.buffer.PooledByteBufAllocator; + import java.io.DataInputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; + import org.apache.bookkeeper.mledger.offload.jcloud.DataBlockHeader; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; /** * @@ -111,7 +113,7 @@ public DataBlockHeaderImpl(long headerLength, long blockLength, long firstEntryI */ @Override public InputStream toStream() { - ByteBuf out = PooledByteBufAllocator.DEFAULT.buffer(HEADER_MAX_SIZE, HEADER_MAX_SIZE); + ByteBuf out = PulsarByteBufAllocator.DEFAULT.buffer(HEADER_MAX_SIZE, HEADER_MAX_SIZE); out.writeInt(MAGIC_WORD) .writeLong(headerLength) .writeLong(blockLength) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java index 7c837fd0b5779..ced2bd1b09b1d 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java @@ -25,7 +25,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; @@ -42,11 +41,12 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.api.DigestType; import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock; +import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.DataFormats; import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat; -import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock; -import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -183,7 +183,7 @@ public OffloadIndexBlock.IndexInputStream toStream() throws IOException { + segmentMetadataLength + indexEntryCount * (8 + 4 + 8); /* messageEntryId + blockPartId + blockOffset */ - ByteBuf out = PooledByteBufAllocator.DEFAULT.buffer(indexBlockLength, indexBlockLength); + ByteBuf out = PulsarByteBufAllocator.DEFAULT.buffer(indexBlockLength, indexBlockLength); out.writeInt(INDEX_MAGIC_WORD) .writeInt(indexBlockLength)