Skip to content

Commit

Permalink
Fix locking for ConsumerImpl when creating deadLetterProducer. (apach…
Browse files Browse the repository at this point in the history
…e#9166)

Fixes apache#9162

### Motivation
When we check if we need to create producer for DLQ, we used double-checked locking, https://github.com/apache/pulsar/blame/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L701-L712
however we should do the second check after we acquired the lock, and the field we're checking should be a volatile field.

### Modifications
Fix the double-checked locking and make the field volatile.
  • Loading branch information
MarvinCai authored Jan 18, 2021
1 parent 39dc24b commit b019c4b
Showing 1 changed file with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle

private final DeadLetterPolicy deadLetterPolicy;

private Producer<T> deadLetterProducer;
private volatile Producer<T> deadLetterProducer;

private volatile Producer<T> retryLetterProducer;
private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock();
Expand Down Expand Up @@ -702,8 +702,8 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
processPossibleToDLQ((MessageIdImpl)messageId);
if (deadLetterProducer == null) {
try {
createProducerLock.writeLock().lock();
if (deadLetterProducer == null) {
createProducerLock.writeLock().lock();
deadLetterProducer = client.newProducer(schema)
.topic(this.deadLetterPolicy
.getDeadLetterTopic())
Expand All @@ -712,9 +712,9 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
}
} catch (Exception e) {
log.error("Create dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), e);
} finally {
} finally {
createProducerLock.writeLock().unlock();
}
}
}
if (deadLetterProducer != null) {
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr);
Expand Down Expand Up @@ -1816,12 +1816,17 @@ private boolean processPossibleToDLQ(MessageIdImpl messageId) {
if (deadLetterMessages != null) {
if (deadLetterProducer == null) {
try {
deadLetterProducer = client.newProducer(schema)
.topic(this.deadLetterPolicy.getDeadLetterTopic())
.blockIfQueueFull(false)
.create();
createProducerLock.writeLock().lock();
if (deadLetterProducer == null) {
deadLetterProducer = client.newProducer(schema)
.topic(this.deadLetterPolicy.getDeadLetterTopic())
.blockIfQueueFull(false)
.create();
}
} catch (Exception e) {
log.error("Create dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), e);
} finally {
createProducerLock.writeLock().unlock();
}
}
if (deadLetterProducer != null) {
Expand Down

0 comments on commit b019c4b

Please sign in to comment.