From c9a8d589c57968773fc85ad3d3dbe419adeea477 Mon Sep 17 00:00:00 2001 From: hao zhang <1006618650@qq.com> Date: Sat, 28 Mar 2020 18:32:53 +0800 Subject: [PATCH] [Issue 6433][Pulsar-client]Fix message id compare between MessageId and BatchMessageId (#6621) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit link PR: #1285 This PR resolve the compare from BatchMessageIdImpl to MessageIdImpl, but didn't consider the symmetry on the other side. Master Issue: #6433 Motivation Fix the bug of compare between MessageId and BatchMessageId, keep the symmetry of compareTo method. Modifications In the mothod of compareTo in BatchMessageId class, when compare to non-batched messageId and other properties is equal,if batchIndex >-1, then return 1, so in MessageId class, add the same logic, and if batchIndex >-1, then return -1. Verifying this change Unit tests added. --- .../pulsar/client/impl/MessageIdImpl.java | 14 ++++++++- ...sistentAcknowledgmentsGroupingTracker.java | 7 ++++- .../client/impl/MessageIdCompareToTest.java | 29 +++++++++++++++---- 3 files changed, 43 insertions(+), 7 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index 7f2e5a1d9fbb8..7c12eda5d39c5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -186,7 +186,19 @@ public byte[] toByteArray() { @Override public int compareTo(MessageId o) { - if (o instanceof MessageIdImpl) { + if (o instanceof BatchMessageIdImpl) { + BatchMessageIdImpl other = (BatchMessageIdImpl) o; + int res = ComparisonChain.start() + .compare(this.ledgerId, other.ledgerId) + .compare(this.entryId, other.entryId) + .compare(this.getPartitionIndex(), other.getPartitionIndex()) + .result(); + if (res == 0 && other.getBatchIndex() > -1) { + return -1; + } else { + return res; + } + } else if (o instanceof MessageIdImpl) { MessageIdImpl other = (MessageIdImpl) o; return ComparisonChain.start() .compare(this.ledgerId, other.ledgerId) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index c9720ee84aa89..95ca4cf91e0d3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -106,7 +106,12 @@ public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map= MAX_ACK_GROUP_SIZE) { flush(); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java index 7a612b1fbaba1..515d353baa9df 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java @@ -117,13 +117,32 @@ public void testCompareDifferentType() { BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(123L, 345L, 567, 789); BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(messageIdImpl); assertTrue(messageIdImpl.compareTo(batchMessageId1) > 0, "Expected to be greater than"); - assertEquals(messageIdImpl.compareTo(batchMessageId2), 0, "Expected to be equal"); + assertTrue(messageIdImpl.compareTo(batchMessageId2) < 0, "Expected to be less than"); assertEquals(messageIdImpl.compareTo(batchMessageId3), 0, "Expected to be equal"); assertTrue(batchMessageId1.compareTo(messageIdImpl) < 0, "Expected to be less than"); assertTrue(batchMessageId2.compareTo(messageIdImpl) > 0, "Expected to be greater than"); assertEquals(batchMessageId3.compareTo(messageIdImpl), 0, "Expected to be equal"); } + @Test + public void compareToSymmetricTest() { + MessageIdImpl simpleMessageId = new MessageIdImpl(123L, 345L, 567); + // batchIndex is -1 if message is non-batched message and has the batchIndex for a batch message + BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(123L, 345L, 567, -1); + BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(123L, 345L, 567, 1); + BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(123L, 345L, 566, 1); + BatchMessageIdImpl batchMessageId4 = new BatchMessageIdImpl(123L, 345L, 566, -1); + + assertEquals(simpleMessageId.compareTo(batchMessageId1), 0, "Expected to be equal"); + assertEquals(batchMessageId1.compareTo(simpleMessageId), 0, "Expected to be equal"); + assertTrue(batchMessageId2.compareTo(simpleMessageId) > 0, "Expected to be greater than"); + assertTrue(simpleMessageId.compareTo(batchMessageId2) < 0, "Expected to be less than"); + assertTrue(simpleMessageId.compareTo(batchMessageId3) > 0, "Expected to be greater than"); + assertTrue(batchMessageId3.compareTo(simpleMessageId) < 0, "Expected to be less than"); + assertTrue(simpleMessageId.compareTo(batchMessageId4) > 0, "Expected to be greater than"); + assertTrue(batchMessageId4.compareTo(simpleMessageId) < 0, "Expected to be less than"); + } + @Test public void testMessageIdImplCompareToTopicMessageId() { MessageIdImpl messageIdImpl = new MessageIdImpl(123L, 345L, 567); @@ -140,7 +159,7 @@ public void testMessageIdImplCompareToTopicMessageId() { "test-topic", new BatchMessageIdImpl(messageIdImpl)); assertTrue(messageIdImpl.compareTo(topicMessageId1) > 0, "Expected to be greater than"); - assertEquals(messageIdImpl.compareTo(topicMessageId2), 0, "Expected to be equal"); + assertTrue(messageIdImpl.compareTo(topicMessageId2) < 0, "Expected to be less than"); assertEquals(messageIdImpl.compareTo(topicMessageId3), 0, "Expected to be equal"); assertTrue(topicMessageId1.compareTo(messageIdImpl) < 0, "Expected to be less than"); assertTrue(topicMessageId2.compareTo(messageIdImpl) > 0, "Expected to be greater than"); @@ -165,9 +184,9 @@ public void testBatchMessageIdImplCompareToTopicMessageId() { assertTrue(messageIdImpl2.compareTo(topicMessageId2) > 0, "Expected to be greater than"); assertEquals(messageIdImpl3.compareTo(topicMessageId2), 0, "Expected to be equal"); assertTrue(topicMessageId1.compareTo(messageIdImpl1) < 0, "Expected to be less than"); - assertEquals(topicMessageId2.compareTo(messageIdImpl1), 0, "Expected to be equal"); - assertEquals(topicMessageId2.compareTo(messageIdImpl2), 0, "Expected to be equal"); - assertEquals(topicMessageId2.compareTo(messageIdImpl2), 0, "Expected to be equal"); + assertTrue(topicMessageId2.compareTo(messageIdImpl1) < 0, "Expected to be less than"); + assertTrue(topicMessageId2.compareTo(messageIdImpl2) < 0, "Expected to be less than"); + assertTrue(topicMessageId2.compareTo(messageIdImpl2) < 0, "Expected to be less than"); } @Test