Skip to content

Commit

Permalink
Fix thread safety violation on ConsumerImpl (apache#3467)
Browse files Browse the repository at this point in the history
Using analysing tools on pulsar-client reported the following fixed thread
safety issues:

  - Fix `startMessageId`, `lastDequeuedMessage` and `lastMessageIdInBroker`
    fields reads are performed without synchronization and potentially races
    with write method, turned them into volatile.
  - Fields `possibleSendToDeadLetterTopicMessages`, `deadLetterPolicy` and
    `msgCrypto` can be turned into final to prevent being modified after
    construction.
  • Loading branch information
lovelle authored and merlimat committed Jan 29, 2019
1 parent 14d478d commit 0fe9e89
Showing 1 changed file with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
@SuppressWarnings("unused")
private volatile int availablePermits = 0;

private MessageId lastDequeuedMessage = MessageId.earliest;
private MessageId lastMessageIdInBroker = MessageId.earliest;
private volatile MessageId lastDequeuedMessage = MessageId.earliest;
private volatile MessageId lastMessageIdInBroker = MessageId.earliest;

private long subscribeTimeout;
private final int partitionIndex;
Expand All @@ -120,11 +120,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
protected final ConsumerStatsRecorder stats;
private final int priorityLevel;
private final SubscriptionMode subscriptionMode;
private BatchMessageIdImpl startMessageId;
private volatile BatchMessageIdImpl startMessageId;

private volatile boolean hasReachedEndOfTopic;

private MessageCrypto msgCrypto = null;
private final MessageCrypto msgCrypto;

private final Map<String, String> metadata;

Expand All @@ -136,9 +136,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private final TopicName topicName;
private final String topicNameWithoutPartition;

private ConcurrentHashMap<MessageIdImpl, List<MessageImpl<T>>> possibleSendToDeadLetterTopicMessages;
private final Map<MessageIdImpl, List<MessageImpl<T>>> possibleSendToDeadLetterTopicMessages;

private DeadLetterPolicy deadLetterPolicy;
private final DeadLetterPolicy deadLetterPolicy;

private Producer<T> deadLetterProducer;

Expand Down Expand Up @@ -199,8 +199,9 @@ enum SubscriptionMode {

// Create msgCrypto if not created already
if (conf.getCryptoKeyReader() != null) {
String logCtx = "[" + topic + "] [" + subscription + "]";
this.msgCrypto = new MessageCrypto(logCtx, false);
this.msgCrypto = new MessageCrypto(String.format("[%s] [%s]", topic, subscription), false);
} else {
this.msgCrypto = null;
}

if (conf.getProperties().isEmpty()) {
Expand Down Expand Up @@ -235,6 +236,9 @@ enum SubscriptionMode {
.deadLetterTopic(String.format("%s-%s-DLQ", topic, subscription))
.build();
}
} else {
deadLetterPolicy = null;
possibleSendToDeadLetterTopicMessages = null;
}

topicNameWithoutPartition = topicName.getPartitionedTopicName();
Expand Down

0 comments on commit 0fe9e89

Please sign in to comment.