diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 97f7f4a06ac63..5d5b1fa79467a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -100,8 +100,8 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle @SuppressWarnings("unused") private volatile int availablePermits = 0; - private MessageId lastDequeuedMessage = MessageId.earliest; - private MessageId lastMessageIdInBroker = MessageId.earliest; + private volatile MessageId lastDequeuedMessage = MessageId.earliest; + private volatile MessageId lastMessageIdInBroker = MessageId.earliest; private long subscribeTimeout; private final int partitionIndex; @@ -120,11 +120,11 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle protected final ConsumerStatsRecorder stats; private final int priorityLevel; private final SubscriptionMode subscriptionMode; - private BatchMessageIdImpl startMessageId; + private volatile BatchMessageIdImpl startMessageId; private volatile boolean hasReachedEndOfTopic; - private MessageCrypto msgCrypto = null; + private final MessageCrypto msgCrypto; private final Map metadata; @@ -136,9 +136,9 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final TopicName topicName; private final String topicNameWithoutPartition; - private ConcurrentHashMap>> possibleSendToDeadLetterTopicMessages; + private final Map>> possibleSendToDeadLetterTopicMessages; - private DeadLetterPolicy deadLetterPolicy; + private final DeadLetterPolicy deadLetterPolicy; private Producer deadLetterProducer; @@ -199,8 +199,9 @@ enum SubscriptionMode { // Create msgCrypto if not created already if (conf.getCryptoKeyReader() != null) { - String logCtx = "[" + topic + "] [" + subscription + "]"; - this.msgCrypto = new MessageCrypto(logCtx, false); + this.msgCrypto = new MessageCrypto(String.format("[%s] [%s]", topic, subscription), false); + } else { + this.msgCrypto = null; } if (conf.getProperties().isEmpty()) { @@ -235,6 +236,9 @@ enum SubscriptionMode { .deadLetterTopic(String.format("%s-%s-DLQ", topic, subscription)) .build(); } + } else { + deadLetterPolicy = null; + possibleSendToDeadLetterTopicMessages = null; } topicNameWithoutPartition = topicName.getPartitionedTopicName();