Skip to content

Commit

Permalink
Fix arithmetic exception for uniform load shedder (apache#13914)
Browse files Browse the repository at this point in the history
  • Loading branch information
hangc0276 authored Jan 24, 2022
1 parent 1568a7a commit a07f029
Showing 1 changed file with 22 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class UniformLoadShedder implements LoadSheddingStrategy {
private static final Logger log = LoggerFactory.getLogger(UniformLoadShedder.class);

private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();
private static final double EPS = 1e-6;

/**
* Attempt to shed some bundles off every broker which is overloaded.
Expand All @@ -70,31 +71,40 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
MutableDouble maxMsgRate = new MutableDouble(-1);
MutableDouble maxThroughputRate = new MutableDouble(-1);
MutableDouble minMsgRate = new MutableDouble(Integer.MAX_VALUE);
MutableDouble minThroughputgRate = new MutableDouble(Integer.MAX_VALUE);
MutableDouble minThroughputRate = new MutableDouble(Integer.MAX_VALUE);
brokersData.forEach((broker, data) -> {
//broker with one bundle can't be considered for bundle unloading
if (data.getLocalData().getBundles().size() <= 1) {
return;
}

double msgRate = data.getLocalData().getMsgRateIn() + data.getLocalData().getMsgRateOut();
double throughputRate = data.getLocalData().getMsgThroughputIn()
+ data.getLocalData().getMsgThroughputOut();
if (data.getLocalData().getBundles().size() > 1 // broker with one bundle can't be considered for
// bundle unloading
&& (msgRate > maxMsgRate.getValue() || throughputRate > maxThroughputRate.getValue())) {
if (msgRate > maxMsgRate.getValue() || throughputRate > maxThroughputRate.getValue()) {
overloadedBroker.setValue(broker);
maxMsgRate.setValue(msgRate);
maxThroughputRate.setValue(throughputRate);
}
if (msgRate < minMsgRate.getValue() || throughputRate < minThroughputgRate.getValue()) {
if (msgRate < minMsgRate.getValue() || throughputRate < minThroughputRate.getValue()) {
underloadedBroker.setValue(broker);
minMsgRate.setValue(msgRate);
minThroughputgRate.setValue(throughputRate);
minThroughputRate.setValue(throughputRate);
}
});

// find the difference between two brokers based on msgRate and throughout and check if the load distribution
// discrepancy is higher than threshold. if that matches then try to unload bundle from overloaded brokers to
// give chance of uniform load distribution.
if (minMsgRate.getValue() <= EPS && minMsgRate.getValue() >= -EPS) {
minMsgRate.setValue(1.0);
}
if (minThroughputRate.getValue() <= EPS && minThroughputRate.getValue() >= -EPS) {
minThroughputRate.setValue(1.0);
}
double msgRateDifferencePercentage = ((maxMsgRate.getValue() - minMsgRate.getValue()) * 100)
/ (minMsgRate.getValue());
double msgThroughputDifferenceRate = maxThroughputRate.getValue() / minThroughputgRate.getValue();
double msgThroughputDifferenceRate = maxThroughputRate.getValue() / minThroughputRate.getValue();

// if the threshold matches then find out how much load needs to be unloaded by considering number of msgRate
// and throughput.
Expand All @@ -112,12 +122,12 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
+ "overloaded broker {} with (msgRate,throughput)= ({},{}) "
+ "and underloaded broker {} with (msgRate,throughput)= ({},{})",
overloadedBroker.getValue(), maxMsgRate.getValue(), maxThroughputRate.getValue(),
underloadedBroker.getValue(), minMsgRate.getValue(), minThroughputgRate.getValue());
underloadedBroker.getValue(), minMsgRate.getValue(), minThroughputRate.getValue());
}
MutableInt msgRateRequiredFromUnloadedBundles = new MutableInt(
(int) ((maxMsgRate.getValue() - minMsgRate.getValue()) / 2));
MutableInt msgThroughtputRequiredFromUnloadedBundles = new MutableInt(
(int) ((maxThroughputRate.getValue() - minThroughputgRate.getValue()) / 2));
MutableInt msgThroughputRequiredFromUnloadedBundles = new MutableInt(
(int) ((maxThroughputRate.getValue() - minThroughputRate.getValue()) / 2));
LocalBrokerData overloadedBrokerData = brokersData.get(overloadedBroker.getValue()).getLocalData();

if (overloadedBrokerData.getBundles().size() > 1) {
Expand Down Expand Up @@ -150,9 +160,9 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
selectedBundlesCache.put(overloadedBroker.getValue(), bundle);
}
} else {
if (throughput <= (msgThroughtputRequiredFromUnloadedBundles.getValue())) {
if (throughput <= (msgThroughputRequiredFromUnloadedBundles.getValue())) {
log.info("Found bundle to unload with throughput {}", throughput);
msgThroughtputRequiredFromUnloadedBundles.add(-throughput);
msgThroughputRequiredFromUnloadedBundles.add(-throughput);
selectedBundlesCache.put(overloadedBroker.getValue(), bundle);
}
}
Expand Down

0 comments on commit a07f029

Please sign in to comment.