diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java index 3329773b3752f..fe8442ee04f72 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java @@ -228,8 +228,8 @@ private static CPUResource getMaxTotalCpu( ? new CPUResource(Double.MAX_VALUE) : defaultWorkerResourceSpec .getCpuCores() - .divide(defaultWorkerResourceSpec.getNumSlots()) - .multiply(maxSlotNum)); + .multiply(maxSlotNum) + .divide(defaultWorkerResourceSpec.getNumSlots())); } private static MemorySize getMaxTotalMem( @@ -244,7 +244,12 @@ private static MemorySize getMaxTotalMem( ? MemorySize.MAX_VALUE : defaultWorkerResourceSpec .getTotalMemSize() - .divide(defaultWorkerResourceSpec.getNumSlots()) - .multiply(maxSlotNum)); + // In theory, there is a possibility of long + // overflow here. However, in actual scenarios, for + // a 1TB of TM memory and a very large number of + // maxSlotNum (e.g. 1_000_000), there is still no + // overflow. + .multiply(maxSlotNum) + .divide(defaultWorkerResourceSpec.getNumSlots())); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationTest.java index 782392e63786d..136a48d2bad35 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurationTest.java @@ -20,11 +20,14 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec; import org.junit.jupiter.api.Test; +import java.math.BigDecimal; + import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link SlotManagerConfiguration}. */ @@ -64,4 +67,45 @@ void testPreferLegacySlotRequestTimeout() throws Exception { assertThat(legacySlotIdleTimeout) .isEqualTo(slotManagerConfiguration.getSlotRequestTimeout().toMilliseconds()); } + + @Test + void testComputeMaxTotalCpu() throws Exception { + final Configuration configuration = new Configuration(); + final int maxSlotNum = 9; + final int numSlots = 3; + final double cpuCores = 10; + configuration.set(ResourceManagerOptions.MAX_SLOT_NUM, maxSlotNum); + final SlotManagerConfiguration slotManagerConfiguration = + SlotManagerConfiguration.fromConfiguration( + configuration, + new WorkerResourceSpec.Builder() + .setNumSlots(numSlots) + .setCpuCores(cpuCores) + .build()); + assertThat(slotManagerConfiguration.getMaxTotalCpu().getValue().doubleValue()) + .isEqualTo(cpuCores * maxSlotNum / numSlots); + } + + @Test + void testComputeMaxTotalMemory() throws Exception { + final Configuration configuration = new Configuration(); + final int maxSlotNum = 1_000_000; + final int numSlots = 10; + final int totalTaskManagerMB = + MemorySize.parse("1", MemorySize.MemoryUnit.TERA_BYTES).getMebiBytes(); + configuration.set(ResourceManagerOptions.MAX_SLOT_NUM, maxSlotNum); + final SlotManagerConfiguration slotManagerConfiguration = + SlotManagerConfiguration.fromConfiguration( + configuration, + new WorkerResourceSpec.Builder() + .setNumSlots(numSlots) + .setTaskHeapMemoryMB(totalTaskManagerMB) + .build()); + assertThat(slotManagerConfiguration.getMaxTotalMem().getBytes()) + .isEqualTo( + BigDecimal.valueOf(MemorySize.ofMebiBytes(totalTaskManagerMB).getBytes()) + .multiply(BigDecimal.valueOf(maxSlotNum)) + .divide(BigDecimal.valueOf(numSlots)) + .longValue()); + } }