Skip to content

Commit

Permalink
[Broker] Optimize getTopicPolicies: skip unnecessarily thrown excepti…
Browse files Browse the repository at this point in the history
…on (apache#10683)

* [Broker] Optimize getTopicPolicies: skip unnecessarily thrown exception

* Make getTopicPolicies return Optional type
  • Loading branch information
michaeljmarshall authored May 25, 2021
1 parent 85effc4 commit a8363a3
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ protected boolean isConsumersExceededOnSubscription(BrokerService brokerService,
Policies policies = null;
Integer maxConsumersPerSubscription = null;
try {
maxConsumersPerSubscription = Optional.ofNullable(brokerService
.getTopicPolicies(TopicName.get(topic)))
maxConsumersPerSubscription = brokerService
.getTopicPolicies(TopicName.get(topic))
.map(TopicPolicies::getMaxConsumersPerSubscription)
.orElse(null);
if (maxConsumersPerSubscription == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,7 @@ public AbstractTopic(String topic, BrokerService brokerService) {
}

protected boolean isProducersExceeded() {
Integer maxProducers = null;
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
if (topicPolicies != null) {
maxProducers = topicPolicies.getMaxProducerPerTopic();
}
Integer maxProducers = getTopicPolicies().map(TopicPolicies::getMaxProducerPerTopic).orElse(null);

if (maxProducers == null) {
Policies policies;
Expand Down Expand Up @@ -197,11 +193,7 @@ public int getNumberOfSameAddressProducers(final String clientAddress) {
}

protected boolean isConsumersExceededOnTopic() {
Integer maxConsumers = null;
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
if (topicPolicies != null) {
maxConsumers = topicPolicies.getMaxConsumerPerTopic();
}
Integer maxConsumers = getTopicPolicies().map(TopicPolicies::getMaxConsumerPerTopic).orElse(null);
if (maxConsumers == null) {
Policies policies;
try {
Expand Down Expand Up @@ -779,11 +771,11 @@ public void updateMaxPublishRate(Policies policies) {

private void updatePublishDispatcher(Policies policies) {
//if topic-level policy exists, try to use topic-level publish rate policy
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
if (topicPolicies != null && topicPolicies.isPublishRateSet()) {
Optional<PublishRate> topicPublishRate = getTopicPolicies().map(TopicPolicies::getPublishRate);
if (topicPublishRate.isPresent()) {
log.info("Using topic policy publish rate instead of namespace level topic publish rate on topic {}",
this.topic);
updatePublishDispatcher(topicPolicies.getPublishRate());
updatePublishDispatcher(topicPublishRate.get());
return;
}

Expand Down Expand Up @@ -850,43 +842,25 @@ public void resetInactiveTopicPolicies(InactiveTopicDeleteMode inactiveTopicDele

/**
* Get {@link TopicPolicies} for this topic.
* @param topicName
* @return TopicPolicies is exist else return null.
* @return TopicPolicies, if they exist. Otherwise, the value will not be present.
*/
public TopicPolicies getTopicPolicies(TopicName topicName) {
TopicName cloneTopicName = topicName;
if (topicName.isPartitioned()) {
cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
}
try {
return brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(cloneTopicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.debug("Topic {} policies have not been initialized yet.", topicName.getPartitionedTopicName());
return null;
} catch (NullPointerException e) {
log.debug("Topic level policies are not enabled. "
+ "Please refer to systemTopicEnabled and topicLevelPoliciesEnabled on broker.conf");
return null;
}
public Optional<TopicPolicies> getTopicPolicies() {
return brokerService.getTopicPolicies(TopicName.get(topic));
}

protected int getWaitingProducersCount() {
return waitingExclusiveProducers.size();
}

protected boolean isExceedMaximumMessageSize(int size) {
Integer maxMessageSize = null;
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
if (topicPolicies != null && topicPolicies.isMaxMessageSizeSet()) {
maxMessageSize = topicPolicies.getMaxMessageSize();
}
if (maxMessageSize != null) {
if (maxMessageSize == 0) {
return false;
}
return size > maxMessageSize;
}
return false;
return getTopicPolicies()
.map(TopicPolicies::getMaxMessageSize)
.map(maxMessageSize -> {
if (maxMessageSize == 0) {
return false;
}
return size > maxMessageSize;
}).orElse(false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2553,36 +2553,26 @@ private boolean isSystemTopic(String topic) {
}

/**
* Get {@link TopicPolicies} for this topic.
* Get {@link TopicPolicies} for the parameterized topic.
* @param topicName
* @return TopicPolicies is exist else return null.
* @return TopicPolicies, if they exist. Otherwise, the value will not be present.
*/
public TopicPolicies getTopicPolicies(TopicName topicName) {
public Optional<TopicPolicies> getTopicPolicies(TopicName topicName) {
if (!pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
return Optional.empty();
}
TopicName cloneTopicName = topicName;
if (topicName.isPartitioned()) {
cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
}
try {
checkTopicLevelPolicyEnable();
return pulsar.getTopicPoliciesService().getTopicPolicies(cloneTopicName);
return Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(cloneTopicName));
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.debug("Topic {} policies have not been initialized yet.", topicName.getPartitionedTopicName());
return null;
} catch (RestException | NullPointerException e) {
log.debug("Topic level policies are not enabled. "
+ "Please refer to systemTopicEnabled and topicLevelPoliciesEnabled on broker.conf");
return null;
}
}

private void checkTopicLevelPolicyEnable() {
if (!pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
throw new RestException(Response.Status.METHOD_NOT_ALLOWED,
"Topic level policies is disabled, to enable the topic level policy and retry.");
return Optional.empty();
}
}


private <T> boolean checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions,
CompletableFuture<T> topicFuture) {
Integer maxTopicsPerNamespace;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,12 +426,12 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
}

private CompletableFuture<Boolean> isDeduplicationEnabled() {
TopicName name = TopicName.get(topic.getName());
//Topic level setting has higher priority than namespace level
TopicPolicies topicPolicies = topic.getTopicPolicies(name);
if (topicPolicies != null && topicPolicies.isDeduplicationSet()) {
return CompletableFuture.completedFuture(topicPolicies.getDeduplicationEnabled());
Optional<Boolean> isDeduplicationEnabled = topic.getTopicPolicies().map(TopicPolicies::getDeduplicationEnabled);
if (isDeduplicationEnabled.isPresent()) {
return CompletableFuture.completedFuture(isDeduplicationEnabled.get());
}
TopicName name = TopicName.get(topic.getName());
return pulsar.getConfigurationCache().policiesCache()
.getAsync(AdminResource.path(POLICIES, name.getNamespace())).thenApply(policies -> {
// If namespace policies have the field set, it will override the broker-level setting
Expand Down Expand Up @@ -488,12 +488,10 @@ public long getLastPublishedSequenceId(String producerName) {
}

public void takeSnapshot() {
Integer interval = null;
// try to get topic-level policies
TopicPolicies topicPolicies = topic.getTopicPolicies(TopicName.get(topic.getName()));
if (topicPolicies != null) {
interval = topicPolicies.getDeduplicationSnapshotIntervalSeconds();
}
Integer interval = topic.getTopicPolicies()
.map(TopicPolicies::getDeduplicationSnapshotIntervalSeconds)
.orElse(null);
try {
//if topic-level policies not exists, try to get namespace-level policies
if (interval == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -792,27 +792,21 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
}

public void updateUnackedMessagesAppliedOnSubscription(Policies policies) {
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
if (topicPolicies != null && topicPolicies.isMaxUnackedMessagesOnSubscriptionSet()) {
maxUnackedMessagesOnSubscriptionApplied = topicPolicies.getMaxUnackedMessagesOnSubscription();
} else {
maxUnackedMessagesOnSubscriptionApplied =
policies != null && policies.max_unacked_messages_per_subscription != null
? policies.max_unacked_messages_per_subscription
: brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerSubscription();
}
maxUnackedMessagesOnSubscriptionApplied = getTopicPolicies()
.map(TopicPolicies::getMaxUnackedMessagesOnSubscription)
.orElseGet(() ->
policies != null && policies.max_unacked_messages_per_subscription != null
? policies.max_unacked_messages_per_subscription
: brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerSubscription()
);
}

private void updateUnackedMessagesExceededOnConsumer(Policies data) {
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
if (topicPolicies != null && topicPolicies.isMaxUnackedMessagesOnConsumerSet()) {
maxUnackedMessagesOnConsumerAppilied = topicPolicies.getMaxUnackedMessagesOnConsumer();
} else {
maxUnackedMessagesOnConsumerAppilied =
data != null && data.max_unacked_messages_per_consumer != null
? data.max_unacked_messages_per_consumer
: brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer();
}
maxUnackedMessagesOnConsumerAppilied = getTopicPolicies()
.map(TopicPolicies::getMaxUnackedMessagesOnConsumer)
.orElseGet(() -> data != null && data.max_unacked_messages_per_consumer != null
? data.max_unacked_messages_per_consumer
: brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer());
getSubscriptions().forEach((name, sub) -> {
if (sub != null) {
sub.getConsumers().forEach(consumer -> {
Expand Down Expand Up @@ -1352,7 +1346,7 @@ public void checkMessageDeduplicationInfo() {
public void checkCompaction() {
TopicName name = TopicName.get(topic);
try {
Long compactionThreshold = Optional.ofNullable(getTopicPolicies(name))
Long compactionThreshold = getTopicPolicies()
.map(TopicPolicies::getCompactionThreshold)
.orElse(null);
if (compactionThreshold == null) {
Expand Down Expand Up @@ -2195,13 +2189,13 @@ public void checkDeduplicationSnapshot() {
* marked as inactive.
*/
private boolean shouldTopicBeRetained() {
TopicName name = TopicName.get(topic);
RetentionPolicies retentionPolicies = null;
try {
retentionPolicies = Optional.ofNullable(getTopicPolicies(name))
retentionPolicies = getTopicPolicies()
.map(TopicPolicies::getRetentionPolicies)
.orElse(null);
if (retentionPolicies == null){
TopicName name = TopicName.get(topic);
retentionPolicies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, name.getNamespace()))
.map(p -> p.retention_policies)
Expand Down Expand Up @@ -2252,9 +2246,9 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
delayedDeliveryEnabled = data.delayed_delivery_policies.isActive();
}
//If the topic-level policy already exists, the namespace-level policy cannot override the topic-level policy.
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
Optional<TopicPolicies> topicPolicies = getTopicPolicies();
if (data.inactive_topic_policies != null) {
if (topicPolicies == null || !topicPolicies.isInactiveTopicPoliciesSet()) {
if (!topicPolicies.isPresent() || !topicPolicies.get().isInactiveTopicPoliciesSet()) {
this.inactiveTopicPolicies = data.inactive_topic_policies;
}
} else {
Expand All @@ -2277,7 +2271,8 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
Dispatcher dispatcher = sub.getDispatcher();
// If the topic-level policy already exists, the namespace-level policy cannot override
// the topic-level policy.
if (dispatcher != null && (topicPolicies == null || !topicPolicies.isSubscriptionDispatchRateSet())) {
if (dispatcher != null
&& (!topicPolicies.isPresent() || !topicPolicies.get().isSubscriptionDispatchRateSet())) {
dispatcher.getRateLimiter().ifPresent(rateLimiter -> rateLimiter.onPoliciesUpdate(data));
}
});
Expand All @@ -2290,7 +2285,7 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
CompletableFuture<Void> persistentPoliciesFuture = checkPersistencePolicies();
// update rate-limiter if policies updated
if (this.dispatchRateLimiter.isPresent()) {
if (topicPolicies == null || !topicPolicies.isDispatchRateSet()) {
if (!topicPolicies.isPresent() || !topicPolicies.get().isDispatchRateSet()) {
dispatchRateLimiter.get().onPoliciesUpdate(data);
}
}
Expand Down Expand Up @@ -2669,14 +2664,14 @@ public synchronized OffloadProcessStatus offloadStatus() {
private int getMessageTTL() throws Exception {
//Return Topic level message TTL if exist. If topic level policy or message ttl is not set,
//fall back to namespace level message ttl then message ttl set for current broker.
Optional<Integer> messageTtl = getTopicPolicies().map(TopicPolicies::getMessageTTLInSeconds);
if (messageTtl.isPresent()) {
return messageTtl.get();
}
TopicName name = TopicName.get(topic);
TopicPolicies topicPolicies = getTopicPolicies(name);
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, name.getNamespace()))
.orElseThrow(KeeperException.NoNodeException::new);
if (topicPolicies != null && topicPolicies.isMessageTTLSet()) {
return topicPolicies.getMessageTTLInSeconds();
}
if (policies.message_ttl_in_seconds != null) {
return policies.message_ttl_in_seconds;
}
Expand Down Expand Up @@ -2880,25 +2875,21 @@ public CompletableFuture<Void> truncate() {
}

public long getDelayedDeliveryTickTimeMillis() {
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
//Topic level setting has higher priority than namespace level
if (topicPolicies != null && topicPolicies.isDelayedDeliveryTickTimeMillisSet()) {
return topicPolicies.getDelayedDeliveryTickTimeMillis();
}
return delayedDeliveryTickTimeMillis;
return getTopicPolicies()
.map(TopicPolicies::getDelayedDeliveryTickTimeMillis)
.orElse(delayedDeliveryTickTimeMillis);
}

public int getMaxUnackedMessagesOnConsumer() {
return maxUnackedMessagesOnConsumerAppilied;
}

public boolean isDelayedDeliveryEnabled() {
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
//Topic level setting has higher priority than namespace level
if (topicPolicies != null && topicPolicies.isDelayedDeliveryEnabledSet()) {
return topicPolicies.getDelayedDeliveryEnabled();
}
return delayedDeliveryEnabled;
return getTopicPolicies()
.map(TopicPolicies::getDelayedDeliveryEnabled)
.orElse(delayedDeliveryEnabled);
}

public int getMaxUnackedMessagesOnSubscription() {
Expand Down Expand Up @@ -3009,17 +3000,15 @@ private boolean checkMaxSubscriptionsPerTopicExceed(String subscriptionName) {
if (StringUtils.isNotEmpty(subscriptionName) && getSubscription(subscriptionName) != null) {
return false;
}
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
Integer maxSubsPerTopic = null;
if (topicPolicies != null && topicPolicies.isMaxSubscriptionsPerTopicSet()) {
maxSubsPerTopic = topicPolicies.getMaxSubscriptionsPerTopic();
}
if (maxSubsPerTopic == null) {
maxSubsPerTopic = maxSubscriptionsPerTopic;
}
if (maxSubsPerTopic == null) {
maxSubsPerTopic = brokerService.pulsar().getConfig().getMaxSubscriptionsPerTopic();
}
Integer maxSubsPerTopic = getTopicPolicies()
.map(TopicPolicies::getMaxSubscriptionsPerTopic)
.orElseGet(() -> {
if (maxSubscriptionsPerTopic != null) {
return maxSubscriptionsPerTopic;
} else {
return brokerService.pulsar().getConfig().getMaxSubscriptionsPerTopic();
}
});

if (maxSubsPerTopic > 0) {
if (subscriptions != null && subscriptions.size() >= maxSubsPerTopic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public SubscribeRateLimiter(PersistentTopic topic) {
subscribeRateLimiter = new ConcurrentHashMap<>();
this.executorService = brokerService.pulsar().getExecutor();
// get subscribeRate from topic level policies
this.subscribeRate = Optional.ofNullable(brokerService.getTopicPolicies(TopicName.get(this.topicName)))
this.subscribeRate = topic.getTopicPolicies()
.map(TopicPolicies::getSubscribeRate)
.orElse(null);

Expand Down Expand Up @@ -154,7 +154,7 @@ private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentif

public void onPoliciesUpdate(Policies data) {
// if subscribe rate is set on topic policy, skip subscribe rate update
SubscribeRate subscribeRate = Optional.ofNullable(brokerService.getTopicPolicies(TopicName.get(topicName)))
SubscribeRate subscribeRate = brokerService.getTopicPolicies(TopicName.get(topicName))
.map(TopicPolicies::getSubscribeRate)
.orElse(null);
if (subscribeRate != null) {
Expand Down

0 comments on commit a8363a3

Please sign in to comment.