diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index b5461b6f7f747..9037aafa194cb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -62,6 +63,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic private final Map policyCacheInitMap = new ConcurrentHashMap<>(); + private final Map>> listeners = new ConcurrentHashMap<>(); + public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) { this.pulsarService = pulsarService; } @@ -125,9 +128,9 @@ private void notifyListener(Message msg) { TopicPoliciesEvent event = msg.getValue().getTopicPoliciesEvent(); TopicName topicName = TopicName.get(event.getDomain(), event.getTenant(), event.getNamespace(), event.getTopic()); - if (LISTENERS.get(topicName) != null) { + if (listeners.get(topicName) != null) { TopicPolicies policies = event.getPolicies(); - for (TopicPolicyListener listener : LISTENERS.get(topicName)) { + for (TopicPolicyListener listener : listeners.get(topicName)) { listener.onUpdate(policies); } } @@ -361,12 +364,12 @@ Boolean getPoliciesCacheInit(NamespaceName namespaceName) { @Override public void registerListener(TopicName topicName, TopicPolicyListener listener) { - LISTENERS.computeIfAbsent(topicName, k -> Lists.newCopyOnWriteArrayList()).add(listener); + listeners.computeIfAbsent(topicName, k -> Lists.newCopyOnWriteArrayList()).add(listener); } @Override public void unregisterListener(TopicName topicName, TopicPolicyListener listener) { - LISTENERS.computeIfAbsent(topicName, k -> Lists.newCopyOnWriteArrayList()).remove(listener); + listeners.computeIfAbsent(topicName, k -> Lists.newCopyOnWriteArrayList()).remove(listener); } private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedTopicPoliciesService.class); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java index 9b6d2820a9cc8..c896dc49fd024 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java @@ -18,10 +18,7 @@ */ package org.apache.pulsar.broker.service; -import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; @@ -34,7 +31,6 @@ public interface TopicPoliciesService { TopicPoliciesService DISABLED = new TopicPoliciesServiceDisabled(); - Map>> LISTENERS = new ConcurrentHashMap<>(); /** * Update policies for a topic async.