Skip to content

Commit

Permalink
[Issue 12726][broker] Fix deadlock in metadata-store callback thread (a…
Browse files Browse the repository at this point in the history
…pache#12753)

Fixes apache#12726

### Motivation

See apache#12726 


### Modifications

Use PolicyHierarchyValue to cache backlogQuota policies to avoid calling blocking metadata query in metadata-callback thread.
  • Loading branch information
Jason918 authored Nov 22, 2021
1 parent 051b60d commit ebfcd71
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -37,6 +38,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import lombok.Getter;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup;
Expand All @@ -51,6 +53,7 @@
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.Policies;
Expand Down Expand Up @@ -140,13 +143,8 @@ public AbstractTopic(String topic, BrokerService brokerService) {
ServiceConfiguration config = brokerService.pulsar().getConfiguration();
this.replicatorPrefix = config.getReplicatorPrefix();


topicPolicies = new HierarchyTopicPolicies();
topicPolicies.getInactiveTopicPolicies().updateBrokerValue(new InactiveTopicPolicies(
config.getBrokerDeleteInactiveTopicsMode(),
config.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
config.isBrokerDeleteInactiveTopicsEnabled()));
topicPolicies.getMaxSubscriptionsPerTopic().updateBrokerValue(config.getMaxSubscriptionsPerTopic());
updateTopicPolicyByBrokerConfig(topicPolicies, brokerService);

this.topicMaxMessageSizeCheckIntervalMs = TimeUnit.SECONDS.toMillis(
config.getMaxMessageSizeCheckIntervalInSeconds());
Expand All @@ -156,6 +154,38 @@ public AbstractTopic(String topic, BrokerService brokerService) {
updatePublishDispatcher(Optional.empty());
}

protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
if (log.isDebugEnabled()) {
log.debug("[{}]updateTopicPolicyByNamespacePolicy,data={}", topic, namespacePolicies);
}
if (namespacePolicies.deleted) {
return;
}
topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic);
topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies);
Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(
type -> this.topicPolicies.getBackLogQuotaMap().get(type)
.updateNamespaceValue(MapUtils.getObject(namespacePolicies.backlog_quota_map, type)));
}

private void updateTopicPolicyByBrokerConfig(HierarchyTopicPolicies topicPolicies, BrokerService brokerService) {
ServiceConfiguration config = brokerService.pulsar().getConfiguration();
topicPolicies.getInactiveTopicPolicies().updateBrokerValue(new InactiveTopicPolicies(
config.getBrokerDeleteInactiveTopicsMode(),
config.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
config.isBrokerDeleteInactiveTopicsEnabled()));

topicPolicies.getMaxSubscriptionsPerTopic().updateBrokerValue(config.getMaxSubscriptionsPerTopic());

//init backlogQuota
topicPolicies.getBackLogQuotaMap()
.get(BacklogQuota.BacklogQuotaType.destination_storage)
.updateBrokerValue(brokerService.getBacklogQuotaManager().getDefaultQuota());
topicPolicies.getBackLogQuotaMap()
.get(BacklogQuota.BacklogQuotaType.message_age)
.updateBrokerValue(brokerService.getBacklogQuotaManager().getDefaultQuota());
}

protected boolean isProducersExceeded() {
Integer maxProducers = getTopicPolicies().map(TopicPolicies::getMaxProducerPerTopic).orElse(null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand All @@ -33,23 +32,17 @@
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.apache.pulsar.common.util.FutureUtil;

@Slf4j
public class BacklogQuotaManager {
private final BacklogQuotaImpl defaultQuota;
private final PulsarService pulsar;
private final boolean isTopicLevelPoliciesEnable;
private final NamespaceResources namespaceResources;


public BacklogQuotaManager(PulsarService pulsar) {
this.isTopicLevelPoliciesEnable = pulsar.getConfiguration().isTopicLevelPoliciesEnabled();
double backlogQuotaGB = pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB();
this.defaultQuota = BacklogQuotaImpl.builder()
.limitSize(backlogQuotaGB > 0 ? (long) (backlogQuotaGB * BacklogQuotaImpl.BYTES_IN_GIGABYTE)
Expand All @@ -58,7 +51,6 @@ public BacklogQuotaManager(PulsarService pulsar) {
.retentionPolicy(pulsar.getConfiguration().getBacklogQuotaDefaultRetentionPolicy())
.build();
this.namespaceResources = pulsar.getPulsarResources().getNamespaceResources();
this.pulsar = pulsar;
}

public BacklogQuotaImpl getDefaultQuota() {
Expand All @@ -77,43 +69,14 @@ public BacklogQuotaImpl getBacklogQuota(NamespaceName namespace, BacklogQuotaTyp
}
}

public BacklogQuotaImpl getBacklogQuota(TopicName topicName, BacklogQuotaType backlogQuotaType) {
if (!isTopicLevelPoliciesEnable) {
return getBacklogQuota(topicName.getNamespaceObject(), backlogQuotaType);
}

try {
return Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(topicName))
.map(TopicPolicies::getBackLogQuotaMap)
.map(map -> map.get(backlogQuotaType.name()))
.orElseGet(() -> getBacklogQuota(topicName.getNamespaceObject(), backlogQuotaType));
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.debug("Topic policies cache have not init, will apply the namespace backlog quota: topicName={}",
topicName);
} catch (Exception e) {
log.error("Failed to read topic policies data, "
+ "will apply the namespace backlog quota: topicName={}", topicName, e);
}
return getBacklogQuota(topicName.getNamespaceObject(), backlogQuotaType);
}

public long getBacklogQuotaLimitInSize(TopicName topicName) {
return getBacklogQuota(topicName, BacklogQuotaType.destination_storage).getLimitSize();
}

public int getBacklogQuotaLimitInTime(TopicName topicName) {
return getBacklogQuota(topicName, BacklogQuotaType.message_age).getLimitTime();
}

/**
* Handle exceeded size backlog by using policies set in the zookeeper for given topic.
*
* @param persistentTopic Topic on which backlog has been exceeded
*/
public void handleExceededBacklogQuota(PersistentTopic persistentTopic, BacklogQuotaType backlogQuotaType,
boolean preciseTimeBasedBacklogQuotaCheck) {
TopicName topicName = TopicName.get(persistentTopic.getName());
BacklogQuota quota = getBacklogQuota(topicName, backlogQuotaType);
BacklogQuota quota = persistentTopic.getBacklogQuota(backlogQuotaType);
log.info("Backlog quota type {} exceeded for topic [{}]. Applying [{}] policy", backlogQuotaType,
persistentTopic.getName(), quota.getPolicy());
switch (quota.getPolicy()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ public CompletableFuture<Void> initialize() {
isEncryptionRequired = false;
} else {
Policies policies = optPolicies.get();
updateTopicPolicyByNamespacePolicy(policies);
isEncryptionRequired = policies.encryption_required;
isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(policies.inactive_topic_policies);
setSchemaCompatibilityStrategy(policies);
schemaValidationEnforced = policies.schema_validation_enforced;
}
Expand Down Expand Up @@ -964,6 +964,9 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired,
data.encryption_required);
}

updateTopicPolicyByNamespacePolicy(data);

isEncryptionRequired = data.encryption_required;
setSchemaCompatibilityStrategy(data);
isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
Expand All @@ -975,8 +978,6 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
});
subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions));

this.topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(data.inactive_topic_policies);

return checkReplicationAndRetryOnFailure();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.netty.util.concurrent.FastThreadLocal;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -126,6 +127,7 @@
import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.LedgerInfo;
Expand Down Expand Up @@ -317,15 +319,16 @@ public CompletableFuture<Void> initialize() {
}

Policies policies = optPolicies.get();

this.updateTopicPolicyByNamespacePolicy(policies);

this.isEncryptionRequired = policies.encryption_required;

setSchemaCompatibilityStrategy(policies);
isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;

schemaValidationEnforced = policies.schema_validation_enforced;

topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(policies.inactive_topic_policies);

updateUnackedMessagesAppliedOnSubscription(policies);
updateUnackedMessagesExceededOnConsumer(policies);
}).exceptionally(ex -> {
Expand Down Expand Up @@ -2399,6 +2402,9 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
log.debug("Ignore the update because it has been deleted : {}", data);
return CompletableFuture.completedFuture(null);
}

updateTopicPolicyByNamespacePolicy(data);

isEncryptionRequired = data.encryption_required;

setSchemaCompatibilityStrategy(data);
Expand All @@ -2407,7 +2413,6 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
schemaValidationEnforced = data.schema_validation_enforced;
updateUnackedMessagesAppliedOnSubscription(data);
updateUnackedMessagesExceededOnConsumer(data);
this.topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(data.max_subscriptions_per_topic);

if (data.delayed_delivery_policies != null) {
delayedDeliveryTickTimeMillis = data.delayed_delivery_policies.getTickTime();
Expand All @@ -2416,8 +2421,6 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
//If the topic-level policy already exists, the namespace-level policy cannot override the topic-level policy.
Optional<TopicPolicies> topicPolicies = getTopicPolicies();

this.topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(data.inactive_topic_policies);

initializeRateLimiterIfNeeded(Optional.ofNullable(data));

this.updateMaxPublishRate(data);
Expand Down Expand Up @@ -2453,7 +2456,6 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
subscribeRateLimiter.get().onPoliciesUpdate(data);
}


return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture,
preCreateSubscriptionForCompactionIfNeeded());
}
Expand All @@ -2463,9 +2465,8 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
* @return Backlog quota for topic
*/
@Override
public BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType) {
TopicName topicName = TopicName.get(this.getName());
return brokerService.getBacklogQuotaManager().getBacklogQuota(topicName, backlogQuotaType);
public BacklogQuota getBacklogQuota(BacklogQuotaType backlogQuotaType) {
return this.topicPolicies.getBackLogQuotaMap().get(backlogQuotaType).get();
}

/**
Expand Down Expand Up @@ -2497,8 +2498,7 @@ public boolean isBacklogQuotaExceeded(String producerName, BacklogQuota.BacklogQ
* @return determine if backlog quota enforcement needs to be done for topic based on size limit
*/
public boolean isSizeBacklogExceeded() {
TopicName topicName = TopicName.get(getName());
long backlogQuotaLimitInBytes = brokerService.getBacklogQuotaManager().getBacklogQuotaLimitInSize(topicName);
long backlogQuotaLimitInBytes = getBacklogQuota(BacklogQuotaType.destination_storage).getLimitSize();
if (backlogQuotaLimitInBytes < 0) {
return false;
}
Expand All @@ -2521,7 +2521,7 @@ public boolean isSizeBacklogExceeded() {
public boolean isTimeBacklogExceeded() {
TopicName topicName = TopicName.get(getName());
CompletableFuture<Boolean> future = new CompletableFuture<>();
int backlogQuotaLimitInSecond = brokerService.getBacklogQuotaManager().getBacklogQuotaLimitInTime(topicName);
int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();

// If backlog quota by time is not set and we have no durable cursor.
if (backlogQuotaLimitInSecond <= 0
Expand Down Expand Up @@ -3088,8 +3088,13 @@ public void onUpdate(TopicPolicies policies) {
}

topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(policies.getMaxSubscriptionsPerTopic());

topicPolicies.getInactiveTopicPolicies().updateTopicValue(policies.getInactiveTopicPolicies());
Arrays.stream(BacklogQuotaType.values()).forEach(type ->
this.topicPolicies.getBackLogQuotaMap().get(type).updateTopicValue(
policies.getBackLogQuotaMap() == null ? null :
policies.getBackLogQuotaMap().get(type.toString()))
);


updateUnackedMessagesAppliedOnSubscription(namespacePolicies.orElse(null));
initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
Expand Down
Loading

0 comments on commit ebfcd71

Please sign in to comment.