Skip to content

Commit

Permalink
[FLINK-17744] Make (Stream)ContextEnvironment#execute call JobListene…
Browse files Browse the repository at this point in the history
…r#onJobExecuted

This closes apache#12339.
  • Loading branch information
EchoLee5 authored and kl0u committed May 28, 2020
1 parent 8e07ca0 commit 7dcfd90
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,21 @@
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.ShutdownHookUtil;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Execution Environment for remote execution with the Client.
*/
Expand Down Expand Up @@ -64,7 +69,26 @@ public ContextEnvironment(

@Override
public JobExecutionResult execute(String jobName) throws Exception {
JobClient jobClient = executeAsync(jobName);
final JobClient jobClient = executeAsync(jobName);
final List<JobListener> jobListeners = getJobListeners();

try {
final JobExecutionResult jobExecutionResult = getJobExecutionResult(jobClient);
jobListeners.forEach(jobListener ->
jobListener.onJobExecuted(jobExecutionResult, null));
return jobExecutionResult;
} catch (Throwable t) {
jobListeners.forEach(jobListener ->
jobListener.onJobExecuted(null, ExceptionUtils.stripExecutionException(t)));
ExceptionUtils.rethrowException(t);

// never reached, only make javac happy
return null;
}
}

private JobExecutionResult getJobExecutionResult(final JobClient jobClient) throws Exception {
checkNotNull(jobClient);

JobExecutionResult jobExecutionResult;
if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
Expand All @@ -81,7 +105,8 @@ public JobExecutionResult execute(String jobName) throws Exception {
ContextEnvironment.class.getSimpleName(),
LOG);
jobExecutionResultFuture.whenComplete((ignored, throwable) ->
ShutdownHookUtil.removeShutdownHook(shutdownHook, ContextEnvironment.class.getSimpleName(), LOG));
ShutdownHookUtil.removeShutdownHook(
shutdownHook, ContextEnvironment.class.getSimpleName(), LOG));
}

jobExecutionResult = jobExecutionResultFuture.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,24 @@
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.ShutdownHookUtil;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Special {@link StreamExecutionEnvironment} that will be used in cases where the CLI client or
* testing utilities create a {@link StreamExecutionEnvironment} that should be used when
Expand Down Expand Up @@ -68,7 +73,26 @@ public StreamContextEnvironment(

@Override
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
JobClient jobClient = executeAsync(streamGraph);
final JobClient jobClient = executeAsync(streamGraph);
final List<JobListener> jobListeners = getJobListeners();

try {
final JobExecutionResult jobExecutionResult = getJobExecutionResult(jobClient);
jobListeners.forEach(jobListener ->
jobListener.onJobExecuted(jobExecutionResult, null));
return jobExecutionResult;
} catch (Throwable t) {
jobListeners.forEach(jobListener ->
jobListener.onJobExecuted(null, ExceptionUtils.stripExecutionException(t)));
ExceptionUtils.rethrowException(t);

// never reached, only make javac happy
return null;
}
}

private JobExecutionResult getJobExecutionResult(final JobClient jobClient) throws Exception {
checkNotNull(jobClient);

JobExecutionResult jobExecutionResult;
if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,13 @@ public ExecutionConfig getConfig() {
return config;
}

/**
* Gets the config JobListeners.
*/
protected List<JobListener> getJobListeners() {
return jobListeners;
}

/**
* Gets the parallelism with which operation are executed by default. Operations can
* individually override this value to use a specific parallelism via
Expand Down

0 comments on commit 7dcfd90

Please sign in to comment.