Skip to content

Commit

Permalink
[FLINK-31843][runtime] remove redundant SlotPool#getFreeSlotsInformation
Browse files Browse the repository at this point in the history
  • Loading branch information
huwh authored and KarmaGYZ committed Jul 23, 2023
1 parent 3cdf228 commit 72bff2a
Show file tree
Hide file tree
Showing 14 changed files with 48 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,6 @@ public interface AllocatedSlotPool {
*/
Optional<SlotInfo> getSlotInformation(AllocationID allocationID);

/**
* Returns information about all currently free slots.
*
* @return collection of free slot information
*/
Collection<FreeSlotInfo> getFreeSlotsInformation();

/**
* Returns information about all currently free slots.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,6 @@ Collection<SlotOffer> registerSlots(
TaskManagerGateway taskManagerGateway,
long currentTime);

/**
* Returns the slot information for all free slots (slots which can be allocated from the slot
* pool).
*
* @return collection of free slot information
*/
Collection<SlotInfo> getFreeSlotsInformation();

/**
* Returns the free slot tracker.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,23 +422,13 @@ public Collection<SlotInfo> getAllocatedSlotsInformation() {
final Collection<? extends SlotInfo> allSlotsInformation =
getDeclarativeSlotPool().getAllSlotsInformation();
final Set<AllocationID> freeSlots =
getDeclarativeSlotPool().getFreeSlotsInformation().stream()
.map(SlotInfo::getAllocationId)
.collect(Collectors.toSet());
getDeclarativeSlotPool().getFreeSlotInfoTracker().getAvailableSlots();

return allSlotsInformation.stream()
.filter(slotInfo -> !freeSlots.contains(slotInfo.getAllocationId()))
.collect(Collectors.toList());
}

@Override
@Nonnull
public Collection<SlotInfo> getAvailableSlotsInformation() {
assertRunningInMainThread();

return getDeclarativeSlotPool().getFreeSlotsInformation();
}

@Override
public FreeSlotInfoTracker getFreeSlotInfoTracker() {
assertRunningInMainThread();
Expand Down Expand Up @@ -519,7 +509,7 @@ void checkBatchSlotTimeout() {

private Set<ResourceProfile> getResourceProfilesFromAllSlots() {
return Stream.concat(
getAvailableSlotsInformation().stream(),
getFreeSlotInfoTracker().getFreeSlotsInformation().stream(),
getAllocatedSlotsInformation().stream())
.map(SlotInfo::getResourceProfile)
.collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId, Exception ca
if (isTaskManagerRegistered(taskManagerId)) {

Collection<AllocationID> freeSlots =
declarativeSlotPool.getFreeSlotsInformation().stream()
declarativeSlotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().stream()
.filter(
slotInfo ->
slotInfo.getTaskManagerLocation()
Expand Down Expand Up @@ -338,6 +338,6 @@ protected String getSlotServiceStatus() {
"Registered TMs: %d, registered slots: %d free slots: %d",
registeredTaskManagers.size(),
declarativeSlotPool.getAllSlotsInformation().size(),
declarativeSlotPool.getFreeSlotsInformation().size());
declarativeSlotPool.getFreeSlotInfoTracker().getAvailableSlots().size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,28 +187,6 @@ public Optional<SlotInfo> getSlotInformation(AllocationID allocationID) {
return Optional.ofNullable(registeredSlots.get(allocationID));
}

@Override
public Collection<FreeSlotInfo> getFreeSlotsInformation() {
final Collection<FreeSlotInfo> freeSlotInfos = new ArrayList<>();

for (Map.Entry<AllocationID, Long> freeSlot : freeSlots.getFreeSlotsSince().entrySet()) {
final AllocatedSlot allocatedSlot =
Preconditions.checkNotNull(registeredSlots.get(freeSlot.getKey()));

freeSlotInfos.add(DefaultFreeSlotInfo.create(allocatedSlot, freeSlot.getValue()));
}

return freeSlotInfos;
}

private FreeSlotInfo getFreeSlotInfo(AllocationID allocationId) {
final AllocatedSlot allocatedSlot =
Preconditions.checkNotNull(registeredSlots.get(allocationId));
final Long idleSince =
Preconditions.checkNotNull(freeSlots.getFreeSlotsSince().get(allocationId));
return DefaultFreeSlotInfo.create(allocatedSlot, idleSince);
}

@Override
public FreeSlotInfoTracker getFreeSlotInfoTracker() {
return new DefaultFreeSlotInfoTracker(
Expand All @@ -218,17 +196,25 @@ public FreeSlotInfoTracker getFreeSlotInfoTracker() {
this::getTaskExecutorUtilization);
}

public double getTaskExecutorUtilization(ResourceID resourceId) {
@Override
public Collection<? extends SlotInfo> getAllSlotsInformation() {
return registeredSlots.values();
}

private double getTaskExecutorUtilization(ResourceID resourceId) {
Set<AllocationID> slots = slotsPerTaskExecutor.get(resourceId);
Preconditions.checkNotNull(slots, "There is no slots on %s", resourceId);

return (double) (slots.size() - freeSlots.getFreeSlotsNumberOfTaskExecutor(resourceId))
/ slots.size();
}

@Override
public Collection<? extends SlotInfo> getAllSlotsInformation() {
return registeredSlots.values();
private FreeSlotInfo getFreeSlotInfo(AllocationID allocationId) {
final AllocatedSlot allocatedSlot =
Preconditions.checkNotNull(registeredSlots.get(allocationId));
final Long idleSince =
Preconditions.checkNotNull(freeSlots.getFreeSlotsSince().get(allocationId));
return DefaultFreeSlotInfo.create(allocatedSlot, idleSince);
}

private static final class FreeSlots {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Default {@link DeclarativeSlotPool} implementation.
Expand Down Expand Up @@ -487,7 +486,7 @@ private void releasePayload(Iterable<? extends AllocatedSlot> allocatedSlots, Th
@Override
public void releaseIdleSlots(long currentTimeMillis) {
final Collection<AllocatedSlotPool.FreeSlotInfo> freeSlotsInformation =
slotPool.getFreeSlotsInformation();
slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation();

ResourceCounter excessResources =
fulfilledResourceRequirements.subtract(totalResourceRequirements);
Expand Down Expand Up @@ -563,13 +562,6 @@ private void releaseSlots(Iterable<AllocatedSlot> slotsToReturnToOwner, Throwabl
}
}

@Override
public Collection<SlotInfo> getFreeSlotsInformation() {
return slotPool.getFreeSlotsInformation().stream()
.map(AllocatedSlotPool.FreeSlotInfo::asSlotInfo)
.collect(Collectors.toList());
}

@Override
public FreeSlotInfoTracker getFreeSlotInfoTracker() {
return slotPool.getFreeSlotInfoTracker();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public static PhysicalSlotRequestBulkCheckerImpl createFromSlotPool(

private static Set<SlotInfo> getAllSlotInfos(SlotPool slotPool) {
return Stream.concat(
slotPool.getAvailableSlotsInformation().stream(),
slotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().stream(),
slotPool.getAllocatedSlotsInformation().stream())
.collect(Collectors.toSet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,6 @@ Collection<SlotOffer> offerSlots(
// allocating and disposing slots
// ------------------------------------------------------------------------

/**
* Returns a list of {@link SlotInfo} objects about all slots that are currently available in
* the slot pool.
*
* @return a list of {@link SlotInfo} objects about all slots that are currently available in
* the slot pool.
*/
Collection<SlotInfo> getAvailableSlotsInformation();

/**
* Returns all free slot tracker.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ public void updateJobResourceRequirements(JobResourceRequirements jobResourceReq
@Override
public boolean hasDesiredResources() {
final Collection<? extends SlotInfo> freeSlots =
declarativeSlotPool.getFreeSlotsInformation();
declarativeSlotPool.getFreeSlotInfoTracker().getFreeSlotsInformation();
return hasDesiredResources(desiredResources, freeSlots);
}

Expand Down Expand Up @@ -873,7 +873,7 @@ private JobSchedulingPlan determineParallelism(
return slotAllocator
.determineParallelismAndCalculateAssignment(
jobInformation,
declarativeSlotPool.getFreeSlotsInformation(),
declarativeSlotPool.getFreeSlotInfoTracker().getFreeSlotsInformation(),
JobAllocationsInformation.fromGraph(previousExecutionGraph))
.orElseThrow(
() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,17 +572,6 @@ public Optional<ResourceID> failAllocation(
"TestingSlotPool does not support this operation.");
}

@Nonnull
@Override
public Collection<SlotInfo> getAvailableSlotsInformation() {
final Collection<SlotInfo> allSlotInfos =
registeredSlots.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());

return Collections.unmodifiableCollection(allSlotInfos);
}

@Override
public FreeSlotInfoTracker getFreeSlotInfoTracker() {
Map<AllocationID, SlotInfo> freeSlots =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ void testReleaseFreeSlotsOnTaskManager() throws Exception {
slotPoolService.releaseFreeSlotsOnTaskManager(
taskManagerLocation.getResourceID(), new FlinkException("Test cause"));

assertThat(slotPool.getFreeSlotsInformation()).isEmpty();
assertThat(slotPool.getFreeSlotInfoTracker().getAvailableSlots()).isEmpty();
assertThat(
Iterables.getOnlyElement(slotPool.getAllSlotsInformation())
.getAllocationId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ void testFreeingOfReservedSlot() {
assertThat(slotPool.freeReservedSlot(slot.getAllocationId(), releaseTime)).isPresent();
assertSlotPoolContainsFreeSlots(slotPool, slots);

for (AllocatedSlotPool.FreeSlotInfo freeSlotInfo : slotPool.getFreeSlotsInformation()) {
for (AllocatedSlotPool.FreeSlotInfo freeSlotInfo :
slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation()) {
final long time;
if (freeSlotInfo.getAllocationId().equals(slot.getAllocationId())) {
time = releaseTime;
Expand All @@ -208,7 +209,8 @@ void testFreeingOfFreeSlotIsIgnored() {
assertThat(slotPool.freeReservedSlot(slot.getAllocationId(), 1)).isNotPresent();

final AllocatedSlotPool.FreeSlotInfo freeSlotInfo =
Iterables.getOnlyElement(slotPool.getFreeSlotsInformation());
Iterables.getOnlyElement(
slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation());

assertThat(freeSlotInfo.getFreeSince()).isEqualTo(0L);
}
Expand All @@ -221,32 +223,32 @@ void testSlotUtilizationCalculation() {

slotPool.addSlots(slots, 0);

assertThat(slotPool.getFreeSlotsInformation())
FreeSlotInfoTracker freeSlotInfoTracker = slotPool.getFreeSlotInfoTracker();

assertThat(freeSlotInfoTracker.getAvailableSlots())
.allSatisfy(
freeSlotInfo ->
allocationId ->
assertThat(
slotPool.getTaskExecutorUtilization(
freeSlotInfo
.asSlotInfo()
.getTaskManagerLocation()
.getResourceID()))
freeSlotInfoTracker.getTaskExecutorUtilization(
freeSlotInfoTracker.getSlotInfo(
allocationId)))
.isCloseTo(0, offset(0.1)));

int numAllocatedSlots = 0;
for (AllocatedSlot slot : slots) {
assertThat(slotPool.reserveFreeSlot(slot.getAllocationId())).isEqualTo(slot);
freeSlotInfoTracker.reserveSlot(slot.getAllocationId());
numAllocatedSlots++;

for (AllocatedSlotPool.FreeSlotInfo freeSlotInfo : slotPool.getFreeSlotsInformation()) {
final double utilization = (double) numAllocatedSlots / slots.size();
assertThat(
slotPool.getTaskExecutorUtilization(
freeSlotInfo
.asSlotInfo()
.getTaskManagerLocation()
.getResourceID()))
.isCloseTo(utilization, offset(0.1));
}
final double utilization = (double) numAllocatedSlots / slots.size();

assertThat(freeSlotInfoTracker.getAvailableSlots())
.allSatisfy(
allocationId ->
assertThat(
freeSlotInfoTracker.getTaskExecutorUtilization(
freeSlotInfoTracker.getSlotInfo(
allocationId)))
.isCloseTo(utilization, offset(0.1)));
}
}

Expand Down Expand Up @@ -305,7 +307,7 @@ private void assertSlotPoolContainsSlots(
private void assertSlotPoolContainsFreeSlots(
DefaultAllocatedSlotPool slotPool, Collection<AllocatedSlot> allocatedSlots) {
final Collection<AllocatedSlotPool.FreeSlotInfo> freeSlotsInformation =
slotPool.getFreeSlotsInformation();
slotPool.getFreeSlotInfoTracker().getFreeSlotsWithIdleSinceInformation();

assertThat(freeSlotsInformation).hasSize(allocatedSlots.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,8 @@ void testFreedSlotWillRemainAssignedToMatchedResourceProfile() {
createSlotOffersForResourceRequirements(
ResourceCounter.withResource(ResourceProfile.ANY, 1)));

final SlotInfo slot = slotPool.getFreeSlotsInformation().iterator().next();
final SlotInfo slot =
slotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().iterator().next();

slotPool.reserveFreeSlot(slot.getAllocationId(), largeResourceProfile);
assertThat(
Expand Down Expand Up @@ -615,7 +616,7 @@ void testReserveFreeSlotForResourceUpdatesAvailableResourcesAndRequirements() {
ResourceCounter.withResource(smallResourceProfile, 1));

final SlotInfo largeSlot =
slotPool.getFreeSlotsInformation().stream()
slotPool.getFreeSlotInfoTracker().getFreeSlotsInformation().stream()
.filter(slot -> slot.getResourceProfile().equals(largeResourceProfile))
.findFirst()
.get();
Expand Down Expand Up @@ -677,7 +678,7 @@ void testRegisterSlotsDoesNotAffectRequirements() {
0L);

final AllocationID allocationId =
slotPool.getFreeSlotsInformation().iterator().next().getAllocationId();
slotPool.getFreeSlotInfoTracker().getAvailableSlots().iterator().next();

assertThat(slotPool.getResourceRequirements()).isEmpty();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,6 @@ public Collection<SlotOffer> registerSlots(
slots, taskManagerLocation, taskManagerGateway, currentTime);
}

@Override
public Collection<SlotInfo> getFreeSlotsInformation() {
return getFreeSlotsInformationSupplier.get();
}

@Override
public FreeSlotInfoTracker getFreeSlotInfoTracker() {
return getFreeSlotInfoTrackerSupplier.get();
Expand Down

0 comments on commit 72bff2a

Please sign in to comment.