Skip to content

Commit

Permalink
Optimize TopicPolicies#delayedDelivery Enabled and TickTimeMillis wit…
Browse files Browse the repository at this point in the history
…h HierarchyTopicPolicies (apache#13649)
  • Loading branch information
Jason918 authored Jan 12, 2022
1 parent dcf01e9 commit 1d9e7b5
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.Policies;
Expand Down Expand Up @@ -170,6 +171,8 @@ protected void updateTopicPolicy(TopicPolicies data) {
data.getBackLogQuotaMap() == null ? null : data.getBackLogQuotaMap().get(type.toString())));
topicPolicies.getTopicMaxMessageSize().updateTopicValue(data.getMaxMessageSize());
topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
}

protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
Expand All @@ -189,6 +192,12 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
.updateNamespaceValue(namespacePolicies.max_consumers_per_subscription);
topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies);
topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
topicPolicies.getDelayedDeliveryEnabled().updateNamespaceValue(
Optional.ofNullable(namespacePolicies.delayed_delivery_policies)
.map(DelayedDeliveryPolicies::isActive).orElse(null));
topicPolicies.getDelayedDeliveryTickTimeMillis().updateNamespaceValue(
Optional.ofNullable(namespacePolicies.delayed_delivery_policies)
.map(DelayedDeliveryPolicies::getTickTime).orElse(null));
topicPolicies.getSubscriptionTypesEnabled().updateNamespaceValue(
subTypeStringsToEnumSet(namespacePolicies.subscription_types_enabled));
Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(
Expand Down Expand Up @@ -219,6 +228,8 @@ private void updateTopicPolicyByBrokerConfig() {

topicPolicies.getTopicMaxMessageSize().updateBrokerValue(config.getMaxMessageSize());
topicPolicies.getMessageTTLInSeconds().updateBrokerValue(config.getTtlDurationDefaultInSeconds());
topicPolicies.getDelayedDeliveryEnabled().updateBrokerValue(config.isDelayedDeliveryEnabled());
topicPolicies.getDelayedDeliveryTickTimeMillis().updateBrokerValue(config.getDelayedDeliveryTickTimeMillis());
}

private EnumSet<SubType> subTypeStringsToEnumSet(Set<String> getSubscriptionTypesEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal

private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
private Optional<SubscribeRateLimiter> subscribeRateLimiter = Optional.empty();
public volatile long delayedDeliveryTickTimeMillis = 1000;
private final long backloggedCursorThresholdEntries;
public volatile boolean delayedDeliveryEnabled = false;
public static final int MESSAGE_RATE_BACKOFF_MS = 1000;

protected final MessageDeduplication messageDeduplication;
Expand Down Expand Up @@ -256,9 +254,6 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
this.ledger = ledger;
this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
this.replicators = new ConcurrentOpenHashMap<>(16, 1);
this.delayedDeliveryEnabled = brokerService.pulsar().getConfiguration().isDelayedDeliveryEnabled();
this.delayedDeliveryTickTimeMillis =
brokerService.pulsar().getConfiguration().getDelayedDeliveryTickTimeMillis();
this.backloggedCursorThresholdEntries =
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
initializeRateLimiterIfNeeded(Optional.empty());
Expand Down Expand Up @@ -2446,10 +2441,6 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
updateUnackedMessagesAppliedOnSubscription(data);
updateUnackedMessagesExceededOnConsumer(data);

if (data.delayed_delivery_policies != null) {
delayedDeliveryTickTimeMillis = data.delayed_delivery_policies.getTickTime();
delayedDeliveryEnabled = data.delayed_delivery_policies.isActive();
}
//If the topic-level policy already exists, the namespace-level policy cannot override the topic-level policy.
Optional<TopicPolicies> topicPolicies = getTopicPolicies();

Expand Down Expand Up @@ -3042,21 +3033,15 @@ public CompletableFuture<Void> truncate() {
}

public long getDelayedDeliveryTickTimeMillis() {
//Topic level setting has higher priority than namespace level
return getTopicPolicies()
.map(TopicPolicies::getDelayedDeliveryTickTimeMillis)
.orElse(delayedDeliveryTickTimeMillis);
return topicPolicies.getDelayedDeliveryTickTimeMillis().get();
}

public int getMaxUnackedMessagesOnConsumer() {
return maxUnackedMessagesOnConsumerAppilied;
}

public boolean isDelayedDeliveryEnabled() {
//Topic level setting has higher priority than namespace level
return getTopicPolicies()
.map(TopicPolicies::getDelayedDeliveryEnabled)
.orElse(delayedDeliveryEnabled);
return topicPolicies.getDelayedDeliveryEnabled().get();
}

public int getMaxUnackedMessagesOnSubscription() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,11 @@ public class HierarchyTopicPolicies {
final PolicyHierarchyValue<Integer> topicMaxMessageSize;
final PolicyHierarchyValue<Integer> messageTTLInSeconds;
final PolicyHierarchyValue<Integer> maxConsumerPerTopic;
final PolicyHierarchyValue<Boolean> delayedDeliveryEnabled;
final PolicyHierarchyValue<Long> delayedDeliveryTickTimeMillis;
final PolicyHierarchyValue<Integer> maxConsumersPerSubscription;


public HierarchyTopicPolicies() {
replicationClusters = new PolicyHierarchyValue<>();
deduplicationEnabled = new PolicyHierarchyValue<>();
Expand All @@ -59,5 +62,7 @@ public HierarchyTopicPolicies() {
.build();
topicMaxMessageSize = new PolicyHierarchyValue<>();
messageTTLInSeconds = new PolicyHierarchyValue<>();
delayedDeliveryEnabled = new PolicyHierarchyValue<>();
delayedDeliveryTickTimeMillis = new PolicyHierarchyValue<>();
}
}

0 comments on commit 1d9e7b5

Please sign in to comment.