diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java index 9493fefbad255..4763eaf23daa8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LeastResourceUsageWithWeight.java @@ -39,6 +39,7 @@ */ @Slf4j public class LeastResourceUsageWithWeight implements ModularLoadManagerStrategy { + private static final double MAX_RESOURCE_USAGE = 1.0d; // Maintain this list to reduce object creation. private final ArrayList bestBrokers; private final Map brokerAvgResourceUsageWithWeight; @@ -144,7 +145,7 @@ public Optional selectBroker(Set candidates, BundleData bundleTo final double diffThreshold = conf.getLoadBalancerAverageResourceUsageDifferenceThresholdPercentage() / 100.0; candidates.forEach(broker -> { - Double avgResUsage = brokerAvgResourceUsageWithWeight.getOrDefault(broker, Double.MAX_VALUE); + Double avgResUsage = brokerAvgResourceUsageWithWeight.getOrDefault(broker, MAX_RESOURCE_USAGE); if ((avgResUsage + diffThreshold <= avgUsage)) { bestBrokers.add(broker); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java index 7177e5369b6a4..e9b442698f97f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java @@ -20,6 +20,8 @@ import static org.testng.Assert.assertEquals; +import java.lang.reflect.Field; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Optional; @@ -131,6 +133,46 @@ public void testLeastResourceUsageWithWeight() { assertEquals(strategy.selectBroker(candidates, bundleData, loadData, conf), Optional.of("2")); } + public void testLeastResourceUsageWithWeightWithArithmeticException() + throws NoSuchFieldException, IllegalAccessException { + BundleData bundleData = new BundleData(); + BrokerData brokerData1 = initBrokerData(10, 100); + BrokerData brokerData2 = initBrokerData(30, 100); + BrokerData brokerData3 = initBrokerData(60, 100); + BrokerData brokerData4 = initBrokerData(5, 100); + LoadData loadData = new LoadData(); + Map brokerDataMap = loadData.getBrokerData(); + brokerDataMap.put("1", brokerData1); + brokerDataMap.put("2", brokerData2); + brokerDataMap.put("3", brokerData3); + brokerDataMap.put("4", brokerData4); + + ServiceConfiguration conf = new ServiceConfiguration(); + conf.setLoadBalancerCPUResourceWeight(1.0); + conf.setLoadBalancerMemoryResourceWeight(0.1); + conf.setLoadBalancerDirectMemoryResourceWeight(0.1); + conf.setLoadBalancerBandwithInResourceWeight(1.0); + conf.setLoadBalancerBandwithOutResourceWeight(1.0); + conf.setLoadBalancerHistoryResourcePercentage(0.5); + conf.setLoadBalancerAverageResourceUsageDifferenceThresholdPercentage(5); + + LeastResourceUsageWithWeight strategy = new LeastResourceUsageWithWeight(); + + // Should choice broker from broker1 2 3. + Set candidates = new HashSet<>(); + candidates.add("1"); + candidates.add("2"); + candidates.add("3"); + Field strategyUpdater = LeastResourceUsageWithWeight.class.getDeclaredField("brokerAvgResourceUsageWithWeight"); + strategyUpdater.setAccessible(true); + Map brokerAvgResourceUsageWithWeight = new HashMap<>(); + brokerAvgResourceUsageWithWeight.put("1", 0.1d); + brokerAvgResourceUsageWithWeight.put("2", 0.3d); + brokerAvgResourceUsageWithWeight.put("4", 0.05d); + strategyUpdater.set(strategy, brokerAvgResourceUsageWithWeight); + assertEquals(strategy.selectBroker(candidates, bundleData, loadData, conf), Optional.of("1")); + } + private BrokerData initBrokerData(double usage, double limit) { LocalBrokerData localBrokerData = new LocalBrokerData(); localBrokerData.setCpu(new ResourceUsage(usage, limit));