Skip to content

Commit

Permalink
Do not use a static map of listeners in TopicPoliciesService (apache#…
Browse files Browse the repository at this point in the history
…9486)

### Motivation

Maybe CI jobs are failing with OOM in the brokers unit tests. The Surefire worker is configured with 4 processes, each with xmx of 1G. 

The problem was introduced in apache#7863 where a static map of listeners was added to an interface. That makes that map to contain all the `PulsarService` instances created during the tests execution and keeping references to everything else.

The map should instead be scoped to the specific instance.
  • Loading branch information
merlimat authored Feb 5, 2021
1 parent 393cebb commit 31ee454
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -62,6 +63,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic

private final Map<NamespaceName, Boolean> policyCacheInitMap = new ConcurrentHashMap<>();

private final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<>();

public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {
this.pulsarService = pulsarService;
}
Expand Down Expand Up @@ -125,9 +128,9 @@ private void notifyListener(Message<PulsarEvent> msg) {
TopicPoliciesEvent event = msg.getValue().getTopicPoliciesEvent();
TopicName topicName = TopicName.get(event.getDomain(), event.getTenant(),
event.getNamespace(), event.getTopic());
if (LISTENERS.get(topicName) != null) {
if (listeners.get(topicName) != null) {
TopicPolicies policies = event.getPolicies();
for (TopicPolicyListener<TopicPolicies> listener : LISTENERS.get(topicName)) {
for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) {
listener.onUpdate(policies);
}
}
Expand Down Expand Up @@ -361,12 +364,12 @@ Boolean getPoliciesCacheInit(NamespaceName namespaceName) {

@Override
public void registerListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) {
LISTENERS.computeIfAbsent(topicName, k -> Lists.newCopyOnWriteArrayList()).add(listener);
listeners.computeIfAbsent(topicName, k -> Lists.newCopyOnWriteArrayList()).add(listener);
}

@Override
public void unregisterListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) {
LISTENERS.computeIfAbsent(topicName, k -> Lists.newCopyOnWriteArrayList()).remove(listener);
listeners.computeIfAbsent(topicName, k -> Lists.newCopyOnWriteArrayList()).remove(listener);
}

private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedTopicPoliciesService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
*/
package org.apache.pulsar.broker.service;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
Expand All @@ -34,7 +31,6 @@
public interface TopicPoliciesService {

TopicPoliciesService DISABLED = new TopicPoliciesServiceDisabled();
Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> LISTENERS = new ConcurrentHashMap<>();

/**
* Update policies for a topic async.
Expand Down

0 comments on commit 31ee454

Please sign in to comment.