Skip to content

Commit

Permalink
[Issue 6433][Pulsar-client]Fix message id compare between MessageId a…
Browse files Browse the repository at this point in the history
…nd BatchMessageId (apache#6621)

link PR: apache#1285
This PR resolve the compare from BatchMessageIdImpl to MessageIdImpl, but didn't consider the 
symmetry on the other side.

Master Issue: apache#6433

Motivation
Fix the bug of compare between MessageId and BatchMessageId, keep the symmetry of compareTo method.

Modifications
In the mothod of compareTo in BatchMessageId class, when compare to non-batched messageId and other properties is equal,if batchIndex >-1, then return 1, so in MessageId class, add the same logic, and if batchIndex >-1, then return -1.

Verifying this change
Unit tests added.
  • Loading branch information
zhanghaou authored Mar 28, 2020
1 parent 7475b75 commit c9a8d58
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,19 @@ public byte[] toByteArray() {

@Override
public int compareTo(MessageId o) {
if (o instanceof MessageIdImpl) {
if (o instanceof BatchMessageIdImpl) {
BatchMessageIdImpl other = (BatchMessageIdImpl) o;
int res = ComparisonChain.start()
.compare(this.ledgerId, other.ledgerId)
.compare(this.entryId, other.entryId)
.compare(this.getPartitionIndex(), other.getPartitionIndex())
.result();
if (res == 0 && other.getBatchIndex() > -1) {
return -1;
} else {
return res;
}
} else if (o instanceof MessageIdImpl) {
MessageIdImpl other = (MessageIdImpl) o;
return ComparisonChain.start()
.compare(this.ledgerId, other.ledgerId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,12 @@ public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String,
doCumulativeAck(msgId);
} else {
// Individual ack
pendingIndividualAcks.add(msgId);
if (msgId instanceof BatchMessageIdImpl) {
pendingIndividualAcks.add(new MessageIdImpl(msgId.getLedgerId(),
msgId.getEntryId(), msgId.getPartitionIndex()));
} else {
pendingIndividualAcks.add(msgId);
}
if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
flush();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,32 @@ public void testCompareDifferentType() {
BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(123L, 345L, 567, 789);
BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(messageIdImpl);
assertTrue(messageIdImpl.compareTo(batchMessageId1) > 0, "Expected to be greater than");
assertEquals(messageIdImpl.compareTo(batchMessageId2), 0, "Expected to be equal");
assertTrue(messageIdImpl.compareTo(batchMessageId2) < 0, "Expected to be less than");
assertEquals(messageIdImpl.compareTo(batchMessageId3), 0, "Expected to be equal");
assertTrue(batchMessageId1.compareTo(messageIdImpl) < 0, "Expected to be less than");
assertTrue(batchMessageId2.compareTo(messageIdImpl) > 0, "Expected to be greater than");
assertEquals(batchMessageId3.compareTo(messageIdImpl), 0, "Expected to be equal");
}

@Test
public void compareToSymmetricTest() {
MessageIdImpl simpleMessageId = new MessageIdImpl(123L, 345L, 567);
// batchIndex is -1 if message is non-batched message and has the batchIndex for a batch message
BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(123L, 345L, 567, -1);
BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(123L, 345L, 567, 1);
BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(123L, 345L, 566, 1);
BatchMessageIdImpl batchMessageId4 = new BatchMessageIdImpl(123L, 345L, 566, -1);

assertEquals(simpleMessageId.compareTo(batchMessageId1), 0, "Expected to be equal");
assertEquals(batchMessageId1.compareTo(simpleMessageId), 0, "Expected to be equal");
assertTrue(batchMessageId2.compareTo(simpleMessageId) > 0, "Expected to be greater than");
assertTrue(simpleMessageId.compareTo(batchMessageId2) < 0, "Expected to be less than");
assertTrue(simpleMessageId.compareTo(batchMessageId3) > 0, "Expected to be greater than");
assertTrue(batchMessageId3.compareTo(simpleMessageId) < 0, "Expected to be less than");
assertTrue(simpleMessageId.compareTo(batchMessageId4) > 0, "Expected to be greater than");
assertTrue(batchMessageId4.compareTo(simpleMessageId) < 0, "Expected to be less than");
}

@Test
public void testMessageIdImplCompareToTopicMessageId() {
MessageIdImpl messageIdImpl = new MessageIdImpl(123L, 345L, 567);
Expand All @@ -140,7 +159,7 @@ public void testMessageIdImplCompareToTopicMessageId() {
"test-topic",
new BatchMessageIdImpl(messageIdImpl));
assertTrue(messageIdImpl.compareTo(topicMessageId1) > 0, "Expected to be greater than");
assertEquals(messageIdImpl.compareTo(topicMessageId2), 0, "Expected to be equal");
assertTrue(messageIdImpl.compareTo(topicMessageId2) < 0, "Expected to be less than");
assertEquals(messageIdImpl.compareTo(topicMessageId3), 0, "Expected to be equal");
assertTrue(topicMessageId1.compareTo(messageIdImpl) < 0, "Expected to be less than");
assertTrue(topicMessageId2.compareTo(messageIdImpl) > 0, "Expected to be greater than");
Expand All @@ -165,9 +184,9 @@ public void testBatchMessageIdImplCompareToTopicMessageId() {
assertTrue(messageIdImpl2.compareTo(topicMessageId2) > 0, "Expected to be greater than");
assertEquals(messageIdImpl3.compareTo(topicMessageId2), 0, "Expected to be equal");
assertTrue(topicMessageId1.compareTo(messageIdImpl1) < 0, "Expected to be less than");
assertEquals(topicMessageId2.compareTo(messageIdImpl1), 0, "Expected to be equal");
assertEquals(topicMessageId2.compareTo(messageIdImpl2), 0, "Expected to be equal");
assertEquals(topicMessageId2.compareTo(messageIdImpl2), 0, "Expected to be equal");
assertTrue(topicMessageId2.compareTo(messageIdImpl1) < 0, "Expected to be less than");
assertTrue(topicMessageId2.compareTo(messageIdImpl2) < 0, "Expected to be less than");
assertTrue(topicMessageId2.compareTo(messageIdImpl2) < 0, "Expected to be less than");
}

@Test
Expand Down

0 comments on commit c9a8d58

Please sign in to comment.