Skip to content

Commit

Permalink
LIHADOOP-18028: Dr. Elephant 's spark parser on Nertz is not working
Browse files Browse the repository at this point in the history
RB=677178

G=superfriends-reviewers
R=annag,fli,shanm,viramach
A=shanm
  • Loading branch information
akshayrai committed Mar 8, 2016
1 parent 3eeb997 commit 86d2aaf
Show file tree
Hide file tree
Showing 13 changed files with 167 additions and 86 deletions.
34 changes: 18 additions & 16 deletions app/com/linkedin/drelephant/analysis/AnalyticJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,36 +263,38 @@ public AppResult getAnalysis() throws Exception {

// Load job information
AppResult result = new AppResult();
result.id = getAppId();
result.trackingUrl = getTrackingUrl();
result.queueName = getQueueName();
result.username = getUser();
result.id = Utils.truncateField(getAppId(), AppResult.ID_LIMIT, getAppId());
result.trackingUrl = Utils.truncateField(getTrackingUrl(), AppResult.TRACKING_URL_LIMIT, getAppId());
result.queueName = Utils.truncateField(getQueueName(), AppResult.QUEUE_NAME_LIMIT, getAppId());
result.username = Utils.truncateField(getUser(), AppResult.USERNAME_LIMIT, getAppId());
result.startTime = new Date(getStartTime());
result.finishTime = new Date(getFinishTime());
result.name = getName();
result.jobType = jobTypeName;

// Truncate long names
if (result.name.length() > 100) {
result.name = result.name.substring(0, 97) + "...";
}
result.name = Utils.truncateField(getName(), AppResult.APP_NAME_LIMIT, getAppId());
result.jobType = Utils.truncateField(jobTypeName, AppResult.JOBTYPE_LIMIT, getAppId());

// Load App Heuristic information
int jobScore = 0;
result.yarnAppHeuristicResults = new ArrayList<AppHeuristicResult>();
Severity worstSeverity = Severity.NONE;
for (HeuristicResult heuristicResult : analysisResults) {
AppHeuristicResult detail = new AppHeuristicResult();
detail.heuristicClass = heuristicResult.getHeuristicClassName();
detail.heuristicName = heuristicResult.getHeuristicName();
detail.heuristicClass = Utils.truncateField(heuristicResult.getHeuristicClassName(),
AppHeuristicResult.HEURISTIC_CLASS_LIMIT, getAppId());
detail.heuristicName = Utils.truncateField(heuristicResult.getHeuristicName(),
AppHeuristicResult.HEURISTIC_NAME_LIMIT, getAppId());
detail.severity = heuristicResult.getSeverity();
detail.score = heuristicResult.getScore();

// Load Heuristic Details
for (HeuristicResultDetails heuristicResultDetails : heuristicResult.getHeuristicResultDetails()) {
AppHeuristicResultDetails heuristicDetail = new AppHeuristicResultDetails();
heuristicDetail.yarnAppHeuristicResult = detail;
heuristicDetail.name = heuristicResultDetails.getName();
heuristicDetail.value = heuristicResultDetails.getValue();
heuristicDetail.details = heuristicResultDetails.getDetails();
heuristicDetail.name = Utils.truncateField(heuristicResultDetails.getName(),
AppHeuristicResultDetails.NAME_LIMIT, getAppId());
heuristicDetail.value = Utils.truncateField(heuristicResultDetails.getValue(),
AppHeuristicResultDetails.VALUE_LIMIT, getAppId());
heuristicDetail.details = Utils.truncateField(heuristicResultDetails.getDetails(),
AppHeuristicResultDetails.DETAILS_LIMIT, getAppId());
detail.yarnAppHeuristicResultDetails.add(heuristicDetail);
}
result.yarnAppHeuristicResults.add(detail);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ public HeuristicResult apply(SparkApplicationData data) {

result.addResultDetail("Spark completed jobs number", String.valueOf(completedJobs.size()));
result.addResultDetail("Spark failed jobs number", String.valueOf(failedJobs.size()));
result.addResultDetail("Spark failed jobs list", getJobListString(jobProgressData.getFailedJobDescriptions()));
result.addResultDetail("Spark failed jobs list", getJobsAsString(jobProgressData.getFailedJobDescriptions()));
result.addResultDetail("Spark average job failure rate", String.format("%.3f", avgJobFailureRate));
result.addResultDetail("Spark jobs with high task failure rate", getJobListString(highFailureRateJobs));
result.addResultDetail("Spark jobs with high task failure rate", getJobsAsString(highFailureRateJobs));

return result;
}
Expand All @@ -129,7 +129,7 @@ private Severity getSingleJobFailureRateSeverity(double rate) {
rate, jobFailureLimits[0], jobFailureLimits[1], jobFailureLimits[2], jobFailureLimits[3]);
}

private static String getJobListString(Collection<String> names) {
return "[" + StringUtils.join(names, ",") + "]";
private static String getJobsAsString(Collection<String> names) {
return StringUtils.join(names, "\n");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,27 +134,27 @@ public HeuristicResult apply(SparkApplicationData data) {
result.addResultDetail("Spark stage completed", String.valueOf(completedStages.size()));
result.addResultDetail("Spark stage failed", String.valueOf(failedStages.size()));
result.addResultDetail("Spark average stage failure rate", String.format("%.3f", avgStageFailureRate));
result.addResultDetail("Spark problematic stages:", getStageListString(problematicStages));
result.addResultDetail("Spark problematic stages", getStagesAsString(problematicStages));

return result;
}

private Severity getStageRuntimeSeverity(long runtime) {
return Severity.getSeverityDescending(
return Severity.getSeverityAscending(
runtime, stageRuntimeLimits[0], stageRuntimeLimits[1], stageRuntimeLimits[2], stageRuntimeLimits[3]);
}

private Severity getStageFailureRateSeverity(double rate) {
return Severity.getSeverityDescending(
return Severity.getSeverityAscending(
rate, stageFailRateLimits[0], stageFailRateLimits[1], stageFailRateLimits[2], stageFailRateLimits[3]);
}

private Severity getSingleStageTasksFailureRate(double rate) {
return Severity.getSeverityDescending(
return Severity.getSeverityAscending(
rate, singleStageFailLimits[0], singleStageFailLimits[1], singleStageFailLimits[2], singleStageFailLimits[3]);
}

private static String getStageListString(Collection<String> names) {
return "[" + StringUtils.join(names, ",") + "]";
private static String getStagesAsString(Collection<String> names) {
return StringUtils.join(names, "\n");
}
}
73 changes: 29 additions & 44 deletions app/com/linkedin/drelephant/util/InfoExtractor.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,32 +58,28 @@ public static void retrieveURLs(AppResult result, MapReduceApplicationData appDa
Properties jobConf = appData.getConf();
String jobId = appData.getJobId();

result.jobExecId = jobConf.getProperty(AZKABAN_ATTEMPT_URL) != null ?
truncate(jobConf.getProperty(AZKABAN_ATTEMPT_URL), jobId) : "";
result.jobExecUrl = jobConf.getProperty(AZKABAN_ATTEMPT_URL) != null ?
Utils.truncateField(jobConf.getProperty(AZKABAN_ATTEMPT_URL), AppResult.URL_LEN_LIMIT, jobId) : "";
// For jobs launched by Azkaban, we consider different attempts to be different jobs
result.jobDefId = jobConf.getProperty(AZKABAN_JOB_URL) != null ?
truncate(jobConf.getProperty(AZKABAN_JOB_URL), jobId) : "";
result.flowExecId = jobConf.getProperty(AZKABAN_EXECUTION_URL) != null ?
truncate(jobConf.getProperty(AZKABAN_EXECUTION_URL), jobId) : "";
result.flowDefId = jobConf.getProperty(AZKABAN_WORKFLOW_URL) != null ?
truncate(jobConf.getProperty(AZKABAN_WORKFLOW_URL), jobId) : "";
result.jobDefUrl = jobConf.getProperty(AZKABAN_JOB_URL) != null ?
Utils.truncateField(jobConf.getProperty(AZKABAN_JOB_URL), AppResult.URL_LEN_LIMIT, jobId) : "";
result.flowExecUrl = jobConf.getProperty(AZKABAN_EXECUTION_URL) != null ?
Utils.truncateField(jobConf.getProperty(AZKABAN_EXECUTION_URL), AppResult.URL_LEN_LIMIT, jobId) : "";
result.flowDefUrl = jobConf.getProperty(AZKABAN_WORKFLOW_URL) != null ?
Utils.truncateField(jobConf.getProperty(AZKABAN_WORKFLOW_URL), AppResult.URL_LEN_LIMIT, jobId) : "";

// For Azkaban, The url and ids are the same
result.jobExecUrl = result.jobExecId;
result.jobDefUrl = result.jobDefId;
result.flowExecUrl = result.flowExecId;
result.flowDefUrl = result.flowDefId;
result.jobDefId = result.jobDefUrl;
result.jobExecId = result.jobExecUrl;
result.flowDefId = result.flowDefUrl;
result.flowExecId = Utils.truncateField(result.flowExecUrl, AppResult.FLOW_EXEC_ID_LIMIT, jobId);

if (!result.jobExecId.isEmpty()) {
result.scheduler = "azkaban";
result.scheduler = Utils.truncateField("azkaban", AppResult.SCHEDULER_LIMIT, jobId);
result.workflowDepth = 0;
}
result.jobName = jobConf.getProperty(AZKABAN_JOB_NAME) != null ? jobConf.getProperty(AZKABAN_JOB_NAME) : "";

// Truncate long job names
if (result.jobName.length() > 255) {
result.jobName = result.jobName.substring(0, 252) + "...";
}
result.jobName = jobConf.getProperty(AZKABAN_JOB_NAME) != null ?
Utils.truncateField(jobConf.getProperty(AZKABAN_JOB_NAME), AppResult.JOB_NAME_LIMIT, jobId) : "";
}

public static void retrieveURLs(AppResult result, SparkApplicationData appData) {
Expand All @@ -100,30 +96,27 @@ public static void retrieveURLs(AppResult result, SparkApplicationData appData)
}
logger.info("Parsed options:" + StringUtils.join(s, ","));

result.jobExecId = options.get(AZKABAN_ATTEMPT_URL) != null ?
truncate(unescapeString(options.get(AZKABAN_ATTEMPT_URL)), appId) : "";
result.jobDefId = options.get(AZKABAN_JOB_URL) != null ?
truncate(unescapeString(options.get(AZKABAN_JOB_URL)), appId) : "";
result.flowExecId = options.get(AZKABAN_EXECUTION_URL) != null ?
truncate(unescapeString(options.get(AZKABAN_EXECUTION_URL)), appId) : "";
result.flowDefId = options.get(AZKABAN_WORKFLOW_URL) != null ?
truncate(unescapeString(options.get(AZKABAN_WORKFLOW_URL)), appId) : "";
result.jobExecUrl = options.get(AZKABAN_ATTEMPT_URL) != null ?
Utils.truncateField(unescapeString(options.get(AZKABAN_ATTEMPT_URL)), AppResult.URL_LEN_LIMIT, appId) : "";
result.jobDefUrl = options.get(AZKABAN_JOB_URL) != null ?
Utils.truncateField(unescapeString(options.get(AZKABAN_JOB_URL)), AppResult.URL_LEN_LIMIT, appId) : "";
result.flowExecUrl = options.get(AZKABAN_EXECUTION_URL) != null ?
Utils.truncateField(unescapeString(options.get(AZKABAN_EXECUTION_URL)), AppResult.URL_LEN_LIMIT, appId) : "";
result.flowDefUrl = options.get(AZKABAN_WORKFLOW_URL) != null ?
Utils.truncateField(unescapeString(options.get(AZKABAN_WORKFLOW_URL)), AppResult.URL_LEN_LIMIT, appId) : "";

result.jobExecUrl = result.jobExecId;
result.jobDefUrl = result.jobDefId;
result.flowExecUrl = result.flowExecId;
result.flowDefUrl = result.flowDefId;
result.jobDefId = result.jobDefUrl;
result.jobExecId = result.jobExecUrl;
result.flowDefId = result.flowDefUrl;
result.flowExecId = Utils.truncateField(result.flowExecUrl, AppResult.FLOW_EXEC_ID_LIMIT, appId);

if (!result.jobExecId.isEmpty()) {
result.scheduler = "azkaban";
result.workflowDepth = 0;
}
result.jobName = options.get(AZKABAN_JOB_NAME) != null ? unescapeString(options.get(AZKABAN_JOB_NAME)) : "";

// Truncate long job names
if (result.jobName.length() > 255) {
result.jobName = result.jobName.substring(0, 252) + "...";
}
result.jobName = options.get(AZKABAN_JOB_NAME) != null ?
Utils.truncateField(unescapeString(options.get(AZKABAN_JOB_NAME)), AppResult.JOB_NAME_LIMIT, appId) : "";

} catch (IllegalArgumentException e) {
logger.error("Encountered error while parsing java options into urls: " + e.getMessage());
Expand Down Expand Up @@ -160,12 +153,4 @@ private static String unescapeString(String s) {
}
return s.replaceAll("\\\\\\&", "\\&");
}

public static String truncate(String value, String jobId) {
if (value != null && value.length() > AppResult.URL_LEN_LIMIT) {
logger.info("Truncate long URL in job result for job: " + jobId + ". Original Url: " + value);
value = value.substring(0, AppResult.URL_LEN_LIMIT);
}
return value;
}
}
16 changes: 16 additions & 0 deletions app/com/linkedin/drelephant/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
public final class Utils {
private static final Logger logger = Logger.getLogger(Utils.class);

private static final String TRUNCATE_SUFFIX = "...";

private Utils() {
// do nothing
}
Expand Down Expand Up @@ -191,4 +193,18 @@ public static int getHeuristicScore(Severity severity, int tasks) {
return score;
}

/**
* Truncate the field by the specified limit
*
* @param field the field to br truncated
* @param limit the truncation limit
* @return The truncated field
*/
public static String truncateField(String field, int limit, String appId) {
if (field != null && limit > TRUNCATE_SUFFIX.length() && field.length() > limit) {
logger.info("Truncating " + field + " to " + limit + " characters for " + appId);
field = field.substring(0, limit - 3) + "...";
}
return field;
}
}
8 changes: 5 additions & 3 deletions app/models/AppHeuristicResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import javax.persistence.OneToMany;
import javax.persistence.Table;
import org.apache.commons.lang.StringUtils;

import com.fasterxml.jackson.annotation.JsonBackReference;
import com.fasterxml.jackson.annotation.JsonIgnore;
Expand All @@ -43,6 +42,9 @@ public class AppHeuristicResult extends Model {

private static final long serialVersionUID = 2L;

public static final int HEURISTIC_NAME_LIMIT = 128;
public static final int HEURISTIC_CLASS_LIMIT = 255;

public static class TABLE {
public static final String TABLE_NAME = "yarn_app_heuristic_result";
public static final String ID = "id";
Expand All @@ -65,10 +67,10 @@ public static String getSearchFields() {
@ManyToOne(cascade = CascadeType.ALL)
public AppResult yarnAppResult;

@Column(nullable = false)
@Column(length = HEURISTIC_CLASS_LIMIT, nullable = false)
public String heuristicClass;

@Column(nullable = false)
@Column(length = HEURISTIC_NAME_LIMIT, nullable = false)
public String heuristicName;

@Column(nullable = false)
Expand Down
9 changes: 6 additions & 3 deletions app/models/AppHeuristicResultDetails.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public class AppHeuristicResultDetails extends Model {

private static final long serialVersionUID = 3L;

public static final int NAME_LIMIT = 128;
public static final int VALUE_LIMIT = 255;
public static final int DETAILS_LIMIT = 65535;

public static class TABLE {
public static final String TABLE_NAME = "yarn_app_heuristic_result_details";
public static final String APP_HEURISTIC_RESULT_ID = "yarnAppHeuristicResult";
Expand All @@ -49,13 +53,12 @@ public static class TABLE {
@ManyToOne(cascade = CascadeType.ALL)
public AppHeuristicResult yarnAppHeuristicResult;

@Column(length=128, nullable = false)
@Column(length=NAME_LIMIT, nullable = false)
public String name;

@Column(length=255, nullable = false)
@Column(length=VALUE_LIMIT, nullable = false)
public String value;

@Column(nullable = true)
public String details;

}
28 changes: 19 additions & 9 deletions app/models/AppResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,17 @@
public class AppResult extends Model {

private static final long serialVersionUID = 1L;

public static final int ID_LIMIT = 50;
public static final int USERNAME_LIMIT = 50;
public static final int QUEUE_NAME_LIMIT = 50;
public static final int APP_NAME_LIMIT = 100;
public static final int JOB_NAME_LIMIT = 255;
public static final int TRACKING_URL_LIMIT = 255;
public static final int JOBTYPE_LIMIT = 20;
public static final int SCHEDULER_LIMIT = 20;
public static final int URL_LEN_LIMIT = 800;
public static final int FLOW_EXEC_ID_LIMIT = 255;

// Note that the Table column constants are actually the java variable names defined in this model.
// This is because ebean operations require the model variable names to be passed as strings.
Expand Down Expand Up @@ -73,16 +83,16 @@ public static String getSearchFields() {
}

@Id
@Column(length = 50, unique = true, nullable = false)
@Column(length = ID_LIMIT, unique = true, nullable = false)
public String id;

@Column(length = 100, nullable = false)
@Column(length = APP_NAME_LIMIT, nullable = false)
public String name;

@Column(length = 50, nullable = false)
@Column(length = USERNAME_LIMIT, nullable = false)
public String username;

@Column(length = 50, nullable = false)
@Column(length = QUEUE_NAME_LIMIT, nullable = false)
public String queueName;

@Column(nullable = false)
Expand All @@ -91,10 +101,10 @@ public static String getSearchFields() {
@Column(nullable = false)
public Date finishTime;

@Column(length = 255, nullable = false)
@Column(length = TRACKING_URL_LIMIT, nullable = false)
public String trackingUrl;

@Column(length = 20, nullable = false)
@Column(length = JOBTYPE_LIMIT, nullable = false)
public String jobType;

@Column(nullable = false)
Expand All @@ -106,16 +116,16 @@ public static String getSearchFields() {
@Column(nullable = false)
public int workflowDepth;

@Column(length = 20, nullable = true)
@Column(length = SCHEDULER_LIMIT, nullable = true)
public String scheduler;

@Column(length = 255, nullable = false)
@Column(length = JOB_NAME_LIMIT, nullable = false)
public String jobName;

@Column(length = URL_LEN_LIMIT, nullable = false)
public String jobExecId;

@Column(length = 255, nullable = false)
@Column(length = FLOW_EXEC_ID_LIMIT, nullable = false)
public String flowExecId;

@Column(length = URL_LEN_LIMIT, nullable = false)
Expand Down
2 changes: 1 addition & 1 deletion app/org/apache/spark/deploy/history/SparkFSFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class SparkFSFetcher(fetcherConfData: FetcherConfigurationData) extends Elephant
* @return If the event log parsing should be throttled
*/
private def shouldThrottle(eventLogPath: Path): Boolean = {
fs.getFileStatus(eventLogPath).getLen() > EVENT_LOG_SIZE_LIMIT_MB
fs.getFileStatus(eventLogPath).getLen() > (EVENT_LOG_SIZE_LIMIT_MB * 1024 * 1024)
}

}
Expand Down
Loading

0 comments on commit 86d2aaf

Please sign in to comment.