Skip to content

Commit

Permalink
PIP 83 : Pulsar client: Message consumption with pooled buffer (apach…
Browse files Browse the repository at this point in the history
…e#10184)

fix api, buffer-access, duplicate code
  • Loading branch information
rdhabalia authored Apr 20, 2021
1 parent 27dd63f commit ef06691
Show file tree
Hide file tree
Showing 23 changed files with 445 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4029,4 +4029,4 @@ public void testPartitionTopicsOnSeparateListner() throws Exception {
blockedMessageLatch.countDown();
log.info("-- Exiting {} test --", methodName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.UUID.randomUUID;
import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
import static org.mockito.Mockito.any;
Expand All @@ -30,12 +31,14 @@
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import io.netty.buffer.ByteBuf;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
Expand All @@ -55,7 +58,6 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Cleanup;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
Expand Down Expand Up @@ -132,6 +134,11 @@ public Object[][] subType() {
return new Object[][] { { SubscriptionType.Shared }, { SubscriptionType.Failover } };
}

@DataProvider(name = "booleanFlagProvider")
public Object[][] booleanFlagProvider() {
return new Object[][] { { true }, { false } };
}

/**
* Verifies unload namespace-bundle doesn't close shared connection used by other namespace-bundle.
*
Expand Down Expand Up @@ -918,4 +925,98 @@ public void testJsonSchemaProducerConsumerWithSpecifiedReaderAndWriter() throws
private static final class TestMessageObject{
private String value;
}
}

/**
* It validates pooled message consumption for batch and non-batch messages.
*
* @throws Exception
*/
@Test(dataProvider = "booleanFlagProvider")
public void testConsumerWithPooledMessages(boolean isBatchingEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);

@Cleanup
PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();

final String topic = "persistent://my-property/my-ns/testConsumerWithPooledMessages" + isBatchingEnabled;

@Cleanup
Consumer<ByteBuffer> consumer = newPulsarClient.newConsumer(Schema.BYTEBUFFER).topic(topic)
.subscriptionName("my-sub").poolMessages(true).subscribe();

@Cleanup
Producer<byte[]> producer = newPulsarClient.newProducer().topic(topic).enableBatching(isBatchingEnabled).create();

final int numMessages = 100;
for (int i = 0; i < numMessages; i++) {
producer.newMessage().value(("value-" + i).getBytes(UTF_8))
.eventTime((i + 1) * 100L).sendAsync();
}
producer.flush();

// Reuse pre-allocated pooled buffer to process every message
byte[] val = null;
int size = 0;
for (int i = 0; i < numMessages; i++) {
Message<ByteBuffer> msg = consumer.receive();
ByteBuffer value;
try {
value = msg.getValue();
int capacity = value.remaining();
// expand the size of buffer if needed
if (capacity > size) {
val = new byte[capacity];
size = capacity;
}
// read message into pooled buffer
value.get(val, 0, capacity);
// process the message
assertEquals(("value-" + i), new String(val, 0, capacity));
} finally {
msg.release();
}
}
consumer.close();
producer.close();
}

/**
* It verifies that expiry/redelivery of messages relesaes the messages without leak.
*
* @param isBatchingEnabled
* @throws Exception
*/
@Test(dataProvider = "booleanFlagProvider")
public void testPooledMessageWithAckTimeout(boolean isBatchingEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);

@Cleanup
PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();

final String topic = "persistent://my-property/my-ns/testPooledMessageWithAckTimeout" + isBatchingEnabled;

@Cleanup
ConsumerImpl<ByteBuffer> consumer = (ConsumerImpl<ByteBuffer>) newPulsarClient.newConsumer(Schema.BYTEBUFFER)
.topic(topic).subscriptionName("my-sub").poolMessages(true).subscribe();

@Cleanup
Producer<byte[]> producer = newPulsarClient.newProducer().topic(topic).enableBatching(isBatchingEnabled)
.create();

final int numMessages = 100;
for (int i = 0; i < numMessages; i++) {
producer.newMessage().value(("value-" + i).getBytes(UTF_8)).eventTime((i + 1) * 100L).sendAsync();
}
producer.flush();

retryStrategically((test) -> consumer.incomingMessages.peek() != null, 5, 500);
MessageImpl<ByteBuffer> msg = (MessageImpl) consumer.incomingMessages.peek();
assertNotNull(msg);
ByteBuf payload = ((MessageImpl) msg).getPayload();
assertNotEquals(payload.refCnt(), 0);
consumer.redeliverUnacknowledgedMessages();
assertEquals(payload.refCnt(), 0);
consumer.close();
producer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -731,4 +731,14 @@ public interface ConsumerBuilder<T> extends Cloneable {
* @return
*/
ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit);

/**
* Enable pooling of messages and the underlying data buffers.
* <p/>
* When pooling is enabled, the application is responsible for calling Message.release() after the handling of every
* received message. If “release()” is not called on a received message, there will be a memory leak. If an
* application attempts to use and already “released” message, it might experience undefined behavior (eg: memory
* corruption, deserialization error, etc.).
*/
ConsumerBuilder<T> poolMessages(boolean poolMessages);
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ public interface Message<T> {
*/
byte[] getData();

/**
* Get the uncompressed message payload size in bytes.
*
* @return size in bytes.
*/
int size();

/**
* Get the de-serialized value of the message, according the configured {@link Schema}.
*
Expand Down Expand Up @@ -217,4 +224,12 @@ public interface Message<T> {
* @return the name of cluster, from which the message is replicated.
*/
String getReplicatedFrom();

/**
* Release a message back to the pool. This is required only if the consumer was created with the option to pool
* messages, otherwise it will have no effect.
*
* @since 2.8.0
*/
void release();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import static org.apache.pulsar.client.internal.DefaultImplementation.getBytes;
import java.nio.ByteBuffer;
import java.sql.Time;
import java.sql.Timestamp;
Expand Down Expand Up @@ -120,6 +121,22 @@ default T decode(byte[] bytes, byte[] schemaVersion) {
return decode(bytes);
}

/**
* Decode a ByteBuffer into an object using a given version. <br/>
*
* @param data
* the ByteBuffer to decode
* @param schemaVersion
* the schema version to decode the object. null indicates using latest version.
* @return the deserialized object
*/
default T decode(ByteBuffer data, byte[] schemaVersion) {
if (data == null) {
return null;
}
return decode(getBytes(data), schemaVersion);
}

/**
* @return an object that represents the Schema associated metadata
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,4 +494,24 @@ public static BatcherBuilder newKeyBasedBatcherBuilder() {
() -> (BatcherBuilder) getConstructor("org.apache.pulsar.client.impl.KeyBasedBatcherBuilder")
.newInstance());
}

/**
* Retrieves ByteBuffer data into byte[].
*
* @param byteBuffer
* @return
*/
public static byte[] getBytes(ByteBuffer byteBuffer) {
if (byteBuffer == null) {
return null;
}
if (byteBuffer.hasArray() && byteBuffer.arrayOffset() == 0
&& byteBuffer.array().length == byteBuffer.remaining()) {
return byteBuffer.array();
}
// Direct buffer is not backed by array and it needs to be read from direct memory
byte[] array = new byte[byteBuffer.remaining()];
byteBuffer.get(array);
return array;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.pulsar.client.cli;

import static org.apache.commons.lang3.StringUtils.isNotBlank;

import static org.apache.pulsar.client.internal.DefaultImplementation.getBytes;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
Expand All @@ -31,6 +31,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
Expand Down Expand Up @@ -122,7 +123,9 @@ public class CmdConsume {
@Parameter(names = { "-st", "--schema-type"}, description = "Set a schema type on the consumer, it can be 'bytes' or 'auto_consume'")
private String schematype = "bytes";


@Parameter(names = { "-pm", "--pool-messages" }, description = "Use the pooled message")
private boolean poolMessages = true;

private ClientBuilder clientBuilder;
private Authentication authentication;
private String serviceURL;
Expand Down Expand Up @@ -171,6 +174,8 @@ private String interpretMessage(Message<?> message, boolean displayHex) throws I
} else if (value instanceof GenericRecord) {
Map<String, Object> asMap = genericRecordToMap((GenericRecord) value);
data = asMap.toString();
} else if (value instanceof ByteBuffer) {
data = new String(getBytes((ByteBuffer) value));
} else {
data = value.toString();
}
Expand Down Expand Up @@ -233,7 +238,7 @@ private int consume(String topic) {
try {
ConsumerBuilder<?> builder;
PulsarClient client = clientBuilder.build();
Schema<?> schema = Schema.BYTES;
Schema<?> schema = poolMessages ? Schema.BYTEBUFFER : Schema.BYTES;
if ("auto_consume".equals(schematype)) {
schema = Schema.AUTO_CONSUME();
} else if (!"bytes".equals(schematype)) {
Expand All @@ -243,7 +248,8 @@ private int consume(String topic) {
.subscriptionName(this.subscriptionName)
.subscriptionType(subscriptionType)
.subscriptionMode(subscriptionMode)
.subscriptionInitialPosition(subscriptionInitialPosition);
.subscriptionInitialPosition(subscriptionInitialPosition)
.poolMessages(poolMessages);

if (isRegex) {
builder.topicsPattern(Pattern.compile(topic));
Expand Down Expand Up @@ -275,15 +281,19 @@ private int consume(String topic) {
if (msg == null) {
LOG.debug("No message to consume after waiting for 5 seconds.");
} else {
numMessagesConsumed += 1;
if (!hideContent) {
System.out.println(MESSAGE_BOUNDARY);
String output = this.interpretMessage(msg, displayHex);
System.out.println(output);
} else if (numMessagesConsumed % 1000 == 0) {
System.out.println("Received " + numMessagesConsumed + " messages");
try {
numMessagesConsumed += 1;
if (!hideContent) {
System.out.println(MESSAGE_BOUNDARY);
String output = this.interpretMessage(msg, displayHex);
System.out.println(output);
} else if (numMessagesConsumed % 1000 == 0) {
System.out.println("Received " + numMessagesConsumed + " messages");
}
consumer.acknowledge(msg);
} finally {
msg.release();
}
consumer.acknowledge(msg);
}
}
client.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -908,23 +908,28 @@ protected boolean hasPendingBatchReceive() {
}

protected void increaseIncomingMessageSize(final Message<?> message) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(
this, message.getData() == null ? 0 : message.getData().length);
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, message.size());
}

protected void resetIncomingMessageSize() {
INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0);
}

protected void decreaseIncomingMessageSize(final Message<?> message) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this,
(message.getData() != null) ? -message.getData().length : 0);
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.size());
}

public long getIncomingMessageSize() {
return INCOMING_MESSAGES_SIZE_UPDATER.get(this);
}

protected void clearIncomingMessages() {
// release messages if they are pooled messages
incomingMessages.forEach(Message::release);
incomingMessages.clear();
resetIncomingMessageSize();
}

protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);

private ExecutorService getExecutor(Message<T> msg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,4 +458,9 @@ public ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, Ti
return this;
}

@Override
public ConsumerBuilder<T> poolMessages(boolean poolMessages) {
conf.setPoolMessages(poolMessages);
return this;
}
}
Loading

0 comments on commit ef06691

Please sign in to comment.