Skip to content

Commit

Permalink
Modify the occasion of triggering listeners (apache#8234)
Browse files Browse the repository at this point in the history
### Motivation
When we use `SystemTopicBasedTopicPoliciesService#updateTopicPoliciesAsync` to update Topic-level policies, `Listener.onUpdate()` will be triggered;
Now this approach will cause 2 problems:
1) Because `updateTopicPoliciesAsync` only sends the message to the system topic, it has not been consumed yet. Therefore, the policy data in the cache is still old. When listeners are triggered, they cannot get the policies directly through the cache. They have to modify the existing interface and pass the policies through parameters.
2) When the broker restarts, some topic-level policies will no longer take effect, because they rely on the `onUpdate` method to take effect

### Modifications
1)The trigger occasion of `listener.onUpdate` is changed to when the reader consumes the event
2)fix some npe
  • Loading branch information
315157973 authored Oct 14, 2020
1 parent edf0944 commit 3539992
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,25 @@ public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, Top
});
})
);
if (listeners.get(topicName) != null) {
for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) {
listener.onUpdate(policies);
}
}
}
});


return result;
}

private void notifyListener(Message<PulsarEvent> msg) {
if (!EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) {
return;
}
TopicPoliciesEvent event = msg.getValue().getTopicPoliciesEvent();
TopicName topicName = TopicName.get(event.getDomain(), event.getTenant(), event.getNamespace(), event.getTopic());
if (listeners.get(topicName) != null) {
TopicPolicies policies = event.getPolicies();
for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) {
listener.onUpdate(policies);
}
}
}

@Override
public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException {
if (policyCacheInitMap.containsKey(topicName.getNamespaceObject())
Expand Down Expand Up @@ -243,6 +250,7 @@ private void readMorePolicies(SystemTopicClient.Reader reader) {
reader.readNextAsync().whenComplete((msg, ex) -> {
if (ex == null) {
refreshTopicPoliciesCache(msg);
notifyListener(msg);
readMorePolicies(reader);
} else {
if (ex instanceof PulsarClientException.AlreadyClosedException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -304,8 +305,11 @@ public static Optional<Policies> getPolicies(BrokerService brokerService, String
final String path = path(POLICIES, namespace.toString());
Optional<Policies> policies = Optional.empty();
try {
policies = brokerService.pulsar().getConfigurationCache().policiesCache().getAsync(path)
.get(brokerService.pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
ConfigurationCacheService configurationCacheService = brokerService.pulsar().getConfigurationCache();
if (configurationCacheService != null) {
policies = configurationCacheService.policiesCache().getAsync(path)
.get(brokerService.pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
}
} catch (Exception e) {
log.warn("Failed to get message-rate for {} ", topicName, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2456,14 +2456,8 @@ public void onUpdate(TopicPolicies policies) {
}
}

private Optional<Policies> getNamespacePolicies(){
try {
return Optional.ofNullable(brokerService.pulsar().getAdminClient().namespaces()
.getPolicies(TopicName.get(topic).getNamespace()));
} catch (Exception e) {
log.error("get namespace policies fail", e);
}
return Optional.empty();
private Optional<Policies> getNamespacePolicies() {
return DispatchRateLimiter.getPolicies(brokerService, topic);
}

private void initializeTopicDispatchRateLimiterIfNeeded(TopicPolicies policies) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.broker.admin;

import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import static org.testng.Assert.assertEquals;

Expand Down Expand Up @@ -549,17 +551,49 @@ public void testPolicyOverwrittenByNamespaceLevel() throws Exception {
Assert.assertEquals(limiter.getDispatchRateOnMsg(), 100);
admin.topics().removeDispatchRate(topic);
for (int i = 0; i < 10; i++) {
Thread.sleep(500);
if (admin.topics().getDispatchRate(topic) == null) {
break;
}
Thread.sleep(500);
}
//2 Remove level policy ,DispatchRateLimiter should us ns level policy
limiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get().getDispatchRateLimiter().get();
Assert.assertEquals(limiter.getDispatchRateOnByte(), 30000);
Assert.assertEquals(limiter.getDispatchRateOnMsg(), 300);
}

@Test(timeOut = 20000)
public void testRestart() throws Exception {
final String topic = testTopic + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topic);
//wait for cache init
Thread.sleep(1000);
InactiveTopicPolicies inactiveTopicPolicies =
new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up,100,true);
admin.namespaces().setInactiveTopicPolicies(myNamespace, inactiveTopicPolicies);
//wait for zk
Thread.sleep(500);
inactiveTopicPolicies =
new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,200,false);
admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies);
for (int i = 0; i < 10; i++) {
Thread.sleep(500);
if (admin.topics().getInactiveTopicPolicies(topic) != null) {
break;
}
}
// restart broker, policy should still take effect
stopBroker();
Thread.sleep(500);
startBroker();

//wait for cache
pulsarClient.newProducer().topic(topic).create().close();
Thread.sleep(2000);
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
Assert.assertEquals(persistentTopic.getInactiveTopicPolicies(), inactiveTopicPolicies);
}

@Test
public void testGetSetSubscriptionDispatchRate() throws Exception {
final String topic = testTopic + UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ public void testTopicLevelInActiveTopicApi() throws Exception {
conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
super.baseSetup();
//wait for init
Thread.sleep(2000);
final String topicName = "persistent://prop/ns-abc/testMaxInactiveDuration-" + UUID.randomUUID().toString();
admin.topics().createPartitionedTopic(topicName, 3);

Expand All @@ -314,8 +316,7 @@ public void testTopicLevelInActiveTopicApi() throws Exception {
policies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
policies.setMaxInactiveDurationSeconds(10);
admin.topics().setInactiveTopicPolicies(topicName, policies);
//wait for init
Thread.sleep(3000);

for (int i = 0; i < 50; i++) {
if (admin.topics().getInactiveTopicPolicies(topicName) != null) {
break;
Expand Down

0 comments on commit 3539992

Please sign in to comment.