Skip to content

Commit

Permalink
[FLINK-1974] Fix getNetRuntime() of JobExecutionResult and add docume…
Browse files Browse the repository at this point in the history
…ntation

- Fix JobInfo to report milliseconds
- Added documentation to indicate that the return type is in milliseconds
- Added an getNetRuntime method which accepts a desired time unit for easy conversion

This closes apache#652
  • Loading branch information
jkirsch authored and fhueske committed May 9, 2015
1 parent 170366c commit 7e5a970
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ protected int executeProgram(PackagedProgram program, Client client, int paralle
}
if (execResult instanceof JobExecutionResult) {
JobExecutionResult result = (JobExecutionResult) execResult;
System.out.println("Job Runtime: " + result.getNetRuntime());
System.out.println("Job Runtime: " + result.getNetRuntime() + " ms");
Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
if (accumulatorsResult.size() > 0) {
System.out.println("Accumulator Results: ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.api.common;

import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* The result of a job execution. Gives access to the execution time of the job,
Expand All @@ -34,7 +35,7 @@ public class JobExecutionResult extends JobSubmissionResult {
* Creates a new JobExecutionResult.
*
* @param jobID The job's ID.
* @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer)
* @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer) in milliseconds
* @param accumulators A map of all accumulators produced by the job.
*/
public JobExecutionResult(JobID jobID, long netRuntime, Map<String, Object> accumulators) {
Expand All @@ -47,12 +48,23 @@ public JobExecutionResult(JobID jobID, long netRuntime, Map<String, Object> accu
* Gets the net execution time of the job, i.e., the execution time in the parallel system,
* without the pre-flight steps like the optimizer.
*
* @return The net execution time.
* @return The net execution time in milliseconds.
*/
public long getNetRuntime() {
return this.netRuntime;
}

/**
* Gets the net execution time of the job, i.e., the execution time in the parallel system,
* without the pre-flight steps like the optimizer in a desired time unit.
*
* @param desiredUnit the unit of the <tt>NetRuntime</tt>
* @return The net execution time in the desired unit.
*/
public long getNetRuntime(TimeUnit desiredUnit) {
return desiredUnit.convert(getNetRuntime(), TimeUnit.MILLISECONDS);
}

/**
* Gets the accumulator with the given name. Returns {@code null}, if no accumulator with
* that name was produced.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* A variant of the {@link org.apache.flink.api.common.JobExecutionResult} that holds
Expand All @@ -45,7 +46,7 @@ public class SerializedJobExecutionResult implements java.io.Serializable {
* Creates a new SerializedJobExecutionResult.
*
* @param jobID The job's ID.
* @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer)
* @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer) in milliseconds
* @param accumulators A map of all accumulator results produced by the job, in serialized form
*/
public SerializedJobExecutionResult(JobID jobID, long netRuntime,
Expand All @@ -63,6 +64,17 @@ public long getNetRuntime() {
return netRuntime;
}

/**
* Gets the net execution time of the job, i.e., the execution time in the parallel system,
* without the pre-flight steps like the optimizer in a desired time unit.
*
* @param desiredUnit the unit of the <tt>NetRuntime</tt>
* @return The net execution time in the desired unit.
*/
public long getNetRuntime(TimeUnit desiredUnit) {
return desiredUnit.convert(getNetRuntime(), TimeUnit.MILLISECONDS);
}

public Map<String, SerializedValue<Object>> getSerializedAccumulatorResults() {
return this.accumulatorResults;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class JobInfo(val client: ActorRef, val start: Long){

def duration: Long = {
if(end != -1){
(end - start)/1000
end - start
}else{
-1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.*;

Expand Down Expand Up @@ -53,6 +54,7 @@ public void testSerialization() {

assertEquals(origJobId, cloned.getJobId());
assertEquals(origTime, cloned.getNetRuntime());
assertEquals(origTime, cloned.getNetRuntime(TimeUnit.MILLISECONDS));
assertEquals(origMap, cloned.getSerializedAccumulatorResults());

// convert to deserialized result
Expand All @@ -62,7 +64,9 @@ public void testSerialization() {
assertEquals(origJobId, jResult.getJobID());
assertEquals(origJobId, jResultCopied.getJobID());
assertEquals(origTime, jResult.getNetRuntime());
assertEquals(origTime, jResult.getNetRuntime(TimeUnit.MILLISECONDS));
assertEquals(origTime, jResultCopied.getNetRuntime());
assertEquals(origTime, jResultCopied.getNetRuntime(TimeUnit.MILLISECONDS));

for (Map.Entry<String, SerializedValue<Object>> entry : origMap.entrySet()) {
String name = entry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public static void main(String[] args) throws Exception {
prepareTestDb();
JDBCExample tut = new JDBCExample();
JobExecutionResult res = LocalExecutor.execute(tut, args);
System.out.println("runtime: " + res.getNetRuntime());
System.out.println("runtime: " + res.getNetRuntime() + " ms");

System.exit(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@

package org.apache.flink.test.recordJobs.wordcount;

import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.Program;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
import org.apache.flink.api.java.record.functions.MapFunction;
import org.apache.flink.api.java.record.functions.ReduceFunction;
import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
import org.apache.flink.api.java.record.io.CsvOutputFormat;
import org.apache.flink.api.java.record.io.TextInputFormat;
import org.apache.flink.api.java.record.operators.FileDataSink;
Expand All @@ -41,6 +38,10 @@
import org.apache.flink.types.StringValue;
import org.apache.flink.util.Collector;

import java.util.Iterator;
import java.util.StringTokenizer;
import java.util.concurrent.TimeUnit;

/**
* Implements a word count which takes the input file and counts the number of
* the occurrences of each word in the file.
Expand Down Expand Up @@ -154,6 +155,6 @@ public static void main(String[] args) throws Exception {
// This will execute the word-count embedded in a local context. replace this line by the commented
// succeeding line to send the job to a local installation or to a cluster for execution
JobExecutionResult result = LocalExecutor.execute(plan);
System.err.println("Total runtime: " + result.getNetRuntime());
System.err.println("Total runtime: " + result.getNetRuntime(TimeUnit.MILLISECONDS) + " ms");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public Long reduce(Long value1, Long value2) {

try {
JobExecutionResult res = env.execute();
String msg = res == null ? "null result" : "result in " + res.getNetRuntime();
String msg = res == null ? "null result" : "result in " + res.getNetRuntime() + " ms";
fail("The program should have failed, but returned " + msg);
}
catch (ProgramInvocationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

@Ignore
public class NetworkStackThroughputITCase {
Expand Down Expand Up @@ -143,7 +144,7 @@ public void calculateThroughput() {
int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);

long dataVolumeMbit = dataVolumeGb * 8192;
long runtimeSecs = getJobExecutionResult().getNetRuntime() / 1000;
long runtimeSecs = getJobExecutionResult().getNetRuntime(TimeUnit.SECONDS);

int mbitPerSecond = (int) (((double) dataVolumeMbit) / runtimeSecs);

Expand Down

0 comments on commit 7e5a970

Please sign in to comment.