Skip to content

Commit

Permalink
[improve][broker] Reduce calls on metadata store / ZK event thread & …
Browse files Browse the repository at this point in the history
…Netty threads in PersistentTopic (apache#19388)
  • Loading branch information
lhotari authored Feb 2, 2023
1 parent 81af293 commit cb1a031
Showing 1 changed file with 17 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -237,6 +238,8 @@ protected TopicStatsHelper initialValue() {

// Record the last time a data message (ie: not an internal Pulsar marker) is published on the topic
private volatile long lastDataMessagePublishedTimestamp = 0;
@Getter
private final ExecutorService orderedExecutor;

private static class TopicStatsHelper {
public double averageMsgSize;
Expand Down Expand Up @@ -265,6 +268,10 @@ public void reset() {

public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) {
super(topic, brokerService);
// null check for backwards compatibility with tests which mock the broker service
this.orderedExecutor = brokerService.getTopicOrderedExecutor() != null
? brokerService.getTopicOrderedExecutor().chooseThread(topic)
: null;
this.ledger = ledger;
this.subscriptions = ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
.expectedItems(16)
Expand Down Expand Up @@ -334,7 +341,7 @@ public CompletableFuture<Void> initialize() {
return FutureUtil.waitForAll(futures).thenCompose(__ ->
brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenAccept(optPolicies -> {
.thenAcceptAsync(optPolicies -> {
if (!optPolicies.isPresent()) {
isEncryptionRequired = false;
updatePublishDispatcher();
Expand All @@ -359,7 +366,7 @@ public CompletableFuture<Void> initialize() {
this.isEncryptionRequired = policies.encryption_required;

isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
})
}, getOrderedExecutor())
.thenCompose(ignore -> initTopicPolicy())
.exceptionally(ex -> {
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false",
Expand All @@ -374,6 +381,10 @@ public CompletableFuture<Void> initialize() {
PersistentTopic(String topic, BrokerService brokerService, ManagedLedger ledger,
MessageDeduplication messageDeduplication) {
super(topic, brokerService);
// null check for backwards compatibility with tests which mock the broker service
this.orderedExecutor = brokerService.getTopicOrderedExecutor() != null
? brokerService.getTopicOrderedExecutor().chooseThread(topic)
: null;
this.ledger = ledger;
this.messageDeduplication = messageDeduplication;
this.subscriptions = ConcurrentOpenHashMap.<String, PersistentSubscription>newBuilder()
Expand Down Expand Up @@ -667,7 +678,7 @@ public CompletableFuture<Void> startReplProducers() {
// read repl-cluster from policies to avoid restart of replicator which are in process of disconnect and close
return brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenAccept(optPolicies -> {
.thenAcceptAsync(optPolicies -> {
if (optPolicies.isPresent()) {
if (optPolicies.get().replication_clusters != null) {
Set<String> configuredClusters = Sets.newTreeSet(optPolicies.get().replication_clusters);
Expand All @@ -680,7 +691,7 @@ public CompletableFuture<Void> startReplProducers() {
} else {
replicators.forEach((region, replicator) -> replicator.startProducer());
}
}).exceptionally(ex -> {
}, getOrderedExecutor()).exceptionally(ex -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Error getting policies while starting repl-producers {}", topic, ex.getMessage());
}
Expand Down Expand Up @@ -1217,9 +1228,9 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
}
FutureUtil.waitForAll(futures).thenRun(() -> {
FutureUtil.waitForAll(futures).thenRunAsync(() -> {
closeClientFuture.complete(null);
}).exceptionally(ex -> {
}, getOrderedExecutor()).exceptionally(ex -> {
log.error("[{}] Error closing clients", topic, ex);
unfenceTopicToResume();
closeClientFuture.completeExceptionally(ex);
Expand Down

0 comments on commit cb1a031

Please sign in to comment.