Skip to content

Commit

Permalink
[FLINK-25855] Allow JobMaster to accept excess slots when restarting …
Browse files Browse the repository at this point in the history
…the job

This commit allows the JobMaster to accept excess slots when the job is restarting.
The way it works is that the DefaultScheduler tells the DefaultSlotPoolBridge when
the job is restarting. If during this time new slots should be offered, then the
DefaultSLotPoolBridge calls DeclarativeSlotPool.registerSlots which accepts all
slot offers. Once the job leaves the RESTARTING state, the DefaultSlotPoolBridge
will call DeclarativeSlotPool.offerSlots which only accepts those slots that are
currently required.

[FLINK-25855] Add DeclarativeSlotPool.registerSlots

This closes apache#18542.
  • Loading branch information
tillrohrmann committed Jan 31, 2022
1 parent d874b26 commit fb14d4d
Show file tree
Hide file tree
Showing 11 changed files with 305 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,21 @@ Collection<SlotOffer> offerSlots(
TaskManagerGateway taskManagerGateway,
long currentTime);

/**
* Registers the given set of slots at the slot pool.
*
* @param slots slots to register
* @param taskManagerLocation taskManagerLocation is the location of the offering TaskExecutor
* @param taskManagerGateway taskManagerGateway is the gateway to talk to the offering
* TaskExecutor
* @param currentTime currentTime is the time the slots are being offered
*/
void registerSlots(
Collection<? extends SlotOffer> slots,
TaskManagerLocation taskManagerLocation,
TaskManagerGateway taskManagerGateway,
long currentTime);

/**
* Returns the slot information for all free slots (slots which can be allocated from the slot
* pool).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
Expand All @@ -39,6 +42,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -66,6 +70,8 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem
private final Time batchSlotTimeout;
private boolean isBatchSlotRequestTimeoutCheckDisabled;

private boolean isJobRestarting = false;

public DeclarativeSlotPoolBridge(
JobID jobId,
DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
Expand Down Expand Up @@ -121,6 +127,50 @@ protected void onClose() {
cancelPendingRequests(request -> true, cause);
}

/**
* To set whether the underlying is currently restarting or not. In the former case the slot
* pool bridge will accept all incoming slot offers.
*
* @param isJobRestarting whether this is restarting or not
*/
@Override
public void setIsJobRestarting(boolean isJobRestarting) {
this.isJobRestarting = isJobRestarting;
}

@Override
public Collection<SlotOffer> offerSlots(
TaskManagerLocation taskManagerLocation,
TaskManagerGateway taskManagerGateway,
Collection<SlotOffer> offers) {
assertHasBeenStarted();

if (!isTaskManagerRegistered(taskManagerLocation.getResourceID())) {
log.debug(
"Ignoring offered slots from unknown task manager {}.",
taskManagerLocation.getResourceID());
return Collections.emptyList();
}

if (isJobRestarting) {
getDeclarativeSlotPool()
.registerSlots(
offers,
taskManagerLocation,
taskManagerGateway,
getRelativeTimeMillis());

return offers;
} else {
return getDeclarativeSlotPool()
.offerSlots(
offers,
taskManagerLocation,
taskManagerGateway,
getRelativeTimeMillis());
}
}

private void cancelPendingRequests(
Predicate<PendingRequest> requestPredicate, FlinkException cancelCause) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -181,10 +182,22 @@ public Collection<SlotOffer> offerSlots(
TaskManagerGateway taskManagerGateway,
long currentTime) {

LOG.debug(
"Received {} slot offers from TaskExecutor {}.",
offers.size(),
taskManagerLocation);
LOG.debug("Received {} slot offers from TaskExecutor {}.", offers, taskManagerLocation);

return internalOfferSlots(
offers,
taskManagerLocation,
taskManagerGateway,
currentTime,
this::matchWithOutstandingRequirement);
}

private Collection<SlotOffer> internalOfferSlots(
Collection<? extends SlotOffer> offers,
TaskManagerLocation taskManagerLocation,
TaskManagerGateway taskManagerGateway,
long currentTime,
Function<ResourceProfile, Optional<ResourceProfile>> matchingCondition) {
final Collection<SlotOffer> acceptedSlotOffers = new ArrayList<>();
final Collection<AllocatedSlot> acceptedSlots = new ArrayList<>();

Expand All @@ -195,7 +208,7 @@ public Collection<SlotOffer> offerSlots(
} else {
Optional<AllocatedSlot> acceptedSlot =
matchOfferWithOutstandingRequirements(
offer, taskManagerLocation, taskManagerGateway);
offer, taskManagerLocation, taskManagerGateway, matchingCondition);
if (acceptedSlot.isPresent()) {
acceptedSlotOffers.add(offer);
acceptedSlots.add(acceptedSlot.get());
Expand All @@ -219,16 +232,40 @@ public Collection<SlotOffer> offerSlots(
return acceptedSlotOffers;
}

@Override
public void registerSlots(
Collection<? extends SlotOffer> slots,
TaskManagerLocation taskManagerLocation,
TaskManagerGateway taskManagerGateway,
long currentTime) {
LOG.debug("Register slots {} from TaskManager {}.", slots, taskManagerLocation);
internalOfferSlots(
slots,
taskManagerLocation,
taskManagerGateway,
currentTime,
this::matchWithOutstandingRequirementOrSelf);
}

private Optional<ResourceProfile> matchWithOutstandingRequirementOrSelf(
ResourceProfile resourceProfile) {
final Optional<ResourceProfile> match = matchWithOutstandingRequirement(resourceProfile);

if (match.isPresent()) {
return match;
} else {
return Optional.of(resourceProfile);
}
}

private Optional<AllocatedSlot> matchOfferWithOutstandingRequirements(
SlotOffer slotOffer,
TaskManagerLocation taskManagerLocation,
TaskManagerGateway taskManagerGateway) {
TaskManagerGateway taskManagerGateway,
Function<ResourceProfile, Optional<ResourceProfile>> matchingCondition) {

final Optional<ResourceProfile> match =
requirementMatcher.match(
slotOffer.getResourceProfile(),
totalResourceRequirements,
fulfilledResourceRequirements::getResourceCount);
matchingCondition.apply(slotOffer.getResourceProfile());

if (match.isPresent()) {
final ResourceProfile matchedRequirement = match.get();
Expand All @@ -252,6 +289,14 @@ private Optional<AllocatedSlot> matchOfferWithOutstandingRequirements(
return Optional.empty();
}

private Optional<ResourceProfile> matchWithOutstandingRequirement(
ResourceProfile resourceProfile) {
return requirementMatcher.match(
resourceProfile,
totalResourceRequirements,
fulfilledResourceRequirements::getResourceCount);
}

@VisibleForTesting
ResourceCounter calculateUnfulfilledResources() {
return totalResourceRequirements.subtract(fulfilledResourceRequirements);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,11 @@ CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(
* @return the allocated slots on the task manager
*/
AllocatedSlotReport createAllocatedSlotReport(ResourceID taskManagerId);

/**
* Sets whether the underlying job is currently restarting or not.
*
* @param isJobRestarting whether the job is restarting or not
*/
void setIsJobRestarting(boolean isJobRestarting);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.flink.runtime.scheduler;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
Expand Down Expand Up @@ -134,7 +135,14 @@ public SchedulerNG createInstance(
schedulerComponents.getAllocatorFactory(),
initializationTimestamp,
mainThreadExecutor,
jobStatusListener,
(jobId, jobStatus, timestamp) -> {
if (jobStatus == JobStatus.RESTARTING) {
slotPool.setIsJobRestarting(true);
} else {
slotPool.setIsJobRestarting(false);
}
jobStatusListener.jobStatusChanges(jobId, jobStatus, timestamp);
},
executionGraphFactory,
shuffleMaster,
rpcTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,16 @@ public boolean equals(Object o) {
public int hashCode() {
return allocationId.hashCode();
}

@Override
public String toString() {
return "SlotOffer{"
+ "allocationId="
+ allocationId
+ ", slotIndex="
+ slotIndex
+ ", resourceProfile="
+ resourceProfile
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,9 @@ public AllocatedSlotReport createAllocatedSlotReport(ResourceID taskManagerId) {
return new AllocatedSlotReport(jobId, allocatedSlotInfos);
}

@Override
public void setIsJobRestarting(boolean isJobRestarting) {}

@Override
public void releaseSlot(@Nonnull SlotRequestId slotRequestId, @Nullable Throwable cause) {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -1922,6 +1925,62 @@ public void testJobMasterOnlyTerminatesAfterTheSchedulerHasClosed() throws Excep
jobMasterTerminationFuture.get();
}

@Test
public void testJobMasterAcceptsExcessSlotsWhenJobIsRestarting() throws Exception {
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
configuration.set(
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofDays(1));
final JobMaster jobMaster =
new JobMasterBuilder(jobGraph, rpcService)
.withConfiguration(configuration)
.createJobMaster();

try {
jobMaster.start();

final JobMasterGateway jobMasterGateway =
jobMaster.getSelfGateway(JobMasterGateway.class);

assertThat(
jobMasterGateway.requestJobStatus(testingTimeout).get(), is(JobStatus.RUNNING));

final LocalUnresolvedTaskManagerLocation unresolvedTaskManagerLocation =
new LocalUnresolvedTaskManagerLocation();
registerSlotsAtJobMaster(
1,
jobMasterGateway,
jobGraph.getJobID(),
new TestingTaskExecutorGatewayBuilder()
.setAddress("firstTaskManager")
.createTestingTaskExecutorGateway(),
unresolvedTaskManagerLocation);

jobMasterGateway.disconnectTaskManager(
unresolvedTaskManagerLocation.getResourceID(),
new FlinkException("Test exception."));

CommonTestUtils.waitUntilCondition(
() ->
jobMasterGateway.requestJobStatus(testingTimeout).get()
== JobStatus.RESTARTING,
Deadline.fromNow(TimeUtils.toDuration(testingTimeout)));

final int numberSlots = 3;
assertThat(
registerSlotsAtJobMaster(
numberSlots,
jobMasterGateway,
jobGraph.getJobID(),
new TestingTaskExecutorGatewayBuilder()
.setAddress("secondTaskManager")
.createTestingTaskExecutorGateway(),
new LocalUnresolvedTaskManagerLocation()),
hasSize(numberSlots));
} finally {
RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
}
}

private void runJobFailureWhenTaskExecutorTerminatesTest(
HeartbeatServices heartbeatServices,
BiConsumer<LocalUnresolvedTaskManagerLocation, JobMasterGateway> jobReachedRunningState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,41 @@ public void testAcceptingOfferedSlotsWithoutResourceManagerConnected() throws Ex
}
}

@Test
public void testIfJobIsRestartingAllOfferedSlotsWillBeRegistered() throws Exception {
final CompletableFuture<Void> registerSlotsCalledFuture = new CompletableFuture<>();
final TestingDeclarativeSlotPoolFactory declarativeSlotPoolFactory =
new TestingDeclarativeSlotPoolFactory(
TestingDeclarativeSlotPool.builder()
.setRegisterSlotsFunction(
(slotOffers,
taskManagerLocation,
taskManagerGateway,
aLong) ->
registerSlotsCalledFuture.complete(null)));

try (DeclarativeSlotPoolBridge declarativeSlotPoolBridge =
createDeclarativeSlotPoolBridge(
declarativeSlotPoolFactory, requestSlotMatchingStrategy)) {
declarativeSlotPoolBridge.start(jobMasterId, "localhost", mainThreadExecutor);

declarativeSlotPoolBridge.setIsJobRestarting(true);

final LocalTaskManagerLocation localTaskManagerLocation =
new LocalTaskManagerLocation();
declarativeSlotPoolBridge.registerTaskManager(localTaskManagerLocation.getResourceID());

declarativeSlotPoolBridge.offerSlots(
localTaskManagerLocation,
new SimpleAckingTaskManagerGateway(),
Collections.singleton(
new SlotOffer(new AllocationID(), 0, ResourceProfile.ANY)));

// make sure that the register slots method is called
registerSlotsCalledFuture.join();
}
}

@Nonnull
static DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge(
DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
Expand Down
Loading

0 comments on commit fb14d4d

Please sign in to comment.