Skip to content

Commit

Permalink
Remove consumer unnecessary locks (apache#9261)
Browse files Browse the repository at this point in the history
### Motivation
1. The `ConsumerImpl` has many unnecessary locks for thread-safe Queue, such as `Queues.newConcurrentLinkedQueue`, `GrowableArrayBlockingQueue`, `ConcurrentLinkedQueue` 

### Changes
1. Remove unnecessary locks in `ConsumerImpl`

Related to PR#8207
  • Loading branch information
hangc0276 authored Apr 1, 2021
1 parent 77cf09e commit 9d08f64
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import io.netty.util.Timeout;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerEventListener;
Expand Down Expand Up @@ -733,6 +733,7 @@ protected void notifyPendingBatchReceivedCallBack() {
if (opBatchReceive == null) {
return;
}

try {
reentrantLock.lock();
notifyPendingBatchReceivedCallBack(opBatchReceive);
Expand Down Expand Up @@ -790,6 +791,7 @@ protected final void notifyPendingBatchReceivedCallBack(OpBatchReceive<T> opBatc
}
msgPeeked = incomingMessages.peek();
}

completePendingBatchReceive(opBatchReceive.future, messages);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,6 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
CompletableFuture<Message<T>> result = cancellationHandler.createFuture();
Message<T> message = null;
try {
lock.writeLock().lock();
message = incomingMessages.poll(0, TimeUnit.MILLISECONDS);
if (message == null) {
pendingReceives.add(result);
Expand All @@ -418,8 +417,6 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
result.completeExceptionally(e);
} finally {
lock.writeLock().unlock();
}

if (message != null) {
Expand Down Expand Up @@ -496,6 +493,7 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
} finally {
lock.writeLock().unlock();
}

return result;
}

Expand Down Expand Up @@ -948,14 +946,9 @@ private void closeConsumerTasks() {
}

private void failPendingReceive() {
lock.readLock().lock();
try {
if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
failPendingReceives(this.pendingReceives);
failPendingBatchReceives(this.pendingBatchReceives);
}
} finally {
lock.readLock().unlock();
if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
failPendingReceives(this.pendingReceives);
failPendingBatchReceives(this.pendingBatchReceives);
}
}

Expand Down Expand Up @@ -1053,23 +1046,18 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ac
uncompressedPayload, createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount);
uncompressedPayload.release();

lock.readLock().lock();
try {
// Enqueue the message so that it can be retrieved when application calls receive()
// if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
// if asyncReceive is waiting then notify callback without adding to incomingMessages queue
if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(), Collections.singletonList(message));
}
if (peekPendingReceive() != null) {
notifyPendingReceivedCallback(message, null);
} else if (enqueueMessageAndCheckBatchReceive(message)) {
if (hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
}
} finally {
lock.readLock().unlock();
// Enqueue the message so that it can be retrieved when application calls receive()
// if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
// if asyncReceive is waiting then notify callback without adding to incomingMessages queue
if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
Collections.singletonList(message));
}
if (peekPendingReceive() != null) {
notifyPendingReceivedCallback(message, null);
} else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
} else {
// handle batch message enqueuing; uncompressed payload has all messages in batch
Expand Down Expand Up @@ -1280,17 +1268,11 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
if (possibleToDeadLetter != null) {
possibleToDeadLetter.add(message);
}
lock.readLock().lock();
try {
if (peekPendingReceive() != null) {
notifyPendingReceivedCallback(message, null);
} else if (enqueueMessageAndCheckBatchReceive(message)) {
if (hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
}
} finally {
lock.readLock().unlock();

if (peekPendingReceive() != null) {
notifyPendingReceivedCallback(message, null);
} else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
singleMessagePayload.release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
private volatile Timeout partitionsAutoUpdateTimeout = null;
TopicsPartitionChangedListener topicsPartitionChangedListener;
CompletableFuture<Void> partitionsAutoUpdateFuture = null;

private final ReadWriteLock lock = new ReentrantReadWriteLock();

private final ConsumerStatsRecorder stats;
private final UnAckedMessageTracker unAckedMessageTracker;
private final ConsumerConfigurationData<T> internalConfig;
Expand Down Expand Up @@ -386,6 +386,7 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
} finally {
lock.writeLock().unlock();
}

return result;
}

Expand Down Expand Up @@ -595,18 +596,14 @@ private ConsumerConfigurationData<T> getInternalConsumerConfig() {

@Override
public void redeliverUnacknowledgedMessages() {
lock.writeLock().lock();
try {
consumers.values().stream().forEach(consumer -> {
consumer.redeliverUnacknowledgedMessages();
consumer.unAckedChunkedMessageIdSequenceMap.clear();
});
incomingMessages.clear();
resetIncomingMessageSize();
unAckedMessageTracker.clear();
} finally {
lock.writeLock().unlock();
}
consumers.values().stream().forEach(consumer -> {
consumer.redeliverUnacknowledgedMessages();
consumer.unAckedChunkedMessageIdSequenceMap.clear();
});
incomingMessages.clear();
resetIncomingMessageSize();
unAckedMessageTracker.clear();

resumeReceivingFromPausedConsumersIfNeeded();
}

Expand Down

0 comments on commit 9d08f64

Please sign in to comment.