Skip to content

Commit

Permalink
Avoid potentially blocking calls to metadata on critical threads (apa…
Browse files Browse the repository at this point in the history
…che#12339)

* Avoid potentially blocking calls to metadata on critical threads

* Fixed log arguments order

* Addressed comments

* Fixed mock in PersistentSubscriptionTest

* Fixed issue in mocked tests

* Fixed test that was force policies modification under the hood
  • Loading branch information
merlimat committed Oct 15, 2021
1 parent 8e2b003 commit c0e87c0
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import com.google.common.base.Function;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -47,6 +48,7 @@
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -331,16 +333,16 @@ public CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName n
return updateSubscriptionPermissionAsync(namespace, subscriptionName, Collections.singleton(role), true);
}

private CompletableFuture<Void> updateSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, Set<String> roles,
boolean remove) {
CompletableFuture<Void> result = new CompletableFuture<>();

private CompletableFuture<Void> updateSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName,
Set<String> roles,
boolean remove) {
try {
validatePoliciesReadOnlyAccess();
} catch (Exception e) {
result.completeExceptionally(e);
return FutureUtil.failedFuture(e);
}

CompletableFuture<Void> result = new CompletableFuture<>();
final String policiesPath = String.format("/%s/%s/%s", "admin", POLICIES, namespace.toString());

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,9 @@ public AbstractTopic(String topic, BrokerService brokerService) {
this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration()
.getBrokerDeleteInactiveTopicsMode());
this.lastActive = System.nanoTime();
Policies policies = null;
try {
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
.orElseGet(() -> new Policies());
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
}
this.preciseTopicPublishRateLimitingEnable =
brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable();
updatePublishDispatcher(policies);
updatePublishDispatcher(Optional.empty());
}

protected boolean isProducersExceeded() {
Expand All @@ -154,11 +146,11 @@ protected boolean isProducersExceeded() {
Policies policies;
try {
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
.orElseGet(() -> new Policies());
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()));
} catch (Exception e) {
policies = new Policies();
}

maxProducers = policies.max_producers_per_topic;
}
maxProducers = maxProducers != null ? maxProducers : brokerService.pulsar()
Expand Down Expand Up @@ -201,15 +193,10 @@ protected boolean isConsumersExceededOnTopic() {
// Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()));

if (policies == null) {
policies = new Policies();
}
} catch (Exception e) {
log.warn("[{}] Failed to get namespace policies that include max number of consumers: {}", topic,
e.getMessage());
policies = new Policies();
}

maxConsumers = policies.max_consumers_per_topic;
}
final int maxConsumersPerTopic = maxConsumers != null ? maxConsumers
Expand Down Expand Up @@ -767,10 +754,10 @@ public PublishRateLimiter getBrokerPublishRateLimiter() {
}

public void updateMaxPublishRate(Policies policies) {
updatePublishDispatcher(policies);
updatePublishDispatcher(Optional.of(policies));
}

private void updatePublishDispatcher(Policies policies) {
private void updatePublishDispatcher(Optional<Policies> optPolicies) {
//if topic-level policy exists, try to use topic-level publish rate policy
Optional<PublishRate> topicPublishRate = getTopicPolicies().map(TopicPolicies::getPublishRate);
if (topicPublishRate.isPresent()) {
Expand All @@ -780,9 +767,25 @@ private void updatePublishDispatcher(Policies policies) {
return;
}

Policies policies;
try {
if (optPolicies.isPresent()) {
policies = optPolicies.get();
} else {
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()));
if (policies == null) {
policies = new Policies();
}
}
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
policies = new Policies();
}

//topic-level policy is not set, try to use namespace-level rate policy
final String clusterName = brokerService.pulsar().getConfiguration().getClusterName();
final PublishRate publishRate = policies != null && policies.publishMaxMessageRate != null
final PublishRate publishRate = policies.publishMaxMessageRate != null
? policies.publishMaxMessageRate.get(clusterName)
: null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2568,11 +2568,11 @@ public int getDefaultNumPartitions(final TopicName topicName) {

private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) {
try {
Optional<Policies> policies = pulsar.getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, topicName.getNamespace()));
Policies policies = pulsar.getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, topicName.getNamespace()));
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) {
return policies.get().autoTopicCreationOverride;
if (policies != null && policies.autoTopicCreationOverride != null) {
return policies.autoTopicCreationOverride;
}
} catch (Throwable t) {
// Ignoring since if we don't have policies, we fallback on the default
Expand Down Expand Up @@ -2601,11 +2601,11 @@ public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) {

private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) {
try {
Optional<Policies> policies = pulsar.getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, topicName.getNamespace()));
Policies policies = pulsar.getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, topicName.getNamespace()));
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) {
return policies.get().autoSubscriptionCreationOverride;
if (policies != null && policies.autoSubscriptionCreationOverride != null) {
return policies.autoSubscriptionCreationOverride;
}
} catch (Throwable t) {
// Ignoring since if we don't have policies, we fallback on the default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.service.persistent;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.web.PulsarWebResource.path;
import java.util.Optional;
Expand Down Expand Up @@ -325,17 +324,16 @@ public DispatchRate getPoliciesDispatchRate(BrokerService brokerService) {
public static Optional<Policies> getPolicies(BrokerService brokerService, String topicName) {
final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject();
final String path = path(POLICIES, namespace.toString());
Optional<Policies> policies = Optional.empty();
Policies policies = null;
try {
ConfigurationCacheService configurationCacheService = brokerService.pulsar().getConfigurationCache();
if (configurationCacheService != null) {
policies = configurationCacheService.policiesCache().getAsync(path)
.get(brokerService.pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
policies = configurationCacheService.policiesCache().getDataIfPresent(path);
}
} catch (Exception e) {
log.warn("Failed to get message-rate for {} ", topicName, e);
}
return policies;
return Optional.ofNullable(policies);
}

/**
Expand Down

0 comments on commit c0e87c0

Please sign in to comment.