From 63d526d398d76b426e35fca47cb3edcb27345202 Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 3 Dec 2019 16:46:39 +0800 Subject: [PATCH] [FLINK-14854][client] Add executeAsync() method to execution environments --- .../flink/api/java/ExecutionEnvironment.java | 70 +++++++++++++--- .../api/scala/ExecutionEnvironment.scala | 45 +++++++++- .../StreamExecutionEnvironment.java | 83 ++++++++++++++++--- .../environment/StreamPlanEnvironment.java | 5 -- .../scala/StreamExecutionEnvironment.scala | 44 ++++++++++ .../test/util/CollectionTestEnvironment.java | 5 -- 6 files changed, 217 insertions(+), 35 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 497690f773b80..f8e2d8a4d1942 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -55,7 +55,6 @@ import org.apache.flink.configuration.RestOptions; import org.apache.flink.core.execution.DefaultExecutorServiceLoader; import org.apache.flink.core.execution.DetachedJobExecutionResult; -import org.apache.flink.core.execution.Executor; import org.apache.flink.core.execution.ExecutorFactory; import org.apache.flink.core.execution.ExecutorServiceLoader; import org.apache.flink.core.execution.JobClient; @@ -80,6 +79,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -804,26 +804,70 @@ public JobExecutionResult execute() throws Exception { * @throws Exception Thrown, if the program executions fails. */ public JobExecutionResult execute(String jobName) throws Exception { - if (configuration.get(DeploymentOptions.TARGET) == null) { - throw new RuntimeException("No execution.target specified in your configuration file."); + try (final JobClient jobClient = executeAsync(jobName).get()) { + lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED) + ? jobClient.getJobExecutionResult(userClassloader).get() + : new DetachedJobExecutionResult(jobClient.getJobID()); + + return lastJobExecutionResult; } + } + + /** + * Triggers the program execution asynchronously. The environment will execute all parts of the program that have + * resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()}, + * writing results (e.g. {@link DataSet#writeAsText(String)}, + * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic + * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}. + * + *

The program execution will be logged and displayed with a generated default name. + * + *

ATTENTION: The caller of this method is responsible for managing the lifecycle of + * the returned {@link JobClient}. This means calling {@link JobClient#close()} at the end of + * its usage. In other case, there may be resource leaks depending on the JobClient implementation. + * + * @return A future of {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded. + * @throws Exception Thrown, if the program submission fails. + */ + @PublicEvolving + public final CompletableFuture executeAsync() throws Exception { + return executeAsync(getDefaultName()); + } + + /** + * Triggers the program execution asynchronously. The environment will execute all parts of the program that have + * resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()}, + * writing results (e.g. {@link DataSet#writeAsText(String)}, + * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic + * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}. + * + *

The program execution will be logged and displayed with the given job name. + * + *

ATTENTION: The caller of this method is responsible for managing the lifecycle of + * the returned {@link JobClient}. This means calling {@link JobClient#close()} at the end of + * its usage. In other case, there may be resource leaks depending on the JobClient implementation. + * + * @return A future of {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded. + * @throws Exception Thrown, if the program submission fails. + */ + @PublicEvolving + public CompletableFuture executeAsync(String jobName) throws Exception { + checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file."); consolidateParallelismDefinitionsInConfiguration(); final Plan plan = createProgramPlan(jobName); final ExecutorFactory executorFactory = - executorServiceLoader.getExecutorFactory(configuration); - - final Executor executor = executorFactory.getExecutor(configuration); + executorServiceLoader.getExecutorFactory(configuration); - try (final JobClient jobClient = executor.execute(plan, configuration).get()) { + checkNotNull( + executorFactory, + "Cannot find compatible factory for specified execution.target (=%s)", + configuration.get(DeploymentOptions.TARGET)); - lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED) - ? jobClient.getJobExecutionResult(userClassloader).get() - : new DetachedJobExecutionResult(jobClient.getJobID()); - - return lastJobExecutionResult; - } + return executorFactory + .getExecutor(configuration) + .execute(plan, configuration); } private void consolidateParallelismDefinitionsInConfiguration() { diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala index bd2c805d69146..d0fd44d0f542f 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala @@ -17,8 +17,10 @@ */ package org.apache.flink.api.scala +import java.util.concurrent.CompletableFuture + import com.esotericsoftware.kryo.Serializer -import org.apache.flink.annotation.{PublicEvolving, Public} +import org.apache.flink.annotation.{Public, PublicEvolving} import org.apache.flink.api.common.io.{FileInputFormat, InputFormat} import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} @@ -30,6 +32,7 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfoBase, ValueTypeInfo} import org.apache.flink.api.java.{CollectionEnvironment, ExecutionEnvironment => JavaEnv} import org.apache.flink.configuration.Configuration +import org.apache.flink.core.execution.JobClient import org.apache.flink.core.fs.Path import org.apache.flink.types.StringValue import org.apache.flink.util.{NumberSequenceIterator, Preconditions, SplittableIterator} @@ -491,6 +494,46 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { javaEnv.execute(jobName) } + /** + * Triggers the program execution asynchronously. + * The environment will execute all parts of the program that have + * resulted in a "sink" operation. Sink operations are for example printing results + * [[DataSet.print]], writing results (e.g. [[DataSet.writeAsText]], [[DataSet.write]], or other + * generic data sinks created with [[DataSet.output]]. + * + * The program execution will be logged and displayed with a generated default name. + * + * ATTENTION: The caller of this method is responsible for managing the lifecycle + * of the returned [[JobClient]]. This means calling [[JobClient#close()]] at the end of + * its usage. In other case, there may be resource leaks depending on the JobClient + * implementation. + * + * @return A future of [[JobClient]] that can be used to communicate with the submitted job, + * completed on submission succeeded. + */ + @PublicEvolving + def executeAsync(): CompletableFuture[JobClient] = javaEnv.executeAsync() + + /** + * Triggers the program execution asynchronously. + * The environment will execute all parts of the program that have + * resulted in a "sink" operation. Sink operations are for example printing results + * [[DataSet.print]], writing results (e.g. [[DataSet.writeAsText]], [[DataSet.write]], or other + * generic data sinks created with [[DataSet.output]]. + * + * The program execution will be logged and displayed with the given name. + * + * ATTENTION: The caller of this method is responsible for managing the lifecycle + * of the returned [[JobClient]]. This means calling [[JobClient#close()]] at the end of + * its usage. In other case, there may be resource leaks depending on the JobClient + * implementation. + * + * @return A future of [[JobClient]] that can be used to communicate with the submitted job, + * completed on submission succeeded. + */ + @PublicEvolving + def executeAsync(jobName: String): CompletableFuture[JobClient] = javaEnv.executeAsync(jobName) + /** * Creates the plan with which the system will execute the program, and returns it as a String * using a JSON representation of the execution data flow graph. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 97a65c55e8dc3..8654dd2d92ebc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -55,7 +55,6 @@ import org.apache.flink.configuration.RestOptions; import org.apache.flink.core.execution.DefaultExecutorServiceLoader; import org.apache.flink.core.execution.DetachedJobExecutionResult; -import org.apache.flink.core.execution.Executor; import org.apache.flink.core.execution.ExecutorFactory; import org.apache.flink.core.execution.ExecutorServiceLoader; import org.apache.flink.core.execution.JobClient; @@ -101,6 +100,7 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -1619,23 +1619,84 @@ public JobExecutionResult execute(String jobName) throws Exception { */ @Internal public JobExecutionResult execute(StreamGraph streamGraph) throws Exception { - if (configuration.get(DeploymentOptions.TARGET) == null) { - throw new RuntimeException("No execution.target specified in your configuration file."); + try (final JobClient jobClient = executeAsync(streamGraph).get()) { + return configuration.getBoolean(DeploymentOptions.ATTACHED) + ? jobClient.getJobExecutionResult(userClassloader).get() + : new DetachedJobExecutionResult(jobClient.getJobID()); } + } + + /** + * Triggers the program asynchronously. The environment will execute all parts of + * the program that have resulted in a "sink" operation. Sink operations are + * for example printing results or forwarding them to a message queue. + * + *

The program execution will be logged and displayed with a generated + * default name. + * + *

ATTENTION: The caller of this method is responsible for managing the lifecycle of + * the returned {@link JobClient}. This means calling {@link JobClient#close()} at the end of + * its usage. In other case, there may be resource leaks depending on the JobClient implementation. + * + * @return A future of {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded. + * @throws Exception which occurs during job execution. + */ + @PublicEvolving + public final CompletableFuture executeAsync() throws Exception { + return executeAsync(DEFAULT_JOB_NAME); + } + + /** + * Triggers the program execution asynchronously. The environment will execute all parts of + * the program that have resulted in a "sink" operation. Sink operations are + * for example printing results or forwarding them to a message queue. + * + *

The program execution will be logged and displayed with the provided name + * + *

ATTENTION: The caller of this method is responsible for managing the lifecycle of + * the returned {@link JobClient}. This means calling {@link JobClient#close()} at the end of + * its usage. In other case, there may be resource leaks depending on the JobClient implementation. + * + * @param jobName desired name of the job + * @return A future of {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded. + * @throws Exception which occurs during job execution. + */ + @PublicEvolving + public CompletableFuture executeAsync(String jobName) throws Exception { + return executeAsync(getStreamGraph(checkNotNull(jobName))); + } + + /** + * Triggers the program execution asynchronously. The environment will execute all parts of + * the program that have resulted in a "sink" operation. Sink operations are + * for example printing results or forwarding them to a message queue. + * + *

ATTENTION: The caller of this method is responsible for managing the lifecycle of + * the returned {@link JobClient}. This means calling {@link JobClient#close()} at the end of + * its usage. In other case, there may be resource leaks depending on the JobClient implementation. + * + * @param streamGraph the stream graph representing the transformations + * @return A future of {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded. + * @throws Exception which occurs during job execution. + */ + @Internal + public CompletableFuture executeAsync(StreamGraph streamGraph) throws Exception { + checkNotNull(streamGraph, "StreamGraph cannot be null."); + checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file."); consolidateParallelismDefinitionsInConfiguration(); final ExecutorFactory executorFactory = - executorServiceLoader.getExecutorFactory(configuration); + executorServiceLoader.getExecutorFactory(configuration); - final Executor executor = executorFactory.getExecutor(configuration); + checkNotNull( + executorFactory, + "Cannot find compatible factory for specified execution.target (=%s)", + configuration.get(DeploymentOptions.TARGET)); - try (final JobClient jobClient = executor.execute(streamGraph, configuration).get()) { - - return configuration.getBoolean(DeploymentOptions.ATTACHED) - ? jobClient.getJobExecutionResult(userClassloader).get() - : new DetachedJobExecutionResult(jobClient.getJobID()); - } + return executorFactory + .getExecutor(configuration) + .execute(streamGraph, configuration); } private void consolidateParallelismDefinitionsInConfiguration() { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java index 54ef3e292367e..89ede4f5962da 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java @@ -47,11 +47,6 @@ protected StreamPlanEnvironment(ExecutionEnvironment env) { } } - @Override - public JobExecutionResult execute() throws Exception { - return execute(""); - } - @Override public JobExecutionResult execute(StreamGraph streamGraph) throws Exception { transformations.clear(); diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index f32d12f2dc26c..2ce8ce7607ffb 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -18,6 +18,8 @@ package org.apache.flink.streaming.api.scala +import java.util.concurrent.CompletableFuture + import com.esotericsoftware.kryo.Serializer import org.apache.flink.annotation.{Internal, Public, PublicEvolving} import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, InputFormat} @@ -27,6 +29,7 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.api.scala.ClosureCleaner import org.apache.flink.configuration.{Configuration, ReadableConfig} +import org.apache.flink.core.execution.JobClient import org.apache.flink.runtime.state.AbstractStateBackend import org.apache.flink.runtime.state.StateBackend import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv} @@ -660,6 +663,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * * The program execution will be logged and displayed with a generated * default name. + * + * @return The result of the job execution, containing elapsed time and accumulators. */ def execute() = javaEnv.execute() @@ -669,9 +674,48 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * for example printing results or forwarding them to a message queue. * * The program execution will be logged and displayed with the provided name. + * + * @return The result of the job execution, containing elapsed time and accumulators. */ def execute(jobName: String) = javaEnv.execute(jobName) + /** + * Triggers the program execution asynchronously. The environment will execute all parts of + * the program that have resulted in a "sink" operation. Sink operations are + * for example printing results or forwarding them to a message queue. + * + * The program execution will be logged and displayed with a generated + * default name. + * + * ATTENTION: The caller of this method is responsible for managing the lifecycle + * of the returned [[JobClient]]. This means calling [[JobClient#close()]] at the end of + * its usage. In other case, there may be resource leaks depending on the JobClient + * implementation. + * + * @return A future of [[JobClient]] that can be used to communicate with the submitted job, + * completed on submission succeeded. + */ + @PublicEvolving + def executeAsync(): CompletableFuture[JobClient] = javaEnv.executeAsync() + + /** + * Triggers the program execution asynchronously. The environment will execute all parts of + * the program that have resulted in a "sink" operation. Sink operations are + * for example printing results or forwarding them to a message queue. + * + * The program execution will be logged and displayed with the provided name. + * + * ATTENTION: The caller of this method is responsible for managing the lifecycle + * of the returned [[JobClient]]. This means calling [[JobClient#close()]] at the end of + * its usage. In other case, there may be resource leaks depending on the JobClient + * implementation. + * + * @return A future of [[JobClient]] that can be used to communicate with the submitted job, + * completed on submission succeeded. + */ + @PublicEvolving + def executeAsync(jobName: String): CompletableFuture[JobClient] = javaEnv.executeAsync(jobName) + /** * Creates the plan with which the system will execute the program, and * returns it as a String using a JSON representation of the execution data diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java index 50db127fcd420..8397a9771505a 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java @@ -42,11 +42,6 @@ public JobExecutionResult getLastJobExecutionResult() { } } - @Override - public JobExecutionResult execute() throws Exception { - return execute("test job"); - } - @Override public JobExecutionResult execute(String jobName) throws Exception { JobExecutionResult result = super.execute(jobName);