Skip to content

Commit

Permalink
[FLINK-15093][streaming-java] StreamExecutionEnvironment#getStreamGra…
Browse files Browse the repository at this point in the history
…ph should clear transformations

* Add internal interface StreamExecutionEnvironment#getStreamGraph(String, boolean) with the ability to clean existing transformations;
* Add tests for this new interface;
* Keep StreamExecutionEnvironment#getExecutionPlan semantic unchanged
  • Loading branch information
danny0405 authored and kl0u committed Dec 10, 2019
1 parent 6576245 commit 9c44413
Show file tree
Hide file tree
Showing 12 changed files with 288 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,16 @@ public void testAppendTableSink() throws IOException {
DataStream<Row> ds = env.fromCollection(Collections.singleton(Row.of("foo")), ROW_TYPE);
sink.emitDataStream(ds);

Collection<Integer> sinkIds = env.getStreamGraph().getSinkIDs();
Collection<Integer> sinkIds = env
.getStreamGraph(StreamExecutionEnvironment.DEFAULT_JOB_NAME, false)
.getSinkIDs();
assertEquals(1, sinkIds.size());
int sinkId = sinkIds.iterator().next();

StreamSink planSink = (StreamSink) env.getStreamGraph().getStreamNode(sinkId).getOperator();
StreamSink planSink = (StreamSink) env
.getStreamGraph(StreamExecutionEnvironment.DEFAULT_JOB_NAME, false)
.getStreamNode(sinkId)
.getOperator();
assertTrue(planSink.getUserFunction() instanceof JDBCSinkFunction);

JDBCSinkFunction sinkFunction = (JDBCSinkFunction) planSink.getUserFunction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@

import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.streaming.api.graph.StreamGraph;

import javax.annotation.Nonnull;

Expand Down Expand Up @@ -68,19 +66,4 @@ private static Configuration validateAndGetConfiguration(final Configuration con
effectiveConfiguration.set(DeploymentOptions.ATTACHED, true);
return effectiveConfiguration;
}

/**
* Executes the JobGraph of the on a mini cluster of ClusterUtil with a user
* specified name.
*
* @return The result of the job execution, containing elapsed time and accumulators.
*/
@Override
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
try {
return super.execute(streamGraph);
} finally {
transformations.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ private static Configuration getEffectiveConfiguration(

@Override
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
transformations.clear();
try {
return super.execute(streamGraph);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {

@Override
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
transformations.clear();

JobClient jobClient = executeAsync(streamGraph);

JobExecutionResult jobExecutionResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1760,24 +1760,45 @@ private void consolidateParallelismDefinitionsInConfiguration() {
}

/**
* Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
* Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job. This call
* clears previously registered {@link Transformation transformations}.
*
* @return The streamgraph representing the transformations
*/
@Internal
public StreamGraph getStreamGraph() {
return getStreamGraphGenerator().generate();
return getStreamGraph(DEFAULT_JOB_NAME);
}

/**
* Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
* Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job. This call
* clears previously registered {@link Transformation transformations}.
*
* @param jobName Desired name of the job
* @return The streamgraph representing the transformations
*/
@Internal
public StreamGraph getStreamGraph(String jobName) {
return getStreamGraphGenerator().setJobName(jobName).generate();
return getStreamGraph(jobName, true);
}

/**
* Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph StreamGraph} of the streaming job
* with the option to clear previously registered {@link Transformation transformations}. Clearing the
* transformations allows, for example, to not re-execute the same operations when calling
* {@link #execute()} multiple times.
*
* @param jobName Desired name of the job
* @param clearTransformations Whether or not to clear previously registered transformations
* @return The streamgraph representing the transformations
*/
@Internal
public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
if (clearTransformations) {
this.transformations.clear();
}
return streamGraph;
}

private StreamGraphGenerator getStreamGraphGenerator() {
Expand All @@ -1801,7 +1822,7 @@ private StreamGraphGenerator getStreamGraphGenerator() {
* @return The execution plan of the program, as a JSON String.
*/
public String getExecutionPlan() {
return getStreamGraph().getStreamingPlanAsJSON();
return getStreamGraph(DEFAULT_JOB_NAME, false).getStreamingPlanAsJSON();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ protected StreamPlanEnvironment(ExecutionEnvironment env) {

@Override
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
transformations.clear();

if (env instanceof OptimizerPlanEnvironment) {
((OptimizerPlanEnvironment) env).setPipeline(streamGraph);
}
Expand Down
Loading

0 comments on commit 9c44413

Please sign in to comment.