Skip to content

Commit

Permalink
[FLINK-25031][runtime] Job finishes iff all job vertices finish
Browse files Browse the repository at this point in the history
This closes apache#17969.
  • Loading branch information
wanglijie95 authored and zhuzhurk committed Dec 6, 2021
1 parent 6e8efc2 commit caf72ab
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
/** Blob writer used to offload RPC messages. */
private final BlobWriter blobWriter;

/** The total number of vertices currently in the execution graph. */
private int numVerticesTotal;
/** Number of total job vertices. */
private int numJobVerticesTotal;

private final PartitionGroupReleaseStrategy.Factory partitionGroupReleaseStrategyFactory;

Expand All @@ -199,7 +199,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
// ------ Execution status and progress. These values are volatile, and accessed under the lock
// -------

private int numFinishedVertices;
/** Number of finished job vertices. */
private int numFinishedJobVertices;

/** Current status of the job execution. */
private volatile JobStatus state = JobStatus.CREATED;
Expand Down Expand Up @@ -587,7 +588,10 @@ public long getNumberOfRestarts() {

@Override
public int getNumFinishedVertices() {
return numFinishedVertices;
return IterableUtils.toStream(getVerticesTopologically())
.map(ExecutionJobVertex::getNumExecutionVertexFinished)
.mapToInt(Integer::intValue)
.sum();
}

@Override
Expand Down Expand Up @@ -809,7 +813,7 @@ public void attachJobGraph(List<JobVertex> topologicallySorted) throws JobExcept
}

this.verticesInCreationOrder.add(ejv);
this.numVerticesTotal += ejv.getParallelism();
this.numJobVerticesTotal++;
}

registerExecutionVerticesAndResultPartitions(this.verticesInCreationOrder);
Expand Down Expand Up @@ -1063,14 +1067,14 @@ public void initFailureCause(Throwable t, long timestamp) {
// ------------------------------------------------------------------------

/**
* Called whenever a vertex reaches state FINISHED (completed successfully). Once all vertices
* are in the FINISHED state, the program is successfully done.
* Called whenever a job vertex reaches state FINISHED (completed successfully). Once all job
* vertices are in the FINISHED state, the program is successfully done.
*/
@Override
public void vertexFinished() {
public void jobVertexFinished() {
assertRunningInJobMasterMainThread();
final int numFinished = ++numFinishedVertices;
if (numFinished == numVerticesTotal) {
final int numFinished = ++numFinishedJobVertices;
if (numFinished == numJobVerticesTotal) {
// done :-)

// check whether we are still in "RUNNING" and trigger the final cleanup
Expand Down Expand Up @@ -1099,9 +1103,9 @@ public void vertexFinished() {
}

@Override
public void vertexUnFinished() {
public void jobVertexUnFinished() {
assertRunningInJobMasterMainThread();
numFinishedVertices--;
numFinishedJobVertices--;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ public class ExecutionJobVertex

private final ResourceProfile resourceProfile;

private int numExecutionVertexFinished;

/**
* Either store a serialized task information, which is for all sub tasks the same, or the
* permanent blob key of the offloaded task information BLOB containing the serialized task
Expand Down Expand Up @@ -333,6 +335,10 @@ public Collection<OperatorCoordinatorHolder> getOperatorCoordinators() {
return operatorCoordinators;
}

int getNumExecutionVertexFinished() {
return numExecutionVertexFinished;
}

public Either<SerializedValue<TaskInformation>, PermanentBlobKey> getTaskInformationOrBlobKey()
throws IOException {
// only one thread should offload the task information, so let's also let only one thread
Expand Down Expand Up @@ -459,6 +465,20 @@ public void fail(Throwable t) {
}
}

void executionVertexFinished() {
numExecutionVertexFinished++;
if (numExecutionVertexFinished == parallelismInfo.getParallelism()) {
getGraph().jobVertexFinished();
}
}

void executionVertexUnFinished() {
if (numExecutionVertexFinished == parallelismInfo.getParallelism()) {
getGraph().jobVertexUnFinished();
}
numExecutionVertexFinished--;
}

// --------------------------------------------------------------------------------------------
// Accumulators / Metrics
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ private void resetForNewExecutionInternal(final long timestamp) {
// if the execution was 'FINISHED' before, tell the ExecutionGraph that
// we take one step back on the road to reaching global FINISHED
if (oldState == FINISHED) {
getExecutionGraphAccessor().vertexUnFinished();
getJobVertex().executionVertexUnFinished();
}

// reset the intermediate results
Expand Down Expand Up @@ -522,7 +522,7 @@ public List<IntermediateResultPartition> finishAllBlockingPartitions() {
// --------------------------------------------------------------------------------------------

void executionFinished(Execution execution) {
getExecutionGraphAccessor().vertexFinished();
getJobVertex().executionVertexFinished();
}

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ public interface InternalExecutionGraphAccessor {

PartitionGroupReleaseStrategy getPartitionGroupReleaseStrategy();

void vertexFinished();
void jobVertexFinished();

void vertexUnFinished();
void jobVertexUnFinished();

ExecutionDeploymentListener getExecutionDeploymentListener();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.executiongraph;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import static org.junit.Assert.assertEquals;

/** Tests the finish behaviour of the {@link ExecutionGraph}. */
public class ExecutionGraphFinishTest extends TestLogger {

@Test
public void testJobFinishes() throws Exception {

JobGraph jobGraph =
JobGraphTestUtils.streamingJobGraph(
ExecutionGraphTestUtils.createJobVertex("Task1", 2, NoOpInvokable.class),
ExecutionGraphTestUtils.createJobVertex("Task2", 2, NoOpInvokable.class));

SchedulerBase scheduler =
SchedulerTestingUtils.newSchedulerBuilder(
jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread())
.build();

ExecutionGraph eg = scheduler.getExecutionGraph();

scheduler.startScheduling();
ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);

Iterator<ExecutionJobVertex> jobVertices = eg.getVerticesTopologically().iterator();

ExecutionJobVertex sender = jobVertices.next();
ExecutionJobVertex receiver = jobVertices.next();

List<ExecutionVertex> senderVertices = Arrays.asList(sender.getTaskVertices());
List<ExecutionVertex> receiverVertices = Arrays.asList(receiver.getTaskVertices());

// test getNumExecutionVertexFinished
senderVertices.get(0).getCurrentExecutionAttempt().markFinished();
assertEquals(1, sender.getNumExecutionVertexFinished());
assertEquals(JobStatus.RUNNING, eg.getState());

senderVertices.get(1).getCurrentExecutionAttempt().markFinished();
assertEquals(2, sender.getNumExecutionVertexFinished());
assertEquals(JobStatus.RUNNING, eg.getState());

// test job finishes
receiverVertices.get(0).getCurrentExecutionAttempt().markFinished();
receiverVertices.get(1).getCurrentExecutionAttempt().markFinished();
assertEquals(4, eg.getNumFinishedVertices());
assertEquals(JobStatus.FINISHED, eg.getState());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -899,10 +899,10 @@ public void notifySchedulerNgAboutInternalTaskFailure(
boolean releasePartitions) {}

@Override
public void vertexFinished() {}
public void jobVertexFinished() {}

@Override
public void vertexUnFinished() {}
public void jobVertexUnFinished() {}

@Override
public ExecutionDeploymentListener getExecutionDeploymentListener() {
Expand Down

0 comments on commit caf72ab

Please sign in to comment.