Skip to content

Commit

Permalink
[improve][client] Avoid timertask run before previous subscribe compl…
Browse files Browse the repository at this point in the history
…ete. (apache#14818)

### Motivation

If configuration  ``patternAutoDiscoveryPeriod`` is small, there may be some unnecessary subscribe requests.

### Modifications

*Describe the modifications you've done.*
  • Loading branch information
mattisonchao authored Mar 24, 2022
1 parent 508a7e3 commit 0fe921f
Showing 1 changed file with 29 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.netty.util.TimerTask;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -82,41 +83,36 @@ public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled()) {
return;
}

CompletableFuture<Void> recheckFuture = new CompletableFuture<>();
List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(2);

client.getLookup().getTopicsUnderNamespace(namespaceName, subscriptionMode).thenAccept(topics -> {
if (log.isDebugEnabled()) {
log.debug("Get topics under namespace {}, topics.size: {}", namespaceName.toString(), topics.size());
topics.forEach(topicName ->
log.debug("Get topics under namespace {}, topic: {}", namespaceName.toString(), topicName));
}

List<String> newTopics = PulsarClientImpl.topicsPatternFilter(topics, topicsPattern);
List<String> oldTopics = new ArrayList<>();
oldTopics.addAll(getPartitionedTopics());
getPartitions().forEach(p -> {
TopicName t = TopicName.get(p);
if (!t.isPartitioned() || !oldTopics.contains(t.getPartitionedTopicName())) {
oldTopics.add(p);
client.getLookup().getTopicsUnderNamespace(namespaceName, subscriptionMode)
.thenCompose(topics -> {
if (log.isDebugEnabled()) {
log.debug("Get topics under namespace {}, topics.size: {}",
namespaceName.toString(), topics.size());
topics.forEach(topicName ->
log.debug("Get topics under namespace {}, topic: {}",
namespaceName.toString(), topicName));
}
final List<String> newTopics = PulsarClientImpl.topicsPatternFilter(topics, topicsPattern);
final List<String> oldTopics = new ArrayList<>(getPartitionedTopics());
for (String partition : getPartitions()) {
TopicName topicName = TopicName.get(partition);
if (!topicName.isPartitioned() || !oldTopics.contains(topicName.getPartitionedTopicName())) {
oldTopics.add(partition);
}
}
final List<CompletableFuture<?>> listenersCallback = new ArrayList<>(2);
listenersCallback.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics, oldTopics)));
listenersCallback.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics, newTopics)));
return FutureUtil.waitForAll(Collections.unmodifiableList(listenersCallback));
}).exceptionally(ex -> {
log.warn("[{}] Failed to recheck topics change: {}", topic, ex.getMessage());
return null;
}).thenAccept(__ -> {
// schedule the next re-check task
this.recheckPatternTimeout = client.timer()
.newTimeout(PatternMultiTopicsConsumerImpl.this,
Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
});

futures.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics, oldTopics)));
futures.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics, newTopics)));
FutureUtil.waitForAll(futures)
.thenAccept(finalFuture -> recheckFuture.complete(null))
.exceptionally(ex -> {
log.warn("[{}] Failed to recheck topics change: {}", topic, ex.getMessage());
recheckFuture.completeExceptionally(ex);
return null;
});
});

// schedule the next re-check task
this.recheckPatternTimeout = client.timer().newTimeout(PatternMultiTopicsConsumerImpl.this,
Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS);
}

public Pattern getPattern() {
Expand Down

0 comments on commit 0fe921f

Please sign in to comment.