diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index 855ee64bacdc3..2d571852919bc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -200,6 +200,9 @@ public byte[] toByteArray() { @Override public int compareTo(MessageId o) { + if (o == null) { + throw new UnsupportedOperationException("MessageId is null"); + } if (o instanceof MessageIdImpl) { MessageIdImpl other = (MessageIdImpl) o; int batchIndex = (o instanceof BatchMessageIdImpl) ? ((BatchMessageIdImpl) o).getBatchIndex() : NO_BATCH; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 9eee2bf7b41da..09d6bd51b9f10 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import io.netty.util.Recycler; +import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Triple; import org.apache.pulsar.client.api.MessageId; @@ -113,7 +114,10 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl consumer, Consum * resent after a disconnection and for which the user has already sent an acknowledgement. */ @Override - public boolean isDuplicate(MessageId messageId) { + public boolean isDuplicate(@NonNull MessageId messageId) { + if (lastCumulativeAck.messageId == null) { + return false; + } if (messageId.compareTo(lastCumulativeAck.messageId) <= 0) { // Already included in a cumulative ack return true;