Skip to content

Commit

Permalink
[fix][broker] The configuration loadBalancerNamespaceMaximumBundles i…
Browse files Browse the repository at this point in the history
…s invalid (apache#16552)
  • Loading branch information
lordcheng10 authored Jul 18, 2022
1 parent 888898c commit 5698b08
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
Expand All @@ -39,12 +40,16 @@ public class BundleSplitterTask implements BundleSplitStrategy {
private static final Logger log = LoggerFactory.getLogger(BundleSplitStrategy.class);
private final Set<String> bundleCache;

private final Map<String, Integer> namespaceBundleCount;


/**
* Construct a BundleSplitterTask.
*
*/
public BundleSplitterTask() {
bundleCache = new HashSet<>();
namespaceBundleCount = new HashMap<>();
}

/**
Expand All @@ -61,12 +66,14 @@ public BundleSplitterTask() {
@Override
public Set<String> findBundlesToSplit(final LoadData loadData, final PulsarService pulsar) {
bundleCache.clear();
namespaceBundleCount.clear();
final ServiceConfiguration conf = pulsar.getConfiguration();
int maxBundleCount = conf.getLoadBalancerNamespaceMaximumBundles();
long maxBundleTopics = conf.getLoadBalancerNamespaceBundleMaxTopics();
long maxBundleSessions = conf.getLoadBalancerNamespaceBundleMaxSessions();
long maxBundleMsgRate = conf.getLoadBalancerNamespaceBundleMaxMsgRate();
long maxBundleBandwidth = conf.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * LoadManagerShared.MIBI;

loadData.getBrokerData().forEach((broker, brokerData) -> {
LocalBrokerData localData = brokerData.getLocalData();
for (final Map.Entry<String, NamespaceBundleStats> entry : localData.getLastStats().entrySet()) {
Expand All @@ -93,8 +100,11 @@ public Set<String> findBundlesToSplit(final LoadData loadData, final PulsarServi
try {
final int bundleCount = pulsar.getNamespaceService()
.getBundleCount(NamespaceName.get(namespace));
if (bundleCount < maxBundleCount) {
if ((bundleCount + namespaceBundleCount.getOrDefault(namespace, 0))
< maxBundleCount) {
bundleCache.add(bundle);
int bundleNum = namespaceBundleCount.getOrDefault(namespace, 0);
namespaceBundleCount.put(namespace, bundleNum + 1);
} else {
if (log.isDebugEnabled()) {
log.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.policies.data.loadbalancer.BrokerData;
import org.apache.pulsar.policies.data.loadbalancer.BundleData;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
Expand Down Expand Up @@ -95,6 +96,57 @@ public void testSplitTaskWhenTopicJustOne() {
Assert.assertEquals(bundlesToSplit.size(), 0);
}

@Test
public void testLoadBalancerNamespaceMaximumBundles() throws Exception {
pulsar.getConfiguration().setLoadBalancerNamespaceMaximumBundles(3);

final BundleSplitterTask bundleSplitterTask = new BundleSplitterTask();
LoadData loadData = new LoadData();

LocalBrokerData brokerData = new LocalBrokerData();
Map<String, NamespaceBundleStats> lastStats = new HashMap<>();
final NamespaceBundleStats namespaceBundleStats = new NamespaceBundleStats();
namespaceBundleStats.topics = 5;
lastStats.put("ten/ns/0x00000000_0x20000000", namespaceBundleStats);

final NamespaceBundleStats namespaceBundleStats2 = new NamespaceBundleStats();
namespaceBundleStats2.topics = 5;
lastStats.put("ten/ns/0x20000000_0x40000000", namespaceBundleStats2);

final NamespaceBundleStats namespaceBundleStats3 = new NamespaceBundleStats();
namespaceBundleStats3.topics = 5;
lastStats.put("ten/ns/0x40000000_0x60000000", namespaceBundleStats3);

brokerData.setLastStats(lastStats);
loadData.getBrokerData().put("broker", new BrokerData(brokerData));

BundleData bundleData1 = new BundleData();
TimeAverageMessageData averageMessageData1 = new TimeAverageMessageData();
averageMessageData1.setMsgRateIn(pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate() * 2);
averageMessageData1.setMsgRateOut(1);
bundleData1.setLongTermData(averageMessageData1);
loadData.getBundleData().put("ten/ns/0x00000000_0x20000000", bundleData1);

BundleData bundleData2 = new BundleData();
TimeAverageMessageData averageMessageData2 = new TimeAverageMessageData();
averageMessageData2.setMsgRateIn(pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate() * 2);
averageMessageData2.setMsgRateOut(1);
bundleData2.setLongTermData(averageMessageData2);
loadData.getBundleData().put("ten/ns/0x20000000_0x40000000", bundleData2);

BundleData bundleData3 = new BundleData();
TimeAverageMessageData averageMessageData3 = new TimeAverageMessageData();
averageMessageData3.setMsgRateIn(pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate() * 2);
averageMessageData3.setMsgRateOut(1);
bundleData3.setLongTermData(averageMessageData3);
loadData.getBundleData().put("ten/ns/0x40000000_0x60000000", bundleData3);

int currentBundleCount = pulsar.getNamespaceService().getBundleCount(NamespaceName.get("ten/ns"));
final Set<String> bundlesToSplit = bundleSplitterTask.findBundlesToSplit(loadData, pulsar);
Assert.assertEquals(bundlesToSplit.size() + currentBundleCount,
pulsar.getConfiguration().getLoadBalancerNamespaceMaximumBundles());
}


@AfterMethod(alwaysRun = true)
void shutdown() throws Exception {
Expand Down

0 comments on commit 5698b08

Please sign in to comment.