Skip to content

Commit

Permalink
[pulsar-client] Add message chunking configuration for reader (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored Apr 15, 2022
1 parent 0ed1afb commit d8923b8
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.MessageImpl.SchemaState;
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
Expand Down Expand Up @@ -217,6 +218,8 @@ public void testLargeMessageAckTimeOut(boolean ackReceiptEnabled) throws Excepti
.isAckReceiptEnabled(ackReceiptEnabled)
.ackTimeout(5, TimeUnit.SECONDS).subscribe();

Reader<byte[]> reader = pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();

ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName);

Producer<byte[]> producer = producerBuilder.enableChunking(true).enableBatching(false).create();
Expand All @@ -232,6 +235,15 @@ public void testLargeMessageAckTimeOut(boolean ackReceiptEnabled) throws Excepti

Message<byte[]> msg = null;
Set<String> messageSet = Sets.newHashSet();
for (int i = 0; i < totalMessages; i++) {
msg = reader.readNext(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.info("Received message: [{}]", receivedMessage);
String expectedMessage = publishedMessages.get(i);
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}

messageSet.clear();
for (int i = 0; i < totalMessages; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
Expand Down Expand Up @@ -268,6 +280,7 @@ public void testLargeMessageAckTimeOut(boolean ackReceiptEnabled) throws Excepti

consumer.close();
producer.close();
reader.close();
log.info("-- Exiting {} test --", methodName);

}
Expand Down Expand Up @@ -384,6 +397,8 @@ public void testExpireIncompleteChunkMessage() throws Exception{
producer.cnx().registerProducer(producerId, producer); // registered spy ProducerImpl
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-sub").subscribe();
ReaderImpl<byte[]> reader = (ReaderImpl<byte[]>) pulsarClient.newReader().topic(topicName)
.startMessageId(MessageId.earliest).create();

TypedMessageBuilderImpl<byte[]> msg = (TypedMessageBuilderImpl<byte[]>) producer.newMessage().value("message-1".getBytes());
ByteBuf payload = Unpooled.wrappedBuffer(msg.getContent());
Expand All @@ -397,17 +412,22 @@ public void testExpireIncompleteChunkMessage() throws Exception{
producer.processOpSendMsg(op);

retryStrategically((test) -> {
return consumer.chunkedMessagesMap.size() > 0;
return reader.getConsumer().chunkedMessagesMap.size() > 0 && consumer.chunkedMessagesMap.size() > 0;
}, 5, 500);
assertEquals(consumer.chunkedMessagesMap.size(), 1);
assertEquals(reader.getConsumer().chunkedMessagesMap.size(), 1);

consumer.expireTimeOfIncompleteChunkedMessageMillis = 1;
reader.getConsumer().expireTimeOfIncompleteChunkedMessageMillis = 1;
Thread.sleep(10);
consumer.removeExpireIncompleteChunkedMessages();
reader.getConsumer().removeExpireIncompleteChunkedMessages();
assertEquals(consumer.chunkedMessagesMap.size(), 0);
assertEquals(reader.getConsumer().chunkedMessagesMap.size(), 0);

producer.close();
consumer.close();
reader.close();
producer = null; // clean reference of mocked producer
}

Expand Down Expand Up @@ -507,6 +527,20 @@ public void testSeekChunkMessages() throws PulsarClientException {
log.info("-- Exiting {} test --", methodName);
}

@Test
public void testReaderChunkingConfiguration() throws Exception {
log.info("-- Starting {} test --", methodName);
final String topicName = "persistent://my-property/my-ns/my-topic1";
ReaderImpl<byte[]> reader = (ReaderImpl<byte[]>) pulsarClient.newReader().topic(topicName)
.startMessageId(MessageId.earliest).maxPendingChunkedMessage(12)
.autoAckOldestChunkedMessageOnQueueFull(true)
.expireTimeOfIncompleteChunkedMessage(12, TimeUnit.MILLISECONDS).create();
ConsumerImpl<byte[]> consumer = reader.getConsumer();
assertEquals(consumer.conf.getMaxPendingChunkedMessage(), 12);
assertTrue(consumer.conf.isAutoAckOldestChunkedMessageOnQueueFull());
assertEquals(consumer.conf.getExpireTimeOfIncompleteChunkedMessageMillis(), 12);
}

private String createMessagePayload(int size) {
StringBuilder str = new StringBuilder();
Random rand = new Random();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,4 +320,51 @@ public interface ReaderBuilder<T> extends Cloneable {
* @return the reader builder instance
*/
ReaderBuilder<T> intercept(ReaderInterceptor<T>... interceptors);

/**
* Consumer buffers chunk messages into memory until it receives all the chunks of the original message. While
* consuming chunk-messages, chunks from same message might not be contiguous in the stream and they might be mixed
* with other messages' chunks. so, consumer has to maintain multiple buffers to manage chunks coming from different
* messages. This mainly happens when multiple publishers are publishing messages on the topic concurrently or
* publisher failed to publish all chunks of the messages.
*
* <pre>
* eg: M1-C1, M2-C1, M1-C2, M2-C2
* Here, Messages M1-C1 and M1-C2 belong to original message M1, M2-C1 and M2-C2 messages belong to M2 message.
* </pre>
* Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it can be
* guarded by providing this @maxPendingChunkedMessage threshold. Once, consumer reaches this threshold, it drops
* the outstanding unchunked-messages by silently acking or asking broker to redeliver later by marking it unacked.
* This behavior can be controlled by configuration: @autoAckOldestChunkedMessageOnQueueFull
*
* The default value is 10.
*
* @param maxPendingChunkedMessage
* @return
*/
ReaderBuilder<T> maxPendingChunkedMessage(int maxPendingChunkedMessage);

/**
* Buffering large number of outstanding uncompleted chunked messages can create memory pressure and it can be
* guarded by providing this @maxPendingChunkedMessage threshold. Once, consumer reaches this threshold, it drops
* the outstanding unchunked-messages by silently acking if autoAckOldestChunkedMessageOnQueueFull is true else it
* marks them for redelivery.
*
* @default false
*
* @param autoAckOldestChunkedMessageOnQueueFull
* @return
*/
ReaderBuilder<T> autoAckOldestChunkedMessageOnQueueFull(boolean autoAckOldestChunkedMessageOnQueueFull);

/**
* If producer fails to publish all the chunks of a message then consumer can expire incomplete chunks if consumer
* won't be able to receive all chunks in expire times (default 1 minute).
*
* @param duration
* @param unit
* @return
*/
ReaderBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit);

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ public MultiTopicsReaderImpl(PulsarClientImpl client, ReaderConfigurationData<T>
consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted());
consumerConfiguration.setPoolMessages(readerConfiguration.isPoolMessages());

// chunking configuration
consumerConfiguration.setMaxPendingChunkedMessage(readerConfiguration.getMaxPendingChunkedMessage());
consumerConfiguration.setAutoAckOldestChunkedMessageOnQueueFull(
readerConfiguration.isAutoAckOldestChunkedMessageOnQueueFull());
consumerConfiguration.setExpireTimeOfIncompleteChunkedMessageMillis(
readerConfiguration.getExpireTimeOfIncompleteChunkedMessageMillis());

if (readerConfiguration.getReaderListener() != null) {
ReaderListener<T> readerListener = readerConfiguration.getReaderListener();
consumerConfiguration.setMessageListener(new MessageListener<T>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,4 +253,22 @@ public ReaderBuilder<T> intercept(ReaderInterceptor<T>... interceptors) {
return this;
}

@Override
public ReaderBuilder<T> maxPendingChunkedMessage(int maxPendingChunkedMessage) {
conf.setMaxPendingChunkedMessage(maxPendingChunkedMessage);
return this;
}

@Override
public ReaderBuilder<T> autoAckOldestChunkedMessageOnQueueFull(boolean autoAckOldestChunkedMessageOnQueueFull) {
conf.setAutoAckOldestChunkedMessageOnQueueFull(autoAckOldestChunkedMessageOnQueueFull);
return this;
}

@Override
public ReaderBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit) {
conf.setExpireTimeOfIncompleteChunkedMessageMillis(unit.toMillis(duration));
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ public ReaderImpl(PulsarClientImpl client, ReaderConfigurationData<T> readerConf
consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted());
consumerConfiguration.setPoolMessages(readerConfiguration.isPoolMessages());

// chunking configuration
consumerConfiguration.setMaxPendingChunkedMessage(readerConfiguration.getMaxPendingChunkedMessage());
consumerConfiguration.setAutoAckOldestChunkedMessageOnQueueFull(
readerConfiguration.isAutoAckOldestChunkedMessageOnQueueFull());
consumerConfiguration.setExpireTimeOfIncompleteChunkedMessageMillis(
readerConfiguration.getExpireTimeOfIncompleteChunkedMessageMillis());

// Reader doesn't need any batch receiving behaviours
// disable the batch receive timer for the ConsumerImpl instance wrapped by the ReaderImpl
consumerConfiguration.setBatchReceivePolicy(DISABLED_BATCH_RECEIVE_POLICY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Data;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
Expand Down Expand Up @@ -67,6 +68,13 @@ public class ReaderConfigurationData<T> implements Serializable, Cloneable {

private transient List<ReaderInterceptor<T>> readerInterceptorList;

// max pending chunked message to avoid sending incomplete message into the queue and memory
private int maxPendingChunkedMessage = 10;

private boolean autoAckOldestChunkedMessageOnQueueFull = false;

private long expireTimeOfIncompleteChunkedMessageMillis = TimeUnit.MINUTES.toMillis(1);

@JsonIgnore
public String getTopicName() {
if (topicNames.size() > 1) {
Expand Down

0 comments on commit d8923b8

Please sign in to comment.