Skip to content

Commit

Permalink
Revert "Remove broker mode to handle persistent/non-persistent topics…
Browse files Browse the repository at this point in the history
… separately (apache#3348)" (apache#3713)

This reverts commit 77d4c28.
  • Loading branch information
rdhabalia authored and merlimat committed Mar 1, 2019
1 parent ccfb949 commit 0142247
Show file tree
Hide file tree
Showing 20 changed files with 384 additions and 25 deletions.
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ maxConcurrentNonPersistentMessagePerConnection=1000
# Number of worker threads to serve non-persistent topic
numWorkerThreadsForNonPersistentTopic=8

# Enable broker to load persistent topics
enablePersistentTopics=true

# Enable broker to load non-persistent topics
enableNonPersistentTopics=true

# Enable to run bookie along with broker
enableRunBookieTogether=false

Expand Down
6 changes: 6 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ maxConcurrentNonPersistentMessagePerConnection=1000
# Number of worker threads to serve non-persistent topic
numWorkerThreadsForNonPersistentTopic=8

# Enable broker to load persistent topics
enablePersistentTopics=true

# Enable broker to load non-persistent topics
enableNonPersistentTopics=true

# Max number of producers allowed to connect to topic. Once this limit reaches, Broker will reject new producers
# until the number of connected producers decrease.
# Using a value of 0, is disabling maxProducersPerTopic-limit check.
Expand Down
6 changes: 6 additions & 0 deletions deployment/terraform-ansible/templates/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ maxConcurrentNonPersistentMessagePerConnection=1000
# Number of worker threads to serve non-persistent topic
numWorkerThreadsForNonPersistentTopic=8

# Enable broker to load persistent topics
enablePersistentTopics=true

# Enable broker to load non-persistent topics
enableNonPersistentTopics=true

# Enable to run bookie along with broker
enableRunBookieTogether=false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,18 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Number of worker threads to serve non-persistent topic")
private int numWorkerThreadsForNonPersistentTopic = Runtime.getRuntime().availableProcessors();;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Enable broker to load persistent topics"
)
private boolean enablePersistentTopics = true;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Enable broker to load non-persistent topics"
)
private boolean enableNonPersistentTopics = true;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Enable to run bookie along with broker"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private LoadManagerShared() {
// The brokers are put into brokerCandidateCache.
public static void applyNamespacePolicies(final ServiceUnitId serviceUnit,
final SimpleResourceAllocationPolicies policies, final Set<String> brokerCandidateCache,
final Set<String> availableBrokers) {
final Set<String> availableBrokers, final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) {
Set<String> primariesCache = localPrimariesCache.get();
primariesCache.clear();

Expand Down Expand Up @@ -145,11 +145,27 @@ public static void applyNamespacePolicies(final ServiceUnitId serviceUnit,
}

}
} else if (policies.isSharedBroker(brokerUrl.getHost())) {
secondaryCache.add(broker);
if (log.isDebugEnabled()) {
log.debug("Added Shared Broker - [{}] as possible Candidates for namespace - [{}]",
brokerUrl.getHost(), namespace.toString());
} else {
// non-persistent topic can be assigned to only those brokers that enabled for non-persistent topic
if (isNonPersistentTopic
&& !brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerUrlString)) {
if (log.isDebugEnabled()) {
log.debug("Filter broker- [{}] because it doesn't support non-persistent namespace - [{}]",
brokerUrl.getHost(), namespace.toString());
}
} else if (!isNonPersistentTopic
&& !brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerUrlString)) {
// persistent topic can be assigned to only brokers that enabled for persistent-topic
if (log.isDebugEnabled()) {
log.debug("Filter broker- [{}] because broker only supports non-persistent namespace - [{}]",
brokerUrl.getHost(), namespace.toString());
}
} else if (policies.isSharedBroker(brokerUrl.getHost())) {
secondaryCache.add(broker);
if (log.isDebugEnabled()) {
log.debug("Added Shared Broker - [{}] as possible Candidates for namespace - [{}]",
brokerUrl.getHost(), namespace.toString());
}
}
}
}
Expand Down Expand Up @@ -500,6 +516,12 @@ public static boolean shouldAntiAffinityNamespaceUnload(String namespace, String
return true;
}

public interface BrokerTopicLoadingPredicate {
boolean isEnablePersistentTopics(String brokerUrl);

boolean isEnableNonPersistentTopics(String brokerUrl);
}

/**
* It filters out brokers which owns topic higher than configured threshold at
* {@link ServiceConfiguration.loadBalancerBrokerMaxTopics}. <br/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.broker.loadbalance.ModularLoadManager;
import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -175,6 +176,9 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
// ZooKeeper belonging to the pulsar service.
private ZooKeeper zkClient;

// check if given broker can load persistent/non-persistent topic
private final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate;

private Map<String, String> brokerToFailureDomainMap;

private static final Deserializer<LocalBrokerData> loadReportDeserializer = (key, content) -> jsonMapper()
Expand All @@ -194,6 +198,22 @@ public ModularLoadManagerImpl() {
preallocatedBundleToBroker = new ConcurrentHashMap<>();
scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-modular-load-manager"));
this.brokerToFailureDomainMap = Maps.newHashMap();

this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
@Override
public boolean isEnablePersistentTopics(String brokerUrl) {
final BrokerData brokerData = loadData.getBrokerData().get(brokerUrl.replace("http://", ""));
return brokerData != null && brokerData.getLocalData() != null
&& brokerData.getLocalData().isPersistentTopicsEnabled();
}

@Override
public boolean isEnableNonPersistentTopics(String brokerUrl) {
final BrokerData brokerData = loadData.getBrokerData().get(brokerUrl.replace("http://", ""));
return brokerData != null && brokerData.getLocalData() != null
&& brokerData.getLocalData().isNonPersistentTopicsEnabled();
}
};
}

/**
Expand Down Expand Up @@ -248,6 +268,12 @@ public LocalBrokerData deserialize(String key, byte[] content) throws Exception
localData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
localData.setBrokerVersionString(pulsar.getBrokerVersion());
// configure broker-topic mode
lastData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
lastData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
localData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
localData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());


placementStrategy = ModularLoadManagerStrategy.create(conf);
policies = new SimpleResourceAllocationPolicies(pulsar);
Expand Down Expand Up @@ -599,7 +625,7 @@ public boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle
ServiceUnitId serviceUnit = pulsar.getNamespaceService().getNamespaceBundleFactory()
.getBundle(namespace, bundle);
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers());
getAvailableBrokers(), brokerTopicLoadingPredicate);
return LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, bundle, currentBroker, pulsar,
brokerToNamespaceToBundleRange, brokerCandidateCache);
}
Expand Down Expand Up @@ -680,7 +706,8 @@ public Optional<String> selectBrokerForAssignment(final ServiceUnitId serviceUni
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
key -> getBundleDataOrDefault(bundle));
brokerCandidateCache.clear();
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers());
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
brokerTopicLoadingPredicate);

// filter brokers which owns topic higher than threshold
LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, loadData,
Expand All @@ -702,14 +729,14 @@ public Optional<String> selectBrokerForAssignment(final ServiceUnitId serviceUni
}
} catch ( BrokerFilterException x ) {
// restore the list of brokers to the full set
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers());
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
brokerTopicLoadingPredicate);
}

if ( brokerCandidateCache.isEmpty() ) {
// restore the list of brokers to the full set
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers());
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
brokerTopicLoadingPredicate);
}

// Choose a broker among the potentially smaller filtered list, when possible
Expand All @@ -727,8 +754,8 @@ public Optional<String> selectBrokerForAssignment(final ServiceUnitId serviceUni
final double maxUsage = loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
if (maxUsage > overloadThreshold) {
// All brokers that were in the filtered list were overloaded, so check if there is a better broker
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers());
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
brokerTopicLoadingPredicate);
broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.PlacementStrategy;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.ResourceQuota;
Expand Down Expand Up @@ -181,6 +182,8 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
private boolean forceLoadReportUpdate = false;
private static final Deserializer<LoadReport> loadReportDeserializer = (key, content) -> jsonMapper()
.readValue(content, LoadReport.class);
// check if given broker can load persistent/non-persistent topic
private final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate;

// Perform initializations which may be done without a PulsarService.
public SimpleLoadManagerImpl() {
Expand All @@ -197,6 +200,21 @@ public SimpleLoadManagerImpl() {
brokerCandidateCache = new HashSet<>();
availableBrokersCache = new HashSet<>();
brokerToNamespaceToBundleRange = new HashMap<>();
this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
@Override
public boolean isEnablePersistentTopics(String brokerUrl) {
ResourceUnit ru = new SimpleResourceUnit(brokerUrl, new PulsarResourceDescription());
LoadReport loadReport = currentLoadReports.get(ru);
return loadReport != null && loadReport.isPersistentTopicsEnabled();
}

@Override
public boolean isEnableNonPersistentTopics(String brokerUrl) {
ResourceUnit ru = new SimpleResourceUnit(brokerUrl, new PulsarResourceDescription());
LoadReport loadReport = currentLoadReports.get(ru);
return loadReport != null && loadReport.isNonPersistentTopicsEnabled();
}
};
}

@Override
Expand All @@ -209,6 +227,9 @@ public void initialize(final PulsarService pulsar) {
this.policies = new SimpleResourceAllocationPolicies(pulsar);
lastLoadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
lastLoadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
lastLoadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());

loadReportCacheZk = new ZooKeeperDataCache<LoadReport>(pulsar.getLocalZkCache()) {
@Override
public LoadReport deserialize(String key, byte[] content) throws Exception {
Expand Down Expand Up @@ -898,8 +919,8 @@ private Multimap<Long, ResourceUnit> getFinalCandidates(ServiceUnitId serviceUni
}
brokerCandidateCache.clear();
try {
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
availableBrokersCache);
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, availableBrokersCache,
brokerTopicLoadingPredicate);
} catch (Exception e) {
log.warn("Error when trying to apply policies: {}", e);
for (final Map.Entry<Long, Set<ResourceUnit>> entry : availableBrokers.entrySet()) {
Expand Down Expand Up @@ -1091,6 +1112,8 @@ private LoadReport generateLoadReportForcefully() throws Exception {
try {
LoadReport loadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
loadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
loadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
loadReport.setName(String.format("%s:%s", pulsar.getAdvertisedAddress(),
pulsar.getConfiguration().getWebServicePort().get()));
loadReport.setBrokerVersionString(pulsar.getBrokerVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,14 @@ public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean c
private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();

if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load non-persistent topic {}", topic);
}
topicFuture.completeExceptionally(
new NotAllowedException("Broker is not unable to load non-persistent topic"));
return topicFuture;
}
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this);
CompletableFuture<Void> replicationFuture = nonPersistentTopic.checkReplication();
Expand Down Expand Up @@ -583,6 +591,14 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
checkTopicNsOwnership(topic);

final CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load persistent topic {}", topic);
}
topicFuture.completeExceptionally(new NotAllowedException("Broker is not unable to load persistent topic"));
return topicFuture;
}

final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get();

if (topicLoadSemaphore.tryAcquire()) {
Expand Down
Loading

0 comments on commit 0142247

Please sign in to comment.