From 72bff2a2d0072602e4e625476bf5480dc50dc76c Mon Sep 17 00:00:00 2001 From: Weihua Hu Date: Tue, 18 Jul 2023 14:47:28 +0800 Subject: [PATCH] [FLINK-31843][runtime] remove redundant SlotPool#getFreeSlotsInformation --- .../jobmaster/slotpool/AllocatedSlotPool.java | 7 --- .../slotpool/DeclarativeSlotPool.java | 8 ---- .../slotpool/DeclarativeSlotPoolBridge.java | 14 +----- .../slotpool/DeclarativeSlotPoolService.java | 4 +- .../slotpool/DefaultAllocatedSlotPool.java | 38 +++++----------- .../slotpool/DefaultDeclarativeSlotPool.java | 10 +---- .../PhysicalSlotRequestBulkCheckerImpl.java | 2 +- .../runtime/jobmaster/slotpool/SlotPool.java | 9 ---- .../scheduler/adaptive/AdaptiveScheduler.java | 4 +- .../runtime/jobmaster/JobMasterTest.java | 11 ----- .../DeclarativeSlotPoolServiceTest.java | 2 +- .../DefaultAllocatedSlotPoolTest.java | 44 ++++++++++--------- .../DefaultDeclarativeSlotPoolTest.java | 7 +-- .../slotpool/TestingDeclarativeSlotPool.java | 5 --- 14 files changed, 48 insertions(+), 117 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java index 4339e0b8f49ce..9e2c29d68da3a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotPool.java @@ -110,13 +110,6 @@ public interface AllocatedSlotPool { */ Optional getSlotInformation(AllocationID allocationID); - /** - * Returns information about all currently free slots. - * - * @return collection of free slot information - */ - Collection getFreeSlotsInformation(); - /** * Returns information about all currently free slots. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java index 981d668277715..5b8921aa12f44 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPool.java @@ -107,14 +107,6 @@ Collection registerSlots( TaskManagerGateway taskManagerGateway, long currentTime); - /** - * Returns the slot information for all free slots (slots which can be allocated from the slot - * pool). - * - * @return collection of free slot information - */ - Collection getFreeSlotsInformation(); - /** * Returns the free slot tracker. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java index 1d4af3db7acc5..1a77528fa2ddc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java @@ -422,23 +422,13 @@ public Collection getAllocatedSlotsInformation() { final Collection allSlotsInformation = getDeclarativeSlotPool().getAllSlotsInformation(); final Set freeSlots = - getDeclarativeSlotPool().getFreeSlotsInformation().stream() - .map(SlotInfo::getAllocationId) - .collect(Collectors.toSet()); + getDeclarativeSlotPool().getFreeSlotInfoTracker().getAvailableSlots(); return allSlotsInformation.stream() .filter(slotInfo -> !freeSlots.contains(slotInfo.getAllocationId())) .collect(Collectors.toList()); } - @Override - @Nonnull - public Collection getAvailableSlotsInformation() { - assertRunningInMainThread(); - - return getDeclarativeSlotPool().getFreeSlotsInformation(); - } - @Override public FreeSlotInfoTracker getFreeSlotInfoTracker() { assertRunningInMainThread(); @@ -519,7 +509,7 @@ void checkBatchSlotTimeout() { private Set getResourceProfilesFromAllSlots() { return Stream.concat( - getAvailableSlotsInformation().stream(), + getFreeSlotInfoTracker().getFreeSlotsInformation().stream(), getAllocatedSlotsInformation().stream()) .map(SlotInfo::getResourceProfile) .collect(Collectors.toSet()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java index ee5fadbc788fd..6088d8b45f7dd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java @@ -241,7 +241,7 @@ public void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId, Exception ca if (isTaskManagerRegistered(taskManagerId)) { Collection freeSlots = - declarativeSlotPool.getFreeSlotsInformation().stream() + declarativeSlotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().stream() .filter( slotInfo -> slotInfo.getTaskManagerLocation() @@ -338,6 +338,6 @@ protected String getSlotServiceStatus() { "Registered TMs: %d, registered slots: %d free slots: %d", registeredTaskManagers.size(), declarativeSlotPool.getAllSlotsInformation().size(), - declarativeSlotPool.getFreeSlotsInformation().size()); + declarativeSlotPool.getFreeSlotInfoTracker().getAvailableSlots().size()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java index a8020152b6641..d3a7b21c6ee9c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java @@ -187,28 +187,6 @@ public Optional getSlotInformation(AllocationID allocationID) { return Optional.ofNullable(registeredSlots.get(allocationID)); } - @Override - public Collection getFreeSlotsInformation() { - final Collection freeSlotInfos = new ArrayList<>(); - - for (Map.Entry freeSlot : freeSlots.getFreeSlotsSince().entrySet()) { - final AllocatedSlot allocatedSlot = - Preconditions.checkNotNull(registeredSlots.get(freeSlot.getKey())); - - freeSlotInfos.add(DefaultFreeSlotInfo.create(allocatedSlot, freeSlot.getValue())); - } - - return freeSlotInfos; - } - - private FreeSlotInfo getFreeSlotInfo(AllocationID allocationId) { - final AllocatedSlot allocatedSlot = - Preconditions.checkNotNull(registeredSlots.get(allocationId)); - final Long idleSince = - Preconditions.checkNotNull(freeSlots.getFreeSlotsSince().get(allocationId)); - return DefaultFreeSlotInfo.create(allocatedSlot, idleSince); - } - @Override public FreeSlotInfoTracker getFreeSlotInfoTracker() { return new DefaultFreeSlotInfoTracker( @@ -218,7 +196,12 @@ public FreeSlotInfoTracker getFreeSlotInfoTracker() { this::getTaskExecutorUtilization); } - public double getTaskExecutorUtilization(ResourceID resourceId) { + @Override + public Collection getAllSlotsInformation() { + return registeredSlots.values(); + } + + private double getTaskExecutorUtilization(ResourceID resourceId) { Set slots = slotsPerTaskExecutor.get(resourceId); Preconditions.checkNotNull(slots, "There is no slots on %s", resourceId); @@ -226,9 +209,12 @@ public double getTaskExecutorUtilization(ResourceID resourceId) { / slots.size(); } - @Override - public Collection getAllSlotsInformation() { - return registeredSlots.values(); + private FreeSlotInfo getFreeSlotInfo(AllocationID allocationId) { + final AllocatedSlot allocatedSlot = + Preconditions.checkNotNull(registeredSlots.get(allocationId)); + final Long idleSince = + Preconditions.checkNotNull(freeSlots.getFreeSlotsSince().get(allocationId)); + return DefaultFreeSlotInfo.create(allocatedSlot, idleSince); } private static final class FreeSlots { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java index 9b2e188c074d5..ebec6f3426489 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java @@ -52,7 +52,6 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Function; -import java.util.stream.Collectors; /** * Default {@link DeclarativeSlotPool} implementation. @@ -487,7 +486,7 @@ private void releasePayload(Iterable allocatedSlots, Th @Override public void releaseIdleSlots(long currentTimeMillis) { final Collection freeSlotsInformation = - slotPool.getFreeSlotsInformation(); + slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation(); ResourceCounter excessResources = fulfilledResourceRequirements.subtract(totalResourceRequirements); @@ -563,13 +562,6 @@ private void releaseSlots(Iterable slotsToReturnToOwner, Throwabl } } - @Override - public Collection getFreeSlotsInformation() { - return slotPool.getFreeSlotsInformation().stream() - .map(AllocatedSlotPool.FreeSlotInfo::asSlotInfo) - .collect(Collectors.toList()); - } - @Override public FreeSlotInfoTracker getFreeSlotInfoTracker() { return slotPool.getFreeSlotInfoTracker(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java index 8be7cff3e3c11..28a2a906767c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImpl.java @@ -200,7 +200,7 @@ public static PhysicalSlotRequestBulkCheckerImpl createFromSlotPool( private static Set getAllSlotInfos(SlotPool slotPool) { return Stream.concat( - slotPool.getAvailableSlotsInformation().stream(), + slotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().stream(), slotPool.getAllocatedSlotsInformation().stream()) .collect(Collectors.toSet()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java index 8bd2383891d07..edff664625c70 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java @@ -115,15 +115,6 @@ Collection offerSlots( // allocating and disposing slots // ------------------------------------------------------------------------ - /** - * Returns a list of {@link SlotInfo} objects about all slots that are currently available in - * the slot pool. - * - * @return a list of {@link SlotInfo} objects about all slots that are currently available in - * the slot pool. - */ - Collection getAvailableSlotsInformation(); - /** * Returns all free slot tracker. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index d4b9d65d403d6..4ee22c9584840 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -835,7 +835,7 @@ public void updateJobResourceRequirements(JobResourceRequirements jobResourceReq @Override public boolean hasDesiredResources() { final Collection freeSlots = - declarativeSlotPool.getFreeSlotsInformation(); + declarativeSlotPool.getFreeSlotInfoTracker().getFreeSlotsInformation(); return hasDesiredResources(desiredResources, freeSlots); } @@ -873,7 +873,7 @@ private JobSchedulingPlan determineParallelism( return slotAllocator .determineParallelismAndCalculateAssignment( jobInformation, - declarativeSlotPool.getFreeSlotsInformation(), + declarativeSlotPool.getFreeSlotInfoTracker().getFreeSlotsInformation(), JobAllocationsInformation.fromGraph(previousExecutionGraph)) .orElseThrow( () -> diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index da7154c2ba4f6..e9889970a6c5d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -572,17 +572,6 @@ public Optional failAllocation( "TestingSlotPool does not support this operation."); } - @Nonnull - @Override - public Collection getAvailableSlotsInformation() { - final Collection allSlotInfos = - registeredSlots.values().stream() - .flatMap(Collection::stream) - .collect(Collectors.toList()); - - return Collections.unmodifiableCollection(allSlotInfos); - } - @Override public FreeSlotInfoTracker getFreeSlotInfoTracker() { Map freeSlots = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java index 7d15fd1e32094..811406695f3ed 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolServiceTest.java @@ -338,7 +338,7 @@ void testReleaseFreeSlotsOnTaskManager() throws Exception { slotPoolService.releaseFreeSlotsOnTaskManager( taskManagerLocation.getResourceID(), new FlinkException("Test cause")); - assertThat(slotPool.getFreeSlotsInformation()).isEmpty(); + assertThat(slotPool.getFreeSlotInfoTracker().getAvailableSlots()).isEmpty(); assertThat( Iterables.getOnlyElement(slotPool.getAllSlotsInformation()) .getAllocationId()) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java index 9a539176ea6fc..2fe2b9e645ee9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPoolTest.java @@ -186,7 +186,8 @@ void testFreeingOfReservedSlot() { assertThat(slotPool.freeReservedSlot(slot.getAllocationId(), releaseTime)).isPresent(); assertSlotPoolContainsFreeSlots(slotPool, slots); - for (AllocatedSlotPool.FreeSlotInfo freeSlotInfo : slotPool.getFreeSlotsInformation()) { + for (AllocatedSlotPool.FreeSlotInfo freeSlotInfo : + slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation()) { final long time; if (freeSlotInfo.getAllocationId().equals(slot.getAllocationId())) { time = releaseTime; @@ -208,7 +209,8 @@ void testFreeingOfFreeSlotIsIgnored() { assertThat(slotPool.freeReservedSlot(slot.getAllocationId(), 1)).isNotPresent(); final AllocatedSlotPool.FreeSlotInfo freeSlotInfo = - Iterables.getOnlyElement(slotPool.getFreeSlotsInformation()); + Iterables.getOnlyElement( + slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation()); assertThat(freeSlotInfo.getFreeSince()).isEqualTo(0L); } @@ -221,32 +223,32 @@ void testSlotUtilizationCalculation() { slotPool.addSlots(slots, 0); - assertThat(slotPool.getFreeSlotsInformation()) + FreeSlotInfoTracker freeSlotInfoTracker = slotPool.getFreeSlotInfoTracker(); + + assertThat(freeSlotInfoTracker.getAvailableSlots()) .allSatisfy( - freeSlotInfo -> + allocationId -> assertThat( - slotPool.getTaskExecutorUtilization( - freeSlotInfo - .asSlotInfo() - .getTaskManagerLocation() - .getResourceID())) + freeSlotInfoTracker.getTaskExecutorUtilization( + freeSlotInfoTracker.getSlotInfo( + allocationId))) .isCloseTo(0, offset(0.1))); int numAllocatedSlots = 0; for (AllocatedSlot slot : slots) { assertThat(slotPool.reserveFreeSlot(slot.getAllocationId())).isEqualTo(slot); + freeSlotInfoTracker.reserveSlot(slot.getAllocationId()); numAllocatedSlots++; - - for (AllocatedSlotPool.FreeSlotInfo freeSlotInfo : slotPool.getFreeSlotsInformation()) { - final double utilization = (double) numAllocatedSlots / slots.size(); - assertThat( - slotPool.getTaskExecutorUtilization( - freeSlotInfo - .asSlotInfo() - .getTaskManagerLocation() - .getResourceID())) - .isCloseTo(utilization, offset(0.1)); - } + final double utilization = (double) numAllocatedSlots / slots.size(); + + assertThat(freeSlotInfoTracker.getAvailableSlots()) + .allSatisfy( + allocationId -> + assertThat( + freeSlotInfoTracker.getTaskExecutorUtilization( + freeSlotInfoTracker.getSlotInfo( + allocationId))) + .isCloseTo(utilization, offset(0.1))); } } @@ -305,7 +307,7 @@ private void assertSlotPoolContainsSlots( private void assertSlotPoolContainsFreeSlots( DefaultAllocatedSlotPool slotPool, Collection allocatedSlots) { final Collection freeSlotsInformation = - slotPool.getFreeSlotsInformation(); + slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation(); assertThat(freeSlotsInformation).hasSize(allocatedSlots.size()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java index 5f0a9cb6df401..bf57aab2b667b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java @@ -569,7 +569,8 @@ void testFreedSlotWillRemainAssignedToMatchedResourceProfile() { createSlotOffersForResourceRequirements( ResourceCounter.withResource(ResourceProfile.ANY, 1))); - final SlotInfo slot = slotPool.getFreeSlotsInformation().iterator().next(); + final SlotInfo slot = + slotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().iterator().next(); slotPool.reserveFreeSlot(slot.getAllocationId(), largeResourceProfile); assertThat( @@ -615,7 +616,7 @@ void testReserveFreeSlotForResourceUpdatesAvailableResourcesAndRequirements() { ResourceCounter.withResource(smallResourceProfile, 1)); final SlotInfo largeSlot = - slotPool.getFreeSlotsInformation().stream() + slotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().stream() .filter(slot -> slot.getResourceProfile().equals(largeResourceProfile)) .findFirst() .get(); @@ -677,7 +678,7 @@ void testRegisterSlotsDoesNotAffectRequirements() { 0L); final AllocationID allocationId = - slotPool.getFreeSlotsInformation().iterator().next().getAllocationId(); + slotPool.getFreeSlotInfoTracker().getAvailableSlots().iterator().next(); assertThat(slotPool.getResourceRequirements()).isEmpty(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java index 6fc7ef32e3ec4..a713a4b2b04e8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java @@ -174,11 +174,6 @@ public Collection registerSlots( slots, taskManagerLocation, taskManagerGateway, currentTime); } - @Override - public Collection getFreeSlotsInformation() { - return getFreeSlotsInformationSupplier.get(); - } - @Override public FreeSlotInfoTracker getFreeSlotInfoTracker() { return getFreeSlotInfoTrackerSupplier.get();