Skip to content

Commit

Permalink
Remove broker mode to handle persistent/non-persistent topics separat…
Browse files Browse the repository at this point in the history
…ely (apache#3348)

* Remove broker mode to handle persistent/non-persistent topics separately

* fix unused imports
  • Loading branch information
rdhabalia authored Feb 26, 2019
1 parent 3ac67de commit 77d4c28
Show file tree
Hide file tree
Showing 20 changed files with 25 additions and 384 deletions.
6 changes: 0 additions & 6 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,6 @@ maxConcurrentNonPersistentMessagePerConnection=1000
# Number of worker threads to serve non-persistent topic
numWorkerThreadsForNonPersistentTopic=8

# Enable broker to load persistent topics
enablePersistentTopics=true

# Enable broker to load non-persistent topics
enableNonPersistentTopics=true

# Enable to run bookie along with broker
enableRunBookieTogether=false

Expand Down
6 changes: 0 additions & 6 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,6 @@ maxConcurrentNonPersistentMessagePerConnection=1000
# Number of worker threads to serve non-persistent topic
numWorkerThreadsForNonPersistentTopic=8

# Enable broker to load persistent topics
enablePersistentTopics=true

# Enable broker to load non-persistent topics
enableNonPersistentTopics=true

# Max number of producers allowed to connect to topic. Once this limit reaches, Broker will reject new producers
# until the number of connected producers decrease.
# Using a value of 0, is disabling maxProducersPerTopic-limit check.
Expand Down
6 changes: 0 additions & 6 deletions deployment/terraform-ansible/templates/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,6 @@ maxConcurrentNonPersistentMessagePerConnection=1000
# Number of worker threads to serve non-persistent topic
numWorkerThreadsForNonPersistentTopic=8

# Enable broker to load persistent topics
enablePersistentTopics=true

# Enable broker to load non-persistent topics
enableNonPersistentTopics=true

# Enable to run bookie along with broker
enableRunBookieTogether=false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,18 +395,6 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Number of worker threads to serve non-persistent topic")
private int numWorkerThreadsForNonPersistentTopic = Runtime.getRuntime().availableProcessors();;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Enable broker to load persistent topics"
)
private boolean enablePersistentTopics = true;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Enable broker to load non-persistent topics"
)
private boolean enableNonPersistentTopics = true;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Enable to run bookie along with broker"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private LoadManagerShared() {
// The brokers are put into brokerCandidateCache.
public static void applyNamespacePolicies(final ServiceUnitId serviceUnit,
final SimpleResourceAllocationPolicies policies, final Set<String> brokerCandidateCache,
final Set<String> availableBrokers, final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) {
final Set<String> availableBrokers) {
Set<String> primariesCache = localPrimariesCache.get();
primariesCache.clear();

Expand Down Expand Up @@ -145,27 +145,11 @@ public static void applyNamespacePolicies(final ServiceUnitId serviceUnit,
}

}
} else {
// non-persistent topic can be assigned to only those brokers that enabled for non-persistent topic
if (isNonPersistentTopic
&& !brokerTopicLoadingPredicate.isEnableNonPersistentTopics(brokerUrlString)) {
if (log.isDebugEnabled()) {
log.debug("Filter broker- [{}] because it doesn't support non-persistent namespace - [{}]",
brokerUrl.getHost(), namespace.toString());
}
} else if (!isNonPersistentTopic
&& !brokerTopicLoadingPredicate.isEnablePersistentTopics(brokerUrlString)) {
// persistent topic can be assigned to only brokers that enabled for persistent-topic
if (log.isDebugEnabled()) {
log.debug("Filter broker- [{}] because broker only supports non-persistent namespace - [{}]",
brokerUrl.getHost(), namespace.toString());
}
} else if (policies.isSharedBroker(brokerUrl.getHost())) {
secondaryCache.add(broker);
if (log.isDebugEnabled()) {
log.debug("Added Shared Broker - [{}] as possible Candidates for namespace - [{}]",
brokerUrl.getHost(), namespace.toString());
}
} else if (policies.isSharedBroker(brokerUrl.getHost())) {
secondaryCache.add(broker);
if (log.isDebugEnabled()) {
log.debug("Added Shared Broker - [{}] as possible Candidates for namespace - [{}]",
brokerUrl.getHost(), namespace.toString());
}
}
}
Expand Down Expand Up @@ -516,12 +500,6 @@ public static boolean shouldAntiAffinityNamespaceUnload(String namespace, String
return true;
}

public interface BrokerTopicLoadingPredicate {
boolean isEnablePersistentTopics(String brokerUrl);

boolean isEnableNonPersistentTopics(String brokerUrl);
}

/**
* It filters out brokers which owns topic higher than configured threshold at
* {@link ServiceConfiguration.loadBalancerBrokerMaxTopics}. <br/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.broker.loadbalance.ModularLoadManager;
import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -176,9 +175,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
// ZooKeeper belonging to the pulsar service.
private ZooKeeper zkClient;

// check if given broker can load persistent/non-persistent topic
private final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate;

private Map<String, String> brokerToFailureDomainMap;

private static final Deserializer<LocalBrokerData> loadReportDeserializer = (key, content) -> jsonMapper()
Expand All @@ -198,22 +194,6 @@ public ModularLoadManagerImpl() {
preallocatedBundleToBroker = new ConcurrentHashMap<>();
scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-modular-load-manager"));
this.brokerToFailureDomainMap = Maps.newHashMap();

this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
@Override
public boolean isEnablePersistentTopics(String brokerUrl) {
final BrokerData brokerData = loadData.getBrokerData().get(brokerUrl.replace("http://", ""));
return brokerData != null && brokerData.getLocalData() != null
&& brokerData.getLocalData().isPersistentTopicsEnabled();
}

@Override
public boolean isEnableNonPersistentTopics(String brokerUrl) {
final BrokerData brokerData = loadData.getBrokerData().get(brokerUrl.replace("http://", ""));
return brokerData != null && brokerData.getLocalData() != null
&& brokerData.getLocalData().isNonPersistentTopicsEnabled();
}
};
}

/**
Expand Down Expand Up @@ -268,12 +248,6 @@ public LocalBrokerData deserialize(String key, byte[] content) throws Exception
localData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
localData.setBrokerVersionString(pulsar.getBrokerVersion());
// configure broker-topic mode
lastData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
lastData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
localData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
localData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());


placementStrategy = ModularLoadManagerStrategy.create(conf);
policies = new SimpleResourceAllocationPolicies(pulsar);
Expand Down Expand Up @@ -625,7 +599,7 @@ public boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle
ServiceUnitId serviceUnit = pulsar.getNamespaceService().getNamespaceBundleFactory()
.getBundle(namespace, bundle);
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers(), brokerTopicLoadingPredicate);
getAvailableBrokers());
return LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, bundle, currentBroker, pulsar,
brokerToNamespaceToBundleRange, brokerCandidateCache);
}
Expand Down Expand Up @@ -706,8 +680,7 @@ public Optional<String> selectBrokerForAssignment(final ServiceUnitId serviceUni
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
key -> getBundleDataOrDefault(bundle));
brokerCandidateCache.clear();
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
brokerTopicLoadingPredicate);
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers());

// filter brokers which owns topic higher than threshold
LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, loadData,
Expand All @@ -729,14 +702,14 @@ public Optional<String> selectBrokerForAssignment(final ServiceUnitId serviceUni
}
} catch ( BrokerFilterException x ) {
// restore the list of brokers to the full set
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
brokerTopicLoadingPredicate);
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers());
}

if ( brokerCandidateCache.isEmpty() ) {
// restore the list of brokers to the full set
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
brokerTopicLoadingPredicate);
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers());
}

// Choose a broker among the potentially smaller filtered list, when possible
Expand All @@ -754,8 +727,8 @@ public Optional<String> selectBrokerForAssignment(final ServiceUnitId serviceUni
final double maxUsage = loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
if (maxUsage > overloadThreshold) {
// All brokers that were in the filtered list were overloaded, so check if there is a better broker
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
brokerTopicLoadingPredicate);
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
getAvailableBrokers());
broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.PlacementStrategy;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.ResourceQuota;
Expand Down Expand Up @@ -182,8 +181,6 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
private boolean forceLoadReportUpdate = false;
private static final Deserializer<LoadReport> loadReportDeserializer = (key, content) -> jsonMapper()
.readValue(content, LoadReport.class);
// check if given broker can load persistent/non-persistent topic
private final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate;

// Perform initializations which may be done without a PulsarService.
public SimpleLoadManagerImpl() {
Expand All @@ -200,21 +197,6 @@ public SimpleLoadManagerImpl() {
brokerCandidateCache = new HashSet<>();
availableBrokersCache = new HashSet<>();
brokerToNamespaceToBundleRange = new HashMap<>();
this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
@Override
public boolean isEnablePersistentTopics(String brokerUrl) {
ResourceUnit ru = new SimpleResourceUnit(brokerUrl, new PulsarResourceDescription());
LoadReport loadReport = currentLoadReports.get(ru);
return loadReport != null && loadReport.isPersistentTopicsEnabled();
}

@Override
public boolean isEnableNonPersistentTopics(String brokerUrl) {
ResourceUnit ru = new SimpleResourceUnit(brokerUrl, new PulsarResourceDescription());
LoadReport loadReport = currentLoadReports.get(ru);
return loadReport != null && loadReport.isNonPersistentTopicsEnabled();
}
};
}

@Override
Expand All @@ -227,9 +209,6 @@ public void initialize(final PulsarService pulsar) {
this.policies = new SimpleResourceAllocationPolicies(pulsar);
lastLoadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
lastLoadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
lastLoadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());

loadReportCacheZk = new ZooKeeperDataCache<LoadReport>(pulsar.getLocalZkCache()) {
@Override
public LoadReport deserialize(String key, byte[] content) throws Exception {
Expand Down Expand Up @@ -919,8 +898,8 @@ private Multimap<Long, ResourceUnit> getFinalCandidates(ServiceUnitId serviceUni
}
brokerCandidateCache.clear();
try {
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, availableBrokersCache,
brokerTopicLoadingPredicate);
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
availableBrokersCache);
} catch (Exception e) {
log.warn("Error when trying to apply policies: {}", e);
for (final Map.Entry<Long, Set<ResourceUnit>> entry : availableBrokers.entrySet()) {
Expand Down Expand Up @@ -1112,8 +1091,6 @@ private LoadReport generateLoadReportForcefully() throws Exception {
try {
LoadReport loadReport = new LoadReport(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
loadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());
loadReport.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
loadReport.setName(String.format("%s:%s", pulsar.getAdvertisedAddress(),
pulsar.getConfiguration().getWebServicePort().get()));
loadReport.setBrokerVersionString(pulsar.getBrokerVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,14 +501,6 @@ public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean c
private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();

if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load non-persistent topic {}", topic);
}
topicFuture.completeExceptionally(
new NotAllowedException("Broker is not unable to load non-persistent topic"));
return topicFuture;
}
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this);
CompletableFuture<Void> replicationFuture = nonPersistentTopic.checkReplication();
Expand Down Expand Up @@ -591,14 +583,6 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
checkTopicNsOwnership(topic);

final CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load persistent topic {}", topic);
}
topicFuture.completeExceptionally(new NotAllowedException("Broker is not unable to load persistent topic"));
return topicFuture;
}

final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get();

if (topicLoadSemaphore.tryAcquire()) {
Expand Down
Loading

0 comments on commit 77d4c28

Please sign in to comment.