Skip to content

Commit

Permalink
[feature][broker] add config maxUnloadBundleNumPerShedding for Unifor…
Browse files Browse the repository at this point in the history
…mLoadShedder (apache#16409)
  • Loading branch information
lordcheng10 authored Jul 29, 2022
1 parent 4d64e2e commit 0f2060d
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2184,6 +2184,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private double loadBalancerMsgThroughputMultiplierDifferenceShedderThreshold = 4;

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "For each uniform balanced unload, the maximum number of bundles that can be unloaded."
+ " The default value is -1, which means no limit"
)
private int maxUnloadBundleNumPerShedding = -1;

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
return Triple.of(bundle, bundleData, throughput);
}).filter(e -> !recentlyUnloadedBundles.containsKey(e.getLeft()))
.sorted((e1, e2) -> Double.compare(e2.getRight(), e1.getRight())).forEach((e) -> {
if (conf.getMaxUnloadBundleNumPerShedding() != -1
&& selectedBundlesCache.size() >= conf.getMaxUnloadBundleNumPerShedding()) {
return;
}
String bundle = e.getLeft();
BundleData bundleData = e.getMiddle();
TimeAverageMessageData shortTermData = bundleData.getShortTermData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.pulsar.policies.data.loadbalancer.*;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;

@Test(groups = "broker")
Expand All @@ -41,6 +42,45 @@ public void setup() {
uniformLoadShedder = new UniformLoadShedder();
}

@Test
public void testMaxUnloadBundleNumPerShedding(){
conf.setMaxUnloadBundleNumPerShedding(2);
int numBundles = 20;
LoadData loadData = new LoadData();

LocalBrokerData broker1 = new LocalBrokerData();
LocalBrokerData broker2 = new LocalBrokerData();

String broker2Name = "broker2";

double brokerThroughput = 0;

for (int i = 1; i <= numBundles; ++i) {
broker1.getBundles().add("bundle-" + i);

BundleData bundle = new BundleData();

TimeAverageMessageData timeAverageMessageData = new TimeAverageMessageData();

double throughput = 1 * 1024 * 1024;
timeAverageMessageData.setMsgThroughputIn(throughput);
timeAverageMessageData.setMsgThroughputOut(throughput);
bundle.setShortTermData(timeAverageMessageData);
loadData.getBundleData().put("bundle-" + i, bundle);

brokerThroughput += throughput;
}

broker1.setMsgThroughputIn(brokerThroughput);
broker1.setMsgThroughputOut(brokerThroughput);

loadData.getBrokerData().put("broker-1", new BrokerData(broker1));
loadData.getBrokerData().put(broker2Name, new BrokerData(broker2));

Multimap<String, String> bundlesToUnload = uniformLoadShedder.findBundlesForUnloading(loadData, conf);
assertEquals(bundlesToUnload.size(),2);
}

@Test
public void testBrokerWithMultipleBundles() {
int numBundles = 10;
Expand Down

0 comments on commit 0f2060d

Please sign in to comment.