diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java index 35e5846797edf..92870178c086f 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java @@ -62,10 +62,12 @@ public JobExecutionResult execute(String jobName) throws Exception { JobSubmissionResult result = this.client.run(toRun, getParallelism(), wait); if(result instanceof JobExecutionResult) { + this.lastJobExecutionResult = (JobExecutionResult) result; return (JobExecutionResult) result; } else { LOG.warn("The Client didn't return a JobExecutionResult"); - return new JobExecutionResult(result.getJobID(), -1, null); + this.lastJobExecutionResult = new JobExecutionResult(result.getJobID(), -1, null); + return this.lastJobExecutionResult; } } diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java index bd74b20c40fff..827bb2594f2b1 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java @@ -111,12 +111,12 @@ public static void main(String... args) throws Exception { // emit result if(fileOutput) { result.writeAsCsv(outputPath, "\n", " "); + + // execute program + env.execute("Connected Components Example"); } else { result.print(); } - - // execute program - env.execute("Connected Components Example"); } // ************************************************************************* diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java index 5af60be9cb24f..fdbe19766abd2 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java @@ -106,13 +106,13 @@ public static void main(String[] args) throws Exception { // emit result if(fileOutput) { triangles.writeAsCsv(outputPath, "\n", ","); + + // execute program + env.execute("Basic Triangle Enumeration Example"); } else { triangles.print(); } - // execute program - env.execute("Basic Triangle Enumeration Example"); - } // ************************************************************************* diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java index fb1e6f5784374..56b448e95fdf1 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java @@ -121,12 +121,12 @@ public static void main(String[] args) throws Exception { // emit result if(fileOutput) { triangles.writeAsCsv(outputPath, "\n", ","); + // execute program + env.execute("Triangle Enumeration Example"); } else { triangles.print(); } - - // execute program - env.execute("Triangle Enumeration Example"); + } diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java index d622799d6efe3..a374d0cac36d1 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java @@ -122,12 +122,12 @@ public static void main(String[] args) throws Exception { // emit result if(fileOutput) { finalPageRanks.writeAsCsv(outputPath, "\n", " "); + // execute program + env.execute("Basic Page Rank Example"); } else { finalPageRanks.print(); } - // execute program - env.execute("Basic Page Rank Example"); } diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java index 83cf9d9d69654..2780bb11d2a70 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java @@ -70,8 +70,6 @@ public Double map(Long value) { System.out.println("We estimate Pi to be:"); pi.print(); - - env.execute(); } //************************************************************************* diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java index 86b7de24c0401..46873f6214427 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java @@ -104,12 +104,13 @@ public static void main(String[] args) throws Exception{ // emit result if(fileOutput) { result.writeAsText(outputPath); + // execute program + env.execute("Linear Regression example"); } else { result.print(); } - // execute program - env.execute("Linear Regression example"); + } diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java index 2016eaa853053..9f6f567a4cf55 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java @@ -72,16 +72,17 @@ public static void main(final String[] args) throws Exception { final DataSet filteredLines = file.filter(new EmptyFieldFilter()); // Here, we could do further processing with the filtered lines... - + JobExecutionResult result = null; // output the filtered lines if (outputPath == null) { filteredLines.print(); + result = env.getLastJobExecutionResult(); } else { filteredLines.writeAsCsv(outputPath); + // execute program + result = env.execute("Accumulator example"); } - // execute program - final JobExecutionResult result = env.execute("Accumulator example"); // get the accumulator result via its registration key final List emptyFields = result.getAccumulatorResult(EMPTY_FIELD_ACCUMULATOR); diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java index 890af65d7a000..9425291ef98d3 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/WebLogAnalysis.java @@ -140,13 +140,11 @@ public static void main(String[] args) throws Exception { // emit result if(fileOutput) { result.writeAsCsv(outputPath, "\n", "|"); + // execute program + env.execute("WebLogAnalysis Example"); } else { result.print(); } - - // execute program - env.execute("WebLogAnalysis Example"); - } // ************************************************************************* diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java index f3364fd44d8e9..b001d12b7b434 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/PojoExample.java @@ -97,12 +97,12 @@ public Word reduce(Word value1, Word value2) throws Exception { if(fileOutput) { counts.writeAsText(outputPath, WriteMode.OVERWRITE); + // execute program + env.execute("WordCount-Pojo Example"); } else { counts.print(); } - - // execute program - env.execute("WordCount-Pojo Example"); + } // ************************************************************************* diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java index 7db79466b12e7..82c3ad80c0a1b 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java @@ -74,12 +74,13 @@ public static void main(String[] args) throws Exception { // emit result if(fileOutput) { counts.writeAsCsv(outputPath, "\n", " "); + // execute program + env.execute("WordCount Example"); } else { counts.print(); } - // execute program - env.execute("WordCount Example"); + } // ************************************************************************* diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala index 26d01c32f4dc1..08a3e62517421 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala @@ -96,12 +96,12 @@ object KMeans { if (fileOutput) { clusteredPoints.writeAsCsv(outputPath, "\n", " ") + env.execute("Scala KMeans Example") } else { clusteredPoints.print() } - env.execute("Scala KMeans Example") } private def parseParameters(programArguments: Array[String]): Boolean = { diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala index e75c862965341..9e23ed7c3f85f 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala @@ -98,11 +98,11 @@ object ConnectedComponents { } if (fileOutput) { verticesWithComponents.writeAsCsv(outputPath, "\n", " ") + env.execute("Scala Connected Components Example") } else { verticesWithComponents.print() } - env.execute("Scala Connected Components Example") } private def parseParameters(args: Array[String]): Boolean = { diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala index b4955edf2d3e9..ae8a98210f10e 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala @@ -100,6 +100,5 @@ object DeltaPageRank { iteration.print() - env.execute("Page Rank") } } diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala index a62786ca87275..a9000b3642eae 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala @@ -91,12 +91,13 @@ object EnumTrianglesBasic { // emit result if (fileOutput) { triangles.writeAsCsv(outputPath, "\n", ",") + // execute program + env.execute("TriangleEnumeration Example") } else { triangles.print() } - // execute program - env.execute("TriangleEnumeration Example") + } // ************************************************************************* diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala index 244e9681a6d61..cc7c33ff05473 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala @@ -117,12 +117,12 @@ object EnumTrianglesOpt { // emit result if (fileOutput) { triangles.writeAsCsv(outputPath, "\n", ",") + // execute program + env.execute("TriangleEnumeration Example") } else { triangles.print() } - // execute program - env.execute("TriangleEnumeration Example") } // ************************************************************************* diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala index a3ea4b33ee366..5b5f6c21923f1 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala @@ -128,12 +128,11 @@ object PageRankBasic { // emit result if (fileOutput) { result.writeAsCsv(outputPath, "\n", " ") + // execute program + env.execute("Basic PageRank Example") } else { result.print() } - - // execute program - env.execute("Basic PageRank Example") } // ************************************************************************* diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala index 727cb47979c86..3de0f2e5e0320 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala @@ -57,11 +57,11 @@ object TransitiveClosureNaive { if (fileOutput) { paths.writeAsCsv(outputPath, "\n", " ") + env.execute("Scala Transitive Closure Example") } else { paths.print() } - env.execute("Scala Transitive Closure Example") } diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala index 582dd4fbb6f6e..3453ee8bf8bd7 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala @@ -47,8 +47,6 @@ object PiEstimation { println("We estimate Pi to be:") pi.print() - - env.execute("PiEstimation example") } } diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala index fb99a009f6993..2a7b786d1ac46 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala @@ -82,11 +82,11 @@ object LinearRegression { if (fileOutput) { result.writeAsText(outputPath) + env.execute("Scala Linear Regression example") } else { result.print() } - env.execute("Scala Linear Regression example") } /** diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala index 950e3c86fc512..53925949e5fd9 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala @@ -123,11 +123,11 @@ object WebLogAnalysis { // emit result if (fileOutput) { result.writeAsCsv(outputPath, "\n", "|") + env.execute("Scala WebLogAnalysis Example") } else { result.print() } - env.execute("Scala WebLogAnalysis Example") } private var fileOutput: Boolean = false diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala index b5c2ee2f1a823..7d5db7e7fd6cb 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala @@ -57,11 +57,11 @@ object WordCount { if (fileOutput) { counts.writeAsCsv(outputPath, "\n", " ") + env.execute("Scala WordCount Example") } else { counts.print() } - env.execute("Scala WordCount Example") } private def parseParameters(args: Array[String]): Boolean = { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java index b48debc381cd5..51e91d7332ad9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java @@ -30,7 +30,8 @@ public JobExecutionResult execute(String jobName) throws Exception { // We need to reverse here. Object-Reuse enabled, means safe mode is disabled. CollectionExecutor exec = new CollectionExecutor(getConfig()); - return exec.execute(p); + this.lastJobExecutionResult = exec.execute(p); + return this.lastJobExecutionResult; } /** diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index 4f2942ebf35e2..133a083741496 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -47,6 +47,7 @@ import org.apache.flink.api.java.functions.SelectByMinFunction; import org.apache.flink.api.java.io.CsvOutputFormat; import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.io.PrintingOutputFormat; import org.apache.flink.api.java.io.TextOutputFormat; import org.apache.flink.api.java.io.TextOutputFormat.TextFormatter; import org.apache.flink.api.java.operators.AggregateOperator; @@ -82,7 +83,6 @@ import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.core.fs.Path; import org.apache.flink.util.AbstractID; -import org.apache.flink.util.ExceptionUtils; import java.io.IOException; import java.util.ArrayList; @@ -1337,14 +1337,10 @@ private DataSink internalWriteAsCsv(Path filePath, String r * For each element of the DataSet the result of {@link Object#toString()} is written. * This triggers execute() automatically. */ - public void print() { - try { - List elements = this.collect(); - for (T e: elements) { - System.out.println(e); - } - } catch (Exception e) { - System.out.println("Could not retrieve values for printing: " + ExceptionUtils.stringifyException(e)); + public void print() throws Exception{ + List elements = this.collect(); + for (T e: elements) { + System.out.println(e); } } @@ -1363,14 +1359,10 @@ public DataSink print(String sinkIdentifier) { * Writes a DataSet to the standard error stream (stderr).
* For each element of the DataSet the result of {@link Object#toString()} is written. */ - public void printToErr() { - try { - List elements = this.collect(); - for (T e: elements) { - System.err.println(e); - } - } catch (Exception e) { - System.err.println("Could not retrieve values for printing: " + e); + public void printToErr() throws Exception{ + List elements = this.collect(); + for (T e: elements) { + System.err.println(e); } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 75d4387a69005..0f65b797ce99f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -94,6 +94,9 @@ */ public abstract class ExecutionEnvironment { + + protected JobExecutionResult lastJobExecutionResult; + private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class); /** The environment of the context (local by default, cluster if invoked through command line) */ @@ -234,7 +237,15 @@ public int getNumberOfExecutionRetries() { public UUID getId() { return this.executionId; } - + + /** + * Returns the {@link org.apache.flink.api.common.JobExecutionResult} of the last executed job. + */ + public JobExecutionResult getLastJobExecutionResult(){ + return this.lastJobExecutionResult; + } + + /** * Gets the UUID by which this environment is identified, as a string. * diff --git a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java index 25042b6b7edef..27b6254c42160 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java @@ -51,7 +51,8 @@ public JobExecutionResult execute(String jobName) throws Exception { PlanExecutor executor = PlanExecutor.createLocalExecutor(configuration); executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled()); - return executor.executePlan(p); + this.lastJobExecutionResult = executor.executePlan(p); + return this.lastJobExecutionResult; } @Override diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java index c9a4fe047a904..515037ce823f6 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java @@ -67,7 +67,9 @@ public JobExecutionResult execute(String jobName) throws Exception { PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFiles); executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled()); - return executor.executePlan(p); + + this.lastJobExecutionResult = executor.executePlan(p); + return this.lastJobExecutionResult; } @Override diff --git a/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java index c0ca6c2d84846..4fc51bb7e870f 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/MultipleInvokationsTest.java @@ -36,7 +36,8 @@ public void testMultipleInvocationsGetPlan() { // ----------- Execution 1 --------------- DataSet data = env.fromElements("Some", "test", "data").name("source1"); - data.print(); + //data.print(); + data.output(new DiscardingOutputFormat()).name("print1"); data.output(new DiscardingOutputFormat()).name("output1"); { diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java index 341d87e1a62f5..e890b4e59729f 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java @@ -28,11 +28,9 @@ import org.apache.flink.api.common.operators.base.CrossOperatorBase; import org.apache.flink.api.common.operators.base.JoinOperatorBase; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.translation.PlanProjectOperator; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple4; -import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.tuple.*; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.junit.Test; @@ -73,7 +71,7 @@ public void testProjectionSemProps1() { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo); - tupleDs.project(1, 3, 2).project(0, 3).print(); + tupleDs.project(1, 3, 2).project(0, 3).output(new DiscardingOutputFormat()); Plan plan = env.createProgramPlan(); @@ -99,7 +97,7 @@ public void testProjectionSemProps2() { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet, Tuple2, String>> tupleDs = env.fromCollection(emptyNestedTupleData, nestedTupleTypeInfo); - tupleDs.project(2, 3, 1).project(2).print(); + tupleDs.project(2, 3, 1).project(2).output(new DiscardingOutputFormat()); Plan plan = env.createProgramPlan(); @@ -135,7 +133,7 @@ public void testJoinProjectionSemProps1() { tupleDs.join(tupleDs).where(0).equalTo(0) .projectFirst(2, 3) .projectSecond(1, 4) - .print(); + .output(new DiscardingOutputFormat()); Plan plan = env.createProgramPlan(); @@ -163,7 +161,7 @@ public void testJoinProjectionSemProps2() { tupleDs.join(tupleDs).where(0).equalTo(0) .projectFirst(2,0) .projectSecond(1,3) - .print(); + .output(new DiscardingOutputFormat()); Plan plan = env.createProgramPlan(); @@ -212,7 +210,7 @@ public void testCrossProjectionSemProps1() { tupleDs.cross(tupleDs) .projectFirst(2, 3) .projectSecond(1, 4) - .print(); + .output(new DiscardingOutputFormat()); Plan plan = env.createProgramPlan(); @@ -240,7 +238,7 @@ public void testCrossProjectionSemProps2() { tupleDs.cross(tupleDs) .projectFirst(2, 0) .projectSecond(1,3) - .print(); + .output(new DiscardingOutputFormat()); Plan plan = env.createProgramPlan(); diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java index f0124e3466715..33b39588eacd3 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java @@ -34,6 +34,7 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.junit.Test; @@ -55,7 +56,7 @@ public void testUnaryFunctionWildcardForwardedAnnotation() { @SuppressWarnings("unchecked") DataSet> input = env.fromElements(new Tuple3(3l, "test", 42)); - input.map(new WildcardForwardedMapper>()).print(); + input.map(new WildcardForwardedMapper>()).output(new DiscardingOutputFormat>()); Plan plan = env.createProgramPlan(); GenericDataSinkBase sink = plan.getDataSinks().iterator().next(); @@ -80,7 +81,7 @@ public void testUnaryFunctionInPlaceForwardedAnnotation() { @SuppressWarnings("unchecked") DataSet> input = env.fromElements(new Tuple3(3l, "test", 42)); - input.map(new IndividualForwardedMapper()).print(); + input.map(new IndividualForwardedMapper()).output(new DiscardingOutputFormat>()); Plan plan = env.createProgramPlan(); GenericDataSinkBase sink = plan.getDataSinks().iterator().next(); @@ -102,7 +103,7 @@ public void testUnaryFunctionMovingForwardedAnnotation() { @SuppressWarnings("unchecked") DataSet> input = env.fromElements(new Tuple3(3l, 2l, 1l)); - input.map(new ShufflingMapper()).print(); + input.map(new ShufflingMapper()).output(new DiscardingOutputFormat>()); Plan plan = env.createProgramPlan(); GenericDataSinkBase sink = plan.getDataSinks().iterator().next(); @@ -128,7 +129,7 @@ public void testUnaryFunctionForwardedInLine1() { @SuppressWarnings("unchecked") DataSet> input = env.fromElements(new Tuple3(3l, 2l, 1l)); input.map(new NoAnnotationMapper>()).withForwardedFields("0->1; 2") - .print(); + .output(new DiscardingOutputFormat>()); Plan plan = env.createProgramPlan(); GenericDataSinkBase sink = plan.getDataSinks().iterator().next(); @@ -151,7 +152,7 @@ public void testUnaryFunctionForwardedInLine2() { @SuppressWarnings("unchecked") DataSet> input = env.fromElements(new Tuple3(3l, 2l, 1l)); input.map(new ReadSetMapper>()).withForwardedFields("0->1; 2") - .print(); + .output(new DiscardingOutputFormat>()); Plan plan = env.createProgramPlan(); GenericDataSinkBase sink = plan.getDataSinks().iterator().next(); @@ -174,7 +175,7 @@ public void testUnaryFunctionForwardedInLine3() { @SuppressWarnings("unchecked") DataSet> input = env.fromElements(new Tuple3(3l, 2l, 1l)); input.map(new ReadSetMapper>()).withForwardedFields("0->1; 2") - .print(); + .output(new DiscardingOutputFormat>()); Plan plan = env.createProgramPlan(); GenericDataSinkBase sink = plan.getDataSinks().iterator().next(); @@ -196,7 +197,7 @@ public void testUnaryFunctionAllForwardedExceptAnnotation() { @SuppressWarnings("unchecked") DataSet> input = env.fromElements(new Tuple3(3l, 2l, 1l)); - input.map(new AllForwardedExceptMapper>()).print(); + input.map(new AllForwardedExceptMapper>()).output(new DiscardingOutputFormat>()); Plan plan = env.createProgramPlan(); GenericDataSinkBase sink = plan.getDataSinks().iterator().next(); @@ -218,7 +219,7 @@ public void testUnaryFunctionReadFieldsAnnotation() { @SuppressWarnings("unchecked") DataSet> input = env.fromElements(new Tuple3(3l, 2l, 1l)); - input.map(new ReadSetMapper>()).print(); + input.map(new ReadSetMapper>()).output(new DiscardingOutputFormat>()); Plan plan = env.createProgramPlan(); GenericDataSinkBase sink = plan.getDataSinks().iterator().next(); @@ -260,7 +261,7 @@ public void testBinaryForwardedAnnotation() { @SuppressWarnings("unchecked") DataSet> input2 = env.fromElements(new Tuple2(3l, 3.1415)); input1.join(input2).where(0).equalTo(0).with(new ForwardedBothAnnotationJoin()) - .print(); + .output(new DiscardingOutputFormat>()); Plan plan = env.createProgramPlan(); GenericDataSinkBase sink = plan.getDataSinks().iterator().next(); @@ -287,7 +288,7 @@ public void testBinaryForwardedInLine1() { DataSet> input2 = env.fromElements(new Tuple2(3l, 2l)); input1.join(input2).where(0).equalTo(0).with(new NoAnnotationJoin()) .withForwardedFieldsFirst("0->1; 1->2").withForwardedFieldsSecond("1->0") - .print(); + .output(new DiscardingOutputFormat>()); Plan plan = env.createProgramPlan(); GenericDataSinkBase sink = plan.getDataSinks().iterator().next(); @@ -314,7 +315,7 @@ public void testBinaryForwardedInLine2() { DataSet> input2 = env.fromElements(new Tuple2(3l, 2l)); input1.join(input2).where(0).equalTo(0).with(new ReadSetJoin()) .withForwardedFieldsFirst("0->1; 1->2").withForwardedFieldsSecond("1->0") - .print(); + .output(new DiscardingOutputFormat>()); Plan plan = env.createProgramPlan(); GenericDataSinkBase sink = plan.getDataSinks().iterator().next(); @@ -347,7 +348,7 @@ public void testBinaryForwardedAnnotationInLineMixed1() { DataSet> input2 = env.fromElements(new Tuple2(3l, 2l)); input1.join(input2).where(0).equalTo(0).with(new ForwardedFirstAnnotationJoin()) .withForwardedFieldsSecond("1") - .print(); + .output(new DiscardingOutputFormat>()); Plan plan = env.createProgramPlan(); GenericDataSinkBase sink = plan.getDataSinks().iterator().next(); @@ -377,7 +378,7 @@ public void testBinaryForwardedAnnotationInLineMixed2() { DataSet> input2 = env.fromElements(new Tuple2(3l, 2l)); input1.join(input2).where(0).equalTo(0).with(new ForwardedSecondAnnotationJoin()) .withForwardedFieldsFirst("0->1") - .print(); + .output(new DiscardingOutputFormat>()); Plan plan = env.createProgramPlan(); GenericDataSinkBase sink = plan.getDataSinks().iterator().next(); @@ -405,7 +406,7 @@ public void testBinaryAllForwardedExceptAnnotation() { @SuppressWarnings("unchecked") DataSet> input2 = env.fromElements(new Tuple3(3l, 2l, 1l)); input1.join(input2).where(0).equalTo(0).with(new AllForwardedExceptJoin()) - .print(); + .output(new DiscardingOutputFormat>()); Plan plan = env.createProgramPlan(); GenericDataSinkBase sink = plan.getDataSinks().iterator().next(); @@ -435,7 +436,7 @@ public void testBinaryReadFieldsAnnotation() { @SuppressWarnings("unchecked") DataSet> input2 = env.fromElements(new Tuple2(3l, 2l)); input1.join(input2).where(0).equalTo(0).with(new ReadSetJoin()) - .print(); + .output(new DiscardingOutputFormat>()); Plan plan = env.createProgramPlan(); GenericDataSinkBase sink = plan.getDataSinks().iterator().next(); diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java index 63b40527ca615..0ce79e3e353b3 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.operators.GenericDataSourceBase; import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; import org.apache.flink.api.java.aggregation.Aggregations; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.types.StringValue; import org.junit.Test; @@ -46,7 +47,7 @@ public void translateAggregate() { DataSet> initialData = env.fromElements(new Tuple3(3.141592, new StringValue("foobar"), Long.valueOf(77))); - initialData.groupBy(0).aggregate(Aggregations.MIN, 1).and(Aggregations.SUM, 2).print(); + initialData.groupBy(0).aggregate(Aggregations.MIN, 1).and(Aggregations.SUM, 2).output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java index 2fe9965a2159c..887173d8e6696 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.operators.base.CoGroupOperatorBase; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.util.Collector; @@ -54,7 +55,7 @@ public void coGroup(Iterable> first, Iterable out) {} }) - .print(); + .output(new DiscardingOutputFormat()); Plan p = env.createProgramPlan(); @@ -98,7 +99,7 @@ public void testSortTuplesAndPojos() { public void coGroup(Iterable> first, Iterable second, Collector out) {} }) - .print(); + .output(new DiscardingOutputFormat()); Plan p = env.createProgramPlan(); diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java index ae89780c33b28..f9ce82f988b6c 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java @@ -33,6 +33,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase; import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.common.functions.RichCoGroupFunction; @@ -91,7 +92,7 @@ public void testCorrectTranslation() { joined, joined.map(new NextWorksetMapper()).name(BEFORE_NEXT_WORKSET_MAP)); - result.print(); + result.output(new DiscardingOutputFormat>()); result.writeAsText("/dev/null"); } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java index b7fbb78806a28..9824ee14b7cc8 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java @@ -27,6 +27,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.DistinctOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; @@ -60,7 +61,7 @@ public void testCombinable() { public String getKey(String value) { return value; } }); - op.print(); + op.output(new DiscardingOutputFormat()); Plan p = env.createProgramPlan(); @@ -81,7 +82,7 @@ public void translateDistinctPlain() { DataSet> initialData = getSourceDataSet(env); - initialData.distinct().print(); + initialData.distinct().output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); @@ -117,7 +118,7 @@ public void translateDistinctPlain2() { DataSet initialData = getSourcePojoDataSet(env); - initialData.distinct().print(); + initialData.distinct().output(new DiscardingOutputFormat()); Plan p = env.createProgramPlan(); @@ -153,7 +154,7 @@ public void translateDistinctPosition() { DataSet> initialData = getSourceDataSet(env); - initialData.distinct(1, 2).print(); + initialData.distinct(1, 2).output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); @@ -193,7 +194,7 @@ public void translateDistinctKeySelector() { public StringValue getKey(Tuple3 value) { return value.f1; } - }).setParallelism(4).print(); + }).setParallelism(4).output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); @@ -237,7 +238,7 @@ public void translateDistinctExpressionKey() { DataSet initialData = getSourcePojoDataSet(env); - initialData.distinct("myInt").print(); + initialData.distinct("myInt").output(new DiscardingOutputFormat()); Plan p = env.createProgramPlan(); diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java index b578eb781c57a..b555844c26567 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java @@ -29,6 +29,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TupleTypeInfo; @@ -54,7 +55,7 @@ public void translateNonGroupedReduce() { public Tuple3 reduce(Tuple3 value1, Tuple3 value2) { return value1; } - }).print(); + }).output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); @@ -96,7 +97,7 @@ public Tuple3 reduce(Tuple3>()); Plan p = env.createProgramPlan(); @@ -143,7 +144,7 @@ public Tuple3 reduce(Tuple3>()); Plan p = env.createProgramPlan(); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java index 973f4027678c5..20a4ef67def88 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.DistinctOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.plan.OptimizedPlan; diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java index 32bd6e9dfaaf9..ccc9b13c1856c 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java @@ -30,7 +30,6 @@ import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.WorksetIterationPlanNode; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; -import org.apache.flink.optimizer.testfunctions.IdentityMapper; import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; @@ -42,13 +41,13 @@ public void testWorksetIterationNotDependingOnSolutionSet() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet> input = env.readCsvFile("/tmp/some.csv").types(Long.class, Long.class); + DataSet> input = env.generateSequence(1, 100).map(new Duplicator()); DeltaIteration, Tuple2> iteration = input.iterateDelta(input, 100, 1); - DataSet> iterEnd = iteration.getWorkset().map(new IdentityMapper>()); + DataSet> iterEnd = iteration.getWorkset().map(new TestMapper>()); iteration.closeWith(iterEnd, iterEnd) - .output(new DiscardingOutputFormat>()); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -64,5 +63,18 @@ public void testWorksetIterationNotDependingOnSolutionSet() { fail(e.getMessage()); } } - + + private static final class Duplicator implements MapFunction> { + @Override + public Tuple2 map(T value) { + return new Tuple2(value, value); + } + } + + private static final class TestMapper implements MapFunction { + @Override + public T map(T value) { + return value; + } + } } diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java index b4e95fbb6385a..9a609432ea7f9 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.optimizer.plan.DualInputPlanNode; @@ -57,7 +58,7 @@ public int partition(Long key, int numPartitions) { input1.partitionCustom(partitioner, 1) .join(input2.partitionCustom(partitioner, 0)) .where(1).equalTo(0) - .print(); + .output(new DiscardingOutputFormat, Tuple3>>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -102,7 +103,7 @@ public int partition(Long key, int numPartitions) { .coGroup(input2.partitionCustom(partitioner, 0)) .where(1).equalTo(0) .with(new DummyCoGroupFunction, Tuple3>()) - .print(); + .output(new DiscardingOutputFormat, Tuple3>>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java index 346e70233fffd..14d2a960e36b5 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java @@ -27,6 +27,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.optimizer.plan.DualInputPlanNode; @@ -57,7 +58,7 @@ public void testCoGroupWithTuples() { .where(1).equalTo(0) .withPartitioner(partitioner) .with(new DummyCoGroupFunction, Tuple3>()) - .print(); + .output(new DiscardingOutputFormat, Tuple3>>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -118,7 +119,7 @@ public void testCoGroupWithPojos() { .where("b").equalTo("a") .withPartitioner(partitioner) .with(new DummyCoGroupFunction()) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -180,7 +181,7 @@ public void testCoGroupWithKeySelectors() { .where(new Pojo2KeySelector()).equalTo(new Pojo3KeySelector()) .withPartitioner(partitioner) .with(new DummyCoGroupFunction()) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -251,7 +252,7 @@ public void testIncompatibleHashAndCustomPartitioning() { grouped .coGroup(partitioned).where(0).equalTo(0) .with(new DummyCoGroupFunction, Tuple3>()) - .print(); + .output(new DiscardingOutputFormat, Tuple3>>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java index 17a76590450bd..bc2eb82e64c5a 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.optimizer.plan.DualInputPlanNode; @@ -57,8 +58,8 @@ public void testJoinReduceCombination() { .withPartitioner(partitioner); joined.groupBy(1).withPartitioner(partitioner) - .reduceGroup(new IdentityGroupReducerCombinable>()) - .print(); + .reduceGroup(new IdentityGroupReducerCombinable>()) + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java index 00fd5878cce0e..1aca046d28864 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java @@ -26,6 +26,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; @@ -53,7 +54,7 @@ public void testPartitionTuples() { data .partitionCustom(part, 0) .mapPartition(new IdentityPartitionerMapper>()) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -123,7 +124,7 @@ public void testPartitionPojo() { data .partitionCustom(part, "a") .mapPartition(new IdentityPartitionerMapper()) - .print(); + .output(new DiscardingOutputFormat()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -193,7 +194,7 @@ public void testPartitionKeySelector() { data .partitionCustom(part, new TestKeySelectorInt()) .mapPartition(new IdentityPartitionerMapper()) - .print(); + .output(new DiscardingOutputFormat()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java index 23f4812f78ac9..b2bfc67365a49 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java @@ -27,6 +27,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; @@ -53,7 +54,7 @@ public void testCustomPartitioningKeySelectorReduce() { data.groupBy(new TestKeySelector>()) .withPartitioner(new TestPartitionerInt()) .reduce(new DummyReducer>()) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -84,8 +85,8 @@ public void testCustomPartitioningKeySelectorGroupReduce() { data.groupBy(new TestKeySelector>()) .withPartitioner(new TestPartitionerInt()) - .reduceGroup(new IdentityGroupReducerCombinable>()) - .print(); + .reduceGroup(new IdentityGroupReducerCombinable>()) + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -116,7 +117,7 @@ public void testCustomPartitioningKeySelectorGroupReduceSorted() { .withPartitioner(new TestPartitionerInt()) .sortGroup(new TestKeySelector>(), Order.ASCENDING) .reduceGroup(new IdentityGroupReducerCombinable>()) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java index 54033ac6becef..dc2b14767063c 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; @@ -37,26 +38,26 @@ @SuppressWarnings("serial") public class GroupingPojoTranslationTest extends CompilerTestBase { - + @Test public void testCustomPartitioningTupleReduce() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - + DataSet data = env.fromElements(new Pojo2()) .rebalance().setParallelism(4); - + data.groupBy("a").withPartitioner(new TestPartitionerInt()) - .reduce(new DummyReducer()) - .print(); - + .reduce(new DummyReducer()) + .output(new DiscardingOutputFormat()); + Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); - + SinkPlanNode sink = op.getDataSinks().iterator().next(); SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); - + assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); @@ -66,26 +67,26 @@ public void testCustomPartitioningTupleReduce() { fail(e.getMessage()); } } - + @Test public void testCustomPartitioningTupleGroupReduce() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - + DataSet data = env.fromElements(new Pojo2()) .rebalance().setParallelism(4); - + data.groupBy("a").withPartitioner(new TestPartitionerInt()) - .reduceGroup(new IdentityGroupReducerCombinable()) - .print(); - + .reduceGroup(new IdentityGroupReducerCombinable()) + .output(new DiscardingOutputFormat()); + Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); - + SinkPlanNode sink = op.getDataSinks().iterator().next(); SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); - + assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); @@ -95,27 +96,27 @@ public void testCustomPartitioningTupleGroupReduce() { fail(e.getMessage()); } } - + @Test public void testCustomPartitioningTupleGroupReduceSorted() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - + DataSet data = env.fromElements(new Pojo3()) .rebalance().setParallelism(4); - + data.groupBy("a").withPartitioner(new TestPartitionerInt()) - .sortGroup("b", Order.ASCENDING) - .reduceGroup(new IdentityGroupReducerCombinable()) - .print(); - + .sortGroup("b", Order.ASCENDING) + .reduceGroup(new IdentityGroupReducerCombinable()) + .output(new DiscardingOutputFormat()); + Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); - + SinkPlanNode sink = op.getDataSinks().iterator().next(); SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); - + assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); @@ -125,28 +126,28 @@ public void testCustomPartitioningTupleGroupReduceSorted() { fail(e.getMessage()); } } - + @Test public void testCustomPartitioningTupleGroupReduceSorted2() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - + DataSet data = env.fromElements(new Pojo4()) .rebalance().setParallelism(4); - + data.groupBy("a").withPartitioner(new TestPartitionerInt()) - .sortGroup("b", Order.ASCENDING) - .sortGroup("c", Order.DESCENDING) - .reduceGroup(new IdentityGroupReducerCombinable()) - .print(); - + .sortGroup("b", Order.ASCENDING) + .sortGroup("c", Order.DESCENDING) + .reduceGroup(new IdentityGroupReducerCombinable()) + .output(new DiscardingOutputFormat()); + Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); - + SinkPlanNode sink = op.getDataSinks().iterator().next(); SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource(); SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource(); - + assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy()); assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); @@ -156,15 +157,15 @@ public void testCustomPartitioningTupleGroupReduceSorted2() { fail(e.getMessage()); } } - + @Test public void testCustomPartitioningTupleInvalidType() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - + DataSet data = env.fromElements(new Pojo2()) .rebalance().setParallelism(4); - + try { data.groupBy("a").withPartitioner(new TestPartitionerLong()); fail("Should throw an exception"); @@ -176,19 +177,19 @@ public void testCustomPartitioningTupleInvalidType() { fail(e.getMessage()); } } - + @Test public void testCustomPartitioningTupleInvalidTypeSorted() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - + DataSet data = env.fromElements(new Pojo3()) .rebalance().setParallelism(4); - + try { data.groupBy("a") - .sortGroup("b", Order.ASCENDING) - .withPartitioner(new TestPartitionerLong()); + .sortGroup("b", Order.ASCENDING) + .withPartitioner(new TestPartitionerLong()); fail("Should throw an exception"); } catch (InvalidProgramException e) {} @@ -198,18 +199,18 @@ public void testCustomPartitioningTupleInvalidTypeSorted() { fail(e.getMessage()); } } - + @Test public void testCustomPartitioningTupleRejectCompositeKey() { try { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - + DataSet data = env.fromElements(new Pojo2()) .rebalance().setParallelism(4); - + try { data.groupBy("a", "b") - .withPartitioner(new TestPartitionerInt()); + .withPartitioner(new TestPartitionerInt()); fail("Should throw an exception"); } catch (InvalidProgramException e) {} @@ -219,39 +220,39 @@ public void testCustomPartitioningTupleRejectCompositeKey() { fail(e.getMessage()); } } - + // -------------------------------------------------------------------------------------------- - + public static class Pojo2 { public int a; public int b; - + } - + public static class Pojo3 { public int a; public int b; public int c; } - + public static class Pojo4 { public int a; public int b; public int c; public int d; } - + private static class TestPartitionerInt implements Partitioner { @Override public int partition(Integer key, int numPartitions) { return 0; } } - + private static class TestPartitionerLong implements Partitioner { @Override public int partition(Long key, int numPartitions) { return 0; } } -} +} \ No newline at end of file diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java index 49f44f55a74de..6eb5ad5de214c 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; @@ -51,7 +52,7 @@ public void testCustomPartitioningTupleAgg() { data.groupBy(0).withPartitioner(new TestPartitionerInt()) .sum(1) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -80,7 +81,7 @@ public void testCustomPartitioningTupleReduce() { data.groupBy(0).withPartitioner(new TestPartitionerInt()) .reduce(new DummyReducer>()) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -109,7 +110,7 @@ public void testCustomPartitioningTupleGroupReduce() { data.groupBy(0).withPartitioner(new TestPartitionerInt()) .reduceGroup(new IdentityGroupReducerCombinable>()) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -139,7 +140,7 @@ public void testCustomPartitioningTupleGroupReduceSorted() { data.groupBy(0).withPartitioner(new TestPartitionerInt()) .sortGroup(1, Order.ASCENDING) .reduceGroup(new IdentityGroupReducerCombinable>()) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -170,7 +171,7 @@ public void testCustomPartitioningTupleGroupReduceSorted2() { .sortGroup(1, Order.ASCENDING) .sortGroup(2, Order.DESCENDING) .reduceGroup(new IdentityGroupReducerCombinable>()) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java index ff429b85439f5..65b9756bc8d6f 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java @@ -28,6 +28,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.optimizer.plan.DualInputPlanNode; @@ -55,7 +56,7 @@ public void testJoinWithTuples() { input1 .join(input2, JoinHint.REPARTITION_HASH_FIRST).where(1).equalTo(0).withPartitioner(partitioner) - .print(); + .output(new DiscardingOutputFormat, Tuple3>>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -114,7 +115,7 @@ public void testJoinWithPojos() { input1 .join(input2, JoinHint.REPARTITION_HASH_FIRST) .where("b").equalTo("a").withPartitioner(partitioner) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -176,7 +177,7 @@ public void testJoinWithKeySelectors() { .where(new Pojo2KeySelector()) .equalTo(new Pojo3KeySelector()) .withPartitioner(partitioner) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -248,7 +249,7 @@ public void testIncompatibleHashAndCustomPartitioning() { grouped .join(partitioned, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0) .with(new DummyFlatJoinFunction>()) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java index ab83dbab5ebf2..25b17f87814be 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.fail; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; @@ -54,7 +55,7 @@ public void testExceptionWhenNewWorksetNotDependentOnWorkset() { DataSet> result = deltaIteration.closeWith(delta, nextWorkset); - result.print(); + result.output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); try { diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java index 96758b1b31927..e5b6ad52e0376 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.*; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; import org.apache.flink.api.common.Plan; @@ -48,7 +49,7 @@ public void testDistinctPreservesPartitioningOfDistinctFields() { data.distinct(0) .groupBy(0) .sum(1) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -84,7 +85,7 @@ public void testDistinctDestroysPartitioningOfNonDistinctFields() { data.distinct(1) .groupBy(0) .sum(1) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java index 8fb4ef04aa5c5..6b49dd4033e0c 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.GroupReduceOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.util.CompilerTestBase; @@ -51,7 +52,7 @@ public void testAllGroupReduceNoCombiner() { data.reduceGroup(new RichGroupReduceFunction() { public void reduce(Iterable values, Collector out) {} }).name("reducer") - .print(); + .output(new DiscardingOutputFormat()).name("sink"); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -97,7 +98,7 @@ public void reduce(Iterable values, Collector out) {} }).name("reducer"); reduced.setCombinable(true); - reduced.print(); + reduced.output(new DiscardingOutputFormat()).name("sink"); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -148,7 +149,7 @@ public void testGroupedReduceWithFieldPositionKeyNonCombinable() { .reduceGroup(new RichGroupReduceFunction, Tuple2>() { public void reduce(Iterable> values, Collector> out) {} }).name("reducer") - .print(); + .output(new DiscardingOutputFormat>()).name("sink"); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -199,7 +200,7 @@ public void reduce(Iterable> values, Collector>()).name("sink"); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -257,7 +258,7 @@ public void testGroupedReduceWithSelectorFunctionKeyNoncombinable() { .reduceGroup(new RichGroupReduceFunction, Tuple2>() { public void reduce(Iterable> values, Collector> out) {} }).name("reducer") - .print(); + .output(new DiscardingOutputFormat>()).name("sink"); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -317,7 +318,7 @@ public void reduce(Iterable> values, Collector>()).name("sink"); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java index 796d4ab5355d7..bcfb2ef134cf0 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.operators.IterativeDataSet; import org.apache.flink.api.java.tuple.Tuple2; @@ -48,7 +49,7 @@ public void testIdentityIteration() { env.setParallelism(43); IterativeDataSet iteration = env.generateSequence(-4, 1000).iterate(100); - iteration.closeWith(iteration).print(); + iteration.closeWith(iteration).output(new DiscardingOutputFormat()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -76,7 +77,7 @@ public void testEmptyWorksetIteration() { DeltaIteration, Tuple2> iter = input.iterateDelta(input, 100, 0); iter.closeWith(iter.getWorkset(), iter.getWorkset()) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -99,7 +100,7 @@ public void testIterationWithUnionRoot() { iteration.closeWith( iteration.map(new IdentityMapper()).union(iteration.map(new IdentityMapper()))) - .print(); + .output(new DiscardingOutputFormat()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -150,7 +151,7 @@ public void testWorksetIterationWithUnionRoot() { .union( iter.getWorkset().map(new IdentityMapper>())) ) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java index 14d863d77707f..b3718b01ca725 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java @@ -27,6 +27,8 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SinkPlanNode; @@ -131,7 +133,7 @@ private DualInputPlanNode createPlanAndGetJoinNode(JoinHint hint) { DataSet i1 = env.generateSequence(1, 1000); DataSet i2 = env.generateSequence(1, 1000); - i1.join(i2, hint).where(new IdentityKeySelector()).equalTo(new IdentityKeySelector()).print(); + i1.join(i2, hint).where(new IdentityKeySelector()).equalTo(new IdentityKeySelector()).output(new DiscardingOutputFormat>()); Plan plan = env.createProgramPlan(); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java index 3f18e627f97ec..e1e6b5f07786e 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.operators.IterativeDataSet; import org.apache.flink.api.java.tuple.Tuple2; @@ -45,7 +46,7 @@ public void testSinkInOpenBulkIteration() { DataSet mapped = iteration.map(new IdentityMapper()); - mapped.print(); + mapped.output(new DiscardingOutputFormat()); try { env.createProgramPlan(); @@ -72,9 +73,9 @@ public void testSinkInClosedBulkIteration() { DataSet mapped = iteration.map(new IdentityMapper()); - iteration.closeWith(mapped).print(); + iteration.closeWith(mapped).output(new DiscardingOutputFormat()); - mapped.print(); + mapped.output(new DiscardingOutputFormat()); Plan p = env.createProgramPlan(); @@ -104,7 +105,7 @@ public void testSinkOnSolutionSetDeltaIteration() { DataSet> mapped = iteration.getSolutionSet().map(new IdentityMapper>()); - mapped.print(); + mapped.output(new DiscardingOutputFormat>()); try { env.createProgramPlan(); @@ -132,7 +133,7 @@ public void testSinkOnWorksetDeltaIteration() { DataSet> mapped = iteration.getWorkset().map(new IdentityMapper>()); - mapped.print(); + mapped.output(new DiscardingOutputFormat>()); try { env.createProgramPlan(); @@ -164,7 +165,7 @@ public void testOperationOnSolutionSet() { .where(0).equalTo(0).projectFirst(1).projectSecond(0); iteration.closeWith(joined, joined) - .print(); + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); try { diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java index 9c2d0d2933f11..7f5c2090c27e0 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; @@ -49,8 +50,8 @@ public void testPartitionOperatorPreservesFields() { public int partition(Long key, int numPartitions) { return key.intValue(); } }, 1) .groupBy(1) - .reduceGroup(new IdentityGroupReducerCombinable>()) - .print(); + .reduceGroup(new IdentityGroupReducerCombinable>()) + .output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java index 2958f1a268d58..942aa472f8a00 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; @@ -53,7 +54,7 @@ public Double reduce(Double value1, Double value2){ return value1 + value2; } }).name("reducer") - .print(); + .output(new DiscardingOutputFormat()).name("sink"); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -98,7 +99,7 @@ public Long reduce(Long value1, Long value2){ return value1 + value2; } }).name("reducer") - .print(); + .output(new DiscardingOutputFormat()).name("sink"); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -151,7 +152,7 @@ public Tuple2 reduce(Tuple2 value1, Tuple2>()).name("sink"); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); @@ -211,7 +212,7 @@ public Tuple2 reduce(Tuple2 value1, Tuple2>()).name("sink"); Plan p = env.createProgramPlan(); OptimizedPlan op = compileNoStats(p); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java index 46eb48a0c4b98..3d6d90bfc4106 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.common.functions.RichGroupReduceFunction; @@ -294,7 +295,7 @@ public void reduce(Iterable> values, Collector>()); return env.createProgramPlan(); } diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java index fb7a80fce5cd4..b23bf359b11ee 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.CompilerException; import org.apache.flink.optimizer.Optimizer; @@ -48,7 +49,7 @@ public void testRejectCoGroupOnHashAndRangePartitioning() { input.coGroup(input).where(0).equalTo(0) .with(new DummyCoGroupFunction, Tuple2>()) .withParameters(cfg) - .print(); + .output(new DiscardingOutputFormat, Tuple2>>()); Plan p = env.createProgramPlan(); try { diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java index 8a4786f345904..a4e520b652145 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.CompilerException; import org.apache.flink.optimizer.Optimizer; @@ -46,7 +47,7 @@ public void testRejectJoinOnHashAndRangePartitioning() { input.join(input).where(0).equalTo(0) .withParameters(cfg) - .print(); + .output(new DiscardingOutputFormat, Tuple2>>()); Plan p = env.createProgramPlan(); try { diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala index efa1e8840b368..6cc327a91ba50 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala @@ -131,6 +131,13 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { javaEnv.getId } + + /** + * retrieves JobExecutionResult from last job execution (for "eager" print) + * @return JobExecutionResult form last job execution + */ + def getLastJobExecutionResult = javaEnv.getLastJobExecutionResult + /** * Gets the UUID by which this environment is identified, as a string. */ diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java index a1bd2e0fc6d57..b4a27b66c1c3e 100644 --- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java +++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java @@ -19,6 +19,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.java.io.CsvInputFormat; +import org.apache.flink.api.java.io.PrintingOutputFormat; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.CrossOperator.DefaultCross; import org.apache.flink.api.java.operators.CrossOperator.ProjectCross; @@ -296,7 +297,7 @@ private void createPrintSink() throws IOException { int parentID = (Integer) receiver.getRecord(true); DataSet parent = (DataSet) sets.get(parentID); boolean toError = (Boolean) receiver.getRecord(); - (toError ? parent.printToErr() : parent.print()).name("PrintSink"); + parent.output(new PrintingOutputFormat(toError)); } private void createBroadcastVariable() throws IOException { diff --git a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java index d0b01643da3cd..018daf8a961c4 100644 --- a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java +++ b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; @@ -59,7 +60,7 @@ public void testSpargelCompiler() { DataSet> initialVertices = vertexIds.map(new IdAssigner()); DataSet> result = initialVertices.runOperation(VertexCentricIteration.withPlainEdges(edges, new CCUpdater(), new CCMessager(), 100)); - result.print(); + result.output(new DiscardingOutputFormat>()); } Plan p = env.createProgramPlan("Spargel Connected Components"); @@ -134,7 +135,7 @@ public void testSpargelCompilerWithBroadcastVariable() { DataSet> result = initialVertices.runOperation(vcIter); - result.print(); + result.output(new DiscardingOutputFormat>()); } Plan p = env.createProgramPlan("Spargel Connected Components"); diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java index 28409142284dc..7189bbebde9bf 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/CollectionTestEnvironment.java @@ -25,8 +25,6 @@ public class CollectionTestEnvironment extends CollectionEnvironment { - protected JobExecutionResult latestResult; - @Override public JobExecutionResult execute() throws Exception { return execute("test job"); @@ -35,7 +33,7 @@ public JobExecutionResult execute() throws Exception { @Override public JobExecutionResult execute(String jobName) throws Exception { JobExecutionResult result = super.execute(jobName); - this.latestResult = result; + this.lastJobExecutionResult = result; return result; } diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java index 22140000f0e65..e639c80805da6 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java @@ -119,7 +119,7 @@ public void testJobWithObjectReuse() throws Exception { // call the test program try { testProgram(); - this.latestExecutionResult = env.latestResult; + this.latestExecutionResult = env.getLastJobExecutionResult(); } catch (Exception e) { System.err.println(e.getMessage()); @@ -171,7 +171,7 @@ public void testJobWithoutObjectReuse() throws Exception { // call the test program try { testProgram(); - this.latestExecutionResult = env.latestResult; + this.latestExecutionResult = env.getLastJobExecutionResult(); } catch (Exception e) { System.err.println(e.getMessage()); @@ -224,7 +224,7 @@ public void testJobCollectionExecution() throws Exception { // call the test program try { testProgram(); - this.latestExecutionResult = env.latestResult; + this.latestExecutionResult = env.getLastJobExecutionResult(); } catch (Exception e) { System.err.println(e.getMessage()); diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java index cf1caeb1db9d8..25f2c83af1173 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java @@ -36,9 +36,6 @@ public class TestEnvironment extends ExecutionEnvironment { private final ForkableFlinkMiniCluster executor; - protected JobExecutionResult latestResult; - - public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism) { this.executor = executor; setParallelism(parallelism); @@ -54,8 +51,8 @@ public JobExecutionResult execute(String jobName) throws Exception { SerializedJobExecutionResult result = executor.submitJobAndWait(jobGraph, false); - this.latestResult = result.toJobExecutionResult(getClass().getClassLoader()); - return this.latestResult; + this.lastJobExecutionResult = result.toJobExecutionResult(getClass().getClassLoader()); + return this.lastJobExecutionResult; } catch (Exception e) { System.err.println(e.getMessage()); diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java index 27c164405896b..aea448fd614e0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichJoinFunction; import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.runtime.operators.DriverStrategy; @@ -58,8 +59,8 @@ public void testMultiSolutionSetJoinPlan() { DataSet> result = constructPlan(inputData, 10); // add two sinks, to test the case of branching after an iteration - result.print(); - result.print(); + result.output(new DiscardingOutputFormat>()); + result.output(new DiscardingOutputFormat>()); Plan p = env.createProgramPlan(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java index 4edd68ec0fab6..a3b757251e1fa 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.fail; import org.apache.flink.api.common.Plan; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.plan.BulkIterationPlanNode; @@ -85,7 +86,7 @@ public void testPageRank() { // termination condition .filter(new EpsilonFilter())); - finalPageRanks.print(); + finalPageRanks.output(new DiscardingOutputFormat>()); // get the plan and compile it Plan p = env.createProgramPlan(); diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala index 25cc089ba72ad..2775d093b40c2 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.scala +import org.apache.flink.api.java.io.DiscardingOutputFormat import org.junit.Test import org.apache.flink.api.common.InvalidProgramException @@ -37,7 +38,7 @@ class DeltaIterationSanityCheckTest extends Serializable { (result, ws) } - iteration.print() + iteration.output(new DiscardingOutputFormat[(Int, String)]) } @Test @@ -51,7 +52,7 @@ class DeltaIterationSanityCheckTest extends Serializable { (result, ws) } - iteration.print() + iteration.output(new DiscardingOutputFormat[(Int,String)]) } @Test(expected = classOf[InvalidProgramException]) @@ -65,7 +66,7 @@ class DeltaIterationSanityCheckTest extends Serializable { (result, ws) } - iteration.print() + iteration.output(new DiscardingOutputFormat[(Int,String)]) } @Test(expected = classOf[InvalidProgramException]) @@ -79,7 +80,8 @@ class DeltaIterationSanityCheckTest extends Serializable { (result, ws) } - iteration.print() } + iteration.output(new DiscardingOutputFormat[(Int,String)]) + } @Test(expected = classOf[InvalidProgramException]) def testIncorrectJoinWithSolution3(): Unit = { @@ -92,7 +94,7 @@ class DeltaIterationSanityCheckTest extends Serializable { (result, ws) } - iteration.print() + iteration.output(new DiscardingOutputFormat[(Int,String)]) } @Test @@ -106,7 +108,7 @@ class DeltaIterationSanityCheckTest extends Serializable { (result, ws) } - iteration.print() + iteration.output(new DiscardingOutputFormat[(Int,String)]) } @Test @@ -120,7 +122,7 @@ class DeltaIterationSanityCheckTest extends Serializable { (result, ws) } - iteration.print() + iteration.output(new DiscardingOutputFormat[(Int,String)]) } @Test(expected = classOf[InvalidProgramException]) @@ -134,7 +136,7 @@ class DeltaIterationSanityCheckTest extends Serializable { (result, ws) } - iteration.print() + iteration.output(new DiscardingOutputFormat[(Int,String)]) } @Test(expected = classOf[InvalidProgramException]) @@ -148,7 +150,8 @@ class DeltaIterationSanityCheckTest extends Serializable { (result, ws) } - iteration.print() } + iteration.output(new DiscardingOutputFormat[(Int,String)]) + } @Test(expected = classOf[InvalidProgramException]) def testIncorrectCoGroupWithSolution3(): Unit = { @@ -161,6 +164,6 @@ class DeltaIterationSanityCheckTest extends Serializable { (result, ws) } - iteration.print() + iteration.output(new DiscardingOutputFormat[(Int,String)]) } } diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala index 3fefa01a829c3..97a0f8726a228 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.scala.compiler +import org.apache.flink.api.java.io.DiscardingOutputFormat import org.apache.flink.optimizer.util.CompilerTestBase import org.junit.Test import org.junit.Assert._ @@ -38,8 +39,8 @@ class PartitionOperatorTranslationTest extends CompilerTestBase { def partition(key: Long, numPartitions: Int): Int = key.intValue() }, 1) .groupBy(1).reduceGroup( x => x) - .print() - + .output(new DiscardingOutputFormat[Iterator[(Long, Long)]]) + val p = env.createProgramPlan() val op = compileNoStats(p) diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala index eecc347e739e8..cc2c81e7c158a 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.scala.functions +import org.apache.flink.api.java.io.DiscardingOutputFormat import org.junit.Assert._ import org.apache.flink.api.common.functions.RichJoinFunction import org.apache.flink.api.common.functions.RichMapFunction @@ -46,7 +47,8 @@ class SemanticPropertiesTranslationTest { val env = ExecutionEnvironment.getExecutionEnvironment val input = env.fromElements((3L, "test", 42)) - input.map(new WildcardForwardMapper[(Long, String, Int)]).print() + input.map(new WildcardForwardMapper[(Long, String, Int)]) + .output(new DiscardingOutputFormat[(Long, String, Int)]) val plan = env.createProgramPlan() @@ -83,7 +85,8 @@ class SemanticPropertiesTranslationTest { val env = ExecutionEnvironment.getExecutionEnvironment val input = env.fromElements((3L, "test", 42)) - input.map(new IndividualForwardMapper[Long, String, Int]).print() + input.map(new IndividualForwardMapper[Long, String, Int]) + .output(new DiscardingOutputFormat[(Long, String, Int)]) val plan = env.createProgramPlan() @@ -120,7 +123,8 @@ class SemanticPropertiesTranslationTest { val env = ExecutionEnvironment.getExecutionEnvironment val input = env.fromElements((3L, "test", 42)) - input.map(new FieldTwoForwardMapper[Long, String, Int]).print() + input.map(new FieldTwoForwardMapper[Long, String, Int]) + .output(new DiscardingOutputFormat[(Long, String, Int)]) val plan = env.createProgramPlan() @@ -160,7 +164,8 @@ class SemanticPropertiesTranslationTest { val input2 = env.fromElements((3L, 3.1415)) input1.join(input2).where(0).equalTo(0)( - new ForwardingTupleJoin[Long, String, Long, Double]).print() + new ForwardingTupleJoin[Long, String, Long, Double]) + .output(new DiscardingOutputFormat[(String, Long)]) val plan = env.createProgramPlan() val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next @@ -204,7 +209,8 @@ class SemanticPropertiesTranslationTest { val input2 = env.fromElements((3L, 42)) input1.join(input2).where(0).equalTo(0)( - new ForwardingBasicJoin[(Long, String), (Long, Int)]).print() + new ForwardingBasicJoin[(Long, String), (Long, Int)]) + .output(new DiscardingOutputFormat[((Long, String), (Long, Int))]) val plan = env.createProgramPlan() val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala index 425cff657e939..6babbe7992b26 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala @@ -23,6 +23,7 @@ import org.apache.flink.api.common.operators.GenericDataSourceBase import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase import org.apache.flink.api.java.aggregation.Aggregations +import org.apache.flink.api.java.io.DiscardingOutputFormat import org.apache.flink.api.scala._ import org.junit.Assert.{assertEquals, assertTrue, fail} import org.junit.Test @@ -37,7 +38,8 @@ class AggregateTranslationTest { val initialData = env.fromElements((3.141592, "foobar", 77L)) - initialData.groupBy(0).aggregate(Aggregations.MIN, 1).and(Aggregations.SUM, 2).print() + initialData.groupBy(0).aggregate(Aggregations.MIN, 1).and(Aggregations.SUM, 2) + .output(new DiscardingOutputFormat[(Double, String, Long)]) val p: Plan = env.createProgramPlan() val sink = p.getDataSinks.iterator.next @@ -55,6 +57,7 @@ class AggregateTranslationTest { System.err.println(e.getMessage) e.printStackTrace() fail("Test caused an error: " + e.getMessage) + } } } diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala index 8d75f2e525a9d..4d85c5863f592 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.scala.operators.translation +import org.apache.flink.api.java.io.DiscardingOutputFormat import org.apache.flink.optimizer.util.CompilerTestBase import org.junit.Assert._ import org.junit.Test @@ -46,7 +47,7 @@ class CoGroupCustomPartitioningTest extends CompilerTestBase { .coGroup(input2) .where(1).equalTo(0) .withPartitioner(partitioner) - .print() + .output(new DiscardingOutputFormat[(Array[(Long, Long)], Array[(Long, Long, Long)])]) val p = env.createProgramPlan() val op = compileNoStats(p) @@ -110,7 +111,7 @@ class CoGroupCustomPartitioningTest extends CompilerTestBase { .coGroup(input2) .where("b").equalTo("a") .withPartitioner(partitioner) - .print() + .output(new DiscardingOutputFormat[(Array[Pojo2], Array[Pojo3])]) val p = env.createProgramPlan() val op = compileNoStats(p) @@ -174,7 +175,7 @@ class CoGroupCustomPartitioningTest extends CompilerTestBase { .coGroup(input2) .where( _.a ).equalTo( _.b ) .withPartitioner(partitioner) - .print() + .output(new DiscardingOutputFormat[(Array[Pojo2], Array[Pojo3])]) val p = env.createProgramPlan() val op = compileNoStats(p) diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala index 8d816ee01a206..a0f93dd5dcb35 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.scala.operators.translation +import org.apache.flink.api.java.io.DiscardingOutputFormat import org.apache.flink.optimizer.util.CompilerTestBase import org.junit.Assert._ import org.junit.Test @@ -49,7 +50,7 @@ class CoGroupGroupSortTranslationTest { .sortSecondGroup(1, Order.ASCENDING).sortSecondGroup(0, Order.DESCENDING) { (first, second) => first.buffered.head } - .print() + .output(new DiscardingOutputFormat[(Long, Long)]) val p = env.createProgramPlan() @@ -92,7 +93,7 @@ class CoGroupGroupSortTranslationTest { .sortSecondGroup("c", Order.ASCENDING).sortSecondGroup("a", Order.DESCENDING) { (first, second) => first.buffered.head } - .print() + .output(new DiscardingOutputFormat[(Long, Long)]) val p = env.createProgramPlan() diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala index 395f36a8cbcca..f81cb84985ed7 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.scala.operators.translation +import org.apache.flink.api.java.io.DiscardingOutputFormat import org.apache.flink.optimizer.util.CompilerTestBase import org.junit.Assert._ import org.junit.Test @@ -40,7 +41,7 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase { data .groupBy( _._1 ).withPartitioner(new TestPartitionerInt()) .reduce( (a,b) => a ) - .print() + .output(new DiscardingOutputFormat[(Int, Int)]) val p = env.createProgramPlan() val op = compileNoStats(p) @@ -73,7 +74,7 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase { data .groupBy( _._1 ).withPartitioner(new TestPartitionerInt()) .reduce( (a, b) => a) - .print() + .output(new DiscardingOutputFormat[(Int, Int)]) val p = env.createProgramPlan() val op = compileNoStats(p) @@ -107,7 +108,7 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase { .withPartitioner(new TestPartitionerInt()) .sortGroup(1, Order.ASCENDING) .reduce( (a,b) => a) - .print() + .output(new DiscardingOutputFormat[(Int, Int, Int)]) val p = env.createProgramPlan() val op = compileNoStats(p) @@ -141,7 +142,7 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase { .withPartitioner(new TestPartitionerInt()) .sortGroup(_._2, Order.ASCENDING) .reduce( (a,b) => a) - .print() + .output(new DiscardingOutputFormat[(Int, Int, Int)]) val p = env.createProgramPlan() val op = compileNoStats(p) @@ -175,7 +176,7 @@ class CustomPartitioningGroupingKeySelectorTest extends CompilerTestBase { .sortGroup(1, Order.ASCENDING) .sortGroup(2, Order.DESCENDING) .reduce( (a,b) => a) - .print() + .output(new DiscardingOutputFormat[(Int, Int, Int, Int)]) val p = env.createProgramPlan() val op = compileNoStats(p) diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala index a02d2af617b07..6e40ea5a2643c 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.scala.operators.translation +import org.apache.flink.api.java.io.DiscardingOutputFormat import org.apache.flink.optimizer.util.CompilerTestBase import org.junit.Assert._ import org.junit.Test @@ -41,7 +42,7 @@ class CustomPartitioningGroupingPojoTest extends CompilerTestBase { data .groupBy("a").withPartitioner(new TestPartitionerInt()) .reduce( (a,b) => a ) - .print() + .output(new DiscardingOutputFormat[Pojo2]) val p = env.createProgramPlan() val op = compileNoStats(p) @@ -72,7 +73,7 @@ class CustomPartitioningGroupingPojoTest extends CompilerTestBase { data .groupBy("a").withPartitioner(new TestPartitionerInt()) .reduceGroup( iter => Seq(iter.next) ) - .print() + .output(new DiscardingOutputFormat[Seq[Pojo2]]) val p = env.createProgramPlan() val op = compileNoStats(p) @@ -102,7 +103,7 @@ class CustomPartitioningGroupingPojoTest extends CompilerTestBase { .groupBy("a").withPartitioner(new TestPartitionerInt()) .sortGroup("b", Order.ASCENDING) .reduceGroup( iter => Seq(iter.next) ) - .print() + .output(new DiscardingOutputFormat[Seq[Pojo3]]) val p = env.createProgramPlan() val op = compileNoStats(p) @@ -133,7 +134,7 @@ class CustomPartitioningGroupingPojoTest extends CompilerTestBase { .sortGroup("b", Order.ASCENDING) .sortGroup("c", Order.DESCENDING) .reduceGroup( iter => Seq(iter.next) ) - .print() + .output(new DiscardingOutputFormat[Seq[Pojo4]]) val p = env.createProgramPlan() val op = compileNoStats(p) diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala index 25efe486244fc..b103e9cff2c01 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.scala.operators.translation +import org.apache.flink.api.java.io.DiscardingOutputFormat import org.junit.Assert._ import org.junit.Test @@ -42,7 +43,7 @@ class CustomPartitioningGroupingTupleTest extends CompilerTestBase { data.groupBy(0) .withPartitioner(new TestPartitionerInt()) .sum(1) - .print() + .output(new DiscardingOutputFormat[(Int, Int)]) val p = env.createProgramPlan() val op = compileNoStats(p) @@ -73,7 +74,7 @@ class CustomPartitioningGroupingTupleTest extends CompilerTestBase { data .groupBy(0).withPartitioner(new TestPartitionerInt()) .reduce( (a,b) => a ) - .print() + .output(new DiscardingOutputFormat[(Int, Int)]) val p = env.createProgramPlan() val op = compileNoStats(p) @@ -104,7 +105,7 @@ class CustomPartitioningGroupingTupleTest extends CompilerTestBase { data .groupBy(0).withPartitioner(new TestPartitionerInt()) .reduceGroup( iter => Seq(iter.next) ) - .print() + .output(new DiscardingOutputFormat[Seq[(Int, Int)]]) val p = env.createProgramPlan() val op = compileNoStats(p) @@ -134,7 +135,7 @@ class CustomPartitioningGroupingTupleTest extends CompilerTestBase { .groupBy(0).withPartitioner(new TestPartitionerInt()) .sortGroup(1, Order.ASCENDING) .reduceGroup( iter => Seq(iter.next) ) - .print() + .output(new DiscardingOutputFormat[Seq[(Int, Int, Int)]]) val p = env.createProgramPlan() val op = compileNoStats(p) @@ -165,7 +166,7 @@ class CustomPartitioningGroupingTupleTest extends CompilerTestBase { .sortGroup(1, Order.ASCENDING) .sortGroup(2, Order.DESCENDING) .reduceGroup( iter => Seq(iter.next) ) - .print() + .output(new DiscardingOutputFormat[Seq[(Int, Int, Int, Int)]]) val p = env.createProgramPlan() val op = compileNoStats(p) diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala index fe3037624bcb8..7ebf378c96471 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.scala.operators.translation +import org.apache.flink.api.java.io.DiscardingOutputFormat import org.apache.flink.api.scala._ import org.apache.flink.optimizer.util.CompilerTestBase import org.junit.Test @@ -43,7 +44,7 @@ class CustomPartitioningTest extends CompilerTestBase { data.partitionCustom(part, 0) .mapPartition( x => x ) - .print() + .output(new DiscardingOutputFormat[(Int, Int)]) val p = env.createProgramPlan() val op = compileNoStats(p) @@ -113,7 +114,7 @@ class CustomPartitioningTest extends CompilerTestBase { data .partitionCustom(part, "a") .mapPartition( x => x) - .print() + .output(new DiscardingOutputFormat[Pojo]) val p = env.createProgramPlan() val op = compileNoStats(p) @@ -184,7 +185,7 @@ class CustomPartitioningTest extends CompilerTestBase { data .partitionCustom(part, pojo => pojo.a) .mapPartition( x => x) - .print() + .output(new DiscardingOutputFormat[Pojo]) val p = env.createProgramPlan() val op = compileNoStats(p) diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala index 6aa4d75859b43..9a400c5357a2c 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala @@ -20,6 +20,7 @@ package org.apache.flink.api.scala.operators.translation import org.apache.flink.api.common.functions.{RichCoGroupFunction, RichMapFunction, RichJoinFunction} import org.apache.flink.api.common.operators.GenericDataSinkBase +import org.apache.flink.api.java.io.DiscardingOutputFormat import org.apache.flink.api.java.operators.translation.WrappingFunction import org.junit.Assert.assertArrayEquals import org.junit.Assert.assertEquals @@ -67,7 +68,7 @@ class DeltaIterationTranslationTest { .setParallelism(ITERATION_PARALLELISM) .registerAggregator(AGGREGATOR_NAME, new LongSumAggregator) - result.print() + result.output(new DiscardingOutputFormat[(Double, Long, String)]) result.writeAsText("/dev/null") val p: Plan = env.createProgramPlan(JOB_NAME) diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala index 783640072c2b9..c540f6182ae1d 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.scala.operators.translation import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase +import org.apache.flink.api.java.io.DiscardingOutputFormat import org.junit.Assert import org.junit.Test @@ -31,7 +32,7 @@ class DistinctTranslationTest { val input = env.fromElements("1", "2", "1", "3") val op = input.distinct { x => x} - op.print() + op.output(new DiscardingOutputFormat[String]) val p = env.createProgramPlan() diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala index 2467596b6ea0c..eae3db161777a 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.scala.operators.translation +import org.apache.flink.api.java.io.DiscardingOutputFormat import org.apache.flink.optimizer.util.CompilerTestBase import org.junit.Assert._ import org.junit.Test @@ -46,7 +47,7 @@ class JoinCustomPartitioningTest extends CompilerTestBase { .join(input2, JoinHint.REPARTITION_HASH_FIRST) .where(1).equalTo(0) .withPartitioner(partitioner) - .print() + .output(new DiscardingOutputFormat[((Long, Long), (Long, Long, Long))]) val p = env.createProgramPlan() val op = compileNoStats(p) @@ -110,7 +111,7 @@ class JoinCustomPartitioningTest extends CompilerTestBase { .join(input2, JoinHint.REPARTITION_HASH_FIRST) .where("b").equalTo("a") .withPartitioner(partitioner) - .print() + .output(new DiscardingOutputFormat[(Pojo2, Pojo3)]) val p = env.createProgramPlan() val op = compileNoStats(p) @@ -174,7 +175,7 @@ class JoinCustomPartitioningTest extends CompilerTestBase { .join(input2, JoinHint.REPARTITION_HASH_FIRST) .where( _.a ).equalTo( _.b ) .withPartitioner(partitioner) - .print() + .output(new DiscardingOutputFormat[(Pojo2, Pojo3)]) val p = env.createProgramPlan() val op = compileNoStats(p) diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala index e97fc218153d1..5d3878c28e084 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.scala.operators.translation import org.apache.flink.api.common.operators.{GenericDataSourceBase, GenericDataSinkBase} +import org.apache.flink.api.java.io.DiscardingOutputFormat import org.apache.flink.api.java.operators.translation.{KeyExtractingMapper, PlanUnwrappingReduceOperator} import org.apache.flink.api.common.typeinfo.BasicTypeInfo @@ -39,7 +40,8 @@ class ReduceTranslationTest { val initialData = env.fromElements((3.141592, "foobar", 77L)).setParallelism(1) - initialData reduce { (v1, v2) => v1 } print() + initialData reduce { (v1, v2) => v1 } output( + new DiscardingOutputFormat[(Double, String, Long)]) val p = env.createProgramPlan( @@ -70,7 +72,8 @@ class ReduceTranslationTest { val initialData = env.fromElements((3.141592, "foobar", 77L)).setParallelism(1) - initialData.groupBy(2) reduce { (v1, v2) => v1 } print() + initialData.groupBy(2) reduce { (v1, v2) => v1 } output( + new DiscardingOutputFormat[(Double, String, Long)]) val p = env.createProgramPlan() @@ -99,7 +102,8 @@ class ReduceTranslationTest { val initialData = env.fromElements((3.141592, "foobar", 77L)).setParallelism(1) - initialData.groupBy { _._2 }. reduce { (v1, v2) => v1 } setParallelism(4) print() + initialData.groupBy { _._2 }. reduce { (v1, v2) => v1 } setParallelism(4) output( + new DiscardingOutputFormat[(Double, String, Long)]) val p = env.createProgramPlan() val sink: GenericDataSinkBase[_] = p.getDataSinks.iterator.next diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index 07300dad63ece..e22e0efdc4da3 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus; +import org.apache.flink.test.testdata.WordCountData; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.yarn.appMaster.YarnTaskManagerRunner; import org.apache.hadoop.fs.Path; @@ -445,13 +446,32 @@ private void testDetachedPerJobYarnClusterInternal(String job) { yc.init(yarnConfiguration); yc.start(); + // get temporary folder for writing output of wordcount example + File tmpOutFolder = null; + try{ + tmpOutFolder = tmp.newFolder(); + } + catch(IOException e) { + throw new RuntimeException(e); + } + + // get temporary file for reading input data for wordcount example + File tmpInFile = null; + try{ + tmpInFile = tmp.newFile(); + FileUtils.writeStringToFile(tmpInFile,WordCountData.TEXT); + } + catch(IOException e) { + throw new RuntimeException(e); + } + Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(), "-yn", "1", "-yjm", "512", "-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly "-ytm", "1024", "-ys", "2", // test requesting slots from YARN. - "--yarndetached", job}, + "--yarndetached", job, tmpInFile.getAbsoluteFile().toString() , tmpOutFolder.getAbsoluteFile().toString()}, "The Job has been submitted with JobID", RunTypes.CLI_FRONTEND); @@ -490,19 +510,26 @@ public int compare(ApplicationReport o1, ApplicationReport o2) { final ApplicationId id = tmpAppId; // now it has finished. - // check the output. - File taskmanagerOut = YarnTestBase.findFile("..", new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.contains("taskmanager") && name.contains("stdout") && dir.getAbsolutePath().contains(id.toString()); + // check the output files. + File[] listOfOutputFiles = tmpOutFolder.listFiles(); + + + Assert.assertNotNull("Taskmanager output not found", listOfOutputFiles); + LOG.info("The job has finished. TaskManager output files found in {}", tmpOutFolder ); + + // read all output files in output folder to one output string + String content = ""; + for(File f:listOfOutputFiles) + { + if(f.isFile()) + { + content += FileUtils.readFileToString(f) + "\n"; } - }); - Assert.assertNotNull("Taskmanager output not found", taskmanagerOut); - LOG.info("The job has finished. TaskManager output file found {}", taskmanagerOut.getAbsolutePath()); - String content = FileUtils.readFileToString(taskmanagerOut); + } + //String content = FileUtils.readFileToString(taskmanagerOut); // check for some of the wordcount outputs. - Assert.assertTrue("Expected string '(all,2)' not found in string '"+content+"'", content.contains("(all,2)")); - Assert.assertTrue("Expected string '(mind,1)' not found in string'"+content+"'", content.contains("(mind,1)")); + Assert.assertTrue("Expected string 'da 5' or '(all,2)' not found in string '"+content+"'", content.contains("da 5") || content.contains("(da,5)") || content.contains("(all,2)")); + Assert.assertTrue("Expected string 'der 29' or '(mind,1)' not found in string'"+content+"'",content.contains("der 29") || content.contains("(der,29)") || content.contains("(mind,1)")); // check if the heap size for the TaskManager was set correctly File jobmanagerLog = YarnTestBase.findFile("..", new FilenameFilter() {