Skip to content

Commit

Permalink
[FLINK-1418] [apis] Fix eager print() and adjust all tests and exampl…
Browse files Browse the repository at this point in the history
…es to not fail due to "eager" print method

 - Add lastJobExecutionResult for getting the result of the last execution, when executing "eager" execution methods

This closes apache#699
  • Loading branch information
nikste authored and StephanEwen committed May 21, 2015
1 parent e9c1562 commit 78d954b
Show file tree
Hide file tree
Showing 79 changed files with 396 additions and 294 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ public JobExecutionResult execute(String jobName) throws Exception {

JobSubmissionResult result = this.client.run(toRun, getParallelism(), wait);
if(result instanceof JobExecutionResult) {
this.lastJobExecutionResult = (JobExecutionResult) result;
return (JobExecutionResult) result;
} else {
LOG.warn("The Client didn't return a JobExecutionResult");
return new JobExecutionResult(result.getJobID(), -1, null);
this.lastJobExecutionResult = new JobExecutionResult(result.getJobID(), -1, null);
return this.lastJobExecutionResult;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ public static void main(String... args) throws Exception {
// emit result
if(fileOutput) {
result.writeAsCsv(outputPath, "\n", " ");

// execute program
env.execute("Connected Components Example");
} else {
result.print();
}

// execute program
env.execute("Connected Components Example");
}

// *************************************************************************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ public static void main(String[] args) throws Exception {
// emit result
if(fileOutput) {
triangles.writeAsCsv(outputPath, "\n", ",");

// execute program
env.execute("Basic Triangle Enumeration Example");
} else {
triangles.print();
}

// execute program
env.execute("Basic Triangle Enumeration Example");

}

// *************************************************************************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,12 @@ public static void main(String[] args) throws Exception {
// emit result
if(fileOutput) {
triangles.writeAsCsv(outputPath, "\n", ",");
// execute program
env.execute("Triangle Enumeration Example");
} else {
triangles.print();
}

// execute program
env.execute("Triangle Enumeration Example");


}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ public static void main(String[] args) throws Exception {
// emit result
if(fileOutput) {
finalPageRanks.writeAsCsv(outputPath, "\n", " ");
// execute program
env.execute("Basic Page Rank Example");
} else {
finalPageRanks.print();
}

// execute program
env.execute("Basic Page Rank Example");

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ public Double map(Long value) {

System.out.println("We estimate Pi to be:");
pi.print();

env.execute();
}

//*************************************************************************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,13 @@ public static void main(String[] args) throws Exception{
// emit result
if(fileOutput) {
result.writeAsText(outputPath);
// execute program
env.execute("Linear Regression example");
} else {
result.print();
}

// execute program
env.execute("Linear Regression example");


}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,17 @@ public static void main(final String[] args) throws Exception {
final DataSet<Tuple> filteredLines = file.filter(new EmptyFieldFilter());

// Here, we could do further processing with the filtered lines...

JobExecutionResult result = null;
// output the filtered lines
if (outputPath == null) {
filteredLines.print();
result = env.getLastJobExecutionResult();
} else {
filteredLines.writeAsCsv(outputPath);
// execute program
result = env.execute("Accumulator example");
}

// execute program
final JobExecutionResult result = env.execute("Accumulator example");

// get the accumulator result via its registration key
final List<Integer> emptyFields = result.getAccumulatorResult(EMPTY_FIELD_ACCUMULATOR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,11 @@ public static void main(String[] args) throws Exception {
// emit result
if(fileOutput) {
result.writeAsCsv(outputPath, "\n", "|");
// execute program
env.execute("WebLogAnalysis Example");
} else {
result.print();
}

// execute program
env.execute("WebLogAnalysis Example");

}

// *************************************************************************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,12 @@ public Word reduce(Word value1, Word value2) throws Exception {

if(fileOutput) {
counts.writeAsText(outputPath, WriteMode.OVERWRITE);
// execute program
env.execute("WordCount-Pojo Example");
} else {
counts.print();
}

// execute program
env.execute("WordCount-Pojo Example");

}

// *************************************************************************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,13 @@ public static void main(String[] args) throws Exception {
// emit result
if(fileOutput) {
counts.writeAsCsv(outputPath, "\n", " ");
// execute program
env.execute("WordCount Example");
} else {
counts.print();
}

// execute program
env.execute("WordCount Example");

}

// *************************************************************************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ object KMeans {

if (fileOutput) {
clusteredPoints.writeAsCsv(outputPath, "\n", " ")
env.execute("Scala KMeans Example")
}
else {
clusteredPoints.print()
}

env.execute("Scala KMeans Example")
}

private def parseParameters(programArguments: Array[String]): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ object ConnectedComponents {
}
if (fileOutput) {
verticesWithComponents.writeAsCsv(outputPath, "\n", " ")
env.execute("Scala Connected Components Example")
} else {
verticesWithComponents.print()
}

env.execute("Scala Connected Components Example")
}

private def parseParameters(args: Array[String]): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,5 @@ object DeltaPageRank {

iteration.print()

env.execute("Page Rank")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,13 @@ object EnumTrianglesBasic {
// emit result
if (fileOutput) {
triangles.writeAsCsv(outputPath, "\n", ",")
// execute program
env.execute("TriangleEnumeration Example")
} else {
triangles.print()
}

// execute program
env.execute("TriangleEnumeration Example")

}

// *************************************************************************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,12 @@ object EnumTrianglesOpt {
// emit result
if (fileOutput) {
triangles.writeAsCsv(outputPath, "\n", ",")
// execute program
env.execute("TriangleEnumeration Example")
} else {
triangles.print()
}

// execute program
env.execute("TriangleEnumeration Example")
}

// *************************************************************************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,11 @@ object PageRankBasic {
// emit result
if (fileOutput) {
result.writeAsCsv(outputPath, "\n", " ")
// execute program
env.execute("Basic PageRank Example")
} else {
result.print()
}

// execute program
env.execute("Basic PageRank Example")
}

// *************************************************************************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ object TransitiveClosureNaive {

if (fileOutput) {
paths.writeAsCsv(outputPath, "\n", " ")
env.execute("Scala Transitive Closure Example")
} else {
paths.print()
}

env.execute("Scala Transitive Closure Example")


}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ object PiEstimation {
println("We estimate Pi to be:")

pi.print()

env.execute("PiEstimation example")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ object LinearRegression {

if (fileOutput) {
result.writeAsText(outputPath)
env.execute("Scala Linear Regression example")
}
else {
result.print()
}
env.execute("Scala Linear Regression example")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,11 @@ object WebLogAnalysis {
// emit result
if (fileOutput) {
result.writeAsCsv(outputPath, "\n", "|")
env.execute("Scala WebLogAnalysis Example")
} else {
result.print()
}

env.execute("Scala WebLogAnalysis Example")
}

private var fileOutput: Boolean = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ object WordCount {

if (fileOutput) {
counts.writeAsCsv(outputPath, "\n", " ")
env.execute("Scala WordCount Example")
} else {
counts.print()
}

env.execute("Scala WordCount Example")
}

private def parseParameters(args: Array[String]): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public JobExecutionResult execute(String jobName) throws Exception {

// We need to reverse here. Object-Reuse enabled, means safe mode is disabled.
CollectionExecutor exec = new CollectionExecutor(getConfig());
return exec.execute(p);
this.lastJobExecutionResult = exec.execute(p);
return this.lastJobExecutionResult;
}

/**
Expand Down
26 changes: 9 additions & 17 deletions flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.flink.api.java.functions.SelectByMinFunction;
import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.io.PrintingOutputFormat;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.io.TextOutputFormat.TextFormatter;
import org.apache.flink.api.java.operators.AggregateOperator;
Expand Down Expand Up @@ -82,7 +83,6 @@
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.ExceptionUtils;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -1337,14 +1337,10 @@ private <X extends Tuple> DataSink<T> internalWriteAsCsv(Path filePath, String r
* For each element of the DataSet the result of {@link Object#toString()} is written.
* This triggers execute() automatically.
*/
public void print() {
try {
List<T> elements = this.collect();
for (T e: elements) {
System.out.println(e);
}
} catch (Exception e) {
System.out.println("Could not retrieve values for printing: " + ExceptionUtils.stringifyException(e));
public void print() throws Exception{
List<T> elements = this.collect();
for (T e: elements) {
System.out.println(e);
}
}

Expand All @@ -1363,14 +1359,10 @@ public DataSink<T> print(String sinkIdentifier) {
* Writes a DataSet to the standard error stream (stderr).<br/>
* For each element of the DataSet the result of {@link Object#toString()} is written.
*/
public void printToErr() {
try {
List<T> elements = this.collect();
for (T e: elements) {
System.err.println(e);
}
} catch (Exception e) {
System.err.println("Could not retrieve values for printing: " + e);
public void printToErr() throws Exception{
List<T> elements = this.collect();
for (T e: elements) {
System.err.println(e);
}
}

Expand Down
Loading

0 comments on commit 78d954b

Please sign in to comment.