From 3539992442ac9f355f8011d2524b0142fc80f6ad Mon Sep 17 00:00:00 2001 From: feynmanlin Date: Wed, 14 Oct 2020 09:02:08 +0800 Subject: [PATCH] Modify the occasion of triggering listeners (#8234) ### Motivation When we use `SystemTopicBasedTopicPoliciesService#updateTopicPoliciesAsync` to update Topic-level policies, `Listener.onUpdate()` will be triggered; Now this approach will cause 2 problems: 1) Because `updateTopicPoliciesAsync` only sends the message to the system topic, it has not been consumed yet. Therefore, the policy data in the cache is still old. When listeners are triggered, they cannot get the policies directly through the cache. They have to modify the existing interface and pass the policies through parameters. 2) When the broker restarts, some topic-level policies will no longer take effect, because they rely on the `onUpdate` method to take effect ### Modifications 1)The trigger occasion of `listener.onUpdate` is changed to when the reader consumes the event 2)fix some npe --- .../SystemTopicBasedTopicPoliciesService.java | 22 ++++++++---- .../persistent/DispatchRateLimiter.java | 8 +++-- .../service/persistent/PersistentTopic.java | 10 ++---- .../broker/admin/TopicPoliciesTest.java | 36 ++++++++++++++++++- .../service/InactiveTopicDeleteTest.java | 5 +-- 5 files changed, 61 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 84add91912135..185402fee9b75 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -113,18 +113,25 @@ public CompletableFuture updateTopicPoliciesAsync(TopicName topicName, Top }); }) ); - if (listeners.get(topicName) != null) { - for (TopicPolicyListener listener : listeners.get(topicName)) { - listener.onUpdate(policies); - } - } } }); - - return result; } + private void notifyListener(Message msg) { + if (!EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) { + return; + } + TopicPoliciesEvent event = msg.getValue().getTopicPoliciesEvent(); + TopicName topicName = TopicName.get(event.getDomain(), event.getTenant(), event.getNamespace(), event.getTopic()); + if (listeners.get(topicName) != null) { + TopicPolicies policies = event.getPolicies(); + for (TopicPolicyListener listener : listeners.get(topicName)) { + listener.onUpdate(policies); + } + } + } + @Override public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException { if (policyCacheInitMap.containsKey(topicName.getNamespaceObject()) @@ -243,6 +250,7 @@ private void readMorePolicies(SystemTopicClient.Reader reader) { reader.readNextAsync().whenComplete((msg, ex) -> { if (ex == null) { refreshTopicPoliciesCache(msg); + notifyListener(msg); readMorePolicies(reader); } else { if (ex instanceof PulsarClientException.AlreadyClosedException) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index 29d11072e125b..266172797c635 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -28,6 +28,7 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.common.naming.TopicName; @@ -304,8 +305,11 @@ public static Optional getPolicies(BrokerService brokerService, String final String path = path(POLICIES, namespace.toString()); Optional policies = Optional.empty(); try { - policies = brokerService.pulsar().getConfigurationCache().policiesCache().getAsync(path) - .get(brokerService.pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS); + ConfigurationCacheService configurationCacheService = brokerService.pulsar().getConfigurationCache(); + if (configurationCacheService != null) { + policies = configurationCacheService.policiesCache().getAsync(path) + .get(brokerService.pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS); + } } catch (Exception e) { log.warn("Failed to get message-rate for {} ", topicName, e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 0e16ead5dae0d..fd9f076b33ec4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2456,14 +2456,8 @@ public void onUpdate(TopicPolicies policies) { } } - private Optional getNamespacePolicies(){ - try { - return Optional.ofNullable(brokerService.pulsar().getAdminClient().namespaces() - .getPolicies(TopicName.get(topic).getNamespace())); - } catch (Exception e) { - log.error("get namespace policies fail", e); - } - return Optional.empty(); + private Optional getNamespacePolicies() { + return DispatchRateLimiter.getPolicies(brokerService, topic); } private void initializeTopicDispatchRateLimiterIfNeeded(TopicPolicies policies) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 936765fe2e87e..0f5c6872f133b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -19,6 +19,8 @@ package org.apache.pulsar.broker.admin; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; +import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.SubscribeRate; import static org.testng.Assert.assertEquals; @@ -549,10 +551,10 @@ public void testPolicyOverwrittenByNamespaceLevel() throws Exception { Assert.assertEquals(limiter.getDispatchRateOnMsg(), 100); admin.topics().removeDispatchRate(topic); for (int i = 0; i < 10; i++) { + Thread.sleep(500); if (admin.topics().getDispatchRate(topic) == null) { break; } - Thread.sleep(500); } //2 Remove level policy ,DispatchRateLimiter should us ns level policy limiter = pulsar.getBrokerService().getTopicIfExists(topic).get().get().getDispatchRateLimiter().get(); @@ -560,6 +562,38 @@ public void testPolicyOverwrittenByNamespaceLevel() throws Exception { Assert.assertEquals(limiter.getDispatchRateOnMsg(), 300); } + @Test(timeOut = 20000) + public void testRestart() throws Exception { + final String topic = testTopic + UUID.randomUUID(); + admin.topics().createNonPartitionedTopic(topic); + //wait for cache init + Thread.sleep(1000); + InactiveTopicPolicies inactiveTopicPolicies = + new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up,100,true); + admin.namespaces().setInactiveTopicPolicies(myNamespace, inactiveTopicPolicies); + //wait for zk + Thread.sleep(500); + inactiveTopicPolicies = + new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions,200,false); + admin.topics().setInactiveTopicPolicies(topic, inactiveTopicPolicies); + for (int i = 0; i < 10; i++) { + Thread.sleep(500); + if (admin.topics().getInactiveTopicPolicies(topic) != null) { + break; + } + } + // restart broker, policy should still take effect + stopBroker(); + Thread.sleep(500); + startBroker(); + + //wait for cache + pulsarClient.newProducer().topic(topic).create().close(); + Thread.sleep(2000); + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get(); + Assert.assertEquals(persistentTopic.getInactiveTopicPolicies(), inactiveTopicPolicies); + } + @Test public void testGetSetSubscriptionDispatchRate() throws Exception { final String topic = testTopic + UUID.randomUUID(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java index 9969efad3f8f6..8ddff1e176832 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java @@ -303,6 +303,8 @@ public void testTopicLevelInActiveTopicApi() throws Exception { conf.setSystemTopicEnabled(true); conf.setTopicLevelPoliciesEnabled(true); super.baseSetup(); + //wait for init + Thread.sleep(2000); final String topicName = "persistent://prop/ns-abc/testMaxInactiveDuration-" + UUID.randomUUID().toString(); admin.topics().createPartitionedTopic(topicName, 3); @@ -314,8 +316,7 @@ public void testTopicLevelInActiveTopicApi() throws Exception { policies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions); policies.setMaxInactiveDurationSeconds(10); admin.topics().setInactiveTopicPolicies(topicName, policies); - //wait for init - Thread.sleep(3000); + for (int i = 0; i < 50; i++) { if (admin.topics().getInactiveTopicPolicies(topicName) != null) { break;