Skip to content

Commit

Permalink
[pulsar-client] Process partitioned-topic messages on different liste…
Browse files Browse the repository at this point in the history
…ner-threads (apache#10017)
  • Loading branch information
rdhabalia authored Mar 24, 2021
1 parent 27d6625 commit 8568054
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3954,4 +3954,68 @@ public void testAccessAvroSchemaMetadata(Schema<MyBean> schema) throws Exception
}
assertEquals(1, res.getFields().size());
}

/**
* Test validates that consumer of partitioned-topic utilizes threads of all partitioned-consumers and slow-listener
* of one of the partition doesn't impact listener-processing of other partition.
* <p>
* Test starts consumer with 10 partitions where one of the partition listener gets blocked but that will not impact
* processing of other 9 partitions and they will be processed successfully.
*
* @throws Exception
*/
@Test(timeOut = 20000)
public void testPartitionTopicsOnSeparateListner() throws Exception {
log.info("-- Starting {} test --", methodName);

final String topicName = "persistent://my-property/my-ns/one-partitioned-topic";
final String subscriptionName = "my-sub-";

PulsarClient pulsarClient = PulsarClient.builder().listenerThreads(10).serviceUrl(lookupUrl.toString()).build();

// create partitioned topic
int partitions = 10;
admin.topics().createPartitionedTopic(topicName, partitions);
assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, partitions);

// each partition
int totalMessages = partitions * 2;
CountDownLatch latch = new CountDownLatch(totalMessages - 2);
CountDownLatch blockedMessageLatch = new CountDownLatch(1);
AtomicInteger count = new AtomicInteger();

Set<String> listenerThreads = Sets.newConcurrentHashSet();
MessageListener<byte[]> messageListener = (c, m) -> {
if (count.incrementAndGet() == 1) {
try {
// blocking one of the partition's listener thread will not impact other topics
blockedMessageLatch.await();
} catch (InterruptedException e) {
// Ok
}
} else {
latch.countDown();
}
listenerThreads.add(Thread.currentThread().getName());
};
@Cleanup
Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName).messageListener(messageListener)
.subscriptionName(subscriptionName + 1).consumerName("aaa").subscribe();
log.info("Consumer1 created. topic: {}", consumer1.getTopic());

@Cleanup
Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).enableBatching(false).create();
log.info("Producer1 created. topic: {}", producer1.getTopic());

for (int i = 0; i < totalMessages; i++) {
producer1.newMessage().value(("one-partitioned-topic-value-producer1-" + i).getBytes(UTF_8)).send();
}
latch.await();
assertEquals(listenerThreads.size(), partitions - 1);
// unblock the listener thread
blockedMessageLatch.countDown();
pulsarClient.close();
log.info("-- Exiting {} test --", methodName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
Expand All @@ -56,7 +57,6 @@
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
Expand Down Expand Up @@ -233,7 +233,7 @@ protected CompletableFuture<Message<T>> pollPendingReceive() {
}

protected void completePendingReceive(CompletableFuture<Message<T>> receivedFuture, Message<T> message) {
pinnedExecutor.execute(() -> {
getExecutor(message).execute(() -> {
if (!receivedFuture.complete(message)) {
log.warn("Race condition detected. receive future was already completed (cancelled={}) and message was dropped. message={}",
receivedFuture.isCancelled(), message);
Expand Down Expand Up @@ -857,7 +857,7 @@ protected void triggerListener() {
executorProvider.getExecutor(peekMessageKey(finalMsg)).execute(() ->
callMessageListener(finalMsg));
} else {
pinnedExecutor.execute(() -> callMessageListener(finalMsg));
getExecutor(msg).execute(() -> callMessageListener(finalMsg));
}
}
} catch (PulsarClientException e) {
Expand Down Expand Up @@ -925,5 +925,14 @@ public long getIncomingMessageSize() {

protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);

private ExecutorService getExecutor(Message<T> msg) {
ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl) ? ((TopicMessageImpl) msg).receivedByconsumer
: null;
ExecutorService executor = receivedConsumer != null && receivedConsumer.pinnedExecutor != null
? receivedConsumer.pinnedExecutor
: pinnedExecutor;
return executor;
}

private static final Logger log = LoggerFactory.getLogger(ConsumerBase.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
this.internalConfig = getInternalConsumerConfig();
this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStatsRecorderImpl(this) : null;

// start track and auto subscribe partition increasement
// start track and auto subscribe partition increment
if (conf.isAutoUpdatePartitions()) {
topicsPartitionChangedListener = new TopicsPartitionChangedListener();
partitionsAutoUpdateTimeout = client.timer()
Expand Down Expand Up @@ -265,8 +265,8 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {

private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
checkArgument(message instanceof MessageImpl);
TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
consumer.getTopic(), consumer.getTopicNameWithoutPartition(), message);
TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(consumer.getTopic(),
consumer.getTopicNameWithoutPartition(), message, consumer);

if (log.isDebugEnabled()) {
log.debug("[{}][{}] Received message from topics-consumer {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,8 @@ private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerC
externalExecutorProvider, consumerSubscribedFuture, metadata.partitions, schema, interceptors);
} else {
int partitionIndex = TopicName.getPartitionIndex(topic);
consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, externalExecutorProvider, partitionIndex, false,
consumerSubscribedFuture,null, schema, interceptors,
consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, externalExecutorProvider,
partitionIndex, false, consumerSubscribedFuture, null, schema, interceptors,
true /* createTopicIfDoesNotExist */);
}
consumers.add(consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,15 @@ public class TopicMessageImpl<T> implements Message<T> {

private final Message<T> msg;
private final TopicMessageIdImpl messageId;
// consumer if this message is received by that consumer
final ConsumerImpl receivedByconsumer;

TopicMessageImpl(String topicPartitionName,
String topicName,
Message<T> msg) {
Message<T> msg,
ConsumerImpl receivedByConsumer) {
this.topicPartitionName = topicPartitionName;
this.receivedByconsumer = receivedByConsumer;

this.msg = msg;
this.messageId = new TopicMessageIdImpl(topicPartitionName, topicName, msg.getMessageId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void testTopicMessageImplReplicatedInfo() {
ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
MessageImpl<byte[]> msg = MessageImpl.create(builder, payload, Schema.BYTES);
msg.setMessageId(new MessageIdImpl(-1, -1, -1));
TopicMessageImpl<byte[]> topicMessage = new TopicMessageImpl<>(topicName, topicName, msg);
TopicMessageImpl<byte[]> topicMessage = new TopicMessageImpl<>(topicName, topicName, msg, null);

assertTrue(topicMessage.isReplicated());
assertEquals(msg.getReplicatedFrom(), from);
Expand All @@ -76,7 +76,7 @@ public void testTopicMessageImplNoReplicatedInfo() {
ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
MessageImpl<byte[]> msg = MessageImpl.create(builder, payload, Schema.BYTES);
msg.setMessageId(new MessageIdImpl(-1, -1, -1));
TopicMessageImpl<byte[]> topicMessage = new TopicMessageImpl<>(topicName, topicName, msg);
TopicMessageImpl<byte[]> topicMessage = new TopicMessageImpl<>(topicName, topicName, msg, null);

assertFalse(topicMessage.isReplicated());
assertNull(topicMessage.getReplicatedFrom());
Expand Down

0 comments on commit 8568054

Please sign in to comment.