Skip to content

Commit

Permalink
[FLINK-14854][client] Add executeAsync() method to execution environm…
Browse files Browse the repository at this point in the history
…ents
  • Loading branch information
tisonkun authored and kl0u committed Dec 3, 2019
1 parent f257cff commit 63d526d
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.ExecutorFactory;
import org.apache.flink.core.execution.ExecutorServiceLoader;
import org.apache.flink.core.execution.JobClient;
Expand All @@ -80,6 +79,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -804,26 +804,70 @@ public JobExecutionResult execute() throws Exception {
* @throws Exception Thrown, if the program executions fails.
*/
public JobExecutionResult execute(String jobName) throws Exception {
if (configuration.get(DeploymentOptions.TARGET) == null) {
throw new RuntimeException("No execution.target specified in your configuration file.");
try (final JobClient jobClient = executeAsync(jobName).get()) {
lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED)
? jobClient.getJobExecutionResult(userClassloader).get()
: new DetachedJobExecutionResult(jobClient.getJobID());

return lastJobExecutionResult;
}
}

/**
* Triggers the program execution asynchronously. The environment will execute all parts of the program that have
* resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
* writing results (e.g. {@link DataSet#writeAsText(String)},
* {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
* data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
*
* <p>The program execution will be logged and displayed with a generated default name.
*
* <p><b>ATTENTION:</b> The caller of this method is responsible for managing the lifecycle of
* the returned {@link JobClient}. This means calling {@link JobClient#close()} at the end of
* its usage. In other case, there may be resource leaks depending on the JobClient implementation.
*
* @return A future of {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded.
* @throws Exception Thrown, if the program submission fails.
*/
@PublicEvolving
public final CompletableFuture<JobClient> executeAsync() throws Exception {
return executeAsync(getDefaultName());
}

/**
* Triggers the program execution asynchronously. The environment will execute all parts of the program that have
* resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
* writing results (e.g. {@link DataSet#writeAsText(String)},
* {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
* data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
*
* <p>The program execution will be logged and displayed with the given job name.
*
* <p><b>ATTENTION:</b> The caller of this method is responsible for managing the lifecycle of
* the returned {@link JobClient}. This means calling {@link JobClient#close()} at the end of
* its usage. In other case, there may be resource leaks depending on the JobClient implementation.
*
* @return A future of {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded.
* @throws Exception Thrown, if the program submission fails.
*/
@PublicEvolving
public CompletableFuture<JobClient> executeAsync(String jobName) throws Exception {
checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");

consolidateParallelismDefinitionsInConfiguration();

final Plan plan = createProgramPlan(jobName);
final ExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);

final Executor executor = executorFactory.getExecutor(configuration);
executorServiceLoader.getExecutorFactory(configuration);

try (final JobClient jobClient = executor.execute(plan, configuration).get()) {
checkNotNull(
executorFactory,
"Cannot find compatible factory for specified execution.target (=%s)",
configuration.get(DeploymentOptions.TARGET));

lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED)
? jobClient.getJobExecutionResult(userClassloader).get()
: new DetachedJobExecutionResult(jobClient.getJobID());

return lastJobExecutionResult;
}
return executorFactory
.getExecutor(configuration)
.execute(plan, configuration);
}

private void consolidateParallelismDefinitionsInConfiguration() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
*/
package org.apache.flink.api.scala

import java.util.concurrent.CompletableFuture

import com.esotericsoftware.kryo.Serializer
import org.apache.flink.annotation.{PublicEvolving, Public}
import org.apache.flink.annotation.{Public, PublicEvolving}
import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
Expand All @@ -30,6 +32,7 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfoBase, ValueTypeInfo}
import org.apache.flink.api.java.{CollectionEnvironment, ExecutionEnvironment => JavaEnv}
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.execution.JobClient
import org.apache.flink.core.fs.Path
import org.apache.flink.types.StringValue
import org.apache.flink.util.{NumberSequenceIterator, Preconditions, SplittableIterator}
Expand Down Expand Up @@ -491,6 +494,46 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
javaEnv.execute(jobName)
}

/**
* Triggers the program execution asynchronously.
* The environment will execute all parts of the program that have
* resulted in a "sink" operation. Sink operations are for example printing results
* [[DataSet.print]], writing results (e.g. [[DataSet.writeAsText]], [[DataSet.write]], or other
* generic data sinks created with [[DataSet.output]].
*
* The program execution will be logged and displayed with a generated default name.
*
* <b>ATTENTION:</b> The caller of this method is responsible for managing the lifecycle
* of the returned [[JobClient]]. This means calling [[JobClient#close()]] at the end of
* its usage. In other case, there may be resource leaks depending on the JobClient
* implementation.
*
* @return A future of [[JobClient]] that can be used to communicate with the submitted job,
* completed on submission succeeded.
*/
@PublicEvolving
def executeAsync(): CompletableFuture[JobClient] = javaEnv.executeAsync()

/**
* Triggers the program execution asynchronously.
* The environment will execute all parts of the program that have
* resulted in a "sink" operation. Sink operations are for example printing results
* [[DataSet.print]], writing results (e.g. [[DataSet.writeAsText]], [[DataSet.write]], or other
* generic data sinks created with [[DataSet.output]].
*
* The program execution will be logged and displayed with the given name.
*
* <b>ATTENTION:</b> The caller of this method is responsible for managing the lifecycle
* of the returned [[JobClient]]. This means calling [[JobClient#close()]] at the end of
* its usage. In other case, there may be resource leaks depending on the JobClient
* implementation.
*
* @return A future of [[JobClient]] that can be used to communicate with the submitted job,
* completed on submission succeeded.
*/
@PublicEvolving
def executeAsync(jobName: String): CompletableFuture[JobClient] = javaEnv.executeAsync(jobName)

/**
* Creates the plan with which the system will execute the program, and returns it as a String
* using a JSON representation of the execution data flow graph.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.ExecutorFactory;
import org.apache.flink.core.execution.ExecutorServiceLoader;
import org.apache.flink.core.execution.JobClient;
Expand Down Expand Up @@ -101,6 +100,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -1619,23 +1619,84 @@ public JobExecutionResult execute(String jobName) throws Exception {
*/
@Internal
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
if (configuration.get(DeploymentOptions.TARGET) == null) {
throw new RuntimeException("No execution.target specified in your configuration file.");
try (final JobClient jobClient = executeAsync(streamGraph).get()) {
return configuration.getBoolean(DeploymentOptions.ATTACHED)
? jobClient.getJobExecutionResult(userClassloader).get()
: new DetachedJobExecutionResult(jobClient.getJobID());
}
}

/**
* Triggers the program asynchronously. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
* for example printing results or forwarding them to a message queue.
*
* <p>The program execution will be logged and displayed with a generated
* default name.
*
* <p><b>ATTENTION:</b> The caller of this method is responsible for managing the lifecycle of
* the returned {@link JobClient}. This means calling {@link JobClient#close()} at the end of
* its usage. In other case, there may be resource leaks depending on the JobClient implementation.
*
* @return A future of {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded.
* @throws Exception which occurs during job execution.
*/
@PublicEvolving
public final CompletableFuture<JobClient> executeAsync() throws Exception {
return executeAsync(DEFAULT_JOB_NAME);
}

/**
* Triggers the program execution asynchronously. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
* for example printing results or forwarding them to a message queue.
*
* <p>The program execution will be logged and displayed with the provided name
*
* <p><b>ATTENTION:</b> The caller of this method is responsible for managing the lifecycle of
* the returned {@link JobClient}. This means calling {@link JobClient#close()} at the end of
* its usage. In other case, there may be resource leaks depending on the JobClient implementation.
*
* @param jobName desired name of the job
* @return A future of {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded.
* @throws Exception which occurs during job execution.
*/
@PublicEvolving
public CompletableFuture<JobClient> executeAsync(String jobName) throws Exception {
return executeAsync(getStreamGraph(checkNotNull(jobName)));
}

/**
* Triggers the program execution asynchronously. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
* for example printing results or forwarding them to a message queue.
*
* <p><b>ATTENTION:</b> The caller of this method is responsible for managing the lifecycle of
* the returned {@link JobClient}. This means calling {@link JobClient#close()} at the end of
* its usage. In other case, there may be resource leaks depending on the JobClient implementation.
*
* @param streamGraph the stream graph representing the transformations
* @return A future of {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded.
* @throws Exception which occurs during job execution.
*/
@Internal
public CompletableFuture<JobClient> executeAsync(StreamGraph streamGraph) throws Exception {
checkNotNull(streamGraph, "StreamGraph cannot be null.");
checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");

consolidateParallelismDefinitionsInConfiguration();

final ExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);
executorServiceLoader.getExecutorFactory(configuration);

final Executor executor = executorFactory.getExecutor(configuration);
checkNotNull(
executorFactory,
"Cannot find compatible factory for specified execution.target (=%s)",
configuration.get(DeploymentOptions.TARGET));

try (final JobClient jobClient = executor.execute(streamGraph, configuration).get()) {

return configuration.getBoolean(DeploymentOptions.ATTACHED)
? jobClient.getJobExecutionResult(userClassloader).get()
: new DetachedJobExecutionResult(jobClient.getJobID());
}
return executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration);
}

private void consolidateParallelismDefinitionsInConfiguration() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ protected StreamPlanEnvironment(ExecutionEnvironment env) {
}
}

@Override
public JobExecutionResult execute() throws Exception {
return execute("");
}

@Override
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
transformations.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.streaming.api.scala

import java.util.concurrent.CompletableFuture

import com.esotericsoftware.kryo.Serializer
import org.apache.flink.annotation.{Internal, Public, PublicEvolving}
import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, InputFormat}
Expand All @@ -27,6 +29,7 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.scala.ClosureCleaner
import org.apache.flink.configuration.{Configuration, ReadableConfig}
import org.apache.flink.core.execution.JobClient
import org.apache.flink.runtime.state.AbstractStateBackend
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
Expand Down Expand Up @@ -660,6 +663,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
*
* The program execution will be logged and displayed with a generated
* default name.
*
* @return The result of the job execution, containing elapsed time and accumulators.
*/
def execute() = javaEnv.execute()

Expand All @@ -669,9 +674,48 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* for example printing results or forwarding them to a message queue.
*
* The program execution will be logged and displayed with the provided name.
*
* @return The result of the job execution, containing elapsed time and accumulators.
*/
def execute(jobName: String) = javaEnv.execute(jobName)

/**
* Triggers the program execution asynchronously. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
* for example printing results or forwarding them to a message queue.
*
* The program execution will be logged and displayed with a generated
* default name.
*
* <b>ATTENTION:</b> The caller of this method is responsible for managing the lifecycle
* of the returned [[JobClient]]. This means calling [[JobClient#close()]] at the end of
* its usage. In other case, there may be resource leaks depending on the JobClient
* implementation.
*
* @return A future of [[JobClient]] that can be used to communicate with the submitted job,
* completed on submission succeeded.
*/
@PublicEvolving
def executeAsync(): CompletableFuture[JobClient] = javaEnv.executeAsync()

/**
* Triggers the program execution asynchronously. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
* for example printing results or forwarding them to a message queue.
*
* The program execution will be logged and displayed with the provided name.
*
* <b>ATTENTION:</b> The caller of this method is responsible for managing the lifecycle
* of the returned [[JobClient]]. This means calling [[JobClient#close()]] at the end of
* its usage. In other case, there may be resource leaks depending on the JobClient
* implementation.
*
* @return A future of [[JobClient]] that can be used to communicate with the submitted job,
* completed on submission succeeded.
*/
@PublicEvolving
def executeAsync(jobName: String): CompletableFuture[JobClient] = javaEnv.executeAsync(jobName)

/**
* Creates the plan with which the system will execute the program, and
* returns it as a String using a JSON representation of the execution data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ public JobExecutionResult getLastJobExecutionResult() {
}
}

@Override
public JobExecutionResult execute() throws Exception {
return execute("test job");
}

@Override
public JobExecutionResult execute(String jobName) throws Exception {
JobExecutionResult result = super.execute(jobName);
Expand Down

0 comments on commit 63d526d

Please sign in to comment.