Skip to content

Commit

Permalink
[improve][broker]add NamespacePolicies and AntiAffinity check before …
Browse files Browse the repository at this point in the history
…unload in checkNamespaceBundleSplit (apache#16780)

Co-authored-by: nicklixinyang <[email protected]>
  • Loading branch information
Nicklee007 and nicklixinyang authored Jan 11, 2023
1 parent 932023d commit 0feaa45
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance;

import java.util.Set;
import java.util.Map;
import org.apache.pulsar.broker.PulsarService;

/**
Expand All @@ -33,7 +33,7 @@ public interface BundleSplitStrategy {
* leader broker).
* @param pulsar
* Service to use.
* @return A set of the bundles that should be split.
* @return A map of the bundles that should be split and the brokers on which they reside.
*/
Set<String> findBundlesToSplit(LoadData loadData, PulsarService pulsar);
Map<String, String> findBundlesToSplit(LoadData loadData, PulsarService pulsar);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
package org.apache.pulsar.broker.loadbalance.impl;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BundleSplitStrategy;
Expand All @@ -38,7 +36,7 @@
*/
public class BundleSplitterTask implements BundleSplitStrategy {
private static final Logger log = LoggerFactory.getLogger(BundleSplitStrategy.class);
private final Set<String> bundleCache;
private final Map<String, String> bundleCache;

private final Map<String, Integer> namespaceBundleCount;

Expand All @@ -48,7 +46,7 @@ public class BundleSplitterTask implements BundleSplitStrategy {
*
*/
public BundleSplitterTask() {
bundleCache = new HashSet<>();
bundleCache = new HashMap<>();
namespaceBundleCount = new HashMap<>();
}

Expand All @@ -61,10 +59,10 @@ public BundleSplitterTask() {
* @param pulsar
* Service to use.
* @return All bundles who have exceeded configured thresholds in number of topics, number of sessions, total
* message rates, or total throughput.
* message rates, or total throughput and the brokers on which they reside.
*/
@Override
public Set<String> findBundlesToSplit(final LoadData loadData, final PulsarService pulsar) {
public Map<String, String> findBundlesToSplit(final LoadData loadData, final PulsarService pulsar) {
bundleCache.clear();
namespaceBundleCount.clear();
final ServiceConfiguration conf = pulsar.getConfiguration();
Expand Down Expand Up @@ -108,7 +106,7 @@ public Set<String> findBundlesToSplit(final LoadData loadData, final PulsarServi
maxBundleSessions, totalMessageRate, maxBundleMsgRate,
totalMessageThroughput / LoadManagerShared.MIBI,
maxBundleBandwidth / LoadManagerShared.MIBI);
bundleCache.add(bundle);
bundleCache.put(bundle, broker);
int bundleNum = namespaceBundleCount.getOrDefault(namespace, 0);
namespaceBundleCount.put(namespace, bundleNum + 1);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -748,10 +748,10 @@ public void checkNamespaceBundleSplit() {
}
final boolean unloadSplitBundles = pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled();
synchronized (bundleSplitStrategy) {
final Set<String> bundlesToBeSplit = bundleSplitStrategy.findBundlesToSplit(loadData, pulsar);
final Map<String, String> bundlesToBeSplit = bundleSplitStrategy.findBundlesToSplit(loadData, pulsar);
NamespaceBundleFactory namespaceBundleFactory = pulsar.getNamespaceService().getNamespaceBundleFactory();
int splitCount = 0;
for (String bundleName : bundlesToBeSplit) {
for (String bundleName : bundlesToBeSplit.keySet()) {
try {
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundleName);
Expand All @@ -768,9 +768,17 @@ public void checkNamespaceBundleSplit() {
.invalidateBundleCache(NamespaceName.get(namespaceName));
deleteBundleDataFromMetadataStore(bundleName);

log.info("Load-manager splitting bundle {} and unloading {}", bundleName, unloadSplitBundles);
// Check NamespacePolicies and AntiAffinityNamespace support unload bundle.
boolean isUnload = false;
String broker = bundlesToBeSplit.get(bundleName);
if (unloadSplitBundles
&& shouldNamespacePoliciesUnload(namespaceName, bundleRange, broker)
&& shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) {
isUnload = true;
}
log.info("Load-manager splitting bundle {} and unloading {}", bundleName, isUnload);
pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceName, bundleRange,
unloadSplitBundles, null);
isUnload, null);

splitCount++;
log.info("Successfully split namespace bundle {}", bundleName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
* @author hezhangjian
Expand Down Expand Up @@ -92,7 +91,7 @@ public void testSplitTaskWhenTopicJustOne() {
bundleData.setLongTermData(averageMessageData);
loadData.getBundleData().put("ten/ns/0x00000000_0x80000000", bundleData);

final Set<String> bundlesToSplit = bundleSplitterTask.findBundlesToSplit(loadData, pulsar);
final Map<String, String> bundlesToSplit = bundleSplitterTask.findBundlesToSplit(loadData, pulsar);
Assert.assertEquals(bundlesToSplit.size(), 0);
}

Expand Down Expand Up @@ -142,7 +141,7 @@ public void testLoadBalancerNamespaceMaximumBundles() throws Exception {
loadData.getBundleData().put("ten/ns/0x40000000_0x60000000", bundleData3);

int currentBundleCount = pulsar.getNamespaceService().getBundleCount(NamespaceName.get("ten/ns"));
final Set<String> bundlesToSplit = bundleSplitterTask.findBundlesToSplit(loadData, pulsar);
final Map<String, String> bundlesToSplit = bundleSplitterTask.findBundlesToSplit(loadData, pulsar);
Assert.assertEquals(bundlesToSplit.size() + currentBundleCount,
pulsar.getConfiguration().getLoadBalancerNamespaceMaximumBundles());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,11 @@ public void testModularLoadManagerSplitBundle() throws Exception {
assertNotEquals(pulsar2.getNamespaceService().getBundle(topicName), bundleInBroker2);
});

// Unload the NamespacePolicies and AntiAffinity check.
String currentBroker = String.format("%s:%d", "localhost", pulsar.getListenPortHTTP().get());
assertTrue(loadManager.shouldNamespacePoliciesUnload(namespace,"0x00000000_0xffffffff", currentBroker));
assertTrue(loadManager.shouldAntiAffinityNamespaceUnload(namespace,"0x00000000_0xffffffff", currentBroker));

// (7) Make lookup request again to Broker-2 which should succeed.
final String topic3 = "persistent://" + namespace + "/topic3";
@Cleanup
Expand Down

0 comments on commit 0feaa45

Please sign in to comment.