diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java index 2a994e2a892f8..d93269369e31b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java @@ -25,6 +25,7 @@ import io.netty.util.TimerTask; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -82,41 +83,36 @@ public void run(Timeout timeout) throws Exception { if (timeout.isCancelled()) { return; } - - CompletableFuture recheckFuture = new CompletableFuture<>(); - List> futures = Lists.newArrayListWithExpectedSize(2); - - client.getLookup().getTopicsUnderNamespace(namespaceName, subscriptionMode).thenAccept(topics -> { - if (log.isDebugEnabled()) { - log.debug("Get topics under namespace {}, topics.size: {}", namespaceName.toString(), topics.size()); - topics.forEach(topicName -> - log.debug("Get topics under namespace {}, topic: {}", namespaceName.toString(), topicName)); - } - - List newTopics = PulsarClientImpl.topicsPatternFilter(topics, topicsPattern); - List oldTopics = new ArrayList<>(); - oldTopics.addAll(getPartitionedTopics()); - getPartitions().forEach(p -> { - TopicName t = TopicName.get(p); - if (!t.isPartitioned() || !oldTopics.contains(t.getPartitionedTopicName())) { - oldTopics.add(p); + client.getLookup().getTopicsUnderNamespace(namespaceName, subscriptionMode) + .thenCompose(topics -> { + if (log.isDebugEnabled()) { + log.debug("Get topics under namespace {}, topics.size: {}", + namespaceName.toString(), topics.size()); + topics.forEach(topicName -> + log.debug("Get topics under namespace {}, topic: {}", + namespaceName.toString(), topicName)); + } + final List newTopics = PulsarClientImpl.topicsPatternFilter(topics, topicsPattern); + final List oldTopics = new ArrayList<>(getPartitionedTopics()); + for (String partition : getPartitions()) { + TopicName topicName = TopicName.get(partition); + if (!topicName.isPartitioned() || !oldTopics.contains(topicName.getPartitionedTopicName())) { + oldTopics.add(partition); + } } + final List> listenersCallback = new ArrayList<>(2); + listenersCallback.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics, oldTopics))); + listenersCallback.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics, newTopics))); + return FutureUtil.waitForAll(Collections.unmodifiableList(listenersCallback)); + }).exceptionally(ex -> { + log.warn("[{}] Failed to recheck topics change: {}", topic, ex.getMessage()); + return null; + }).thenAccept(__ -> { + // schedule the next re-check task + this.recheckPatternTimeout = client.timer() + .newTimeout(PatternMultiTopicsConsumerImpl.this, + Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS); }); - - futures.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics, oldTopics))); - futures.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics, newTopics))); - FutureUtil.waitForAll(futures) - .thenAccept(finalFuture -> recheckFuture.complete(null)) - .exceptionally(ex -> { - log.warn("[{}] Failed to recheck topics change: {}", topic, ex.getMessage()); - recheckFuture.completeExceptionally(ex); - return null; - }); - }); - - // schedule the next re-check task - this.recheckPatternTimeout = client.timer().newTimeout(PatternMultiTopicsConsumerImpl.this, - Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS); } public Pattern getPattern() {