diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 56ff9bed45be2..4c6d41d271a97 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -1209,7 +1209,6 @@ public CreatingExecutionGraph.AssignmentResult tryToAssignSlots( executionGraphWithVertexParallelism.getExecutionGraph(); executionGraph.start(componentMainThreadExecutor); - executionGraph.transitionToRunning(); executionGraph.setInternalTaskFailuresListener( new UpdateSchedulerNgOnInternalFailuresListener(this)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java index da90ef1468d1f..e9b1317e46e62 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java @@ -123,7 +123,6 @@ private void handleExecutionGraphCreation( operatorCoordinatorHandlerFactory.create(executionGraph, context); operatorCoordinatorHandler.initializeOperatorCoordinators( context.getMainThreadExecutor()); - operatorCoordinatorHandler.startAllOperatorCoordinators(); final String updatedPlan = JsonPlanGenerator.generatePlan( executionGraph.getJobID(), @@ -137,6 +136,10 @@ private void handleExecutionGraphCreation( .iterator(), executionGraphWithVertexParallelism.getVertexParallelism()); executionGraph.setJsonPlan(updatedPlan); + + executionGraph.transitionToRunning(); + operatorCoordinatorHandler.startAllOperatorCoordinators(); + context.goToExecuting( result.getExecutionGraph(), executionGraphHandler, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java index b831b3bb62f90..0f89cdf7e1282 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java @@ -93,8 +93,12 @@ void testNotPossibleSlotAssignmentTransitionsToWaitingForResources() { ignored -> CreatingExecutionGraph.AssignmentResult.notPossible()); context.setExpectWaitingForResources(); - executionGraphWithVertexParallelismFuture.complete( - getGraph(new StateTrackingMockExecutionGraph())); + final StateTrackingMockExecutionGraph executionGraph = + new StateTrackingMockExecutionGraph(); + + executionGraphWithVertexParallelismFuture.complete(getGraph(executionGraph)); + + assertThat(executionGraph.getState()).isEqualTo(JobStatus.INITIALIZING); } @Test