From cf0d75c4bb324825a057dc72243bb6a2046f8479 Mon Sep 17 00:00:00 2001 From: JunRuiLee Date: Tue, 19 Mar 2024 15:27:52 +0800 Subject: [PATCH] [FLINK-34731][runtime] Remove SpeculativeScheduler and incorporate its features into AdaptiveBatchScheduler. This closes #24524. --- .../adaptivebatch/AdaptiveBatchScheduler.java | 115 +++++++- .../AdaptiveBatchSchedulerFactory.java | 185 +++++++------ ...> DefaultSpeculativeExecutionHandler.java} | 245 +++++------------- .../DummySpeculativeExecutionHandler.java | 72 +++++ .../SpeculativeExecutionHandler.java | 82 ++++++ .../scheduler/DefaultSchedulerBuilder.java | 88 +++---- .../scheduler/DefaultSchedulerTest.java | 11 + ...est.java => SpeculativeExecutionTest.java} | 97 ++++--- ...xecutionTimeBasedSlowTaskDetectorTest.java | 4 +- ...e.java => SpeculativeExecutionITCase.java} | 8 +- 10 files changed, 549 insertions(+), 358 deletions(-) rename flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/{SpeculativeScheduler.java => DefaultSpeculativeExecutionHandler.java} (59%) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummySpeculativeExecutionHandler.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeExecutionHandler.java rename flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/{SpeculativeSchedulerTest.java => SpeculativeExecutionTest.java} (85%) rename flink-tests/src/test/java/org/apache/flink/test/scheduling/{SpeculativeSchedulerITCase.java => SpeculativeExecutionITCase.java} (99%) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java index 83fb50f15148c..3d95135e4e938 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java @@ -23,10 +23,12 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.BatchExecutionOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint; import org.apache.flink.core.failure.FailureEnricher; import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.blocklist.BlocklistOperations; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; @@ -44,6 +46,7 @@ import org.apache.flink.runtime.executiongraph.ParallelismAndInputInfos; import org.apache.flink.runtime.executiongraph.ResultPartitionBytes; import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.failover.FailureHandlingResult; import org.apache.flink.runtime.executiongraph.failover.RestartBackoffTimeStrategy; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; @@ -75,6 +78,8 @@ import org.slf4j.Logger; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -113,6 +118,8 @@ public class AdaptiveBatchScheduler extends DefaultScheduler { private final Map> sourceParallelismFuturesByJobVertexId; + private final SpeculativeExecutionHandler speculativeExecutionHandler; + public AdaptiveBatchScheduler( final Logger log, final JobGraph jobGraph, @@ -138,7 +145,8 @@ public AdaptiveBatchScheduler( final ShuffleMaster shuffleMaster, final Time rpcTimeout, final VertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider, - int defaultMaxParallelism, + final int defaultMaxParallelism, + final BlocklistOperations blocklistOperations, final HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint, final Map forwardGroupsByJobVertexId) throws Exception { @@ -183,10 +191,40 @@ public AdaptiveBatchScheduler( this.hybridPartitionDataConsumeConstraint = hybridPartitionDataConsumeConstraint; this.sourceParallelismFuturesByJobVertexId = new HashMap<>(); + + speculativeExecutionHandler = + createSpeculativeExecutionHandler( + log, jobMasterConfiguration, executionVertexVersioner, blocklistOperations); + } + + private SpeculativeExecutionHandler createSpeculativeExecutionHandler( + Logger log, + Configuration jobMasterConfiguration, + ExecutionVertexVersioner executionVertexVersioner, + BlocklistOperations blocklistOperations) { + + if (jobMasterConfiguration.get(BatchExecutionOptions.SPECULATIVE_ENABLED)) { + return new DefaultSpeculativeExecutionHandler( + jobMasterConfiguration, + blocklistOperations, + this::getExecutionVertex, + () -> getExecutionGraph().getRegisteredExecutions(), + (newSpeculativeExecutions, verticesToDeploy) -> + executionDeployer.allocateSlotsAndDeploy( + newSpeculativeExecutions, + executionVertexVersioner.getExecutionVertexVersions( + verticesToDeploy)), + log); + } else { + return new DummySpeculativeExecutionHandler(); + } } @Override protected void startSchedulingInternal() { + speculativeExecutionHandler.init( + getExecutionGraph(), getMainThreadExecutor(), jobManagerJobMetricGroup); + tryComputeSourceParallelismThenRunAsync( (Void value, Throwable throwable) -> { if (getExecutionGraph().getState() == JobStatus.CREATED) { @@ -196,8 +234,16 @@ protected void startSchedulingInternal() { }); } + @Override + public CompletableFuture closeAsync() { + speculativeExecutionHandler.stopSlowTaskDetector(); + return super.closeAsync(); + } + @Override protected void onTaskFinished(final Execution execution, final IOMetrics ioMetrics) { + speculativeExecutionHandler.notifyTaskFinished(execution, this::cancelPendingExecutions); + checkNotNull(ioMetrics); updateResultPartitionBytesMetrics(ioMetrics.getResultPartitionBytes()); ExecutionVertexVersion currentVersion = @@ -215,6 +261,66 @@ protected void onTaskFinished(final Execution execution, final IOMetrics ioMetri }); } + private CompletableFuture cancelPendingExecutions( + final ExecutionVertexID executionVertexId) { + final List pendingExecutions = + getExecutionVertex(executionVertexId).getCurrentExecutions().stream() + .filter( + e -> + !e.getState().isTerminal() + && e.getState() != ExecutionState.CANCELING) + .collect(Collectors.toList()); + if (pendingExecutions.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + log.info( + "Canceling {} un-finished executions of {} because one of its executions has finished.", + pendingExecutions.size(), + executionVertexId); + + final CompletableFuture future = + FutureUtils.combineAll( + pendingExecutions.stream() + .map(this::cancelExecution) + .collect(Collectors.toList())); + cancelAllPendingSlotRequestsForVertex(executionVertexId); + return future; + } + + @Override + protected void onTaskFailed(final Execution execution) { + speculativeExecutionHandler.notifyTaskFailed(execution); + + super.onTaskFailed(execution); + } + + @Override + protected void handleTaskFailure( + final Execution failedExecution, @Nullable final Throwable error) { + if (!speculativeExecutionHandler.handleTaskFailure( + failedExecution, error, this::handleLocalExecutionAttemptFailure)) { + super.handleTaskFailure(failedExecution, error); + } + } + + private void handleLocalExecutionAttemptFailure( + final Execution failedExecution, @Nullable final Throwable error) { + executionSlotAllocator.cancel(failedExecution.getAttemptId()); + + final FailureHandlingResult failureHandlingResult = + recordTaskFailure(failedExecution, error); + if (failureHandlingResult.canRestart()) { + archiveFromFailureHandlingResult( + createFailureHandlingResultSnapshot(failureHandlingResult)); + } else { + failJob( + error, + failureHandlingResult.getTimestamp(), + failureHandlingResult.getFailureLabels()); + } + } + private void updateResultPartitionBytesMetrics( Map resultPartitionBytes) { checkNotNull(resultPartitionBytes); @@ -251,6 +357,8 @@ public void allocateSlotsAndDeploy(final List verticesToDeplo @Override protected void resetForNewExecution(final ExecutionVertexID executionVertexId) { + speculativeExecutionHandler.resetForNewExecution(executionVertexId); + final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId); if (executionVertex.getExecutionState() == ExecutionState.FINISHED) { executionVertex @@ -641,4 +749,9 @@ private static BlockingResultInfo createFromIntermediateResult(IntermediateResul BlockingResultInfo getBlockingResultInfo(IntermediateDataSetID resultId) { return blockingResultInfos.get(resultId); } + + @VisibleForTesting + SpeculativeExecutionHandler getSpeculativeExecutionHandler() { + return speculativeExecutionHandler; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java index 226a6b644a7f8..947851daa415c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.scheduler.adaptivebatch; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.BatchShuffleMode; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.ExecutionMode; @@ -59,6 +60,7 @@ import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory; import org.apache.flink.runtime.scheduler.DefaultExecutionOperations; import org.apache.flink.runtime.scheduler.ExecutionGraphFactory; +import org.apache.flink.runtime.scheduler.ExecutionOperations; import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory; import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; import org.apache.flink.runtime.scheduler.SchedulerNG; @@ -73,18 +75,16 @@ import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.util.SlotSelectionStrategyUtils; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.ScheduledExecutor; import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; -import java.util.function.Consumer; import static org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint.ONLY_FINISHED_PRODUCERS; import static org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint.UNFINISHED_PRODUCERS; @@ -120,10 +120,6 @@ public SchedulerNG createInstance( BlocklistOperations blocklistOperations) throws Exception { - checkState( - jobGraph.getJobType() == JobType.BATCH, - "Adaptive batch scheduler only supports batch jobs"); - checkAllExchangesAreSupported(jobGraph); final SlotPool slotPool = slotPoolService .castInto(SlotPool.class) @@ -132,17 +128,6 @@ public SchedulerNG createInstance( new IllegalStateException( "The AdaptiveBatchScheduler requires a SlotPool.")); - final boolean enableSpeculativeExecution = - jobMasterConfiguration.get(BatchExecutionOptions.SPECULATIVE_ENABLED); - - final HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint = - getOrDecideHybridPartitionDataConsumeConstraint( - jobMasterConfiguration, enableSpeculativeExecution); - - final List> startUpActions = new ArrayList<>(); - final Consumer combinedStartUpActions = - m -> startUpActions.forEach(a -> a.accept(m)); - final ExecutionSlotAllocatorFactory allocatorFactory = createExecutionSlotAllocatorFactory(jobMasterConfiguration, slotPool); @@ -162,6 +147,75 @@ public SchedulerNG createInstance( jobGraph.getName(), jobGraph.getJobID()); + return createScheduler( + log, + jobGraph, + executionConfig, + ioExecutor, + jobMasterConfiguration, + futureExecutor, + userCodeLoader, + checkpointRecoveryFactory, + rpcTimeout, + blobWriter, + jobManagerJobMetricGroup, + shuffleMaster, + partitionTracker, + executionDeploymentTracker, + initializationTimestamp, + mainThreadExecutor, + jobStatusListener, + failureEnrichers, + blocklistOperations, + new DefaultExecutionOperations(), + allocatorFactory, + restartBackoffTimeStrategy, + new ScheduledExecutorServiceAdapter(futureExecutor), + DefaultVertexParallelismAndInputInfosDecider.from( + getDefaultMaxParallelism(jobMasterConfiguration, executionConfig), + jobMasterConfiguration)); + } + + @VisibleForTesting + public static AdaptiveBatchScheduler createScheduler( + Logger log, + JobGraph jobGraph, + ExecutionConfig executionConfig, + Executor ioExecutor, + Configuration jobMasterConfiguration, + ScheduledExecutorService futureExecutor, + ClassLoader userCodeLoader, + CheckpointRecoveryFactory checkpointRecoveryFactory, + Time rpcTimeout, + BlobWriter blobWriter, + JobManagerJobMetricGroup jobManagerJobMetricGroup, + ShuffleMaster shuffleMaster, + JobMasterPartitionTracker partitionTracker, + ExecutionDeploymentTracker executionDeploymentTracker, + long initializationTimestamp, + ComponentMainThreadExecutor mainThreadExecutor, + JobStatusListener jobStatusListener, + Collection failureEnrichers, + BlocklistOperations blocklistOperations, + ExecutionOperations executionOperations, + ExecutionSlotAllocatorFactory allocatorFactory, + RestartBackoffTimeStrategy restartBackoffTimeStrategy, + ScheduledExecutor delayExecutor, + VertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider) + throws Exception { + + checkState( + jobGraph.getJobType() == JobType.BATCH, + "Adaptive batch scheduler only supports batch jobs"); + checkAllExchangesAreSupported(jobGraph); + + final boolean enableSpeculativeExecution = + jobMasterConfiguration.get(BatchExecutionOptions.SPECULATIVE_ENABLED); + + final HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint = + getOrDecideHybridPartitionDataConsumeConstraint( + jobMasterConfiguration, enableSpeculativeExecution); + final ExecutionGraphFactory executionGraphFactory = new DefaultExecutionGraphFactory( jobMasterConfiguration, @@ -189,70 +243,35 @@ public SchedulerNG createInstance( ForwardGroupComputeUtil.computeForwardGroupsAndCheckParallelism( jobGraph.getVerticesSortedTopologicallyFromSources()); - if (enableSpeculativeExecution) { - return new SpeculativeScheduler( - log, - jobGraph, - ioExecutor, - jobMasterConfiguration, - combinedStartUpActions, - new ScheduledExecutorServiceAdapter(futureExecutor), - userCodeLoader, - new CheckpointsCleaner(), - checkpointRecoveryFactory, - jobManagerJobMetricGroup, - schedulingStrategyFactory, - FailoverStrategyFactoryLoader.loadFailoverStrategyFactory( - jobMasterConfiguration), - restartBackoffTimeStrategy, - new DefaultExecutionOperations(), - new ExecutionVertexVersioner(), - allocatorFactory, - initializationTimestamp, - mainThreadExecutor, - jobStatusListener, - failureEnrichers, - executionGraphFactory, - shuffleMaster, - rpcTimeout, - DefaultVertexParallelismAndInputInfosDecider.from( - defaultMaxParallelism, jobMasterConfiguration), - defaultMaxParallelism, - blocklistOperations, - hybridPartitionDataConsumeConstraint, - forwardGroupsByJobVertexId); - } else { - return new AdaptiveBatchScheduler( - log, - jobGraph, - ioExecutor, - jobMasterConfiguration, - combinedStartUpActions, - new ScheduledExecutorServiceAdapter(futureExecutor), - userCodeLoader, - new CheckpointsCleaner(), - checkpointRecoveryFactory, - jobManagerJobMetricGroup, - schedulingStrategyFactory, - FailoverStrategyFactoryLoader.loadFailoverStrategyFactory( - jobMasterConfiguration), - restartBackoffTimeStrategy, - new DefaultExecutionOperations(), - new ExecutionVertexVersioner(), - allocatorFactory, - initializationTimestamp, - mainThreadExecutor, - jobStatusListener, - failureEnrichers, - executionGraphFactory, - shuffleMaster, - rpcTimeout, - DefaultVertexParallelismAndInputInfosDecider.from( - defaultMaxParallelism, jobMasterConfiguration), - defaultMaxParallelism, - hybridPartitionDataConsumeConstraint, - forwardGroupsByJobVertexId); - } + return new AdaptiveBatchScheduler( + log, + jobGraph, + ioExecutor, + jobMasterConfiguration, + componentMainThreadExecutor -> {}, + delayExecutor, + userCodeLoader, + new CheckpointsCleaner(), + checkpointRecoveryFactory, + jobManagerJobMetricGroup, + schedulingStrategyFactory, + FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration), + restartBackoffTimeStrategy, + executionOperations, + new ExecutionVertexVersioner(), + allocatorFactory, + initializationTimestamp, + mainThreadExecutor, + jobStatusListener, + failureEnrichers, + executionGraphFactory, + shuffleMaster, + rpcTimeout, + vertexParallelismAndInputInfosDecider, + defaultMaxParallelism, + blocklistOperations, + hybridPartitionDataConsumeConstraint, + forwardGroupsByJobVertexId); } public static InputConsumableDecider.Factory loadInputConsumableDeciderFactory( @@ -318,7 +337,7 @@ private static ExecutionJobVertex.Factory createExecutionJobVertexFactory( } } - private void checkAllExchangesAreSupported(final JobGraph jobGraph) { + private static void checkAllExchangesAreSupported(final JobGraph jobGraph) { for (JobVertex jobVertex : jobGraph.getVertices()) { for (IntermediateDataSet dataSet : jobVertex.getProducedDataSets()) { checkState( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultSpeculativeExecutionHandler.java similarity index 59% rename from flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultSpeculativeExecutionHandler.java index bdbc191c346d9..f4449e33a94f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultSpeculativeExecutionHandler.java @@ -20,49 +20,29 @@ package org.apache.flink.runtime.scheduler.adaptivebatch; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.BatchExecutionOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint; -import org.apache.flink.core.failure.FailureEnricher; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.blocklist.BlockedNode; import org.apache.flink.runtime.blocklist.BlocklistOperations; -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; -import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.executiongraph.IOMetrics; -import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.executiongraph.SpeculativeExecutionVertex; -import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; -import org.apache.flink.runtime.executiongraph.failover.FailureHandlingResult; -import org.apache.flink.runtime.executiongraph.failover.RestartBackoffTimeStrategy; import org.apache.flink.runtime.io.network.partition.PartitionException; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroup; import org.apache.flink.runtime.metrics.MetricNames; -import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; -import org.apache.flink.runtime.scheduler.ExecutionGraphFactory; -import org.apache.flink.runtime.scheduler.ExecutionOperations; -import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory; -import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; import org.apache.flink.runtime.scheduler.slowtaskdetector.ExecutionTimeBasedSlowTaskDetector; import org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetector; import org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetectorListener; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; -import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory; -import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.concurrent.FutureUtils; -import org.apache.flink.util.concurrent.ScheduledExecutor; import org.slf4j.Logger; @@ -76,8 +56,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.function.Consumer; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -85,9 +66,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; -/** The speculative scheduler. */ -public class SpeculativeScheduler extends AdaptiveBatchScheduler - implements SlowTaskDetectorListener { +/** The default implementation of {@link SpeculativeExecutionHandler}. */ +public class DefaultSpeculativeExecutionHandler + implements SpeculativeExecutionHandler, SlowTaskDetectorListener { private final int maxConcurrentExecutions; @@ -101,70 +82,26 @@ public class SpeculativeScheduler extends AdaptiveBatchScheduler private final Counter numEffectiveSpeculativeExecutionsCounter; - public SpeculativeScheduler( - final Logger log, - final JobGraph jobGraph, - final Executor ioExecutor, - final Configuration jobMasterConfiguration, - final Consumer startUpAction, - final ScheduledExecutor delayExecutor, - final ClassLoader userCodeLoader, - final CheckpointsCleaner checkpointsCleaner, - final CheckpointRecoveryFactory checkpointRecoveryFactory, - final JobManagerJobMetricGroup jobManagerJobMetricGroup, - final SchedulingStrategyFactory schedulingStrategyFactory, - final FailoverStrategy.Factory failoverStrategyFactory, - final RestartBackoffTimeStrategy restartBackoffTimeStrategy, - final ExecutionOperations executionOperations, - final ExecutionVertexVersioner executionVertexVersioner, - final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory, - long initializationTimestamp, - final ComponentMainThreadExecutor mainThreadExecutor, - final JobStatusListener jobStatusListener, - final Collection failureEnrichers, - final ExecutionGraphFactory executionGraphFactory, - final ShuffleMaster shuffleMaster, - final Time rpcTimeout, - final VertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider, - final int defaultMaxParallelism, - final BlocklistOperations blocklistOperations, - final HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint, - final Map forwardGroupsByJobVertexId) - throws Exception { - - super( - log, - jobGraph, - ioExecutor, - jobMasterConfiguration, - startUpAction, - delayExecutor, - userCodeLoader, - checkpointsCleaner, - checkpointRecoveryFactory, - jobManagerJobMetricGroup, - schedulingStrategyFactory, - failoverStrategyFactory, - restartBackoffTimeStrategy, - executionOperations, - executionVertexVersioner, - executionSlotAllocatorFactory, - initializationTimestamp, - mainThreadExecutor, - jobStatusListener, - failureEnrichers, - executionGraphFactory, - shuffleMaster, - rpcTimeout, - vertexParallelismAndInputInfosDecider, - defaultMaxParallelism, - hybridPartitionDataConsumeConstraint, - forwardGroupsByJobVertexId); + private final Function executionVertexRetriever; + private final Supplier> registerExecutionsSupplier; + + private final BiConsumer, Collection> + allocateSlotsAndDeployFunction; + + private final Logger log; + + public DefaultSpeculativeExecutionHandler( + Configuration jobMasterConfiguration, + BlocklistOperations blocklistOperations, + Function executionVertexRetriever, + Supplier> registerExecutionsSupplier, + BiConsumer, Collection> + allocateSlotsAndDeployFunction, + Logger log) { this.maxConcurrentExecutions = jobMasterConfiguration.get( BatchExecutionOptions.SPECULATIVE_MAX_CONCURRENT_EXECUTIONS); - this.blockSlowNodeDuration = jobMasterConfiguration.get(BatchExecutionOptions.BLOCK_SLOW_NODE_DURATION); checkArgument( @@ -172,126 +109,77 @@ public SpeculativeScheduler( "The blocking duration should not be negative."); this.blocklistOperations = checkNotNull(blocklistOperations); - this.slowTaskDetector = new ExecutionTimeBasedSlowTaskDetector(jobMasterConfiguration); - this.numEffectiveSpeculativeExecutionsCounter = new SimpleCounter(); + this.executionVertexRetriever = checkNotNull(executionVertexRetriever); + this.registerExecutionsSupplier = checkNotNull(registerExecutionsSupplier); + this.allocateSlotsAndDeployFunction = checkNotNull(allocateSlotsAndDeployFunction); + this.log = checkNotNull(log); } @Override - protected void startSchedulingInternal() { - registerMetrics(jobManagerJobMetricGroup); - - super.startSchedulingInternal(); - slowTaskDetector.start(getExecutionGraph(), this, getMainThreadExecutor()); - } - - private void registerMetrics(MetricGroup metricGroup) { + public void init( + ExecutionGraph executionGraph, + ComponentMainThreadExecutor mainThreadExecutor, + MetricGroup metricGroup) { metricGroup.gauge(MetricNames.NUM_SLOW_EXECUTION_VERTICES, () -> numSlowExecutionVertices); metricGroup.counter( MetricNames.NUM_EFFECTIVE_SPECULATIVE_EXECUTIONS, numEffectiveSpeculativeExecutionsCounter); - } - @Override - public CompletableFuture closeAsync() { - slowTaskDetector.stop(); - return super.closeAsync(); + slowTaskDetector.start(executionGraph, this, mainThreadExecutor); } @Override - public SpeculativeExecutionVertex getExecutionVertex(ExecutionVertexID executionVertexId) { - return (SpeculativeExecutionVertex) super.getExecutionVertex(executionVertexId); + public void stopSlowTaskDetector() { + slowTaskDetector.stop(); } @Override - protected void onTaskFinished(final Execution execution, final IOMetrics ioMetrics) { + public void notifyTaskFinished( + final Execution execution, + Function> cancelPendingExecutionsFunction) { if (!isOriginalAttempt(execution)) { numEffectiveSpeculativeExecutionsCounter.inc(); } // cancel all un-terminated executions because the execution vertex has finished - FutureUtils.assertNoException(cancelPendingExecutions(execution.getVertex().getID())); - - super.onTaskFinished(execution, ioMetrics); + FutureUtils.assertNoException( + cancelPendingExecutionsFunction.apply(execution.getVertex().getID())); } - private static boolean isOriginalAttempt(final Execution execution) { - return ((SpeculativeExecutionVertex) execution.getVertex()) + private boolean isOriginalAttempt(final Execution execution) { + return getExecutionVertex(execution.getVertex().getID()) .isOriginalAttempt(execution.getAttemptNumber()); } - private CompletableFuture cancelPendingExecutions( - final ExecutionVertexID executionVertexId) { - final List pendingExecutions = - getExecutionVertex(executionVertexId).getCurrentExecutions().stream() - .filter( - e -> - !e.getState().isTerminal() - && e.getState() != ExecutionState.CANCELING) - .collect(Collectors.toList()); - if (pendingExecutions.isEmpty()) { - return CompletableFuture.completedFuture(null); - } - - log.info( - "Canceling {} un-finished executions of {} because one of its executions has finished.", - pendingExecutions.size(), - executionVertexId); - - final CompletableFuture future = - FutureUtils.combineAll( - pendingExecutions.stream() - .map(this::cancelExecution) - .collect(Collectors.toList())); - cancelAllPendingSlotRequestsForVertex(executionVertexId); - return future; - } - @Override - protected void onTaskFailed(final Execution execution) { + public void notifyTaskFailed(final Execution execution) { final SpeculativeExecutionVertex executionVertex = getExecutionVertex(execution.getVertex().getID()); // when an execution fails, remove it from current executions to make room for future // speculative executions executionVertex.archiveFailedExecution(execution.getAttemptId()); - - super.onTaskFailed(execution); } @Override - protected void handleTaskFailure( - final Execution failedExecution, @Nullable final Throwable error) { - + public boolean handleTaskFailure( + final Execution failedExecution, + @Nullable final Throwable error, + BiConsumer handleLocalExecutionAttemptFailure) { final SpeculativeExecutionVertex executionVertex = getExecutionVertex(failedExecution.getVertex().getID()); - // if the execution vertex is not possible finish or a PartitionException occurred, trigger - // an execution vertex failover to recover + // if the execution vertex is not possible finish or a PartitionException occurred, + // trigger an execution vertex failover to recover if (!isExecutionVertexPossibleToFinish(executionVertex) || ExceptionUtils.findThrowable(error, PartitionException.class).isPresent()) { - super.handleTaskFailure(failedExecution, error); + return false; } else { // this is just a local failure and the execution vertex will not be fully restarted - handleLocalExecutionAttemptFailure(failedExecution, error); - } - } - - private void handleLocalExecutionAttemptFailure( - final Execution failedExecution, @Nullable final Throwable error) { - executionSlotAllocator.cancel(failedExecution.getAttemptId()); - - final FailureHandlingResult failureHandlingResult = - recordTaskFailure(failedExecution, error); - if (failureHandlingResult.canRestart()) { - archiveFromFailureHandlingResult( - createFailureHandlingResultSnapshot(failureHandlingResult)); - } else { - failJob( - error, - failureHandlingResult.getTimestamp(), - failureHandlingResult.getFailureLabels()); + handleLocalExecutionAttemptFailure.accept(failedExecution, error); + return true; } } @@ -314,17 +202,6 @@ private static boolean isExecutionVertexPossibleToFinish( return anyExecutionPossibleToFinish; } - @Override - protected void resetForNewExecution(final ExecutionVertexID executionVertexId) { - final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId); - final Execution execution = executionVertex.getCurrentExecutionAttempt(); - if (execution.getState() == ExecutionState.FINISHED && !isOriginalAttempt(execution)) { - numEffectiveSpeculativeExecutionsCounter.dec(); - } - - super.resetForNewExecution(executionVertexId); - } - @Override public void notifySlowTasks(Map> slowTasks) { final long currentTimestamp = System.currentTimeMillis(); @@ -367,9 +244,7 @@ public void notifySlowTasks(Map getSlowNodeIds( slowTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()); return slowExecutions.stream() - .map(id -> getExecutionGraph().getRegisteredExecutions().get(id)) + .map(id -> registerExecutionsSupplier.get().get(id)) .map( e -> { checkNotNull( @@ -410,6 +285,11 @@ private Set getSlowNodeIds( .collect(Collectors.toSet()); } + private SpeculativeExecutionVertex getExecutionVertex( + final ExecutionVertexID executionVertexId) { + return (SpeculativeExecutionVertex) executionVertexRetriever.apply(executionVertexId); + } + private void setupSubtaskGatewayForAttempts( final SpeculativeExecutionVertex executionVertex, final Collection attempts) { @@ -426,6 +306,15 @@ private void setupSubtaskGatewayForAttempts( executionVertex.getParallelSubtaskIndex(), attemptNumbers)); } + @Override + public void resetForNewExecution(final ExecutionVertexID executionVertexId) { + final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId); + final Execution execution = executionVertex.getCurrentExecutionAttempt(); + if (execution.getState() == ExecutionState.FINISHED && !isOriginalAttempt(execution)) { + numEffectiveSpeculativeExecutionsCounter.dec(); + } + } + @VisibleForTesting long getNumSlowExecutionVertices() { return numSlowExecutionVertices; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummySpeculativeExecutionHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummySpeculativeExecutionHandler.java new file mode 100644 index 0000000000000..80cc4b92ea78d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummySpeculativeExecutionHandler.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import java.util.function.Function; + +/** The dummy implementation of {@link SpeculativeExecutionHandler}. */ +public class DummySpeculativeExecutionHandler implements SpeculativeExecutionHandler { + @Override + public void init( + ExecutionGraph executionGraph, + ComponentMainThreadExecutor mainThreadExecutor, + MetricGroup metricGroup) { + // do nothing + } + + @Override + public void stopSlowTaskDetector() { + // do nothing + } + + @Override + public void notifyTaskFinished( + Execution execution, + Function> cancelPendingExecutionsFunction) { + // do nothing + } + + @Override + public void notifyTaskFailed(Execution execution) { + // do nothing + } + + @Override + public boolean handleTaskFailure( + Execution failedExecution, + @Nullable Throwable error, + BiConsumer handleLocalExecutionAttemptFailure) { + return false; + } + + @Override + public void resetForNewExecution(ExecutionVertexID executionVertexId) { + // do nothing + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeExecutionHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeExecutionHandler.java new file mode 100644 index 0000000000000..7444b970935f8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeExecutionHandler.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import javax.annotation.Nullable; + +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import java.util.function.Function; + +/** Interface for managing speculative execution of tasks and handling slow task detection. */ +public interface SpeculativeExecutionHandler { + + /** Initial speculative execution handler. */ + void init( + ExecutionGraph executionGraph, + ComponentMainThreadExecutor mainThreadExecutor, + MetricGroup metricGroup); + + /** Stops the slow task detector. */ + void stopSlowTaskDetector(); + + /** + * Notifies that a task has finished its execution. + * + * @param execution the execution that has finished + * @param cancelPendingExecutionsFunction the function to cancel pending executions + */ + void notifyTaskFinished( + Execution execution, + Function> cancelPendingExecutionsFunction); + + /** + * Notifies that a task has failed its execution. + * + * @param execution the execution that has failed + */ + void notifyTaskFailed(Execution execution); + + /** + * Handles a task failure. + * + * @param failedExecution the execution that failed + * @param error the error that caused the failure, if available + * @param handleLocalExecutionAttemptFailure a consumer that handles local execution attempt + * failure + * @return true if the failure was handled as a local failure, false otherwise + */ + boolean handleTaskFailure( + Execution failedExecution, + @Nullable Throwable error, + BiConsumer handleLocalExecutionAttemptFailure); + + /** + * Resets the state of the component for a new execution of a specific execution vertex. + * + * @param executionVertexId the ID of the execution vertex to reset + */ + void resetForNewExecution(ExecutionVertexID executionVertexId); +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java index 7df0a95bc9b2a..1183fe327be66 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.BatchExecutionOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint; import org.apache.flink.core.failure.FailureEnricher; import org.apache.flink.runtime.blob.BlobWriter; @@ -33,7 +34,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.executiongraph.ParallelismAndInputInfos; -import org.apache.flink.runtime.executiongraph.SpeculativeExecutionJobVertex; import org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils; import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; import org.apache.flink.runtime.executiongraph.failover.NoRestartBackoffTimeStrategy; @@ -43,19 +43,17 @@ import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroupComputeUtil; import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler; +import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory; import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo; -import org.apache.flink.runtime.scheduler.adaptivebatch.SpeculativeScheduler; import org.apache.flink.runtime.scheduler.adaptivebatch.VertexParallelismAndInputInfosDecider; import org.apache.flink.runtime.scheduler.strategy.AllFinishedInputConsumableDecider; import org.apache.flink.runtime.scheduler.strategy.InputConsumableDecider; import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy; import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory; -import org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.shuffle.ShuffleTestUtils; import org.apache.flink.util.concurrent.ScheduledExecutor; @@ -72,6 +70,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.function.Function; +import static org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint.ALL_PRODUCERS_FINISHED; import static org.apache.flink.runtime.scheduler.SchedulerBase.computeVertexParallelismStore; /** A builder to create {@link DefaultScheduler} or its subclass instances for testing. */ @@ -322,68 +321,51 @@ public DefaultScheduler build() throws Exception { } public AdaptiveBatchScheduler buildAdaptiveBatchJobScheduler() throws Exception { - return new AdaptiveBatchScheduler( + return buildAdaptiveBatchJobScheduler(false); + } + + public AdaptiveBatchScheduler buildAdaptiveBatchJobScheduler(boolean enableSpeculativeExecution) + throws Exception { + jobMasterConfiguration.set( + BatchExecutionOptions.SPECULATIVE_ENABLED, enableSpeculativeExecution); + jobMasterConfiguration.set( + BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM, + defaultMaxParallelism); + if (enableSpeculativeExecution) { + jobMasterConfiguration.set( + JobManagerOptions.HYBRID_PARTITION_DATA_CONSUME_CONSTRAINT, + ALL_PRODUCERS_FINISHED); + } else { + jobMasterConfiguration.set( + JobManagerOptions.HYBRID_PARTITION_DATA_CONSUME_CONSTRAINT, + hybridPartitionDataConsumeConstraint); + } + + return AdaptiveBatchSchedulerFactory.createScheduler( log, jobGraph, + jobGraph.getSerializedExecutionConfig().deserializeValue(userCodeLoader), ioExecutor, jobMasterConfiguration, - componentMainThreadExecutor -> {}, - delayExecutor, + futureExecutor, userCodeLoader, - checkpointCleaner, checkpointRecoveryFactory, - jobManagerJobMetricGroup, - new VertexwiseSchedulingStrategy.Factory(inputConsumableDeciderFactory), - failoverStrategyFactory, - restartBackoffTimeStrategy, - executionOperations, - executionVertexVersioner, - executionSlotAllocatorFactory, - System.currentTimeMillis(), - mainThreadExecutor, - jobStatusListener, - failureEnrichers, - createExecutionGraphFactory(true), - shuffleMaster, rpcTimeout, - vertexParallelismAndInputInfosDecider, - defaultMaxParallelism, - hybridPartitionDataConsumeConstraint, - ForwardGroupComputeUtil.computeForwardGroupsAndCheckParallelism( - jobGraph.getVerticesSortedTopologicallyFromSources())); - } - - public SpeculativeScheduler buildSpeculativeScheduler() throws Exception { - return new SpeculativeScheduler( - log, - jobGraph, - ioExecutor, - jobMasterConfiguration, - componentMainThreadExecutor -> {}, - delayExecutor, - userCodeLoader, - checkpointCleaner, - checkpointRecoveryFactory, + blobWriter, jobManagerJobMetricGroup, - new VertexwiseSchedulingStrategy.Factory(inputConsumableDeciderFactory), - failoverStrategyFactory, - restartBackoffTimeStrategy, - executionOperations, - executionVertexVersioner, - executionSlotAllocatorFactory, + shuffleMaster, + partitionTracker, + new DefaultExecutionDeploymentTracker(), System.currentTimeMillis(), mainThreadExecutor, jobStatusListener, failureEnrichers, - createExecutionGraphFactory(true, new SpeculativeExecutionJobVertex.Factory()), - shuffleMaster, - rpcTimeout, - vertexParallelismAndInputInfosDecider, - defaultMaxParallelism, blocklistOperations, - HybridPartitionDataConsumeConstraint.ALL_PRODUCERS_FINISHED, - ForwardGroupComputeUtil.computeForwardGroupsAndCheckParallelism( - jobGraph.getVerticesSortedTopologicallyFromSources())); + executionOperations, + executionSlotAllocatorFactory, + restartBackoffTimeStrategy, + delayExecutor, + vertexParallelismAndInputInfosDecider); } private ExecutionGraphFactory createExecutionGraphFactory(boolean isDynamicGraph) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java index 03cf41c12a5a7..1887607c27d54 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java @@ -2044,6 +2044,17 @@ public static JobGraph singleNonParallelJobVertexJobGraph() { return singleJobVertexJobGraph(1); } + public static JobGraph singleNonParallelJobVertexJobGraphForBatch() { + return singleJobVertexJobGraphForBatch(1); + } + + private static JobGraph singleJobVertexJobGraphForBatch(final int parallelism) { + final JobVertex vertex = new JobVertex("source"); + vertex.setInvokableClass(NoOpInvokable.class); + vertex.setParallelism(parallelism); + return JobGraphTestUtils.batchJobGraph(vertex); + } + private static JobGraph singleJobVertexJobGraph(final int parallelism) { final JobVertex vertex = new JobVertex("source"); vertex.setInvokableClass(NoOpInvokable.class); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeExecutionTest.java similarity index 85% rename from flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeExecutionTest.java index 1bde7878c4e9e..5c2664a33a1f5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeExecutionTest.java @@ -47,6 +47,7 @@ import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocator; import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocatorFactory; import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.testutils.TestingUtils; @@ -69,6 +70,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -78,15 +80,15 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex; import static org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder.createCustomParallelismDecider; -import static org.apache.flink.runtime.scheduler.DefaultSchedulerTest.singleNonParallelJobVertexJobGraph; +import static org.apache.flink.runtime.scheduler.DefaultSchedulerTest.singleNonParallelJobVertexJobGraphForBatch; import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.createCanceledTaskExecutionState; import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.createFailedTaskExecutionState; import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.createFinishedTaskExecutionState; import static org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerTest.createResultPartitionBytesForExecution; import static org.assertj.core.api.Assertions.assertThat; -/** Tests for {@link SpeculativeScheduler}. */ -class SpeculativeSchedulerTest { +/** Tests for {@link AdaptiveBatchScheduler} with speculative execution enabled. */ +class SpeculativeExecutionTest { @RegisterExtension private static final TestExecutorExtension EXECUTOR_RESOURCE = @@ -132,7 +134,7 @@ void testStartScheduling() { @Test void testNotifySlowTasks() { - final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final AdaptiveBatchScheduler scheduler = createSchedulerAndStartScheduling(); final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); final Execution attempt1 = ev.getCurrentExecutionAttempt(); @@ -153,7 +155,7 @@ void testNotifySlowTasks() { @Test void testNotifyDuplicatedSlowTasks() { - final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final AdaptiveBatchScheduler scheduler = createSchedulerAndStartScheduling(); final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); final Execution attempt1 = ev.getCurrentExecutionAttempt(); @@ -178,7 +180,7 @@ void testNotifyDuplicatedSlowTasks() { @Test void testRestartVertexIfAllSpeculativeExecutionFailed() { - final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final AdaptiveBatchScheduler scheduler = createSchedulerAndStartScheduling(); final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); final Execution attempt1 = ev.getCurrentExecutionAttempt(); @@ -198,7 +200,7 @@ void testRestartVertexIfAllSpeculativeExecutionFailed() { @Test void testNoRestartIfNotAllSpeculativeExecutionFailed() { - final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final AdaptiveBatchScheduler scheduler = createSchedulerAndStartScheduling(); final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); final Execution attempt1 = ev.getCurrentExecutionAttempt(); @@ -211,7 +213,7 @@ void testNoRestartIfNotAllSpeculativeExecutionFailed() { @Test void testRestartVertexIfPartitionExceptionHappened() { - final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final AdaptiveBatchScheduler scheduler = createSchedulerAndStartScheduling(); final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); final Execution attempt1 = ev.getCurrentExecutionAttempt(); @@ -232,7 +234,7 @@ void testRestartVertexIfPartitionExceptionHappened() { @Test void testCancelOtherDeployedCurrentExecutionsWhenAnyExecutionFinished() { - final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final AdaptiveBatchScheduler scheduler = createSchedulerAndStartScheduling(); final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); final Execution attempt1 = ev.getCurrentExecutionAttempt(); @@ -248,7 +250,7 @@ void testCancelOtherDeployedCurrentExecutionsWhenAnyExecutionFinished() { void testCancelOtherScheduledCurrentExecutionsWhenAnyExecutionFinished() { testExecutionSlotAllocator.disableAutoCompletePendingRequests(); - final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final AdaptiveBatchScheduler scheduler = createSchedulerAndStartScheduling(); final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); final Execution attempt1 = ev.getCurrentExecutionAttempt(); @@ -263,7 +265,7 @@ void testCancelOtherScheduledCurrentExecutionsWhenAnyExecutionFinished() { @Test void testExceptionHistoryIfPartitionExceptionHappened() { - final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final AdaptiveBatchScheduler scheduler = createSchedulerAndStartScheduling(); final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); final Execution attempt1 = ev.getCurrentExecutionAttempt(); @@ -288,7 +290,7 @@ void testExceptionHistoryIfPartitionExceptionHappened() { @Test void testLocalExecutionAttemptFailureIsCorrectlyRecorded() { - final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final AdaptiveBatchScheduler scheduler = createSchedulerAndStartScheduling(); final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); final Execution attempt1 = ev.getCurrentExecutionAttempt(); @@ -300,7 +302,7 @@ void testLocalExecutionAttemptFailureIsCorrectlyRecorded() { createFailedTaskExecutionState(attempt1.getAttemptId()); scheduler.updateTaskExecutionState(failedState); - final ClassLoader classLoader = SpeculativeSchedulerTest.class.getClassLoader(); + final ClassLoader classLoader = this.getClass().getClassLoader(); assertThat(scheduler.getExecutionGraph().getFailureInfo()).isNotNull(); assertThat(scheduler.getExecutionGraph().getFailureInfo().getExceptionAsString()) .contains(failedState.getError(classLoader).getMessage()); @@ -313,7 +315,7 @@ void testLocalExecutionAttemptFailureIsCorrectlyRecorded() { @Test void testUnrecoverableLocalExecutionAttemptFailureWillFailJob() { - final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final AdaptiveBatchScheduler scheduler = createSchedulerAndStartScheduling(); final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); final Execution attempt1 = ev.getCurrentExecutionAttempt(); @@ -333,7 +335,7 @@ void testUnrecoverableLocalExecutionAttemptFailureWillFailJob() { void testLocalExecutionAttemptFailureAndForbiddenRestartWillFailJob() { restartStrategy.setCanRestart(false); - final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final AdaptiveBatchScheduler scheduler = createSchedulerAndStartScheduling(); final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); final Execution attempt1 = ev.getCurrentExecutionAttempt(); @@ -364,10 +366,10 @@ void testSpeculativeExecutionCombinedWithAdaptiveScheduling( final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread(); - final SpeculativeScheduler scheduler = + final AdaptiveBatchScheduler scheduler = createSchedulerBuilder(jobGraph, mainThreadExecutor) .setVertexParallelismAndInputInfosDecider(createCustomParallelismDecider(3)) - .buildSpeculativeScheduler(); + .buildAdaptiveBatchJobScheduler(true); mainThreadExecutor.execute(scheduler::startScheduling); final DefaultExecutionGraph graph = (DefaultExecutionGraph) scheduler.getExecutionGraph(); @@ -401,25 +403,25 @@ void testSpeculativeExecutionCombinedWithAdaptiveScheduling( @Test void testNumSlowExecutionVerticesMetric() { - final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final AdaptiveBatchScheduler scheduler = createSchedulerAndStartScheduling(); final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); final Execution attempt1 = ev.getCurrentExecutionAttempt(); notifySlowTask(scheduler, attempt1); - assertThat(scheduler.getNumSlowExecutionVertices()).isEqualTo(1); + assertThat(getNumSlowExecutionVertices(scheduler)).isEqualTo(1); // notify a slow vertex twice notifySlowTask(scheduler, attempt1); - assertThat(scheduler.getNumSlowExecutionVertices()).isEqualTo(1); + assertThat(getNumSlowExecutionVertices(scheduler)).isEqualTo(1); // vertex no longer slow - scheduler.notifySlowTasks(Collections.emptyMap()); - assertThat(scheduler.getNumSlowExecutionVertices()).isZero(); + notifySlowTask(scheduler, Collections.emptyMap()); + assertThat(getNumSlowExecutionVertices(scheduler)).isZero(); } @Test void testEffectiveSpeculativeExecutionsMetric() { - final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final AdaptiveBatchScheduler scheduler = createSchedulerAndStartScheduling(); final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); final Execution attempt1 = ev.getCurrentExecutionAttempt(); @@ -430,7 +432,7 @@ void testEffectiveSpeculativeExecutionsMetric() { final Execution attempt2 = getExecution(ev, 1); scheduler.updateTaskExecutionState( createFinishedTaskExecutionState(attempt2.getAttemptId())); - assertThat(scheduler.getNumEffectiveSpeculativeExecutions()).isEqualTo(1); + assertThat(getNumEffectiveSpeculativeExecutions(scheduler)).isEqualTo(1); // complete cancellation scheduler.updateTaskExecutionState( @@ -441,7 +443,7 @@ void testEffectiveSpeculativeExecutionsMetric() { // numEffectiveSpeculativeExecutions will be decreased accordingly. scheduler.handleGlobalFailure(new Exception()); taskRestartExecutor.triggerScheduledTasks(); - assertThat(scheduler.getNumEffectiveSpeculativeExecutions()).isZero(); + assertThat(getNumEffectiveSpeculativeExecutions(scheduler)).isZero(); final Execution attempt3 = getExecution(ev, 2); notifySlowTask(scheduler, attempt3); @@ -450,7 +452,7 @@ void testEffectiveSpeculativeExecutionsMetric() { // finishes first scheduler.updateTaskExecutionState( createFinishedTaskExecutionState(attempt3.getAttemptId())); - assertThat(scheduler.getNumEffectiveSpeculativeExecutions()).isZero(); + assertThat(getNumEffectiveSpeculativeExecutions(scheduler)).isZero(); } private static Execution getExecution(ExecutionVertex executionVertex, int attemptNumber) { @@ -460,20 +462,20 @@ private static Execution getExecution(ExecutionVertex executionVertex, int attem .get(); } - private static ExecutionVertex getOnlyExecutionVertex(SpeculativeScheduler scheduler) { + private static ExecutionVertex getOnlyExecutionVertex(AdaptiveBatchScheduler scheduler) { return Iterables.getOnlyElement(scheduler.getExecutionGraph().getAllExecutionVertices()); } - private SpeculativeScheduler createSchedulerAndStartScheduling() { - return createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph()); + private AdaptiveBatchScheduler createSchedulerAndStartScheduling() { + return createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraphForBatch()); } - private SpeculativeScheduler createSchedulerAndStartScheduling(final JobGraph jobGraph) { + private AdaptiveBatchScheduler createSchedulerAndStartScheduling(final JobGraph jobGraph) { final ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread(); try { - final SpeculativeScheduler scheduler = createScheduler(jobGraph, mainThreadExecutor); + final AdaptiveBatchScheduler scheduler = createScheduler(jobGraph, mainThreadExecutor); mainThreadExecutor.execute(scheduler::startScheduling); return scheduler; } catch (Exception e) { @@ -481,10 +483,11 @@ private SpeculativeScheduler createSchedulerAndStartScheduling(final JobGraph jo } } - private SpeculativeScheduler createScheduler( + private AdaptiveBatchScheduler createScheduler( final JobGraph jobGraph, final ComponentMainThreadExecutor mainThreadExecutor) throws Exception { - return createSchedulerBuilder(jobGraph, mainThreadExecutor).buildSpeculativeScheduler(); + return createSchedulerBuilder(jobGraph, mainThreadExecutor) + .buildAdaptiveBatchJobScheduler(true); } private DefaultSchedulerBuilder createSchedulerBuilder( @@ -505,11 +508,29 @@ private DefaultSchedulerBuilder createSchedulerBuilder( } private static void notifySlowTask( - final SpeculativeScheduler scheduler, final Execution slowTask) { - scheduler.notifySlowTasks( - ImmutableMap.of( - slowTask.getVertex().getID(), - Collections.singleton(slowTask.getAttemptId()))); + final AdaptiveBatchScheduler scheduler, final Execution slowTask) { + ((DefaultSpeculativeExecutionHandler) scheduler.getSpeculativeExecutionHandler()) + .notifySlowTasks( + ImmutableMap.of( + slowTask.getVertex().getID(), + Collections.singleton(slowTask.getAttemptId()))); + } + + private static void notifySlowTask( + final AdaptiveBatchScheduler scheduler, + final Map> slowTasks) { + ((DefaultSpeculativeExecutionHandler) scheduler.getSpeculativeExecutionHandler()) + .notifySlowTasks(slowTasks); + } + + private long getNumSlowExecutionVertices(AdaptiveBatchScheduler scheduler) { + return ((DefaultSpeculativeExecutionHandler) scheduler.getSpeculativeExecutionHandler()) + .getNumSlowExecutionVertices(); + } + + private long getNumEffectiveSpeculativeExecutions(AdaptiveBatchScheduler scheduler) { + return ((DefaultSpeculativeExecutionHandler) scheduler.getSpeculativeExecutionHandler()) + .getNumEffectiveSpeculativeExecutions(); } private static class TestBlocklistOperations implements BlocklistOperations { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java index aa506f729b911..62113cf9e937c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java @@ -214,7 +214,7 @@ void testFinishedTaskExceedRatioInDynamicGraph() throws Exception { final JobVertex jobVertex2 = new JobVertex("vertex2"); jobVertex2.setInvokableClass(NoOpInvokable.class); jobVertex2.connectNewDataSetAsInput( - jobVertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + jobVertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); final ExecutionGraph executionGraph = createDynamicExecutionGraph(jobVertex1, jobVertex2); final ExecutionTimeBasedSlowTaskDetector slowTaskDetector = @@ -431,7 +431,7 @@ private ExecutionGraph createExecutionGraph(JobVertex... jobVertices) throws Exc } private ExecutionGraph createDynamicExecutionGraph(JobVertex... jobVertices) throws Exception { - final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertices); + final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertices); final SchedulerBase scheduler = new DefaultSchedulerBuilder( diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeExecutionITCase.java similarity index 99% rename from flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java rename to flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeExecutionITCase.java index 12fcc9d7f0c4a..9f3df4502cb3c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeExecutionITCase.java @@ -56,7 +56,6 @@ import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.core.io.InputStatus; import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.runtime.scheduler.adaptivebatch.SpeculativeScheduler; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -93,8 +92,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** IT cases for {@link SpeculativeScheduler}. */ -class SpeculativeSchedulerITCase { +/** + * IT cases for {@link org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler} with + * speculative execution enabled. + */ +class SpeculativeExecutionITCase { @TempDir private Path temporaryFolder; private static final int MAX_PARALLELISM = 4;