diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index dfce6ed63335b..28c248fe674ac 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -35,10 +35,10 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import io.netty.util.Timeout; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerEventListener; @@ -733,6 +733,7 @@ protected void notifyPendingBatchReceivedCallBack() { if (opBatchReceive == null) { return; } + try { reentrantLock.lock(); notifyPendingBatchReceivedCallBack(opBatchReceive); @@ -790,6 +791,7 @@ protected final void notifyPendingBatchReceivedCallBack(OpBatchReceive opBatc } msgPeeked = incomingMessages.peek(); } + completePendingBatchReceive(opBatchReceive.future, messages); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index c7f67f48aef48..d57fc39a7d97b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -409,7 +409,6 @@ protected CompletableFuture> internalReceiveAsync() { CompletableFuture> result = cancellationHandler.createFuture(); Message message = null; try { - lock.writeLock().lock(); message = incomingMessages.poll(0, TimeUnit.MILLISECONDS); if (message == null) { pendingReceives.add(result); @@ -418,8 +417,6 @@ protected CompletableFuture> internalReceiveAsync() { } catch (InterruptedException e) { Thread.currentThread().interrupt(); result.completeExceptionally(e); - } finally { - lock.writeLock().unlock(); } if (message != null) { @@ -496,6 +493,7 @@ protected CompletableFuture> internalBatchReceiveAsync() { } finally { lock.writeLock().unlock(); } + return result; } @@ -948,14 +946,9 @@ private void closeConsumerTasks() { } private void failPendingReceive() { - lock.readLock().lock(); - try { - if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) { - failPendingReceives(this.pendingReceives); - failPendingBatchReceives(this.pendingBatchReceives); - } - } finally { - lock.readLock().unlock(); + if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) { + failPendingReceives(this.pendingReceives); + failPendingBatchReceives(this.pendingBatchReceives); } } @@ -1053,23 +1046,18 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List ac uncompressedPayload, createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount); uncompressedPayload.release(); - lock.readLock().lock(); - try { - // Enqueue the message so that it can be retrieved when application calls receive() - // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it. - // if asyncReceive is waiting then notify callback without adding to incomingMessages queue - if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) { - possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(), Collections.singletonList(message)); - } - if (peekPendingReceive() != null) { - notifyPendingReceivedCallback(message, null); - } else if (enqueueMessageAndCheckBatchReceive(message)) { - if (hasPendingBatchReceive()) { - notifyPendingBatchReceivedCallBack(); - } - } - } finally { - lock.readLock().unlock(); + // Enqueue the message so that it can be retrieved when application calls receive() + // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it. + // if asyncReceive is waiting then notify callback without adding to incomingMessages queue + if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null && + redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) { + possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(), + Collections.singletonList(message)); + } + if (peekPendingReceive() != null) { + notifyPendingReceivedCallback(message, null); + } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) { + notifyPendingBatchReceivedCallBack(); } } else { // handle batch message enqueuing; uncompressed payload has all messages in batch @@ -1280,17 +1268,11 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv if (possibleToDeadLetter != null) { possibleToDeadLetter.add(message); } - lock.readLock().lock(); - try { - if (peekPendingReceive() != null) { - notifyPendingReceivedCallback(message, null); - } else if (enqueueMessageAndCheckBatchReceive(message)) { - if (hasPendingBatchReceive()) { - notifyPendingBatchReceivedCallBack(); - } - } - } finally { - lock.readLock().unlock(); + + if (peekPendingReceive() != null) { + notifyPendingReceivedCallback(message, null); + } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) { + notifyPendingBatchReceivedCallBack(); } singleMessagePayload.release(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 39f77157634d6..52634019d4e50 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -97,8 +97,8 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { private volatile Timeout partitionsAutoUpdateTimeout = null; TopicsPartitionChangedListener topicsPartitionChangedListener; CompletableFuture partitionsAutoUpdateFuture = null; - private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final ConsumerStatsRecorder stats; private final UnAckedMessageTracker unAckedMessageTracker; private final ConsumerConfigurationData internalConfig; @@ -386,6 +386,7 @@ protected CompletableFuture> internalBatchReceiveAsync() { } finally { lock.writeLock().unlock(); } + return result; } @@ -595,18 +596,14 @@ private ConsumerConfigurationData getInternalConsumerConfig() { @Override public void redeliverUnacknowledgedMessages() { - lock.writeLock().lock(); - try { - consumers.values().stream().forEach(consumer -> { - consumer.redeliverUnacknowledgedMessages(); - consumer.unAckedChunkedMessageIdSequenceMap.clear(); - }); - incomingMessages.clear(); - resetIncomingMessageSize(); - unAckedMessageTracker.clear(); - } finally { - lock.writeLock().unlock(); - } + consumers.values().stream().forEach(consumer -> { + consumer.redeliverUnacknowledgedMessages(); + consumer.unAckedChunkedMessageIdSequenceMap.clear(); + }); + incomingMessages.clear(); + resetIncomingMessageSize(); + unAckedMessageTracker.clear(); + resumeReceivingFromPausedConsumersIfNeeded(); }