diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index 02305e4d922f2..d57fc7bbac7de 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -19,20 +19,26 @@ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import io.netty.buffer.ByteBuf; import java.util.Collections; import java.util.List; +import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Markers; @@ -148,6 +154,39 @@ && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) { sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages); } + /** + * Determine whether the number of consumers on the subscription reaches the threshold. + * @return + */ + protected abstract boolean isConsumersExceededOnSubscription(); + + protected boolean isConsumersExceededOnSubscription(BrokerService brokerService, + String topic, int consumerSize) { + Policies policies = null; + Integer maxConsumersPerSubscription = null; + try { + maxConsumersPerSubscription = Optional.ofNullable(brokerService + .getTopicPolicies(TopicName.get(topic))) + .map(TopicPolicies::getMaxConsumersPerSubscription) + .orElse(null); + if (maxConsumersPerSubscription == null) { + // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer + policies = brokerService.pulsar().getConfigurationCache().policiesCache() + .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())); + } + } catch (Exception e) { + log.debug("Get topic or namespace policies fail", e); + } + + if (maxConsumersPerSubscription == null) { + maxConsumersPerSubscription = policies != null && policies.max_consumers_per_subscription > 0 + ? policies.max_consumers_per_subscription : + brokerService.pulsar().getConfiguration().getMaxConsumersPerSubscription(); + } + + return maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerSize; + } + private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) { // Remove the protobuf headers Commands.skipMessageMetadata(headersAndPayload); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java index 9eb5a2e5351b6..b142c51adeb43 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java @@ -70,8 +70,6 @@ public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int part protected abstract void cancelPendingRead(); - protected abstract boolean isConsumersExceededOnSubscription(); - protected void notifyActiveConsumerChanged(Consumer activeConsumer) { if (null != activeConsumer && subscriptionType == SubType.Failover) { consumers.forEach(consumer -> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java index 00584cf181550..b6f4ca7ed0605 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java @@ -86,7 +86,8 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce consumerSet.add(consumer); } - private boolean isConsumersExceededOnSubscription() { + @Override + protected boolean isConsumersExceededOnSubscription() { final int maxConsumersPerSubscription = serviceConfig.getMaxConsumersPerSubscription(); if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerList.size()) { return true; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java index cecdbaf5de3a0..a720767fee808 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java @@ -18,13 +18,10 @@ */ package org.apache.pulsar.broker.service.nonpersistent; -import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import java.util.List; -import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.EntryBatchSizes; @@ -33,9 +30,6 @@ import org.apache.pulsar.broker.service.SendMessageInfo; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; -import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.Policies; -import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.stats.Rate; @@ -79,37 +73,9 @@ public void sendMessages(List entries) { } } + @Override protected boolean isConsumersExceededOnSubscription() { - Policies policies = null; - Integer maxConsumersPerSubscription = null; - try { - maxConsumersPerSubscription = Optional.ofNullable(topic.getBrokerService() - .getTopicPolicies(TopicName.get(topicName))) - .map(TopicPolicies::getMaxConsumersPerSubscription) - .orElse(null); - if (maxConsumersPerSubscription == null) { - // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer - policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache() - .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace())); - - if (policies == null) { - policies = new Policies(); - } - } - } catch (Exception e) { - policies = new Policies(); - } - - if (maxConsumersPerSubscription == null) { - maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 - ? policies.max_consumers_per_subscription : - serviceConfig.getMaxConsumersPerSubscription(); - } - - if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumers.size()) { - return true; - } - return false; + return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumers.size()); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index eec0f939dec7e..2c056447919b1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.service.persistent; -import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS; import com.google.common.collect.ComparisonChain; import com.google.common.collect.Lists; @@ -40,7 +39,6 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker; import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -59,10 +57,8 @@ import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.MessageMetadata; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.Policies; -import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet; import org.apache.pulsar.common.util.collections.LongPairSet; @@ -158,37 +154,9 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce consumerSet.add(consumer); } - private boolean isConsumersExceededOnSubscription() { - Policies policies = null; - Integer maxConsumersPerSubscription = null; - try { - maxConsumersPerSubscription = Optional.ofNullable(topic.getBrokerService() - .getTopicPolicies(TopicName.get(topic.getName()))) - .map(TopicPolicies::getMaxConsumersPerSubscription) - .orElse(null); - - if (maxConsumersPerSubscription == null) { - // Use getDataIfPresent from zk cache to make the call non-blocking and - // prevent deadlocks in addConsumer - policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache() - .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace())); - if (policies == null) { - policies = new Policies(); - } - } - } catch (Exception e) { - policies = new Policies(); - } - - if (maxConsumersPerSubscription == null) { - maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 - ? policies.max_consumers_per_subscription : serviceConfig.getMaxConsumersPerSubscription(); - } - - if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerList.size()) { - return true; - } - return false; + @Override + protected boolean isConsumersExceededOnSubscription() { + return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumerList.size()); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 6e28a3cf47efb..93c2d2547ab16 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.service.persistent; import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS; import java.util.Iterator; import java.util.List; @@ -36,7 +35,6 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.util.SafeRun; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Dispatcher; @@ -50,10 +48,8 @@ import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.Policies; -import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.util.Codec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,36 +128,9 @@ protected void scheduleReadOnActiveConsumer() { }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS); } + @Override protected boolean isConsumersExceededOnSubscription() { - Policies policies = null; - Integer maxConsumersPerSubscription = null; - try { - maxConsumersPerSubscription = Optional.ofNullable(topic.getBrokerService() - .getTopicPolicies(TopicName.get(topicName))) - .map(TopicPolicies::getMaxConsumersPerSubscription) - .orElse(null); - if (maxConsumersPerSubscription == null) { - // Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer - policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache() - .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace())); - - if (policies == null) { - policies = new Policies(); - } - } - } catch (Exception e) { - policies = new Policies(); - } - - if (maxConsumersPerSubscription == null) { - maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0 - ? policies.max_consumers_per_subscription : serviceConfig.getMaxConsumersPerSubscription(); - } - - if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumers.size()) { - return true; - } - return false; + return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumers.size()); } @Override