Skip to content

Commit

Permalink
[Java Client] Improve consumer listener logic (apache#13273)
Browse files Browse the repository at this point in the history
* [Java Client] Improve consumer listener logic

* Move isListenerHandlingMessage update to before submitting to executor
  • Loading branch information
michaeljmarshall authored Dec 21, 2021
1 parent 2690a2b commit 9f46c4a
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -85,7 +84,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
protected volatile long incomingMessagesSize = 0;
protected volatile Timeout batchReceiveTimeout = null;
protected final Lock reentrantLock = new ReentrantLock();
private final AtomicInteger executorQueueSize = new AtomicInteger(0);
private volatile boolean isListenerHandlingMessage = false;

protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
int receiverQueueSize, ExecutorProvider executorProvider,
Expand Down Expand Up @@ -915,15 +914,17 @@ private void doPendingBatchReceiveTask(Timeout timeout) {
}

protected void triggerListener() {
// Trigger the notification on the message listener in a separate thread to avoid blocking the networking
// thread while the message processing happens
// Use internalPinnedExecutor to maintain message ordering
internalPinnedExecutor.execute(() -> {
try {
// Control executor to call MessageListener one by one.
if (executorQueueSize.get() < 1) {
// Listener should only have one pending/running executable to process a message
// See https://github.com/apache/pulsar/issues/11008 for context.
if (!isListenerHandlingMessage) {
final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS);
if (msg != null) {
executorQueueSize.incrementAndGet();
isListenerHandlingMessage = true;
// Trigger the notification on the message listener in a separate thread to avoid blocking the
// internal pinned executor thread while the message processing happens
if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) {
executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
callMessageListener(msg));
Expand Down Expand Up @@ -956,7 +957,7 @@ protected void callMessageListener(Message<T> msg) {
log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription,
msg.getMessageId(), t);
} finally {
executorQueueSize.decrementAndGet();
isListenerHandlingMessage = false;
triggerListener();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1137,8 +1137,7 @@ private void processPayloadByProcessor(final BrokerEntryMetadata brokerEntryMeta
increaseAvailablePermits(cnx(), skippedMessages.get());
}

internalPinnedExecutor.execute(()
-> tryTriggerListener());
tryTriggerListener();
}

void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ackSet, ByteBuf headersAndPayload, ClientCnx cnx) {
Expand Down

0 comments on commit 9f46c4a

Please sign in to comment.