Skip to content

Commit

Permalink
Fix NPE when ACK grouping tracker checks duplicated message id (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower authored May 14, 2021
1 parent 959a8c8 commit 993ea62
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ public byte[] toByteArray() {

@Override
public int compareTo(MessageId o) {
if (o == null) {
throw new UnsupportedOperationException("MessageId is null");
}
if (o instanceof MessageIdImpl) {
MessageIdImpl other = (MessageIdImpl) o;
int batchIndex = (o instanceof BatchMessageIdImpl) ? ((BatchMessageIdImpl) o).getBatchIndex() : NO_BATCH;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;

import io.netty.util.Recycler;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.client.api.MessageId;
Expand Down Expand Up @@ -113,7 +114,10 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, Consum
* resent after a disconnection and for which the user has already sent an acknowledgement.
*/
@Override
public boolean isDuplicate(MessageId messageId) {
public boolean isDuplicate(@NonNull MessageId messageId) {
if (lastCumulativeAck.messageId == null) {
return false;
}
if (messageId.compareTo(lastCumulativeAck.messageId) <= 0) {
// Already included in a cumulative ack
return true;
Expand Down

0 comments on commit 993ea62

Please sign in to comment.