Skip to content

Commit

Permalink
Fix the master CI broken with update dispatch rate block issue (apach…
Browse files Browse the repository at this point in the history
…e#12360)

Avoid calling metadata store sync method in the `updateDispatchRate()` method, this can fixe the current broken CI of the master branch https://github.com/apache/pulsar/pull/12072/checks?check_run_id=3889859984
  • Loading branch information
codelipenghui authored Oct 14, 2021
1 parent 867d71c commit 64c4b14
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.persistent;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand Down Expand Up @@ -161,15 +162,23 @@ private DispatchRate createDispatchRate() {
public void updateDispatchRate() {
Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type);
if (!dispatchRate.isPresent()) {
dispatchRate = Optional.ofNullable(getPoliciesDispatchRate(brokerService));

if (!dispatchRate.isPresent()) {
dispatchRate = Optional.of(createDispatchRate());
}
getPoliciesDispatchRateAsync(brokerService).thenAccept(dispatchRateOp -> {
if (!dispatchRateOp.isPresent()) {
dispatchRateOp = Optional.of(createDispatchRate());
}
updateDispatchRate(dispatchRateOp.get());
log.info("[{}] configured {} message-dispatch rate at broker {}", this.topicName, type,
dispatchRateOp.get());

}).exceptionally(ex -> {
log.error("[{}] failed to get the dispatch rate policy from the namespace resource for type {}",
topicName, type, ex);
return null;
});
} else {
updateDispatchRate(dispatchRate.get());
log.info("[{}] configured {} message-dispatch rate at broker {}", this.topicName, type, dispatchRate.get());
}

updateDispatchRate(dispatchRate.get());
log.info("[{}] configured {} message-dispatch rate at broker {}", this.topicName, type, dispatchRate.get());
}

public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional<Policies> policies,
Expand Down Expand Up @@ -312,10 +321,16 @@ public static DispatchRateImpl getPoliciesDispatchRate(final String cluster,
*
* @return
*/
public DispatchRate getPoliciesDispatchRate(BrokerService brokerService) {
public CompletableFuture<Optional<DispatchRate>> getPoliciesDispatchRateAsync(BrokerService brokerService) {
final String cluster = brokerService.pulsar().getConfiguration().getClusterName();
final Optional<Policies> policies = getPolicies(brokerService, topicName);
return getPoliciesDispatchRate(cluster, policies, type);
return getPoliciesAsync(brokerService, topicName).thenApply(policiesOp ->
Optional.ofNullable(getPoliciesDispatchRate(cluster, policiesOp, type)));
}

public static CompletableFuture<Optional<Policies>> getPoliciesAsync(BrokerService brokerService,
String topicName) {
final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject();
return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace);
}

public static Optional<Policies> getPolicies(BrokerService brokerService, String topicName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1227,10 +1227,12 @@ public void testRemoveSubscriptionDispatchRate() throws Exception {
Awaitility.await()
.untilAsserted(() -> Assert.assertNull(admin.topics().getSubscriptionDispatchRate(topic)));

dispatchRateLimiter = pulsar.getBrokerService().getTopicIfExists(topic)
.get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
Assert.assertNotEquals(dispatchRateLimiter.getDispatchRateOnMsg(), dispatchRate.getDispatchThrottlingRateInByte());
Assert.assertNotEquals(dispatchRateLimiter.getDispatchRateOnByte(), dispatchRate.getDispatchThrottlingRateInByte());
Awaitility.await().untilAsserted(() -> {
DispatchRateLimiter drl = pulsar.getBrokerService().getTopicIfExists(topic)
.get().get().getSubscription(subscriptionName).getDispatcher().getRateLimiter().get();
Assert.assertNotEquals(drl.getDispatchRateOnMsg(), dispatchRate.getDispatchThrottlingRateInMsg());
Assert.assertNotEquals(drl.getDispatchRateOnByte(), dispatchRate.getDispatchThrottlingRateInByte());
});

consumer.close();
admin.topics().delete(topic, true);
Expand Down

0 comments on commit 64c4b14

Please sign in to comment.