Skip to content

Commit

Permalink
Add rest end point for new user interface
Browse files Browse the repository at this point in the history
  • Loading branch information
nntnag17 committed Oct 3, 2016
1 parent eaf55bb commit e0b91a1
Show file tree
Hide file tree
Showing 7 changed files with 1,616 additions and 149 deletions.
131 changes: 14 additions & 117 deletions app/controllers/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public static Result search() {
.where()
.eq(AppResult.TABLE.FLOW_EXEC_ID, flowExecPair.getId())
.findList();
Map<IdUrlPair, List<AppResult>> map = groupJobs(results, GroupBy.JOB_EXECUTION_ID);
Map<IdUrlPair, List<AppResult>> map = ControllerUtil.groupJobs(results, ControllerUtil.GroupBy.JOB_EXECUTION_ID);
return ok(searchPage.render(null, flowDetails.render(flowExecPair, map)));
}

Expand Down Expand Up @@ -454,8 +454,8 @@ private static Map<IdUrlPair, Map<IdUrlPair, List<AppResult>>> compareFlows(List
IdUrlPair flow1 = new IdUrlPair(results1.get(0).flowExecId, results1.get(0).flowExecUrl);
IdUrlPair flow2 = new IdUrlPair(results2.get(0).flowExecId, results2.get(0).flowExecUrl);

Map<IdUrlPair, List<AppResult>> map1 = groupJobs(results1, GroupBy.JOB_DEFINITION_ID);
Map<IdUrlPair, List<AppResult>> map2 = groupJobs(results2, GroupBy.JOB_DEFINITION_ID);
Map<IdUrlPair, List<AppResult>> map1 = ControllerUtil.groupJobs(results1, ControllerUtil.GroupBy.JOB_DEFINITION_ID);
Map<IdUrlPair, List<AppResult>> map2 = ControllerUtil.groupJobs(results2, ControllerUtil.GroupBy.JOB_DEFINITION_ID);

final Set<IdUrlPair> group1 = new TreeSet<IdUrlPair>(new Comparator<IdUrlPair>() {
public int compare(final IdUrlPair o1, final IdUrlPair o2) {
Expand Down Expand Up @@ -546,7 +546,7 @@ public static Result flowHistory() {
}

Map<IdUrlPair, List<AppResult>> flowExecIdToJobsMap =
limitHistoryResults(groupJobs(results, GroupBy.FLOW_EXECUTION_ID), results.size(), MAX_HISTORY_LIMIT);
ControllerUtil.limitHistoryResults(ControllerUtil.groupJobs(results, ControllerUtil.GroupBy.FLOW_EXECUTION_ID), results.size(), MAX_HISTORY_LIMIT);

// Compute flow execution data
List<AppResult> filteredResults = new ArrayList<AppResult>(); // All jobs starting from latest execution
Expand All @@ -563,13 +563,13 @@ public static Result flowHistory() {
flowExecTimeList.add(mrJobsList.get(mrJobsList.size() - 1).finishTime);

filteredResults.addAll(mrJobsList);
executionMap.put(entry.getKey(), groupJobs(mrJobsList, GroupBy.JOB_DEFINITION_ID));
executionMap.put(entry.getKey(), ControllerUtil.groupJobs(mrJobsList, ControllerUtil.GroupBy.JOB_DEFINITION_ID));
}

// Calculate unique list of jobs (job def url) to maintain order across executions. List will contain job def urls
// from latest execution first followed by any other extra job def url that may appear in previous executions.
Map<IdUrlPair, String> idPairToJobNameMap = new HashMap<IdUrlPair, String>();
Map<IdUrlPair, List<AppResult>> filteredMap = groupJobs(filteredResults, GroupBy.JOB_DEFINITION_ID);
Map<IdUrlPair, List<AppResult>> filteredMap = ControllerUtil.groupJobs(filteredResults, ControllerUtil.GroupBy.JOB_DEFINITION_ID);
for (Map.Entry<IdUrlPair, List<AppResult>> entry : filteredMap.entrySet()) {
idPairToJobNameMap.put(entry.getKey(), filteredMap.get(entry.getKey()).get(0).jobName);
}
Expand Down Expand Up @@ -649,7 +649,7 @@ public static Result jobHistory() {
return notFound("Unable to find record for job def id: " + jobDefPair.getId());
}
Map<IdUrlPair, List<AppResult>> flowExecIdToJobsMap =
limitHistoryResults(groupJobs(results, GroupBy.FLOW_EXECUTION_ID), results.size(), MAX_HISTORY_LIMIT);
ControllerUtil.limitHistoryResults(ControllerUtil.groupJobs(results, ControllerUtil.GroupBy.FLOW_EXECUTION_ID), results.size(), MAX_HISTORY_LIMIT);

// Compute job execution data
List<Long> flowExecTimeList = new ArrayList<Long>();
Expand Down Expand Up @@ -691,55 +691,6 @@ public static Result jobHistory() {
return notFound("Unable to find graph type: " + graphType);
}

/**
* Applies a limit on the number of executions to be displayed after trying to maximize the correctness.
*
* Correctness:
* When the number of jobs are less than the JOB_HISTORY_LIMIT, we can show all the executions correctly. However,
* when the number of jobs are greater than the JOB_HISTORY_LIMIT, we cannot simply prune the jobs at that point and
* show the history because we may skip some jobs which belong to the last flow execution. For the flow executions
* we display, we want to ensure we show all the jobs belonging to that flow.
*
* So, when the number of executions are less than 10, we skip the last execution and when the number of executions
* are greater than 10, we skip the last 3 executions just to maximise the correctness.
*
* @param map The results map to be pruned.
* @param size Total number of jobs in the map
* @param execLimit The upper limit on the number of executions to be displayed.
* @return A map after applying the limit.
*/
private static Map<IdUrlPair, List<AppResult>> limitHistoryResults(Map<IdUrlPair, List<AppResult>> map,int size,
int execLimit) {

Map<IdUrlPair, List<AppResult>> resultMap = new LinkedHashMap<IdUrlPair, List<AppResult>>();

int limit;
if (size < JOB_HISTORY_LIMIT) {
// No pruning needed. 100% correct.
limit = execLimit;
} else {
Set<IdUrlPair> keySet = map.keySet();
if (keySet.size() > 10) {
// Prune last 3 executions
limit = keySet.size() > (execLimit + 3) ? execLimit : keySet.size() - 3;
} else {
// Prune the last execution
limit = keySet.size() - 1;
}
}

// Filtered results
int i = 1;
for (Map.Entry<IdUrlPair, List<AppResult>> entry : map.entrySet()) {
if (i > limit) {
break;
}
resultMap.put(entry.getKey(), entry.getValue());
i++;
}

return resultMap;
}

/**
* Controls the Help Page
Expand Down Expand Up @@ -861,7 +812,7 @@ public static Result restFlowExecResult(String flowExecId) {
return notFound("Unable to find record on flow exec url: " + flowExecId);
}

Map<IdUrlPair, List<AppResult>> groupMap = groupJobs(results, GroupBy.JOB_EXECUTION_ID);
Map<IdUrlPair, List<AppResult>> groupMap = ControllerUtil.groupJobs(results, ControllerUtil.GroupBy.JOB_EXECUTION_ID);

Map<String, List<AppResult>> resMap = new HashMap<String, List<AppResult>>();
for (Map.Entry<IdUrlPair, List<AppResult>> entry : groupMap.entrySet()) {
Expand All @@ -873,63 +824,9 @@ public static Result restFlowExecResult(String flowExecId) {
return ok(Json.toJson(resMap));
}

static enum GroupBy {
JOB_EXECUTION_ID,
JOB_DEFINITION_ID,
FLOW_EXECUTION_ID
}

/**
* Grouping a list of AppResult by GroupBy enum.
*
* @param results The list of jobs of type AppResult to be grouped.
* @param groupBy The field by which the results have to be grouped.
* @return A map with the grouped field as the key and the list of jobs as the value.
*/
private static Map<IdUrlPair, List<AppResult>> groupJobs(List<AppResult> results, GroupBy groupBy) {
Map<String, List<AppResult>> groupMap = new LinkedHashMap<String, List<AppResult>>();
Map<String, String> idUrlMap = new HashMap<String, String>();

for (AppResult result : results) {
String idField = null;
String urlField = null;
switch (groupBy) {
case JOB_EXECUTION_ID:
idField = result.jobExecId;
urlField = result.jobExecUrl;
break;
case JOB_DEFINITION_ID:
idField = result.jobDefId;
urlField = result.jobDefUrl;
break;
case FLOW_EXECUTION_ID:
idField = result.flowExecId;
urlField = result.flowExecUrl;
break;
}
if (!idUrlMap.containsKey(idField)) {
idUrlMap.put(idField, urlField);
}

if (groupMap.containsKey(idField)) {
groupMap.get(idField).add(result);
} else {
List<AppResult> list = new ArrayList<AppResult>();
list.add(result);
groupMap.put(idField, list);
}
}

// Construct the final result map with the key as a (id, url) pair.
Map<IdUrlPair, List<AppResult>> resultMap = new LinkedHashMap<IdUrlPair, List<AppResult>>();
for (Map.Entry<String, List<AppResult>> entry : groupMap.entrySet()) {
String key = entry.getKey();
List<AppResult> value = entry.getValue();
resultMap.put(new IdUrlPair(key, idUrlMap.get(key)), value);
}

return resultMap;
}

/**
* The Rest API for Search Feature
Expand Down Expand Up @@ -1100,7 +997,7 @@ public static Result restFlowGraphData(String flowDefId) {
logger.info("No results for Job url");
}
Map<IdUrlPair, List<AppResult>> flowExecIdToJobsMap =
limitHistoryResults(groupJobs(results, GroupBy.FLOW_EXECUTION_ID), results.size(), MAX_HISTORY_LIMIT);
ControllerUtil.limitHistoryResults(ControllerUtil.groupJobs(results, ControllerUtil.GroupBy.FLOW_EXECUTION_ID), results.size(), MAX_HISTORY_LIMIT);

// Compute the graph data starting from the earliest available execution to latest
List<IdUrlPair> keyList = new ArrayList<IdUrlPair>(flowExecIdToJobsMap.keySet());
Expand All @@ -1109,7 +1006,7 @@ public static Result restFlowGraphData(String flowDefId) {
int flowPerfScore = 0;
JsonArray jobScores = new JsonArray();
List<AppResult> mrJobsList = Lists.reverse(flowExecIdToJobsMap.get(flowExecPair));
Map<IdUrlPair, List<AppResult>> jobDefIdToJobsMap = groupJobs(mrJobsList, GroupBy.JOB_DEFINITION_ID);
Map<IdUrlPair, List<AppResult>> jobDefIdToJobsMap = ControllerUtil.groupJobs(mrJobsList, ControllerUtil.GroupBy.JOB_DEFINITION_ID);

// Compute the execution records. Note that each entry in the jobDefIdToJobsMap will have at least one AppResult
for (IdUrlPair jobDefPair : jobDefIdToJobsMap.keySet()) {
Expand Down Expand Up @@ -1194,7 +1091,7 @@ public static Result restJobGraphData(String jobDefId) {
logger.info("No results for Job url");
}
Map<IdUrlPair, List<AppResult>> flowExecIdToJobsMap =
limitHistoryResults(groupJobs(results, GroupBy.FLOW_EXECUTION_ID), results.size(), MAX_HISTORY_LIMIT);
ControllerUtil.limitHistoryResults(ControllerUtil.groupJobs(results, ControllerUtil.GroupBy.FLOW_EXECUTION_ID), results.size(), MAX_HISTORY_LIMIT);

// Compute the graph data starting from the earliest available execution to latest
List<IdUrlPair> keyList = new ArrayList<IdUrlPair>(flowExecIdToJobsMap.keySet());
Expand Down Expand Up @@ -1285,7 +1182,7 @@ public static Result restJobMetricsGraphData(String jobDefId) {
logger.info("No results for Job url");
}
Map<IdUrlPair, List<AppResult>> flowExecIdToJobsMap =
limitHistoryResults(groupJobs(results, GroupBy.FLOW_EXECUTION_ID), results.size(), MAX_HISTORY_LIMIT);
ControllerUtil.limitHistoryResults(ControllerUtil.groupJobs(results, ControllerUtil.GroupBy.FLOW_EXECUTION_ID), results.size(), MAX_HISTORY_LIMIT);

// Compute the graph data starting from the earliest available execution to latest
List<IdUrlPair> keyList = new ArrayList<IdUrlPair>(flowExecIdToJobsMap.keySet());
Expand Down Expand Up @@ -1424,7 +1321,7 @@ public static Result restFlowMetricsGraphData(String flowDefId) {
logger.info("No results for Job url");
}
Map<IdUrlPair, List<AppResult>> flowExecIdToJobsMap =
limitHistoryResults(groupJobs(results, GroupBy.FLOW_EXECUTION_ID), results.size(), MAX_HISTORY_LIMIT);
ControllerUtil.limitHistoryResults(ControllerUtil.groupJobs(results, ControllerUtil.GroupBy.FLOW_EXECUTION_ID), results.size(), MAX_HISTORY_LIMIT);

// Compute the graph data starting from the earliest available execution to latest
List<IdUrlPair> keyList = new ArrayList<IdUrlPair>(flowExecIdToJobsMap.keySet());
Expand All @@ -1433,7 +1330,7 @@ public static Result restFlowMetricsGraphData(String flowDefId) {
int flowPerfScore = 0;
JsonArray jobScores = new JsonArray();
List<AppResult> mrJobsList = Lists.reverse(flowExecIdToJobsMap.get(flowExecPair));
Map<IdUrlPair, List<AppResult>> jobDefIdToJobsMap = groupJobs(mrJobsList, GroupBy.JOB_DEFINITION_ID);
Map<IdUrlPair, List<AppResult>> jobDefIdToJobsMap = ControllerUtil.groupJobs(mrJobsList, ControllerUtil.GroupBy.JOB_DEFINITION_ID);

long totalFlowMemoryUsed = 0;
long totalFlowMemoryWasted = 0;
Expand Down
140 changes: 140 additions & 0 deletions app/controllers/ControllerUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package controllers;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import models.AppResult;


public class ControllerUtil {

private static final int JOB_HISTORY_LIMIT = 5000;

public static enum GroupBy {
JOB_EXECUTION_ID,
JOB_DEFINITION_ID,
FLOW_EXECUTION_ID
}
/**
* Applies a limit on the number of executions to be displayed after trying to maximize the correctness.
*
* Correctness:
* When the number of jobs are less than the JOB_HISTORY_LIMIT, we can show all the executions correctly. However,
* when the number of jobs are greater than the JOB_HISTORY_LIMIT, we cannot simply prune the jobs at that point and
* show the history because we may skip some jobs which belong to the last flow execution. For the flow executions
* we display, we want to ensure we show all the jobs belonging to that flow.
*
* So, when the number of executions are less than 10, we skip the last execution and when the number of executions
* are greater than 10, we skip the last 3 executions just to maximise the correctness.
*
* @param map The results map to be pruned.
* @param size Total number of jobs in the map
* @param execLimit The upper limit on the number of executions to be displayed.
* @return A map after applying the limit.
*/
public static Map<IdUrlPair, List<AppResult>> limitHistoryResults(Map<IdUrlPair, List<AppResult>> map,int size,
int execLimit) {

Map<IdUrlPair, List<AppResult>> resultMap = new LinkedHashMap<IdUrlPair, List<AppResult>>();

int limit;
if (size < JOB_HISTORY_LIMIT) {
// No pruning needed. 100% correct.
limit = execLimit;
} else {
Set<IdUrlPair> keySet = map.keySet();
if (keySet.size() > 10) {
// Prune last 3 executions
limit = keySet.size() > (execLimit + 3) ? execLimit : keySet.size() - 3;
} else {
// Prune the last execution
limit = keySet.size() - 1;
}
}

// Filtered results
int i = 1;
for (Map.Entry<IdUrlPair, List<AppResult>> entry : map.entrySet()) {
if (i > limit) {
break;
}
resultMap.put(entry.getKey(), entry.getValue());
i++;
}

return resultMap;
}


/**
* Grouping a list of AppResult by GroupBy enum.
*
* @param results The list of jobs of type AppResult to be grouped.
* @param groupBy The field by which the results have to be grouped.
* @return A map with the grouped field as the key and the list of jobs as the value.
*/
public static Map<IdUrlPair, List<AppResult>> groupJobs(List<AppResult> results, GroupBy groupBy) {
Map<String, List<AppResult>> groupMap = new LinkedHashMap<String, List<AppResult>>();
Map<String, String> idUrlMap = new HashMap<String, String>();

for (AppResult result : results) {
String idField = null;
String urlField = null;
switch (groupBy) {
case JOB_EXECUTION_ID:
idField = result.jobExecId;
urlField = result.jobExecUrl;
break;
case JOB_DEFINITION_ID:
idField = result.jobDefId;
urlField = result.jobDefUrl;
break;
case FLOW_EXECUTION_ID:
idField = result.flowExecId;
urlField = result.flowExecUrl;
break;
}
if (!idUrlMap.containsKey(idField)) {
idUrlMap.put(idField, urlField);
}

if (groupMap.containsKey(idField)) {
groupMap.get(idField).add(result);
} else {
List<AppResult> list = new ArrayList<AppResult>();
list.add(result);
groupMap.put(idField, list);
}
}

// Construct the final result map with the key as a (id, url) pair.
Map<IdUrlPair, List<AppResult>> resultMap = new LinkedHashMap<IdUrlPair, List<AppResult>>();
for (Map.Entry<String, List<AppResult>> entry : groupMap.entrySet()) {
String key = entry.getKey();
List<AppResult> value = entry.getValue();
resultMap.put(new IdUrlPair(key, idUrlMap.get(key)), value);
}

return resultMap;
}

}
Loading

0 comments on commit e0b91a1

Please sign in to comment.