Skip to content

Commit

Permalink
[fix][java-client] Fix performance regression with message listener (a…
Browse files Browse the repository at this point in the history
…pache#15162)

### Motivation

apache#13023 has introduced a performance regression.
For each message, we are switching from external thread pool -> internal thread poll -> external thread pool.

Previously we want to control the outstanding messages of a consumer using a listener, so after apache#11455,
the message will not move from the receiver queue to the external executor. And apache#13023 changed the listener trigger
in the internal thread pool to fix the ordering issue, so this is the root cause of the performance regression.

Here is the frame graph to show the thread frame of the internal thread and external thread.
[framegraph.html.txt](https://github.com/apache/pulsar/files/8483765/framegraph.html.txt)

And also fix the performance issue for multiple topic consumers and key-shared subscriptions which enabled message listeners. Before this change, the messages are processed serially. After this change, We can improve parallelism on the premise of ensuring order.

### Modification

- Remove the isListenerHandlingMessage control
- Move the messages from the receiver queue to the queue of external executor but not increase permits
- Increase permits before call message listener
  • Loading branch information
codelipenghui authored Apr 19, 2022
1 parent 789fa50 commit 83cd791
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ static class RawConsumerImpl extends ConsumerImpl<byte[]> {
client.externalExecutorProvider(),
TopicName.getPartitionIndex(conf.getSingleTopic()),
false,
false,
consumerFuture,
MessageId.earliest,
0 /* startMessageRollbackDurationInSec */,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
protected volatile long incomingMessagesSize = 0;
protected volatile Timeout batchReceiveTimeout = null;
protected final Lock reentrantLock = new ReentrantLock();
private volatile boolean isListenerHandlingMessage = false;

protected static final AtomicLongFieldUpdater<ConsumerBase> CONSUMER_EPOCH =
AtomicLongFieldUpdater.newUpdater(ConsumerBase.class, "consumerEpoch");
Expand Down Expand Up @@ -984,33 +983,34 @@ protected void tryTriggerListener() {
}

private void triggerListener() {
// The messages are added into the receiver queue by the internal pinned executor,
// so need to use internal pinned executor to avoid race condition which message
// might be added into the receiver queue but not able to read here.
internalPinnedExecutor.execute(() -> {
try {
// Listener should only have one pending/running executable to process a message
// See https://github.com/apache/pulsar/issues/11008 for context.
if (!isListenerHandlingMessage) {
final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS);
Message<T> msg;
do {
msg = internalReceive(0, TimeUnit.MILLISECONDS);
if (msg != null) {
isListenerHandlingMessage = true;
// Trigger the notification on the message listener in a separate thread to avoid blocking the
// internal pinned executor thread while the message processing happens
final Message<T> finalMsg = msg;
if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) {
executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
callMessageListener(msg));
callMessageListener(finalMsg));
} else {
getExternalExecutor(msg).execute(() -> {
callMessageListener(msg);
callMessageListener(finalMsg);
});
}
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription);
}
}
}
} while (msg != null);
} catch (PulsarClientException e) {
log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e);
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription);
}
});
}
Expand All @@ -1021,13 +1021,16 @@ protected void callMessageListener(Message<T> msg) {
log.debug("[{}][{}] Calling message listener for message {}", topic, subscription,
msg.getMessageId());
}
ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl)
? ((TopicMessageImpl<T>) msg).receivedByconsumer : (ConsumerImpl) this;
// Increase the permits here since we will not increase permits while receive messages from consumer
// after enabled message listener.
receivedConsumer.increaseAvailablePermits((MessageImpl<?>) (msg instanceof TopicMessageImpl
? ((TopicMessageImpl<T>) msg).getMessage() : msg));
listener.received(ConsumerBase.this, msg);
} catch (Throwable t) {
log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription,
msg.getMessageId(), t);
} finally {
isListenerHandlingMessage = false;
triggerListener();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle

private final int partitionIndex;
private final boolean hasParentConsumer;
private final boolean parentConsumerHasListener;

private final UnAckedMessageTracker unAckedMessageTracker;
private final AcknowledgmentsGroupingTracker acknowledgmentsGroupingTracker;
Expand Down Expand Up @@ -208,7 +209,7 @@ static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
Schema<T> schema,
ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
return newConsumerImpl(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer,
return newConsumerImpl(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer, false,
subscribeFuture, startMessageId, schema, interceptors, createTopicIfDoesNotExist, 0);
}

Expand All @@ -218,6 +219,7 @@ static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
ExecutorProvider executorProvider,
int partitionIndex,
boolean hasParentConsumer,
boolean parentConsumerHasListener,
CompletableFuture<Consumer<T>> subscribeFuture,
MessageId startMessageId,
Schema<T> schema,
Expand All @@ -231,6 +233,7 @@ static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
createTopicIfDoesNotExist);
} else {
return new ConsumerImpl<>(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer,
parentConsumerHasListener,
subscribeFuture, startMessageId,
startMessageRollbackDurationInSec /* rollback time in sec to start msgId */,
schema, interceptors, createTopicIfDoesNotExist);
Expand All @@ -239,7 +242,7 @@ static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,

protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer,
CompletableFuture<Consumer<T>> subscribeFuture, MessageId startMessageId,
boolean parentConsumerHasListener, CompletableFuture<Consumer<T>> subscribeFuture, MessageId startMessageId,
long startMessageRollbackDurationInSec, Schema<T> schema, ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
super(client, topic, conf, conf.getReceiverQueueSize(), executorProvider, subscribeFuture, schema,
Expand All @@ -253,6 +256,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
this.lookupDeadline = System.currentTimeMillis() + client.getConfiguration().getLookupTimeoutMs();
this.partitionIndex = partitionIndex;
this.hasParentConsumer = hasParentConsumer;
this.parentConsumerHasListener = parentConsumerHasListener;
this.priorityLevel = conf.getPriorityLevel();
this.readCompacted = conf.isReadCompacted();
this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition();
Expand Down Expand Up @@ -1570,7 +1574,9 @@ protected synchronized void messageProcessed(Message<?> msg) {
if (msgCnx != currentCnx) {
// The processed message did belong to the old queue that was cleared after reconnection.
} else {
increaseAvailablePermits(currentCnx);
if (listener == null && !parentConsumerHasListener) {
increaseAvailablePermits(currentCnx);
}
stats.updateNumMsgsReceived(msg);

trackMessage(msg);
Expand Down Expand Up @@ -1605,13 +1611,20 @@ protected void trackMessage(MessageId messageId, int redeliveryCount) {
}
}

void increaseAvailablePermits(MessageImpl<?> msg) {
ClientCnx currentCnx = cnx();
ClientCnx msgCnx = msg.getCnx();
if (msgCnx == currentCnx) {
increaseAvailablePermits(currentCnx);
}
}

void increaseAvailablePermits(ClientCnx currentCnx) {
increaseAvailablePermits(currentCnx, 1);
}

protected void increaseAvailablePermits(ClientCnx currentCnx, int delta) {
int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta);

while (available >= getCurrentReceiverQueueSize() / 2 && !paused) {
if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, available, 0)) {
sendFlowPermitsToBroker(currentCnx, available);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ private void doSubscribeTopicPartitions(Schema<T> schema,
configurationData.setStartPaused(paused);
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName,
configurationData, client.externalExecutorProvider(),
partitionIndex, true, subFuture,
partitionIndex, true, listener != null, subFuture,
startMessageId, schema, interceptors,
createIfDoesNotExist, startMessageRollbackDurationInSec);
synchronized (pauseMutex) {
Expand Down Expand Up @@ -1086,7 +1086,7 @@ private void doSubscribeTopicPartitions(Schema<T> schema,
internalConfig.setStartPaused(paused);
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
client.externalExecutorProvider(), -1,
true, subFuture, startMessageId, schema, interceptors,
true, listener != null, subFuture, startMessageId, schema, interceptors,
createIfDoesNotExist, startMessageRollbackDurationInSec);

synchronized (pauseMutex) {
Expand Down Expand Up @@ -1390,7 +1390,7 @@ private CompletableFuture<Void> subscribeIncreasedTopicPartitions(String topicNa
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(
client, partitionName, configurationData,
client.externalExecutorProvider(),
partitionIndex, true, subFuture, startMessageId, schema, interceptors,
partitionIndex, true, listener != null, subFuture, startMessageId, schema, interceptors,
true /* createTopicIfDoesNotExist */, startMessageRollbackDurationInSec);
synchronized (pauseMutex) {
if (paused) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void reachedEndOfTopic(Consumer<T> consumer) {
this, readerConfiguration.getReaderInterceptorList());
final int partitionIdx = TopicName.getPartitionIndex(readerConfiguration.getTopicName());
consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration,
executorProvider, partitionIdx, false, consumerFuture,
executorProvider, partitionIdx, false, false, consumerFuture,
readerConfiguration.getStartMessageId(), readerConfiguration.getStartMessageFromRollbackDurationInSec(),
schema, consumerInterceptors, true /* createTopicIfDoesNotExist */);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConf
CompletableFuture<Consumer<T>> subscribeFuture, MessageId startMessageId, Schema<T> schema,
ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
super(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer, subscribeFuture,
super(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer, false, subscribeFuture,
startMessageId, 0 /* startMessageRollbackDurationInSec */, schema, interceptors,
createTopicIfDoesNotExist);
}
Expand Down

0 comments on commit 83cd791

Please sign in to comment.