Skip to content

Commit

Permalink
Add custom flowtime per scheduler (linkedin#268)
Browse files Browse the repository at this point in the history
  • Loading branch information
Maxime Kestemont authored and akshayrai committed Jul 28, 2017
1 parent 1d6f3f6 commit 9a65e0e
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 9 deletions.
2 changes: 2 additions & 0 deletions app-conf/SchedulerConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
<!--<private_key>/home/key/private_key</private_key>-->
<!--<userame>elephant</userame>-->
<!--<password></password>-->
<!--<flowtimefield>flowExecId</flowtimefield>-->
<!--<flowtimetype>yyyy-MM-dd'T'HH:mm:ss</flowtimetype>-->
</params>

</scheduler>
Expand Down
70 changes: 70 additions & 0 deletions app/com/linkedin/drelephant/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@

package com.linkedin.drelephant.util;

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.linkedin.drelephant.analysis.Severity;
import com.linkedin.drelephant.configurations.scheduler.SchedulerConfigurationData;
import com.linkedin.drelephant.math.Statistics;
import java.io.IOException;
import java.io.InputStream;
import java.text.DecimalFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -499,4 +505,68 @@ public int compare(AppResult a, AppResult b) {
}
return totalWaittime;
}

/**
* Get the flowtime corresponding to the job.
* The flowtime will be used in the front-end as the entries under the 'Flow Executions' column (in the Flow/Job history view).
*
* By default, the finishTime value of the job is used. However, this can be configured in the scheduler configuration,
* so that another field is used instead, with the appropriate formatting to convert it to a long value.
*
* This method can be extended to include other flowtimefield and flowtimetype.
*/
public static long getFlowTime(AppResult result) {
String schedulerName = result.scheduler;
SchedulerConfigurationData schedulerData = InfoExtractor.getSchedulerData(schedulerName);

String flowTimeField = null;
String flowTimeType = null;
if (schedulerData != null) {
flowTimeField = schedulerData.getParamMap().get("flowtimefield");
flowTimeType = schedulerData.getParamMap().get("flowtimetype");
}

if (flowTimeField != null && flowTimeType != null && flowTimeField.equals("flowExecId")) {
SimpleDateFormat DATE_FORMAT = new SimpleDateFormat(flowTimeType);
long flowTime;
try {
flowTime = DATE_FORMAT.parse(result.flowExecId).getTime();
} catch (ParseException e) {
logger.warn("Could not parse " + result.flowExecId +" for application " + result.id);
flowTime = result.finishTime;
}
return flowTime;
} else {
return result.finishTime;
}

}

/**
* Sort the JsonArray given in parameters, based on the flowtime property,
* from the most recent to the oldest.
*/
public static JsonArray sortJsonArray(JsonArray datasets) {
ArrayList<JsonObject> datasetsList = new ArrayList<JsonObject>();
for (JsonElement element : datasets) {
datasetsList.add(element.getAsJsonObject());
}

Collections.sort( datasetsList, new Comparator<JsonObject>() {
private String KEY_NAME = "flowtime";

@Override
public int compare(JsonObject a, JsonObject b) {
Long valA = a.get(KEY_NAME).getAsLong();
Long valB = b.get(KEY_NAME).getAsLong();
return valA.compareTo(valB);
}
});

datasets = new JsonArray();
for (JsonObject element : datasetsList) {
datasets.add(element);
}
return datasets;
}
}
28 changes: 19 additions & 9 deletions app/controllers/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ public static Result compare() {
* @return A map of Job Urls to the list of jobs corresponding to the 2 flow execution urls
*/
private static Map<IdUrlPair, Map<IdUrlPair, List<AppResult>>> compareFlows(List<AppResult> results1, List<AppResult> results2) {

Map<IdUrlPair, Map<IdUrlPair, List<AppResult>>> jobDefMap = new HashMap<IdUrlPair, Map<IdUrlPair, List<AppResult>>>();

if (results1 != null && !results1.isEmpty() && results2 != null && !results2.isEmpty()) {
Expand Down Expand Up @@ -1137,16 +1137,20 @@ public static Result restFlowGraphData(String flowDefId) {

// Execution record
JsonObject dataset = new JsonObject();
dataset.addProperty("flowtime", mrJobsList.get(mrJobsList.size() - 1).finishTime);
dataset.addProperty("flowtime", Utils.getFlowTime(mrJobsList.get(mrJobsList.size() - 1)));
dataset.addProperty("score", flowPerfScore);
dataset.add("jobscores", jobScores);

datasets.add(dataset);
}

return ok(new Gson().toJson(datasets));
JsonArray sortedDatasets = Utils.sortJsonArray(datasets);

return ok(new Gson().toJson(sortedDatasets));
}



/**
* The data for plotting the job history graph. While plotting the job history
* graph an ajax call is made to this to fetch the graph data.
Expand Down Expand Up @@ -1228,14 +1232,16 @@ public static Result restJobGraphData(String jobDefId) {

// Execution record
JsonObject dataset = new JsonObject();
dataset.addProperty("flowtime", mrJobsList.get(mrJobsList.size() - 1).finishTime);
dataset.addProperty("flowtime", Utils.getFlowTime(mrJobsList.get(mrJobsList.size() - 1)));
dataset.addProperty("score", jobPerfScore);
dataset.add("stagescores", stageScores);

datasets.add(dataset);
}

return ok(new Gson().toJson(datasets));
JsonArray sortedDatasets = Utils.sortJsonArray(datasets);

return ok(new Gson().toJson(sortedDatasets));
}

/**
Expand Down Expand Up @@ -1330,7 +1336,7 @@ public static Result restJobMetricsGraphData(String jobDefId) {

// Execution record
JsonObject dataset = new JsonObject();
dataset.addProperty("flowtime", mrJobsList.get(mrJobsList.size() - 1).finishTime);
dataset.addProperty("flowtime", Utils.getFlowTime(mrJobsList.get(mrJobsList.size() - 1)));
dataset.addProperty("runtime", Utils.getTotalRuntime(mrJobsList));
dataset.addProperty("waittime", Utils.getTotalWaittime(mrJobsList));
dataset.addProperty("resourceused", totalMemoryUsed);
Expand All @@ -1340,7 +1346,9 @@ public static Result restJobMetricsGraphData(String jobDefId) {
datasets.add(dataset);
}

return ok(new Gson().toJson(datasets));
JsonArray sortedDatasets = Utils.sortJsonArray(datasets);

return ok(new Gson().toJson(sortedDatasets));
}

/**
Expand Down Expand Up @@ -1480,7 +1488,7 @@ public static Result restFlowMetricsGraphData(String flowDefId) {

// Execution record
JsonObject dataset = new JsonObject();
dataset.addProperty("flowtime", mrJobsList.get(mrJobsList.size() - 1).finishTime);
dataset.addProperty("flowtime", Utils.getFlowTime(mrJobsList.get(mrJobsList.size() - 1)));
dataset.addProperty("runtime", totalFlowRuntime);
dataset.addProperty("waittime", totalFlowDelay);
dataset.addProperty("resourceused", totalFlowMemoryUsed);
Expand All @@ -1490,7 +1498,9 @@ public static Result restFlowMetricsGraphData(String flowDefId) {
datasets.add(dataset);
}

return ok(new Gson().toJson(datasets));
JsonArray sortedDatasets = Utils.sortJsonArray(datasets);

return ok(new Gson().toJson(sortedDatasets));
}

/**
Expand Down

0 comments on commit 9a65e0e

Please sign in to comment.