Skip to content

Commit

Permalink
[FLINK-34731][runtime] Remove SpeculativeScheduler and incorporate it…
Browse files Browse the repository at this point in the history
…s features into AdaptiveBatchScheduler.

This closes apache#24524.
  • Loading branch information
JunRuiLee authored and zhuzhurk committed Mar 22, 2024
1 parent 709bf93 commit cf0d75c
Show file tree
Hide file tree
Showing 10 changed files with 549 additions and 358 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blocklist.BlocklistOperations;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
Expand All @@ -44,6 +46,7 @@
import org.apache.flink.runtime.executiongraph.ParallelismAndInputInfos;
import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.FailureHandlingResult;
import org.apache.flink.runtime.executiongraph.failover.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
Expand Down Expand Up @@ -75,6 +78,8 @@

import org.slf4j.Logger;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -113,6 +118,8 @@ public class AdaptiveBatchScheduler extends DefaultScheduler {
private final Map<JobVertexID, CompletableFuture<Integer>>
sourceParallelismFuturesByJobVertexId;

private final SpeculativeExecutionHandler speculativeExecutionHandler;

public AdaptiveBatchScheduler(
final Logger log,
final JobGraph jobGraph,
Expand All @@ -138,7 +145,8 @@ public AdaptiveBatchScheduler(
final ShuffleMaster<?> shuffleMaster,
final Time rpcTimeout,
final VertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider,
int defaultMaxParallelism,
final int defaultMaxParallelism,
final BlocklistOperations blocklistOperations,
final HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint,
final Map<JobVertexID, ForwardGroup> forwardGroupsByJobVertexId)
throws Exception {
Expand Down Expand Up @@ -183,10 +191,40 @@ public AdaptiveBatchScheduler(
this.hybridPartitionDataConsumeConstraint = hybridPartitionDataConsumeConstraint;

this.sourceParallelismFuturesByJobVertexId = new HashMap<>();

speculativeExecutionHandler =
createSpeculativeExecutionHandler(
log, jobMasterConfiguration, executionVertexVersioner, blocklistOperations);
}

private SpeculativeExecutionHandler createSpeculativeExecutionHandler(
Logger log,
Configuration jobMasterConfiguration,
ExecutionVertexVersioner executionVertexVersioner,
BlocklistOperations blocklistOperations) {

if (jobMasterConfiguration.get(BatchExecutionOptions.SPECULATIVE_ENABLED)) {
return new DefaultSpeculativeExecutionHandler(
jobMasterConfiguration,
blocklistOperations,
this::getExecutionVertex,
() -> getExecutionGraph().getRegisteredExecutions(),
(newSpeculativeExecutions, verticesToDeploy) ->
executionDeployer.allocateSlotsAndDeploy(
newSpeculativeExecutions,
executionVertexVersioner.getExecutionVertexVersions(
verticesToDeploy)),
log);
} else {
return new DummySpeculativeExecutionHandler();
}
}

@Override
protected void startSchedulingInternal() {
speculativeExecutionHandler.init(
getExecutionGraph(), getMainThreadExecutor(), jobManagerJobMetricGroup);

tryComputeSourceParallelismThenRunAsync(
(Void value, Throwable throwable) -> {
if (getExecutionGraph().getState() == JobStatus.CREATED) {
Expand All @@ -196,8 +234,16 @@ protected void startSchedulingInternal() {
});
}

@Override
public CompletableFuture<Void> closeAsync() {
speculativeExecutionHandler.stopSlowTaskDetector();
return super.closeAsync();
}

@Override
protected void onTaskFinished(final Execution execution, final IOMetrics ioMetrics) {
speculativeExecutionHandler.notifyTaskFinished(execution, this::cancelPendingExecutions);

checkNotNull(ioMetrics);
updateResultPartitionBytesMetrics(ioMetrics.getResultPartitionBytes());
ExecutionVertexVersion currentVersion =
Expand All @@ -215,6 +261,66 @@ protected void onTaskFinished(final Execution execution, final IOMetrics ioMetri
});
}

private CompletableFuture<?> cancelPendingExecutions(
final ExecutionVertexID executionVertexId) {
final List<Execution> pendingExecutions =
getExecutionVertex(executionVertexId).getCurrentExecutions().stream()
.filter(
e ->
!e.getState().isTerminal()
&& e.getState() != ExecutionState.CANCELING)
.collect(Collectors.toList());
if (pendingExecutions.isEmpty()) {
return CompletableFuture.completedFuture(null);
}

log.info(
"Canceling {} un-finished executions of {} because one of its executions has finished.",
pendingExecutions.size(),
executionVertexId);

final CompletableFuture<?> future =
FutureUtils.combineAll(
pendingExecutions.stream()
.map(this::cancelExecution)
.collect(Collectors.toList()));
cancelAllPendingSlotRequestsForVertex(executionVertexId);
return future;
}

@Override
protected void onTaskFailed(final Execution execution) {
speculativeExecutionHandler.notifyTaskFailed(execution);

super.onTaskFailed(execution);
}

@Override
protected void handleTaskFailure(
final Execution failedExecution, @Nullable final Throwable error) {
if (!speculativeExecutionHandler.handleTaskFailure(
failedExecution, error, this::handleLocalExecutionAttemptFailure)) {
super.handleTaskFailure(failedExecution, error);
}
}

private void handleLocalExecutionAttemptFailure(
final Execution failedExecution, @Nullable final Throwable error) {
executionSlotAllocator.cancel(failedExecution.getAttemptId());

final FailureHandlingResult failureHandlingResult =
recordTaskFailure(failedExecution, error);
if (failureHandlingResult.canRestart()) {
archiveFromFailureHandlingResult(
createFailureHandlingResultSnapshot(failureHandlingResult));
} else {
failJob(
error,
failureHandlingResult.getTimestamp(),
failureHandlingResult.getFailureLabels());
}
}

private void updateResultPartitionBytesMetrics(
Map<IntermediateResultPartitionID, ResultPartitionBytes> resultPartitionBytes) {
checkNotNull(resultPartitionBytes);
Expand Down Expand Up @@ -251,6 +357,8 @@ public void allocateSlotsAndDeploy(final List<ExecutionVertexID> verticesToDeplo

@Override
protected void resetForNewExecution(final ExecutionVertexID executionVertexId) {
speculativeExecutionHandler.resetForNewExecution(executionVertexId);

final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
if (executionVertex.getExecutionState() == ExecutionState.FINISHED) {
executionVertex
Expand Down Expand Up @@ -641,4 +749,9 @@ private static BlockingResultInfo createFromIntermediateResult(IntermediateResul
BlockingResultInfo getBlockingResultInfo(IntermediateDataSetID resultId) {
return blockingResultInfos.get(resultId);
}

@VisibleForTesting
SpeculativeExecutionHandler getSpeculativeExecutionHandler() {
return speculativeExecutionHandler;
}
}
Loading

0 comments on commit cf0d75c

Please sign in to comment.