Skip to content

Commit

Permalink
[FLINK-32254][runtime] FineGrainedSlotManager may not allocate enough…
Browse files Browse the repository at this point in the history
… taskmanagers if maxSlotNum is configured

This closes apache#22714
  • Loading branch information
reswqa committed Jun 7, 2023
1 parent 9e12704 commit a008f25
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ private static CPUResource getMaxTotalCpu(
? new CPUResource(Double.MAX_VALUE)
: defaultWorkerResourceSpec
.getCpuCores()
.divide(defaultWorkerResourceSpec.getNumSlots())
.multiply(maxSlotNum));
.multiply(maxSlotNum)
.divide(defaultWorkerResourceSpec.getNumSlots()));
}

private static MemorySize getMaxTotalMem(
Expand All @@ -244,7 +244,12 @@ private static MemorySize getMaxTotalMem(
? MemorySize.MAX_VALUE
: defaultWorkerResourceSpec
.getTotalMemSize()
.divide(defaultWorkerResourceSpec.getNumSlots())
.multiply(maxSlotNum));
// In theory, there is a possibility of long
// overflow here. However, in actual scenarios, for
// a 1TB of TM memory and a very large number of
// maxSlotNum (e.g. 1_000_000), there is still no
// overflow.
.multiply(maxSlotNum)
.divide(defaultWorkerResourceSpec.getNumSlots()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;

import org.junit.jupiter.api.Test;

import java.math.BigDecimal;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link SlotManagerConfiguration}. */
Expand Down Expand Up @@ -64,4 +67,45 @@ void testPreferLegacySlotRequestTimeout() throws Exception {
assertThat(legacySlotIdleTimeout)
.isEqualTo(slotManagerConfiguration.getSlotRequestTimeout().toMilliseconds());
}

@Test
void testComputeMaxTotalCpu() throws Exception {
final Configuration configuration = new Configuration();
final int maxSlotNum = 9;
final int numSlots = 3;
final double cpuCores = 10;
configuration.set(ResourceManagerOptions.MAX_SLOT_NUM, maxSlotNum);
final SlotManagerConfiguration slotManagerConfiguration =
SlotManagerConfiguration.fromConfiguration(
configuration,
new WorkerResourceSpec.Builder()
.setNumSlots(numSlots)
.setCpuCores(cpuCores)
.build());
assertThat(slotManagerConfiguration.getMaxTotalCpu().getValue().doubleValue())
.isEqualTo(cpuCores * maxSlotNum / numSlots);
}

@Test
void testComputeMaxTotalMemory() throws Exception {
final Configuration configuration = new Configuration();
final int maxSlotNum = 1_000_000;
final int numSlots = 10;
final int totalTaskManagerMB =
MemorySize.parse("1", MemorySize.MemoryUnit.TERA_BYTES).getMebiBytes();
configuration.set(ResourceManagerOptions.MAX_SLOT_NUM, maxSlotNum);
final SlotManagerConfiguration slotManagerConfiguration =
SlotManagerConfiguration.fromConfiguration(
configuration,
new WorkerResourceSpec.Builder()
.setNumSlots(numSlots)
.setTaskHeapMemoryMB(totalTaskManagerMB)
.build());
assertThat(slotManagerConfiguration.getMaxTotalMem().getBytes())
.isEqualTo(
BigDecimal.valueOf(MemorySize.ofMebiBytes(totalTaskManagerMB).getBytes())
.multiply(BigDecimal.valueOf(maxSlotNum))
.divide(BigDecimal.valueOf(numSlots))
.longValue());
}
}

0 comments on commit a008f25

Please sign in to comment.