From e5c4e3f519f364b5951e7cac331eb8af48f0ed84 Mon Sep 17 00:00:00 2001 From: Zhu Zhu Date: Wed, 6 Jul 2022 15:11:45 +0800 Subject: [PATCH] [FLINK-28392][runtime] DefaultExecutionDeployer avoid retrieving executions from ExecutionGraph#currentExecutions This closes #20178. --- .../scheduler/DefaultExecutionDeployer.java | 88 ++++++++----------- .../runtime/scheduler/DefaultScheduler.java | 1 - .../runtime/scheduler/ExecutionDeployer.java | 4 - .../DefaultExecutionDeployerTest.java | 25 +++--- 4 files changed, 49 insertions(+), 69 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployer.java index e5b7d18b2092d..a12d0cba06c40 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployer.java @@ -39,7 +39,6 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; @@ -65,8 +64,6 @@ public class DefaultExecutionDeployer implements ExecutionDeployer { private final Time partitionRegistrationTimeout; - private final Function executionRetriever; - private final BiConsumer allocationReservationFunc; private final ComponentMainThreadExecutor mainThreadExecutor; @@ -77,7 +74,6 @@ private DefaultExecutionDeployer( final ExecutionOperations executionOperations, final ExecutionVertexVersioner executionVertexVersioner, final Time partitionRegistrationTimeout, - final Function executionRetriever, final BiConsumer allocationReservationFunc, final ComponentMainThreadExecutor mainThreadExecutor) { @@ -86,7 +82,6 @@ private DefaultExecutionDeployer( this.executionOperations = checkNotNull(executionOperations); this.executionVertexVersioner = checkNotNull(executionVertexVersioner); this.partitionRegistrationTimeout = checkNotNull(partitionRegistrationTimeout); - this.executionRetriever = checkNotNull(executionRetriever); this.allocationReservationFunc = checkNotNull(allocationReservationFunc); this.mainThreadExecutor = checkNotNull(mainThreadExecutor); } @@ -103,7 +98,8 @@ public void allocateSlotsAndDeploy( allocateSlotsFor(executionsToDeploy); final List deploymentHandles = - createDeploymentHandles(requiredVersionByVertex, executionSlotAssignments); + createDeploymentHandles( + executionsToDeploy, requiredVersionByVertex, executionSlotAssignments); waitForAllSlotsAndDeploy(deploymentHandles); } @@ -132,22 +128,25 @@ private List allocateSlotsFor( } private List createDeploymentHandles( + final List executionsToDeploy, final Map requiredVersionByVertex, final List executionSlotAssignments) { - return executionSlotAssignments.stream() - .map( - executionSlotAssignment -> { - final Execution execution = - getExecutionOrThrow( - executionSlotAssignment.getExecutionAttemptId()); - final ExecutionVertexID executionVertexId = - execution.getVertex().getID(); - return new ExecutionDeploymentHandle( - executionSlotAssignment, - requiredVersionByVertex.get(executionVertexId)); - }) - .collect(Collectors.toList()); + final List deploymentHandles = + new ArrayList<>(executionsToDeploy.size()); + for (int i = 0; i < executionsToDeploy.size(); i++) { + final Execution execution = executionsToDeploy.get(i); + final ExecutionSlotAssignment assignment = executionSlotAssignments.get(i); + checkState(execution.getAttemptId().equals(assignment.getExecutionAttemptId())); + + final ExecutionVertexID executionVertexId = execution.getVertex().getID(); + final ExecutionDeploymentHandle deploymentHandle = + new ExecutionDeploymentHandle( + execution, assignment, requiredVersionByVertex.get(executionVertexId)); + deploymentHandles.add(deploymentHandle); + } + + return deploymentHandles; } private void waitForAllSlotsAndDeploy(final List deploymentHandles) { @@ -169,8 +168,7 @@ private CompletableFuture assignAllResourcesAndRegisterProducedPartitions( (ignore, throwable) -> { if (throwable != null) { handleTaskDeploymentFailure( - deploymentHandle.getExecutionAttemptId(), - throwable); + deploymentHandle.getExecution(), throwable); } return null; }); @@ -208,11 +206,9 @@ private BiFunction assignResource( return (logicalSlot, throwable) -> { final ExecutionVertexVersion requiredVertexVersion = deploymentHandle.getRequiredVertexVersion(); - final Optional optionalExecution = - getExecution(deploymentHandle.getExecutionAttemptId()); + final Execution execution = deploymentHandle.getExecution(); - if (!optionalExecution.isPresent() - || optionalExecution.get().getState() != ExecutionState.SCHEDULED + if (execution.getState() != ExecutionState.SCHEDULED || executionVertexVersioner.isModified(requiredVertexVersion)) { if (throwable == null) { log.debug( @@ -231,7 +227,6 @@ private BiFunction assignResource( throw new CompletionException(maybeWrapWithNoResourceAvailableException(throwable)); } - final Execution execution = optionalExecution.get(); if (!execution.tryAssignResource(logicalSlot)) { throw new IllegalStateException( "Could not assign resource " @@ -277,8 +272,7 @@ private Function> registerProducedPartition // a null logicalSlot means the slot assignment is skipped, in which case // the produced partition registration process can be skipped as well if (logicalSlot != null) { - final Execution execution = - getExecutionOrThrow(deploymentHandle.getExecutionAttemptId()); + final Execution execution = deploymentHandle.getExecution(); final CompletableFuture partitionRegistrationFuture = execution.registerProducedPartitions(logicalSlot.getTaskManagerLocation()); @@ -299,11 +293,9 @@ private BiFunction deployOrHandleError( return (ignored, throwable) -> { final ExecutionVertexVersion requiredVertexVersion = deploymentHandle.getRequiredVertexVersion(); - final Optional optionalExecution = - getExecution(deploymentHandle.getExecutionAttemptId()); + final Execution execution = deploymentHandle.getExecution(); - if (!optionalExecution.isPresent() - || optionalExecution.get().getState() != ExecutionState.SCHEDULED + if (execution.getState() != ExecutionState.SCHEDULED || executionVertexVersioner.isModified(requiredVertexVersion)) { if (throwable == null) { log.debug( @@ -314,11 +306,10 @@ private BiFunction deployOrHandleError( return null; } - final Execution execution = optionalExecution.get(); if (throwable == null) { deployTaskSafe(execution); } else { - handleTaskDeploymentFailure(execution.getAttemptId(), throwable); + handleTaskDeploymentFailure(execution, throwable); } return null; }; @@ -328,40 +319,37 @@ private void deployTaskSafe(final Execution execution) { try { executionOperations.deploy(execution); } catch (Throwable e) { - handleTaskDeploymentFailure(execution.getAttemptId(), e); + handleTaskDeploymentFailure(execution, e); } } - private void handleTaskDeploymentFailure( - final ExecutionAttemptID executionAttemptId, final Throwable error) { - - final Execution execution = getExecutionOrThrow(executionAttemptId); + private void handleTaskDeploymentFailure(final Execution execution, final Throwable error) { executionOperations.markFailed(execution, error); } - private Execution getExecutionOrThrow(ExecutionAttemptID executionAttemptId) { - return getExecution(executionAttemptId).get(); - } - - private Optional getExecution(ExecutionAttemptID executionAttemptId) { - return Optional.ofNullable(executionRetriever.apply(executionAttemptId)); - } - private static class ExecutionDeploymentHandle { + private final Execution execution; + private final ExecutionSlotAssignment executionSlotAssignment; private final ExecutionVertexVersion requiredVertexVersion; ExecutionDeploymentHandle( - ExecutionSlotAssignment executionSlotAssignment, + final Execution execution, + final ExecutionSlotAssignment executionSlotAssignment, final ExecutionVertexVersion requiredVertexVersion) { + this.execution = checkNotNull(execution); this.executionSlotAssignment = checkNotNull(executionSlotAssignment); this.requiredVertexVersion = checkNotNull(requiredVertexVersion); } + Execution getExecution() { + return execution; + } + ExecutionAttemptID getExecutionAttemptId() { - return executionSlotAssignment.getExecutionAttemptId(); + return execution.getAttemptId(); } CompletableFuture getLogicalSlotFuture() { @@ -383,7 +371,6 @@ public DefaultExecutionDeployer createInstance( ExecutionOperations executionOperations, ExecutionVertexVersioner executionVertexVersioner, Time partitionRegistrationTimeout, - Function executionRetriever, BiConsumer allocationReservationFunc, ComponentMainThreadExecutor mainThreadExecutor) { return new DefaultExecutionDeployer( @@ -392,7 +379,6 @@ public DefaultExecutionDeployer createInstance( executionOperations, executionVertexVersioner, partitionRegistrationTimeout, - executionRetriever, allocationReservationFunc, mainThreadExecutor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java index 081cfa26987bf..6cf1cbe628e7c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java @@ -189,7 +189,6 @@ protected DefaultScheduler( executionOperations, executionVertexVersioner, rpcTimeout, - id -> getExecutionGraph().getRegisteredExecutions().get(id), this::startReserveAllocation, mainThreadExecutor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java index 4fc979e80ba6f..adb5ff76c8749 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java @@ -23,7 +23,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.executiongraph.Execution; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.slf4j.Logger; @@ -31,7 +30,6 @@ import java.util.List; import java.util.Map; import java.util.function.BiConsumer; -import java.util.function.Function; /** This deployer is responsible for deploying executions. */ interface ExecutionDeployer { @@ -63,7 +61,6 @@ interface Factory { * @param executionVertexVersioner the versioner which records the versions of execution * vertices. * @param partitionRegistrationTimeout timeout of partition registration - * @param executionRetriever retriever to get executions * @param allocationReservationFunc function to reserve allocations for local recovery * @param mainThreadExecutor the main thread executor * @return an instantiated {@link ExecutionDeployer} @@ -74,7 +71,6 @@ ExecutionDeployer createInstance( final ExecutionOperations executionOperations, final ExecutionVertexVersioner executionVertexVersioner, final Time partitionRegistrationTimeout, - final Function executionRetriever, final BiConsumer allocationReservationFunc, final ComponentMainThreadExecutor mainThreadExecutor); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java index 3525fea6eae8e..ce86ffd695fb7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.java @@ -108,7 +108,7 @@ void tearDown() { void testDeployTasks() throws Exception { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); final ExecutionGraph executionGraph = createExecutionGraph(jobGraph); - final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph); + final ExecutionDeployer executionDeployer = createExecutionDeployer(); deployTasks(executionDeployer, executionGraph); @@ -120,7 +120,7 @@ void testDeployTasks() throws Exception { void testDeployTasksOnlyIfAllSlotRequestsAreFulfilled() throws Exception { final JobGraph jobGraph = singleJobVertexJobGraph(4); final ExecutionGraph executionGraph = createExecutionGraph(jobGraph); - final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph); + final ExecutionDeployer executionDeployer = createExecutionDeployer(); testExecutionSlotAllocator.disableAutoCompletePendingRequests(); @@ -143,7 +143,7 @@ void testDeploymentFailures() throws Exception { testExecutionOperations.enableFailDeploy(); final ExecutionGraph executionGraph = createExecutionGraph(jobGraph); - final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph); + final ExecutionDeployer executionDeployer = createExecutionDeployer(); deployTasks(executionDeployer, executionGraph); assertThat(testExecutionOperations.getFailedExecutions()) @@ -157,7 +157,7 @@ void testSlotAllocationTimeout() throws Exception { testExecutionSlotAllocator.disableAutoCompletePendingRequests(); final ExecutionGraph executionGraph = createExecutionGraph(jobGraph); - final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph); + final ExecutionDeployer executionDeployer = createExecutionDeployer(); deployTasks(executionDeployer, executionGraph); assertThat(testExecutionSlotAllocator.getPendingRequests()).hasSize(2); @@ -175,7 +175,7 @@ void testSkipDeploymentIfVertexVersionOutdated() throws Exception { testExecutionSlotAllocator.disableAutoCompletePendingRequests(); final ExecutionGraph executionGraph = createExecutionGraph(jobGraph); - final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph); + final ExecutionDeployer executionDeployer = createExecutionDeployer(); deployTasks(executionDeployer, executionGraph); final ExecutionAttemptID attemptId = getAnyExecution(executionGraph).getAttemptId(); @@ -193,7 +193,7 @@ void testReleaseSlotIfVertexVersionOutdated() throws Exception { testExecutionSlotAllocator.disableAutoCompletePendingRequests(); final ExecutionGraph executionGraph = createExecutionGraph(jobGraph); - final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph); + final ExecutionDeployer executionDeployer = createExecutionDeployer(); deployTasks(executionDeployer, executionGraph); final ExecutionAttemptID attemptId = getAnyExecution(executionGraph).getAttemptId(); @@ -208,7 +208,7 @@ void testReleaseSlotIfVertexVersionOutdated() throws Exception { void testDeployOnlyIfVertexIsCreated() throws Exception { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); final ExecutionGraph executionGraph = createExecutionGraph(jobGraph); - final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph); + final ExecutionDeployer executionDeployer = createExecutionDeployer(); // deploy once to transition the tasks out from CREATED state deployTasks(executionDeployer, executionGraph); @@ -233,7 +233,7 @@ void testDeploymentWaitForProducedPartitionRegistration() throws Exception { final JobGraph jobGraph = nonParallelSourceSinkJobGraph(); final ExecutionGraph executionGraph = createExecutionGraph(jobGraph); - final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph); + final ExecutionDeployer executionDeployer = createExecutionDeployer(); deployTasks(executionDeployer, executionGraph); @@ -251,7 +251,7 @@ void testFailedProducedPartitionRegistration() throws Exception { final JobGraph jobGraph = nonParallelSourceSinkJobGraph(); final ExecutionGraph executionGraph = createExecutionGraph(jobGraph); - final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph); + final ExecutionDeployer executionDeployer = createExecutionDeployer(); deployTasks(executionDeployer, executionGraph); @@ -267,7 +267,7 @@ void testDirectExceptionOnProducedPartitionRegistration() throws Exception { final JobGraph jobGraph = nonParallelSourceSinkJobGraph(); final ExecutionGraph executionGraph = createExecutionGraph(jobGraph); - final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph); + final ExecutionDeployer executionDeployer = createExecutionDeployer(); deployTasks(executionDeployer, executionGraph); @@ -289,7 +289,7 @@ void testProducedPartitionRegistrationTimeout() throws Exception { final JobGraph jobGraph = nonParallelSourceSinkJobGraph(); final ExecutionGraph executionGraph = createExecutionGraph(jobGraph); - final ExecutionDeployer executionDeployer = createExecutionDeployer(executionGraph); + final ExecutionDeployer executionDeployer = createExecutionDeployer(); deployTasks(executionDeployer, executionGraph); @@ -339,7 +339,7 @@ private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exce return executionGraph; } - private ExecutionDeployer createExecutionDeployer(ExecutionGraph executionGraph) { + private ExecutionDeployer createExecutionDeployer() { return new DefaultExecutionDeployer.Factory() .createInstance( LoggerFactory.getLogger(DefaultExecutionDeployer.class), @@ -347,7 +347,6 @@ private ExecutionDeployer createExecutionDeployer(ExecutionGraph executionGraph) testExecutionOperations, executionVertexVersioner, partitionRegistrationTimeout, - id -> executionGraph.getRegisteredExecutions().get(id), (ignored1, ignored2) -> {}, mainThreadExecutor); }