From bbce00a2245cf05b829182a9a75a86d4e1139492 Mon Sep 17 00:00:00 2001 From: lipenghui Date: Fri, 5 Feb 2021 14:46:47 +0800 Subject: [PATCH] Fix the partition number not equals expected error (#9446) Fixes #8000 ### Motivation Fix the partition number not equals expected error ### Verifying this change New tests added, without this fix, you can see errors like `topics consumer java.lang.IllegalStateException: allTopicPartitionsNumber 2 not equals expected: 5` --- .../client/impl/TopicsConsumerImplTest.java | 45 +++++++++++++++++++ .../client/impl/MultiTopicsConsumerImpl.java | 8 +++- 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index 07af862980286..e108535d1c831 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -48,6 +48,7 @@ import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TopicStats; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -1219,4 +1220,48 @@ public void testSubscriptionMustCompleteWhenOperationTimeoutOnMultipleTopics() t } } + @Test(timeOut = testTimeout) + public void testPartitionsUpdatesForMultipleTopics() throws Exception { + final String topicName0 = "persistent://public/default/testPartitionsUpdatesForMultipleTopics-0"; + final String subName = "my-sub"; + admin.topics().createPartitionedTopic(topicName0, 2); + assertEquals(admin.topics().getPartitionedTopicMetadata(topicName0).partitions, 2); + + PatternMultiTopicsConsumerImpl consumer = (PatternMultiTopicsConsumerImpl) pulsarClient.newConsumer(Schema.STRING) + .topicsPattern("persistent://public/default/test.*") + .subscriptionType(SubscriptionType.Failover) + .subscriptionName(subName) + .subscribe(); + + Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 2); + Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 2); + + admin.topics().updatePartitionedTopic(topicName0, 5); + consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout()); + + Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { + Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 5); + Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 5); + }); + + final String topicName1 = "persistent://public/default/testPartitionsUpdatesForMultipleTopics-1"; + admin.topics().createPartitionedTopic(topicName1, 3); + assertEquals(admin.topics().getPartitionedTopicMetadata(topicName1).partitions, 3); + + consumer.getRecheckPatternTimeout().task().run(consumer.getRecheckPatternTimeout()); + + Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { + Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 8); + Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 8); + }); + + admin.topics().updatePartitionedTopic(topicName1, 5); + consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout()); + + Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { + Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 10); + Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 10); + }); + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index cca8f84f942e2..6cf6d3d6b8ff8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -1172,6 +1172,11 @@ public List> getConsumers() { return consumers.values().stream().collect(Collectors.toList()); } + // get all partitions that in the topics map + int getPartitionsOfTheTopicMap() { + return topics.values().stream().mapToInt(Integer::intValue).sum(); + } + @Override public void pause() { synchronized (pauseMutex) { @@ -1246,7 +1251,8 @@ private CompletableFuture subscribeIncreasedTopicPartitions(String topicNa future.complete(null); return future; } else if (oldPartitionNumber < currentPartitionNumber) { - allTopicPartitionsNumber.compareAndSet(oldPartitionNumber, currentPartitionNumber); + allTopicPartitionsNumber.addAndGet(currentPartitionNumber - oldPartitionNumber); + topics.put(topicName, currentPartitionNumber); List newPartitions = list.subList(oldPartitionNumber, currentPartitionNumber); // subscribe new added partitions List>> futureList = newPartitions