Skip to content

Commit

Permalink
[FLINK-15959][flink-runtime] Support min number of slots configuratio…
Browse files Browse the repository at this point in the history
…n in DefaultResourceAllocationStrategy
  • Loading branch information
xiangyuf authored and KarmaGYZ committed Oct 8, 2023
1 parent 51cdb34 commit eb046b6
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ private static SlotManager createSlotManager(
slotManagerConfiguration.getNumSlotsPerWorker(),
slotManagerConfiguration.isEvenlySpreadOutSlots(),
slotManagerConfiguration.getTaskManagerTimeout(),
slotManagerConfiguration.getRedundantTaskManagerNum()));
slotManagerConfiguration.getRedundantTaskManagerNum(),
slotManagerConfiguration.getMinTotalCpu(),
slotManagerConfiguration.getMinTotalMem()));
} else {
return new DeclarativeSlotManager(
scheduledExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.flink.runtime.resourcemanager.slotmanager;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.resources.CPUResource;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.slots.ResourceRequirement;
Expand Down Expand Up @@ -63,6 +65,8 @@ public class DefaultResourceAllocationStrategy implements ResourceAllocationStra
private final ResourceProfile defaultSlotResourceProfile;
private final ResourceProfile totalResourceProfile;
private final int numSlotsPerWorker;
private final CPUResource minTotalCPU;
private final MemorySize minTotalMemory;
private final ResourceMatchingStrategy availableResourceMatchingStrategy;

/**
Expand All @@ -82,7 +86,9 @@ public DefaultResourceAllocationStrategy(
int numSlotsPerWorker,
boolean evenlySpreadOutSlots,
Time taskManagerTimeout,
int redundantTaskManagerNum) {
int redundantTaskManagerNum,
CPUResource minTotalCPU,
MemorySize minTotalMemory) {
this.totalResourceProfile = totalResourceProfile;
this.numSlotsPerWorker = numSlotsPerWorker;
this.defaultSlotResourceProfile =
Expand All @@ -94,6 +100,8 @@ public DefaultResourceAllocationStrategy(
: AnyMatchingResourceMatchingStrategy.INSTANCE;
this.taskManagerTimeout = taskManagerTimeout;
this.redundantTaskManagerNum = redundantTaskManagerNum;
this.minTotalCPU = minTotalCPU;
this.minTotalMemory = minTotalMemory;
}

@Override
Expand All @@ -109,6 +117,11 @@ public ResourceAllocationResult tryFulfillRequirements(
final List<InternalResourceInfo> pendingResources =
getPendingResources(taskManagerResourceInfoProvider, resultBuilder);

ResourceProfile totalCurrentResources =
Stream.concat(registeredResources.stream(), pendingResources.stream())
.map(internalResourceInfo -> internalResourceInfo.totalProfile)
.reduce(ResourceProfile.ZERO, ResourceProfile::merge);

for (Map.Entry<JobID, Collection<ResourceRequirement>> resourceRequirements :
missingResources.entrySet()) {
final JobID jobId = resourceRequirements.getKey();
Expand All @@ -119,29 +132,27 @@ public ResourceAllocationResult tryFulfillRequirements(

if (!unfulfilledJobRequirements.isEmpty()) {
tryFulfillRequirementsForJobWithPendingResources(
jobId, unfulfilledJobRequirements, pendingResources, resultBuilder);
jobId,
unfulfilledJobRequirements,
pendingResources,
totalCurrentResources,
resultBuilder);
}
}

// Unlike tryFulfillRequirementsForJobWithPendingResources, which updates pendingResources
// to the latest state after a new PendingTaskManager is created,
// tryFulFillRedundantResources will not update pendingResources even after new
// tryFulFillRequiredResources will not update pendingResources even after new
// PendingTaskManagers are created.
// This is because the pendingResources are no longer needed afterwards.
tryFulFillRedundantResources(
totalResourceProfile.multiply(redundantTaskManagerNum),
registeredResources,
pendingResources,
resultBuilder);

tryFulFillRequiredResources(
registeredResources, pendingResources, totalCurrentResources, resultBuilder);
return resultBuilder.build();
}

@Override
public ResourceReconcileResult tryReconcileClusterResources(
TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
ResourceProfile requiredRedundantResources =
totalResourceProfile.multiply(redundantTaskManagerNum);
ResourceReconcileResult.Builder builder = ResourceReconcileResult.builder();

List<TaskManagerInfo> taskManagersIdleTimeout = new ArrayList<>();
Expand Down Expand Up @@ -173,52 +184,59 @@ public ResourceReconcileResult tryReconcileClusterResources(
}
});

if (taskManagersIdleTimeout.isEmpty() && pendingTaskManagersNonUse.isEmpty()) {
// short-cut for nothing to release
return builder.build();
}

ResourceProfile resourcesToKeep = ResourceProfile.ZERO;
boolean redundantFulfilled = false;
ResourceProfile resourcesInTotal = ResourceProfile.ZERO;
boolean resourceFulfilled = false;

// check whether available resources of used (pending) task manager is enough.
ResourceProfile availableResourcesOfNonIdle =
ResourceProfile resourcesAvailableOfNonIdle =
getAvailableResourceOfTaskManagers(taskManagersNonTimeout);
resourcesToKeep = resourcesToKeep.merge(availableResourcesOfNonIdle);
if (canFulfillRequirement(requiredRedundantResources, resourcesToKeep)) {
redundantFulfilled = true;

ResourceProfile resourcesInTotalOfNonIdle =
getTotalResourceOfTaskManagers(taskManagersNonTimeout);

resourcesToKeep = resourcesToKeep.merge(resourcesAvailableOfNonIdle);
resourcesInTotal = resourcesInTotal.merge(resourcesInTotalOfNonIdle);

if (isRequiredResourcesFulfilled(resourcesToKeep, resourcesInTotal)) {
resourceFulfilled = true;
} else {
ResourceProfile availableResourcesOfNonIdlePendingTaskManager =
ResourceProfile resourcesAvailableOfNonIdlePendingTaskManager =
getAvailableResourceOfPendingTaskManagers(pendingTaskManagersInuse);
resourcesToKeep = resourcesToKeep.merge(availableResourcesOfNonIdlePendingTaskManager);
ResourceProfile resourcesInTotalOfNonIdlePendingTaskManager =
getTotalResourceOfPendingTaskManagers(pendingTaskManagersInuse);

resourcesToKeep = resourcesToKeep.merge(resourcesAvailableOfNonIdlePendingTaskManager);
resourcesInTotal = resourcesInTotal.merge(resourcesInTotalOfNonIdlePendingTaskManager);
}

// try reserve or release unused (pending) task managers
for (TaskManagerInfo taskManagerInfo : taskManagersIdleTimeout) {
if (redundantFulfilled
|| canFulfillRequirement(requiredRedundantResources, resourcesToKeep)) {
redundantFulfilled = true;
if (resourceFulfilled
|| isRequiredResourcesFulfilled(resourcesToKeep, resourcesInTotal)) {
resourceFulfilled = true;
builder.addTaskManagerToRelease(taskManagerInfo);
} else {
resourcesToKeep = resourcesToKeep.merge(taskManagerInfo.getAvailableResource());
resourcesInTotal = resourcesInTotal.merge(taskManagerInfo.getTotalResource());
}
}
for (PendingTaskManager pendingTaskManager : pendingTaskManagersNonUse) {
if (redundantFulfilled
|| canFulfillRequirement(requiredRedundantResources, resourcesToKeep)) {
redundantFulfilled = true;
if (resourceFulfilled
|| isRequiredResourcesFulfilled(resourcesToKeep, resourcesInTotal)) {
resourceFulfilled = true;
builder.addPendingTaskManagerToRelease(pendingTaskManager);
} else {
resourcesToKeep = resourcesToKeep.merge(pendingTaskManager.getUnusedResource());
resourcesInTotal =
resourcesInTotal.merge(pendingTaskManager.getTotalResourceProfile());
}
}

if (!redundantFulfilled) {
// fulfill redundant resources
tryFulFillRedundantResourcesWithAction(
requiredRedundantResources,
resourcesToKeep,
builder::addPendingTaskManagerToAllocate);
if (!resourceFulfilled) {
// fulfill required resources
tryFulFillRequiredResourcesWithAction(
resourcesToKeep, resourcesInTotal, builder::addPendingTaskManagerToAllocate);
}

return builder.build();
Expand Down Expand Up @@ -297,6 +315,7 @@ private void tryFulfillRequirementsForJobWithPendingResources(
JobID jobId,
Collection<ResourceRequirement> unfulfilledRequirements,
List<InternalResourceInfo> availableResources,
ResourceProfile totalCurrentResources,
ResourceAllocationResult.Builder resultBuilder) {
for (ResourceRequirement missingResource : unfulfilledRequirements) {
// for this strategy, all pending resources should have the same default slot resource
Expand All @@ -321,6 +340,7 @@ private void tryFulfillRequirementsForJobWithPendingResources(
final PendingTaskManager newPendingTaskManager =
new PendingTaskManager(totalResourceProfile, numSlotsPerWorker);
resultBuilder.addPendingTaskManagerAllocate(newPendingTaskManager);
totalCurrentResources.merge(totalResourceProfile);
ResourceProfile remainResource = totalResourceProfile;
while (numUnfulfilled > 0
&& canFulfillRequirement(effectiveProfile, remainResource)) {
Expand All @@ -347,10 +367,26 @@ && canFulfillRequirement(effectiveProfile, remainResource)) {
}
}

private void tryFulFillRedundantResources(
ResourceProfile requiredRedundantResource,
private boolean isRequiredResourcesFulfilled(
ResourceProfile resourcesAvailable, ResourceProfile resourcesInTotal) {
return isRedundantResourcesFulfilled(resourcesAvailable)
&& isMinRequiredResourcesFulfilled(resourcesInTotal);
}

private boolean isRedundantResourcesFulfilled(ResourceProfile resourcesAvailable) {
return resourcesAvailable.allFieldsNoLessThan(
totalResourceProfile.multiply(redundantTaskManagerNum));
}

private boolean isMinRequiredResourcesFulfilled(ResourceProfile resourcesInTotal) {
return resourcesInTotal.getCpuCores().compareTo(minTotalCPU) >= 0
&& resourcesInTotal.getTotalMemory().compareTo(minTotalMemory) >= 0;
}

private void tryFulFillRequiredResources(
List<InternalResourceInfo> availableRegisteredResources,
List<InternalResourceInfo> availablePendingResources,
ResourceProfile totalCurrentResources,
ResourceAllocationResult.Builder resultBuilder) {
ResourceProfile totalAvailableResources =
Stream.concat(
Expand All @@ -359,24 +395,34 @@ private void tryFulFillRedundantResources(
.map(internalResourceInfo -> internalResourceInfo.availableProfile)
.reduce(ResourceProfile.ZERO, ResourceProfile::merge);

tryFulFillRedundantResourcesWithAction(
requiredRedundantResource,
tryFulFillRequiredResourcesWithAction(
totalAvailableResources,
totalCurrentResources,
resultBuilder::addPendingTaskManagerAllocate);
}

private void tryFulFillRedundantResourcesWithAction(
ResourceProfile requiredRedundantResource,
ResourceProfile totalAvailableResources,
private void tryFulFillRequiredResourcesWithAction(
ResourceProfile resourcesAvailable,
ResourceProfile resourcesInTotal,
Consumer<? super PendingTaskManager> fulfillAction) {
while (!canFulfillRequirement(requiredRedundantResource, totalAvailableResources)) {
while (!isRequiredResourcesFulfilled(resourcesAvailable, resourcesInTotal)) {
PendingTaskManager pendingTaskManager =
new PendingTaskManager(totalResourceProfile, numSlotsPerWorker);
fulfillAction.accept(pendingTaskManager);
totalAvailableResources = totalAvailableResources.merge(totalResourceProfile);
resourcesAvailable = resourcesAvailable.merge(totalResourceProfile);
resourcesInTotal = resourcesInTotal.merge(totalResourceProfile);
}
}

private ResourceProfile getTotalResourceOfTaskManagers(List<TaskManagerInfo> taskManagers) {
return totalResourceProfile.multiply(taskManagers.size());
}

private ResourceProfile getTotalResourceOfPendingTaskManagers(
List<PendingTaskManager> pendingTaskManagers) {
return totalResourceProfile.multiply(pendingTaskManagers.size());
}

private ResourceProfile getAvailableResourceOfTaskManagers(List<TaskManagerInfo> taskManagers) {
return taskManagers.stream()
.map(TaskManagerInfo::getAvailableResource)
Expand Down
Loading

0 comments on commit eb046b6

Please sign in to comment.