Skip to content

Commit

Permalink
Move duplicate code to abstract parent class (apache#10061)
Browse files Browse the repository at this point in the history
* Move duplicate code to abstract parent class

* code style
  • Loading branch information
315157973 authored Mar 29, 2021
1 parent 510ecfa commit a070c33
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,26 @@

package org.apache.pulsar.broker.service;

import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import io.netty.buffer.ByteBuf;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;

Expand Down Expand Up @@ -148,6 +154,39 @@ && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
}

/**
* Determine whether the number of consumers on the subscription reaches the threshold.
* @return
*/
protected abstract boolean isConsumersExceededOnSubscription();

protected boolean isConsumersExceededOnSubscription(BrokerService brokerService,
String topic, int consumerSize) {
Policies policies = null;
Integer maxConsumersPerSubscription = null;
try {
maxConsumersPerSubscription = Optional.ofNullable(brokerService
.getTopicPolicies(TopicName.get(topic)))
.map(TopicPolicies::getMaxConsumersPerSubscription)
.orElse(null);
if (maxConsumersPerSubscription == null) {
// Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()));
}
} catch (Exception e) {
log.debug("Get topic or namespace policies fail", e);
}

if (maxConsumersPerSubscription == null) {
maxConsumersPerSubscription = policies != null && policies.max_consumers_per_subscription > 0
? policies.max_consumers_per_subscription :
brokerService.pulsar().getConfiguration().getMaxConsumersPerSubscription();
}

return maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerSize;
}

private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) {
// Remove the protobuf headers
Commands.skipMessageMetadata(headersAndPayload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int part

protected abstract void cancelPendingRead();

protected abstract boolean isConsumersExceededOnSubscription();

protected void notifyActiveConsumerChanged(Consumer activeConsumer) {
if (null != activeConsumer && subscriptionType == SubType.Failover) {
consumers.forEach(consumer ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
consumerSet.add(consumer);
}

private boolean isConsumersExceededOnSubscription() {
@Override
protected boolean isConsumersExceededOnSubscription() {
final int maxConsumersPerSubscription = serviceConfig.getMaxConsumersPerSubscription();
if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerList.size()) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@
*/
package org.apache.pulsar.broker.service.nonpersistent;

import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
Expand All @@ -33,9 +30,6 @@
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.Rate;

Expand Down Expand Up @@ -79,37 +73,9 @@ public void sendMessages(List<Entry> entries) {
}
}

@Override
protected boolean isConsumersExceededOnSubscription() {
Policies policies = null;
Integer maxConsumersPerSubscription = null;
try {
maxConsumersPerSubscription = Optional.ofNullable(topic.getBrokerService()
.getTopicPolicies(TopicName.get(topicName)))
.map(TopicPolicies::getMaxConsumersPerSubscription)
.orElse(null);
if (maxConsumersPerSubscription == null) {
// Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()));

if (policies == null) {
policies = new Policies();
}
}
} catch (Exception e) {
policies = new Policies();
}

if (maxConsumersPerSubscription == null) {
maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0
? policies.max_consumers_per_subscription :
serviceConfig.getMaxConsumersPerSubscription();
}

if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumers.size()) {
return true;
}
return false;
return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumers.size());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.service.persistent;

import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Lists;
Expand All @@ -40,7 +39,6 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
Expand All @@ -59,10 +57,8 @@
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
import org.apache.pulsar.common.util.collections.LongPairSet;
Expand Down Expand Up @@ -158,37 +154,9 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
consumerSet.add(consumer);
}

private boolean isConsumersExceededOnSubscription() {
Policies policies = null;
Integer maxConsumersPerSubscription = null;
try {
maxConsumersPerSubscription = Optional.ofNullable(topic.getBrokerService()
.getTopicPolicies(TopicName.get(topic.getName())))
.map(TopicPolicies::getMaxConsumersPerSubscription)
.orElse(null);

if (maxConsumersPerSubscription == null) {
// Use getDataIfPresent from zk cache to make the call non-blocking and
// prevent deadlocks in addConsumer
policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()));
if (policies == null) {
policies = new Policies();
}
}
} catch (Exception e) {
policies = new Policies();
}

if (maxConsumersPerSubscription == null) {
maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0
? policies.max_consumers_per_subscription : serviceConfig.getMaxConsumersPerSubscription();
}

if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerList.size()) {
return true;
}
return false;
@Override
protected boolean isConsumersExceededOnSubscription() {
return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumerList.size());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.service.persistent;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
import java.util.Iterator;
import java.util.List;
Expand All @@ -36,7 +35,6 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
Expand All @@ -50,10 +48,8 @@
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -132,36 +128,9 @@ protected void scheduleReadOnActiveConsumer() {
}, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS);
}

@Override
protected boolean isConsumersExceededOnSubscription() {
Policies policies = null;
Integer maxConsumersPerSubscription = null;
try {
maxConsumersPerSubscription = Optional.ofNullable(topic.getBrokerService()
.getTopicPolicies(TopicName.get(topicName)))
.map(TopicPolicies::getMaxConsumersPerSubscription)
.orElse(null);
if (maxConsumersPerSubscription == null) {
// Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()));

if (policies == null) {
policies = new Policies();
}
}
} catch (Exception e) {
policies = new Policies();
}

if (maxConsumersPerSubscription == null) {
maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0
? policies.max_consumers_per_subscription : serviceConfig.getMaxConsumersPerSubscription();
}

if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumers.size()) {
return true;
}
return false;
return isConsumersExceededOnSubscription(topic.getBrokerService(), topic.getName(), consumers.size());
}

@Override
Expand Down

0 comments on commit a070c33

Please sign in to comment.