From ef06691531002c5d7cdbbdafc5494914ee8e0765 Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Tue, 20 Apr 2021 16:26:28 -0700 Subject: [PATCH] PIP 83 : Pulsar client: Message consumption with pooled buffer (#10184) fix api, buffer-access, duplicate code --- .../api/SimpleProducerConsumerTest.java | 2 +- .../impl/BrokerClientIntegrationTest.java | 107 +++++++- .../pulsar/client/api/ConsumerBuilder.java | 10 + .../org/apache/pulsar/client/api/Message.java | 15 + .../org/apache/pulsar/client/api/Schema.java | 17 ++ .../internal/DefaultImplementation.java | 20 ++ .../apache/pulsar/client/cli/CmdConsume.java | 34 ++- .../pulsar/client/impl/ConsumerBase.java | 13 +- .../client/impl/ConsumerBuilderImpl.java | 5 + .../pulsar/client/impl/ConsumerImpl.java | 25 +- .../impl/ConsumerStatsRecorderImpl.java | 2 +- .../pulsar/client/impl/MessageImpl.java | 256 ++++++++++++------ .../pulsar/client/impl/MessagesImpl.java | 4 +- .../client/impl/MultiTopicsConsumerImpl.java | 7 +- .../pulsar/client/impl/TopicMessageImpl.java | 10 + .../client/impl/ZeroQueueConsumerImpl.java | 1 + .../impl/conf/ConsumerConfigurationData.java | 2 + .../client/impl/schema/AbstractSchema.java | 3 +- .../client/impl/schema/ByteBufferSchema.java | 13 +- .../client/impl/schema/BytesSchema.java | 2 +- .../impl/schema/LocalDateTimeSchema.java | 4 +- .../client/impl/schema/StringSchema.java | 2 +- .../testclient/PerformanceConsumer.java | 25 +- 23 files changed, 445 insertions(+), 134 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index abcb7ede90756..1123562314e61 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -4029,4 +4029,4 @@ public void testPartitionTopicsOnSeparateListner() throws Exception { blockedMessageLatch.countDown(); log.info("-- Exiting {} test --", methodName); } -} +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java index c4d03adcd1cb1..f7ce13b9203b8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.UUID.randomUUID; import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH; import static org.mockito.Mockito.any; @@ -30,12 +31,14 @@ import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - +import io.netty.buffer.ByteBuf; import java.lang.reflect.Field; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.security.GeneralSecurityException; import java.util.ArrayList; @@ -55,7 +58,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import lombok.Cleanup; - import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; @@ -132,6 +134,11 @@ public Object[][] subType() { return new Object[][] { { SubscriptionType.Shared }, { SubscriptionType.Failover } }; } + @DataProvider(name = "booleanFlagProvider") + public Object[][] booleanFlagProvider() { + return new Object[][] { { true }, { false } }; + } + /** * Verifies unload namespace-bundle doesn't close shared connection used by other namespace-bundle. * @@ -918,4 +925,98 @@ public void testJsonSchemaProducerConsumerWithSpecifiedReaderAndWriter() throws private static final class TestMessageObject{ private String value; } -} + + /** + * It validates pooled message consumption for batch and non-batch messages. + * + * @throws Exception + */ + @Test(dataProvider = "booleanFlagProvider") + public void testConsumerWithPooledMessages(boolean isBatchingEnabled) throws Exception { + log.info("-- Starting {} test --", methodName); + + @Cleanup + PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build(); + + final String topic = "persistent://my-property/my-ns/testConsumerWithPooledMessages" + isBatchingEnabled; + + @Cleanup + Consumer consumer = newPulsarClient.newConsumer(Schema.BYTEBUFFER).topic(topic) + .subscriptionName("my-sub").poolMessages(true).subscribe(); + + @Cleanup + Producer producer = newPulsarClient.newProducer().topic(topic).enableBatching(isBatchingEnabled).create(); + + final int numMessages = 100; + for (int i = 0; i < numMessages; i++) { + producer.newMessage().value(("value-" + i).getBytes(UTF_8)) + .eventTime((i + 1) * 100L).sendAsync(); + } + producer.flush(); + + // Reuse pre-allocated pooled buffer to process every message + byte[] val = null; + int size = 0; + for (int i = 0; i < numMessages; i++) { + Message msg = consumer.receive(); + ByteBuffer value; + try { + value = msg.getValue(); + int capacity = value.remaining(); + // expand the size of buffer if needed + if (capacity > size) { + val = new byte[capacity]; + size = capacity; + } + // read message into pooled buffer + value.get(val, 0, capacity); + // process the message + assertEquals(("value-" + i), new String(val, 0, capacity)); + } finally { + msg.release(); + } + } + consumer.close(); + producer.close(); + } + + /** + * It verifies that expiry/redelivery of messages relesaes the messages without leak. + * + * @param isBatchingEnabled + * @throws Exception + */ + @Test(dataProvider = "booleanFlagProvider") + public void testPooledMessageWithAckTimeout(boolean isBatchingEnabled) throws Exception { + log.info("-- Starting {} test --", methodName); + + @Cleanup + PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build(); + + final String topic = "persistent://my-property/my-ns/testPooledMessageWithAckTimeout" + isBatchingEnabled; + + @Cleanup + ConsumerImpl consumer = (ConsumerImpl) newPulsarClient.newConsumer(Schema.BYTEBUFFER) + .topic(topic).subscriptionName("my-sub").poolMessages(true).subscribe(); + + @Cleanup + Producer producer = newPulsarClient.newProducer().topic(topic).enableBatching(isBatchingEnabled) + .create(); + + final int numMessages = 100; + for (int i = 0; i < numMessages; i++) { + producer.newMessage().value(("value-" + i).getBytes(UTF_8)).eventTime((i + 1) * 100L).sendAsync(); + } + producer.flush(); + + retryStrategically((test) -> consumer.incomingMessages.peek() != null, 5, 500); + MessageImpl msg = (MessageImpl) consumer.incomingMessages.peek(); + assertNotNull(msg); + ByteBuf payload = ((MessageImpl) msg).getPayload(); + assertNotEquals(payload.refCnt(), 0); + consumer.redeliverUnacknowledgedMessages(); + assertEquals(payload.refCnt(), 0); + consumer.close(); + producer.close(); + } +} \ No newline at end of file diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 0b589e104ffb6..1038ba9ea8af6 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -731,4 +731,14 @@ public interface ConsumerBuilder extends Cloneable { * @return */ ConsumerBuilder expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit); + + /** + * Enable pooling of messages and the underlying data buffers. + *

+ * When pooling is enabled, the application is responsible for calling Message.release() after the handling of every + * received message. If “release()” is not called on a received message, there will be a memory leak. If an + * application attempts to use and already “released” message, it might experience undefined behavior (eg: memory + * corruption, deserialization error, etc.). + */ + ConsumerBuilder poolMessages(boolean poolMessages); } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java index 324c2ba4157e6..df3a4b9347036 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java @@ -66,6 +66,13 @@ public interface Message { */ byte[] getData(); + /** + * Get the uncompressed message payload size in bytes. + * + * @return size in bytes. + */ + int size(); + /** * Get the de-serialized value of the message, according the configured {@link Schema}. * @@ -217,4 +224,12 @@ public interface Message { * @return the name of cluster, from which the message is replicated. */ String getReplicatedFrom(); + + /** + * Release a message back to the pool. This is required only if the consumer was created with the option to pool + * messages, otherwise it will have no effect. + * + * @since 2.8.0 + */ + void release(); } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java index a9df5cc18b9f6..69fcb00ee2a93 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.api; +import static org.apache.pulsar.client.internal.DefaultImplementation.getBytes; import java.nio.ByteBuffer; import java.sql.Time; import java.sql.Timestamp; @@ -120,6 +121,22 @@ default T decode(byte[] bytes, byte[] schemaVersion) { return decode(bytes); } + /** + * Decode a ByteBuffer into an object using a given version.
+ * + * @param data + * the ByteBuffer to decode + * @param schemaVersion + * the schema version to decode the object. null indicates using latest version. + * @return the deserialized object + */ + default T decode(ByteBuffer data, byte[] schemaVersion) { + if (data == null) { + return null; + } + return decode(getBytes(data), schemaVersion); + } + /** * @return an object that represents the Schema associated metadata */ diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java index 47b841056437e..dcee6a3fc0606 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java @@ -494,4 +494,24 @@ public static BatcherBuilder newKeyBasedBatcherBuilder() { () -> (BatcherBuilder) getConstructor("org.apache.pulsar.client.impl.KeyBasedBatcherBuilder") .newInstance()); } + + /** + * Retrieves ByteBuffer data into byte[]. + * + * @param byteBuffer + * @return + */ + public static byte[] getBytes(ByteBuffer byteBuffer) { + if (byteBuffer == null) { + return null; + } + if (byteBuffer.hasArray() && byteBuffer.arrayOffset() == 0 + && byteBuffer.array().length == byteBuffer.remaining()) { + return byteBuffer.array(); + } + // Direct buffer is not backed by array and it needs to be read from direct memory + byte[] array = new byte[byteBuffer.remaining()]; + byteBuffer.get(array); + return array; + } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java index f5c9d03b89cae..0e4f41888aa22 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java @@ -19,7 +19,7 @@ package org.apache.pulsar.client.cli; import static org.apache.commons.lang3.StringUtils.isNotBlank; - +import static org.apache.pulsar.client.internal.DefaultImplementation.getBytes; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; import com.beust.jcommander.Parameters; @@ -31,6 +31,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.URI; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; @@ -122,7 +123,9 @@ public class CmdConsume { @Parameter(names = { "-st", "--schema-type"}, description = "Set a schema type on the consumer, it can be 'bytes' or 'auto_consume'") private String schematype = "bytes"; - + @Parameter(names = { "-pm", "--pool-messages" }, description = "Use the pooled message") + private boolean poolMessages = true; + private ClientBuilder clientBuilder; private Authentication authentication; private String serviceURL; @@ -171,6 +174,8 @@ private String interpretMessage(Message message, boolean displayHex) throws I } else if (value instanceof GenericRecord) { Map asMap = genericRecordToMap((GenericRecord) value); data = asMap.toString(); + } else if (value instanceof ByteBuffer) { + data = new String(getBytes((ByteBuffer) value)); } else { data = value.toString(); } @@ -233,7 +238,7 @@ private int consume(String topic) { try { ConsumerBuilder builder; PulsarClient client = clientBuilder.build(); - Schema schema = Schema.BYTES; + Schema schema = poolMessages ? Schema.BYTEBUFFER : Schema.BYTES; if ("auto_consume".equals(schematype)) { schema = Schema.AUTO_CONSUME(); } else if (!"bytes".equals(schematype)) { @@ -243,7 +248,8 @@ private int consume(String topic) { .subscriptionName(this.subscriptionName) .subscriptionType(subscriptionType) .subscriptionMode(subscriptionMode) - .subscriptionInitialPosition(subscriptionInitialPosition); + .subscriptionInitialPosition(subscriptionInitialPosition) + .poolMessages(poolMessages); if (isRegex) { builder.topicsPattern(Pattern.compile(topic)); @@ -275,15 +281,19 @@ private int consume(String topic) { if (msg == null) { LOG.debug("No message to consume after waiting for 5 seconds."); } else { - numMessagesConsumed += 1; - if (!hideContent) { - System.out.println(MESSAGE_BOUNDARY); - String output = this.interpretMessage(msg, displayHex); - System.out.println(output); - } else if (numMessagesConsumed % 1000 == 0) { - System.out.println("Received " + numMessagesConsumed + " messages"); + try { + numMessagesConsumed += 1; + if (!hideContent) { + System.out.println(MESSAGE_BOUNDARY); + String output = this.interpretMessage(msg, displayHex); + System.out.println(output); + } else if (numMessagesConsumed % 1000 == 0) { + System.out.println("Received " + numMessagesConsumed + " messages"); + } + consumer.acknowledge(msg); + } finally { + msg.release(); } - consumer.acknowledge(msg); } } client.close(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 28c248fe674ac..dc1c858455038 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -908,8 +908,7 @@ protected boolean hasPendingBatchReceive() { } protected void increaseIncomingMessageSize(final Message message) { - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet( - this, message.getData() == null ? 0 : message.getData().length); + INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, message.size()); } protected void resetIncomingMessageSize() { @@ -917,14 +916,20 @@ protected void resetIncomingMessageSize() { } protected void decreaseIncomingMessageSize(final Message message) { - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, - (message.getData() != null) ? -message.getData().length : 0); + INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.size()); } public long getIncomingMessageSize() { return INCOMING_MESSAGES_SIZE_UPDATER.get(this); } + protected void clearIncomingMessages() { + // release messages if they are pooled messages + incomingMessages.forEach(Message::release); + incomingMessages.clear(); + resetIncomingMessageSize(); + } + protected abstract void completeOpBatchReceive(OpBatchReceive op); private ExecutorService getExecutor(Message msg) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 338fe47968fc7..76b53ec791aff 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -458,4 +458,9 @@ public ConsumerBuilder expireTimeOfIncompleteChunkedMessage(long duration, Ti return this; } + @Override + public ConsumerBuilder poolMessages(boolean poolMessages) { + conf.setPoolMessages(poolMessages); + return this; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index ab1d44e4bd847..b5e587f664d78 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -184,6 +184,7 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final BlockingQueue pendingChunkedMessageUuidQueue; private final boolean createTopicIfDoesNotExist; + private final boolean poolMessages; private final AtomicReference clientCnxUsedForConsumerRegistration = new AtomicReference<>(); @@ -252,6 +253,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat this.pendingChunkedMessageUuidQueue = new GrowableArrayBlockingQueue<>(); this.expireTimeOfIncompleteChunkedMessageMillis = conf.getExpireTimeOfIncompleteChunkedMessageMillis(); this.autoAckOldestChunkedMessageOnQueueFull = conf.isAutoAckOldestChunkedMessageOnQueueFull(); + this.poolMessages = conf.isPoolMessages(); if (client.getConfiguration().getStatsIntervalSeconds() > 0) { stats = new ConsumerStatsRecorderImpl(client, conf, this); @@ -835,7 +837,8 @@ private BatchMessageIdImpl clearReceiverQueue() { previousMessage = new BatchMessageIdImpl(nextMessageInQueue.getLedgerId(), nextMessageInQueue.getEntryId() - 1, nextMessageInQueue.getPartitionIndex(), -1); } - + // release messages if they are pooled messages + currentMessageQueue.forEach(Message::release); return previousMessage; } else if (!lastDequeuedMessageId.equals(MessageId.earliest)) { // If the queue was empty we need to restart from the message just after the last one that has been dequeued @@ -1044,8 +1047,9 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List ac return; } - final MessageImpl message = new MessageImpl<>(topicName.toString(), msgId, msgMetadata, - uncompressedPayload, createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount); + final MessageImpl message = MessageImpl.create(topicName.toString(), msgId, msgMetadata, + uncompressedPayload, createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount, + poolMessages); uncompressedPayload.release(); // Enqueue the message so that it can be retrieved when application calls receive() @@ -1264,9 +1268,9 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex(), i, batchSize, acker); - final MessageImpl message = new MessageImpl<>(topicName.toString(), batchMessageIdImpl, + final MessageImpl message = MessageImpl.create(topicName.toString(), batchMessageIdImpl, msgMetadata, singleMessageMetadata, singleMessagePayload, - createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount); + createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount, poolMessages); if (possibleToDeadLetter != null) { possibleToDeadLetter.add(message); } @@ -1542,8 +1546,7 @@ public void redeliverUnacknowledgedMessages() { int currentSize = 0; synchronized (this) { currentSize = incomingMessages.size(); - incomingMessages.clear(); - resetIncomingMessageSize(); + clearIncomingMessages(); unAckedMessageTracker.clear(); } cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId), cnx.ctx().voidPromise()); @@ -1566,8 +1569,8 @@ public void redeliverUnacknowledgedMessages() { public int clearIncomingMessagesAndGetMessageNumber() { int messagesNumber = incomingMessages.size(); - incomingMessages.clear(); - resetIncomingMessageSize(); + incomingMessages.forEach(Message::release); + clearIncomingMessages(); unAckedMessageTracker.clear(); return messagesNumber; } @@ -1789,8 +1792,7 @@ private CompletableFuture seekAsyncInternal(long requestId, ByteBuf seek, duringSeek.set(true); lastDequeuedMessageId = MessageId.earliest; - incomingMessages.clear(); - resetIncomingMessageSize(); + clearIncomingMessages(); seekFuture.complete(null); }).exceptionally(e -> { log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage()); @@ -2113,6 +2115,7 @@ private int removeExpiredMessagesFromQueue(Set messageIds) { messageIds.add(id); break; } + message.release(); message = incomingMessages.poll(); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java index f87c463f0f506..aee60b98741a8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java @@ -175,7 +175,7 @@ private void init(ConsumerConfigurationData conf) { public void updateNumMsgsReceived(Message message) { if (message != null) { numMsgsReceived.increment(); - numBytesReceived.add(message.getData() == null ? 0 : message.getData().length); + numBytesReceived.add(message.size()); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index 350210c099514..8ed18fe2c91fc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -21,11 +21,13 @@ import static com.google.common.base.Preconditions.checkNotNull; import static java.nio.charset.StandardCharsets.UTF_8; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.Recycler; +import io.netty.util.ReferenceCountUtil; import io.netty.util.Recycler.Handle; import java.io.IOException; @@ -59,17 +61,20 @@ public class MessageImpl implements Message { private final MessageMetadata msgMetadata; private ClientCnx cnx; private ByteBuf payload; + private Schema schema; private SchemaState schemaState = SchemaState.None; private Optional encryptionCtx = Optional.empty(); private String topic; // only set for incoming messages transient private Map properties; - private final int redeliveryCount; + private int redeliveryCount; private int uncompressedSize; private BrokerEntryMetadata brokerEntryMetadata; + private boolean poolMessage; + // Constructor for out-going message public static MessageImpl create(MessageMetadata msgMetadata, ByteBuffer payload, Schema schema) { @SuppressWarnings("unchecked") @@ -94,93 +99,138 @@ public static MessageImpl create(MessageMetadata msgMetadata, ByteBuffer MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf payload, Optional encryptionCtx, ClientCnx cnx, Schema schema) { - this(topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, 0); + this(topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, 0, false); } MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf payload, - Optional encryptionCtx, ClientCnx cnx, Schema schema, int redeliveryCount) { - this.msgMetadata = new MessageMetadata().copyFrom(msgMetadata); - this.messageId = messageId; - this.topic = topic; - this.cnx = cnx; - this.redeliveryCount = redeliveryCount; - - // Need to make a copy since the passed payload is using a ref-count buffer that we don't know when could - // release, since the Message is passed to the user. Also, the passed ByteBuf is coming from network and is - // backed by a direct buffer which we could not expose as a byte[] - this.payload = Unpooled.copiedBuffer(payload); - this.encryptionCtx = encryptionCtx; - - if (msgMetadata.getPropertiesCount() > 0) { - this.properties = Collections.unmodifiableMap(msgMetadata.getPropertiesList().stream() - .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue, - (oldValue,newValue) -> newValue))); + Optional encryptionCtx, ClientCnx cnx, Schema schema, int redeliveryCount, + boolean pooledMessage) { + this.msgMetadata = new MessageMetadata(); + init(this, topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, redeliveryCount, pooledMessage); + } + + public static MessageImpl create(String topic, MessageIdImpl messageId, MessageMetadata msgMetadata, + ByteBuf payload, Optional encryptionCtx, ClientCnx cnx, Schema schema, + int redeliveryCount, boolean pooledMessage) { + if (pooledMessage) { + @SuppressWarnings("unchecked") + MessageImpl msg = (MessageImpl) RECYCLER.get(); + init(msg, topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, redeliveryCount, + pooledMessage); + return msg; } else { - properties = Collections.emptyMap(); + return new MessageImpl<>(topic, messageId, msgMetadata, payload, encryptionCtx, cnx, schema, + redeliveryCount, pooledMessage); } - this.schema = schema; } MessageImpl(String topic, BatchMessageIdImpl batchMessageIdImpl, MessageMetadata msgMetadata, - SingleMessageMetadata singleMessageMetadata, ByteBuf payload, - Optional encryptionCtx, ClientCnx cnx, Schema schema) { - this(topic, batchMessageIdImpl, msgMetadata, singleMessageMetadata, payload, encryptionCtx, cnx, schema, 0); + SingleMessageMetadata singleMessageMetadata, ByteBuf payload, Optional encryptionCtx, + ClientCnx cnx, Schema schema) { + this(topic, batchMessageIdImpl, msgMetadata, singleMessageMetadata, payload, encryptionCtx, cnx, schema, 0, + false); } MessageImpl(String topic, BatchMessageIdImpl batchMessageIdImpl, MessageMetadata batchMetadata, - SingleMessageMetadata singleMessageMetadata, ByteBuf payload, - Optional encryptionCtx, ClientCnx cnx, Schema schema, int redeliveryCount) { - this.msgMetadata = new MessageMetadata().copyFrom(batchMetadata); - this.messageId = batchMessageIdImpl; - this.topic = topic; - this.cnx = cnx; - this.redeliveryCount = redeliveryCount; + SingleMessageMetadata singleMessageMetadata, ByteBuf payload, Optional encryptionCtx, + ClientCnx cnx, Schema schema, int redeliveryCount, boolean keepMessageInDirectMemory) { + this.msgMetadata = new MessageMetadata(); + init(this, topic, batchMessageIdImpl, batchMetadata, singleMessageMetadata, payload, encryptionCtx, cnx, schema, + redeliveryCount, keepMessageInDirectMemory); - this.payload = Unpooled.copiedBuffer(payload); - this.encryptionCtx = encryptionCtx; + } - if (singleMessageMetadata.getPropertiesCount() > 0) { - Map properties = Maps.newTreeMap(); - for (KeyValue entry : singleMessageMetadata.getPropertiesList()) { - properties.put(entry.getKey(), entry.getValue()); - } - this.properties = Collections.unmodifiableMap(properties); + public static MessageImpl create(String topic, BatchMessageIdImpl batchMessageIdImpl, + MessageMetadata batchMetadata, SingleMessageMetadata singleMessageMetadata, ByteBuf payload, + Optional encryptionCtx, ClientCnx cnx, Schema schema, int redeliveryCount, + boolean pooledMessage) { + if (pooledMessage) { + @SuppressWarnings("unchecked") + MessageImpl msg = (MessageImpl) RECYCLER.get(); + init(msg, topic, batchMessageIdImpl, batchMetadata, singleMessageMetadata, payload, encryptionCtx, cnx, + schema, redeliveryCount, pooledMessage); + return msg; } else { - properties = Collections.emptyMap(); - } - if (singleMessageMetadata.hasPartitionKey()) { - msgMetadata.setPartitionKeyB64Encoded(singleMessageMetadata.isPartitionKeyB64Encoded()) - .setPartitionKey(singleMessageMetadata.getPartitionKey()); - } else if (msgMetadata.hasPartitionKey()) { - msgMetadata.clearPartitionKey(); - msgMetadata.clearPartitionKeyB64Encoded(); + return new MessageImpl<>(topic, batchMessageIdImpl, batchMetadata, singleMessageMetadata, payload, + encryptionCtx, cnx, schema, redeliveryCount, pooledMessage); } + } - if (singleMessageMetadata.hasOrderingKey()) { - msgMetadata.setOrderingKey(singleMessageMetadata.getOrderingKey()); - } else if (msgMetadata.hasOrderingKey()) { - msgMetadata.clearOrderingKey(); - } + static void init(MessageImpl msg, String topic, MessageIdImpl messageId, MessageMetadata msgMetadata, + ByteBuf payload, Optional encryptionCtx, ClientCnx cnx, Schema schema, + int redeliveryCount, boolean poolMessage) { + init(msg, topic, null /* batchMessageIdImpl */, msgMetadata, null /* singleMessageMetadata */, payload, + encryptionCtx, cnx, schema, redeliveryCount, poolMessage); + msg.messageId = messageId; + } + + private static void init(MessageImpl msg, String topic, BatchMessageIdImpl batchMessageIdImpl, + MessageMetadata msgMetadata, SingleMessageMetadata singleMessageMetadata, ByteBuf payload, + Optional encryptionCtx, ClientCnx cnx, Schema schema, int redeliveryCount, + boolean poolMessage) { + msg.msgMetadata.clear(); + msg.msgMetadata.copyFrom(msgMetadata); + msg.messageId = batchMessageIdImpl; + msg.topic = topic; + msg.cnx = cnx; + msg.redeliveryCount = redeliveryCount; + msg.encryptionCtx = encryptionCtx; + msg.schema = schema; - if (singleMessageMetadata.hasEventTime()) { - msgMetadata.setEventTime(singleMessageMetadata.getEventTime()); - } + msg.poolMessage = poolMessage; + // If it's not pool message then need to make a copy since the passed payload is + // using a ref-count buffer that we don't know when could release, since the + // Message is passed to the user. Also, the passed ByteBuf is coming from network + // and is backed by a direct buffer which we could not expose as a byte[] + msg.payload = poolMessage ? payload.retain() : Unpooled.copiedBuffer(payload); + + if (singleMessageMetadata != null) { + if (singleMessageMetadata.getPropertiesCount() > 0) { + Map properties = Maps.newTreeMap(); + for (KeyValue entry : singleMessageMetadata.getPropertiesList()) { + properties.put(entry.getKey(), entry.getValue()); + } + msg.properties = Collections.unmodifiableMap(properties); + } else { + msg.properties = Collections.emptyMap(); + } + if (singleMessageMetadata.hasPartitionKey()) { + msg.msgMetadata.setPartitionKeyB64Encoded(singleMessageMetadata.isPartitionKeyB64Encoded()) + .setPartitionKey(singleMessageMetadata.getPartitionKey()); + } else if (msg.msgMetadata.hasPartitionKey()) { + msg.msgMetadata.clearPartitionKey(); + msg.msgMetadata.clearPartitionKeyB64Encoded(); + } - if (singleMessageMetadata.hasSequenceId()) { - msgMetadata.setSequenceId(singleMessageMetadata.getSequenceId()); - } + if (singleMessageMetadata.hasOrderingKey()) { + msg.msgMetadata.setOrderingKey(singleMessageMetadata.getOrderingKey()); + } else if (msg.msgMetadata.hasOrderingKey()) { + msg.msgMetadata.clearOrderingKey(); + } - if (singleMessageMetadata.hasNullValue()) { - msgMetadata.setNullValue(singleMessageMetadata.isNullValue()); - } + if (singleMessageMetadata.hasEventTime()) { + msg.msgMetadata.setEventTime(singleMessageMetadata.getEventTime()); + } - if (singleMessageMetadata.hasNullPartitionKey()) { - msgMetadata.setNullPartitionKey(singleMessageMetadata.isNullPartitionKey()); - } + if (singleMessageMetadata.hasSequenceId()) { + msg.msgMetadata.setSequenceId(singleMessageMetadata.getSequenceId()); + } - this.schema = schema; - } + if (singleMessageMetadata.hasNullValue()) { + msg.msgMetadata.setNullValue(singleMessageMetadata.isNullValue()); + } + if (singleMessageMetadata.hasNullPartitionKey()) { + msg.msgMetadata.setNullPartitionKey(singleMessageMetadata.isNullPartitionKey()); + } + } else if (msgMetadata.getPropertiesCount() > 0) { + msg.properties = Collections.unmodifiableMap(msgMetadata.getPropertiesList().stream().collect( + Collectors.toMap(KeyValue::getKey, KeyValue::getValue, (oldValue, newValue) -> newValue))); + } else { + msg.properties = Collections.emptyMap(); + } + } + public MessageImpl(String topic, String msgId, Map properties, byte[] payload, Schema schema, MessageMetadata msgMetadata) { this(topic, msgId, properties, Unpooled.wrappedBuffer(payload), schema, msgMetadata); @@ -311,6 +361,11 @@ public byte[] getData() { if (msgMetadata.isNullValue()) { return null; } + if (payload.isDirect()) { + byte[] data = new byte[payload.readableBytes()]; + payload.getBytes(payload.readerIndex(), data); + return data; + } if (payload.arrayOffset() == 0 && payload.capacity() == payload.array().length) { return payload.array(); } else { @@ -321,6 +376,14 @@ public byte[] getData() { } } + @Override + public int size() { + if (msgMetadata.isNullValue()) { + return 0; + } + return payload.readableBytes(); + } + public Schema getSchema() { return this.schema; } @@ -356,19 +419,11 @@ public T getValue() { } // check if the schema passed in from client supports schema versioning or not // this is an optimization to only get schema version when necessary - if (schema.supportSchemaVersioning()) { - byte[] schemaVersion = getSchemaVersion(); - if (null == schemaVersion) { - return schema.decode(getData()); - } else { - return schema.decode(getData(), schemaVersion); - } - } else { - return schema.decode(getData()); - } + return decode(schema.supportSchemaVersioning() ? getSchemaVersion() : null); } } + private KeyValueSchema getKeyValueSchema() { if (schema instanceof AutoConsumeSchema) { return (KeyValueSchema) ((AutoConsumeSchema) schema).getInternalSchema(); @@ -377,6 +432,19 @@ private KeyValueSchema getKeyValueSchema() { } } + + private T decode(byte[] schemaVersion) { + T value = poolMessage ? schema.decode(payload.nioBuffer(), schemaVersion) : null; + if (value != null) { + return value; + } + if (null == schemaVersion) { + return schema.decode(getData()); + } else { + return schema.decode(getData(), schemaVersion); + } + } + private T getKeyValueBySchemaVersion() { KeyValueSchema kvSchema = getKeyValueSchema(); byte[] schemaVersion = getSchemaVersion(); @@ -390,7 +458,7 @@ private T getKeyValueBySchemaVersion() { return (T) keyValue; } } else { - return schema.decode(getData(), schemaVersion); + return decode(schemaVersion); } } @@ -406,7 +474,7 @@ private T getKeyValue() { return (T) keyValue; } } else { - return schema.decode(getData()); + return decode(null); } } @@ -527,24 +595,42 @@ public ClientCnx getCnx() { } public void recycle() { - msgMetadata.clear(); + if (msgMetadata != null) { + msgMetadata.clear(); + } + if (brokerEntryMetadata != null) { + brokerEntryMetadata.clear(); + } + cnx = null; messageId = null; topic = null; payload = null; + encryptionCtx = null; + redeliveryCount = 0; + uncompressedSize = 0; properties = null; schema = null; schemaState = SchemaState.None; - brokerEntryMetadata = null; + poolMessage = false; if (recyclerHandle != null) { recyclerHandle.recycle(this); } } + @Override + public void release() { + if (poolMessage) { + ReferenceCountUtil.safeRelease(payload); + recycle(); + } + } + private MessageImpl(Handle> recyclerHandle) { this.recyclerHandle = recyclerHandle; this.redeliveryCount = 0; this.msgMetadata = new MessageMetadata(); + this.brokerEntryMetadata = new BrokerEntryMetadata(); } private Handle> recyclerHandle; @@ -590,7 +676,15 @@ void setSchemaState(SchemaState schemaState) { this.schemaState = schemaState; } - + /** + * used only for unit-test to validate payload's state and ref-cnt. + * + * @return + */ + @VisibleForTesting + ByteBuf getPayload() { + return payload; + } enum SchemaState { None, Ready, Broken diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java index c56694e2759ea..4ff23eb46f5b6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java @@ -49,7 +49,7 @@ protected boolean canAdd(Message message) { return false; } - if (maxSizeOfMessages > 0 && currentSizeOfMessages + message.getData().length > maxSizeOfMessages) { + if (maxSizeOfMessages > 0 && currentSizeOfMessages + message.size() > maxSizeOfMessages) { return false; } @@ -62,7 +62,7 @@ protected void add(Message message) { } Preconditions.checkArgument(canAdd(message), "No more space to add messages."); currentNumberOfMessages ++; - currentSizeOfMessages += message.getData().length; + currentSizeOfMessages += message.size(); messageList.add(message); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 8033c183df446..94cb7d70d9c63 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -602,8 +602,7 @@ public void redeliverUnacknowledgedMessages() { consumer.redeliverUnacknowledgedMessages(); consumer.unAckedChunkedMessageIdSequenceMap.clear(); }); - incomingMessages.clear(); - resetIncomingMessageSize(); + clearIncomingMessages(); unAckedMessageTracker.clear(); resumeReceivingFromPausedConsumersIfNeeded(); @@ -687,8 +686,7 @@ public CompletableFuture seekAsync(MessageId messageId) { consumers.values().forEach(consumerImpl -> futures.add(consumerImpl.seekAsync(targetMessageId))); unAckedMessageTracker.clear(); - incomingMessages.clear(); - resetIncomingMessageSize(); + clearIncomingMessages(); return FutureUtil.waitForAll(futures); } @@ -777,6 +775,7 @@ private void removeExpiredMessagesFromQueue(Set messageIds) { messageIds.add(messageId); break; } + message.release(); message = incomingMessages.poll(); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java index e0c0ef14dda10..8c94eeaa15280 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java @@ -93,6 +93,11 @@ public byte[] getData() { return msg.getData(); } + @Override + public int size() { + return msg.size(); + } + @Override public long getPublishTime() { return msg.getPublishTime(); @@ -184,4 +189,9 @@ public Schema getSchema() { } return null; } + + @Override + public void release() { + msg.release(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java index d353d38b55d46..98d5b481d9b03 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java @@ -85,6 +85,7 @@ private Message fetchSingleMessageFromBroker() throws PulsarClientException { // Just being cautious if (incomingMessages.size() > 0) { log.error("The incoming message queue should never be greater than 0 when Queue size is 0"); + incomingMessages.forEach(Message::release); incomingMessages.clear(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 32fa5e8a3df11..a39ac21164bf4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -146,6 +146,8 @@ public int getMaxPendingChuckedMessage() { private boolean batchIndexAckEnabled = false; private boolean ackReceiptEnabled = false; + + private boolean poolMessages = false; public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit timeUnit) { checkArgument(interval > 0, "interval needs to be > 0"); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java index 255a49152a746..3205a642624e2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java @@ -42,6 +42,7 @@ void validate(ByteBuf byteBuf) { /** * Decode a byteBuf into an object using the schema definition and deserializer implementation + *

Do not modify reader/writer index of ByteBuf so, it can be reused to access correct data. * * @param byteBuf * the byte buffer to decode @@ -61,7 +62,7 @@ public T decode(ByteBuf byteBuf, byte[] schemaVersion) { // ignore version by default (most of the primitive schema implementations ignore schema version) return decode(byteBuf); } - + @Override public Schema clone() { return this; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java index 4a7993136557a..c560f0e76aa0b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java @@ -76,17 +76,22 @@ public ByteBuffer decode(byte[] data) { } @Override - public ByteBuffer decode(ByteBuf byteBuf) { - if (null == byteBuf) { + public ByteBuffer decode(ByteBuf byteBuffer) { + if (null == byteBuffer) { return null; } else { - int size = byteBuf.readableBytes(); + int size = byteBuffer.readableBytes(); byte[] bytes = new byte[size]; - byteBuf.readBytes(bytes); + byteBuffer.readBytes(bytes); return ByteBuffer.wrap(bytes); } } + @Override + public ByteBuffer decode(ByteBuffer byteBuffer, byte[] schemaVersion) { + return byteBuffer; + } + @Override public SchemaInfo getSchemaInfo() { return SCHEMA_INFO; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java index 9447b00dcfa21..9c7ec373a2820 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java @@ -60,7 +60,7 @@ public byte[] decode(ByteBuf byteBuf) { int size = byteBuf.readableBytes(); byte[] bytes = new byte[size]; - byteBuf.readBytes(bytes, 0, size); + byteBuf.getBytes(byteBuf.readerIndex(), bytes); return bytes; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java index 8a6c4fbc8a40e..aa86a1920c538 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java @@ -76,8 +76,8 @@ public LocalDateTime decode(ByteBuf byteBuf) { if (null == byteBuf) { return null; } - long epochDay = byteBuf.readLong(); - long nanoOfDay = byteBuf.readLong(); + long epochDay = byteBuf.getLong(0); + long nanoOfDay = byteBuf.getLong(8); return LocalDateTime.of(LocalDate.ofEpochDay(epochDay), LocalTime.ofNanoOfDay(nanoOfDay)); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java index 7e813acf11d82..7e57f6ca6ed86 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java @@ -120,7 +120,7 @@ public String decode(ByteBuf byteBuf) { bytes = new byte[size * 2]; tmpBuffer.set(bytes); } - byteBuf.readBytes(bytes, 0, size); + byteBuf.getBytes(byteBuf.readerIndex(), bytes, 0, size); return new String(bytes, 0, size, charset); } diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index ca6b07e14c5a0..034b6134f4e17 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -26,6 +26,9 @@ import com.beust.jcommander.ParameterException; import com.beust.jcommander.Parameters; import java.io.FileInputStream; +import java.lang.management.BufferPoolMXBean; +import java.lang.management.ManagementFactory; +import java.nio.ByteBuffer; import java.text.DecimalFormat; import java.util.Collections; import java.util.List; @@ -41,9 +44,11 @@ import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.stats.JvmMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -168,6 +173,9 @@ static class Arguments { @Parameter(names = {"--batch-index-ack" }, description = "Enable or disable the batch index acknowledgment") public boolean batchIndexAck = false; + + @Parameter(names = { "-pm", "--pool-messages" }, description = "Use the pooled message") + private boolean poolMessages = true; } public static void main(String[] args) throws Exception { @@ -271,7 +279,7 @@ public static void main(String[] args) throws Exception { final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null; long startTime = System.nanoTime(); long testEndTime = startTime + (long) (arguments.testTime * 1e9); - MessageListener listener = (consumer, msg) -> { + MessageListener listener = (consumer, msg) -> { if (arguments.testTime > 0) { if (System.nanoTime() > testEndTime) { log.info("------------------- DONE -----------------------"); @@ -280,10 +288,10 @@ public static void main(String[] args) throws Exception { } } messagesReceived.increment(); - bytesReceived.add(msg.getData().length); + bytesReceived.add(msg.size()); totalMessagesReceived.increment(); - totalBytesReceived.add(msg.getData().length); + totalBytesReceived.add(msg.size()); if (limiter != null) { limiter.acquire(); @@ -296,6 +304,10 @@ public static void main(String[] args) throws Exception { } consumer.acknowledgeAsync(msg); + + if(arguments.poolMessages) { + msg.release(); + } }; ClientBuilder clientBuilder = PulsarClient.builder() // @@ -318,8 +330,8 @@ public static void main(String[] args) throws Exception { PulsarClient pulsarClient = clientBuilder.build(); - List>> futures = Lists.newArrayList(); - ConsumerBuilder consumerBuilder = pulsarClient.newConsumer() // + List>> futures = Lists.newArrayList(); + ConsumerBuilder consumerBuilder = pulsarClient.newConsumer(Schema.BYTEBUFFER) // .messageListener(listener) // .receiverQueueSize(arguments.receiverQueueSize) // .maxTotalReceiverQueueSizeAcrossPartitions(arguments.maxTotalReceiverQueueSizeAcrossPartitions) @@ -328,6 +340,7 @@ public static void main(String[] args) throws Exception { .subscriptionInitialPosition(arguments.subscriptionInitialPosition) .autoAckOldestChunkedMessageOnQueueFull(arguments.autoAckOldestChunkedMessageOnQueueFull) .enableBatchIndexAcknowledgment(arguments.batchIndexAck) + .poolMessages(arguments.poolMessages) .replicateSubscriptionState(arguments.replicatedSubscription); if (arguments.maxPendingChunkedMessage > 0) { consumerBuilder.maxPendingChunkedMessage(arguments.maxPendingChunkedMessage); @@ -355,7 +368,7 @@ public static void main(String[] args) throws Exception { } } - for (Future> future : futures) { + for (Future> future : futures) { future.get(); }