Skip to content

Commit

Permalink
LIHADOOP-17538: Code changes to support new schema
Browse files Browse the repository at this point in the history
RB=666697

G=superfriends-reviewers
R=annag,fli,rratti,shanm,viramach
A=fli
  • Loading branch information
akshayrai committed Feb 26, 2016
1 parent 5cfd449 commit c467979
Show file tree
Hide file tree
Showing 91 changed files with 2,796 additions and 1,277 deletions.
4 changes: 2 additions & 2 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ Bootstrap (http://getbootstrap.com/)
Copyright 2011-2016 Twitter, Inc.
License: MIT License (https://github.com/twbs/bootstrap/blob/master/LICENSE)

Bootstrap Datepicker (http://www.eyecon.ro/bootstrap-datepicker)
Copyright 2012 Stefan Petre
Bootstrap Datepicker (https://github.com/eternicode/bootstrap-datepicker)
Copyright 2012 Stefan Petre, Improvements by Andrew Rowls
License: Apache 2.0

D3.js (http://d3js.org)
Expand Down
2 changes: 2 additions & 0 deletions app-conf/elephant.conf
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
port=8080

db_url=localhost
db_name=drelephant
db_user=root
db_password=

#jvm_props="-Devolutionplugin=enabled -DapplyEvolutions.default=true"
keytab_location="/export/apps/hadoop/keytabs/dr_elephant-service.keytab"
keytab_user="elephant/eat1-magicaz01.grid.linkedin.com"
11 changes: 6 additions & 5 deletions app/com/linkedin/drelephant/ElephantContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import com.linkedin.drelephant.configurations.fetcher.FetcherConfigurationData;
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfiguration;
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData;
import com.linkedin.drelephant.configurations.jobtype.JobTypeConf;
import com.linkedin.drelephant.configurations.jobtype.JobTypeConfiguration;
import com.linkedin.drelephant.util.Utils;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
Expand Down Expand Up @@ -175,8 +175,8 @@ private void loadHeuristics() {

// Bind No_DATA heuristic to its helper pages, no need to add any real configurations
_heuristicsConfData.add(
new HeuristicConfigurationData(HeuristicResult.NO_DATA.getAnalysis(), null, "views.html.help.helpNoData", null,
null));
new HeuristicConfigurationData(HeuristicResult.NO_DATA.getHeuristicName(),
HeuristicResult.NO_DATA.getHeuristicClassName(), "views.html.help.helpNoData", null, null));
}

/**
Expand Down Expand Up @@ -216,7 +216,8 @@ private void configureSupportedApplicationTypes() {
* Load all the job types configured in JobTypeConf.xml
*/
private void loadJobTypes() {
JobTypeConf conf = new JobTypeConf(JOB_TYPES_CONF);
Document document = Utils.loadXMLDoc(JOB_TYPES_CONF);
JobTypeConfiguration conf = new JobTypeConfiguration(document.getDocumentElement());
_appTypeToJobTypes = conf.getAppTypeToJobTypeList();
}

Expand All @@ -243,7 +244,7 @@ public Map<String, List<String>> getAllHeuristicNames() {

List<String> nameList = new ArrayList<String>();
for (Heuristic heuristic : list) {
nameList.add(heuristic.getHeuristicName());
nameList.add(heuristic.getHeuristicConfData().getHeuristicName());
}

Collections.sort(nameList);
Expand Down
11 changes: 6 additions & 5 deletions app/com/linkedin/drelephant/ElephantRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import model.JobResult;
import models.AppResult;

import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -149,7 +149,7 @@ public void run() {
analyticJob = _jobQueue.take();
logger.info("Executor thread " + _threadId + " analyzing " + analyticJob.getAppType().getName() + " "
+ analyticJob.getAppId());
JobResult result = analyticJob.getAnalysis();
AppResult result = analyticJob.getAnalysis();
result.save();

} catch (InterruptedException ex) {
Expand All @@ -162,9 +162,10 @@ public void run() {
logger.error("Add analytic job id [" + analyticJob.getAppId() + "] into the retry list.");
_analyticJobGenerator.addIntoRetries(analyticJob);
} else {
logger.error(
"Drop the analytic job. Reason: reached the max retries for application id = [" + analyticJob.getAppId()
+ "].");
if (analyticJob != null) {
logger.error("Drop the analytic job. Reason: reached the max retries for application id = ["
+ analyticJob.getAppId() + "].");
}
}
}
}
Expand Down
93 changes: 61 additions & 32 deletions app/com/linkedin/drelephant/analysis/AnalyticJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import com.linkedin.drelephant.util.InfoExtractor;
import com.linkedin.drelephant.util.Utils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import model.JobHeuristicResult;
import model.JobResult;
import models.AppHeuristicResult;
import models.AppHeuristicResultDetails;
import models.AppResult;
import org.apache.log4j.Logger;


Expand All @@ -41,6 +43,7 @@ public class AnalyticJob {
private String _appId;
private String _jobId;
private String _name;
private String _queueName;
private String _user;
private String _trackingUrl;
private long _startTime;
Expand Down Expand Up @@ -79,25 +82,24 @@ public AnalyticJob setAppId(String appId) {
}

/**
* Set the id of the job
* jobId is the appId with the prefix 'application_' replaced by 'job_'
* Set the name of the analytic job
*
* @param jobId The job id
* @param name
* @return The analytic job
*/
public AnalyticJob setJobId(String jobId) {
_jobId = jobId;
public AnalyticJob setName(String name) {
_name = name;
return this;
}

/**
* Set the name of the analytic job
* Set the queue name in which the analytic jobs was submitted
*
* @param name
* @param name the name of the queue
* @return The analytic job
*/
public AnalyticJob setName(String name) {
_name = name;
public AnalyticJob setQueueName(String name) {
_queueName = name;
return this;
}

Expand All @@ -120,6 +122,10 @@ public AnalyticJob setUser(String user) {
* @return The analytic job
*/
public AnalyticJob setStartTime(long startTime) {
// TIMESTAMP range starts from FROM_UNIXTIME(1) = 1970-01-01 00:00:01
if (startTime <= 0) {
startTime = 1;
}
_startTime = startTime;
return this;
}
Expand All @@ -131,6 +137,10 @@ public AnalyticJob setStartTime(long startTime) {
* @return The analytic job
*/
public AnalyticJob setFinishTime(long finishTime) {
// TIMESTAMP range starts from FROM_UNIXTIME(1) = 1970-01-01 00:00:01
if (finishTime <= 0) {
finishTime = 1;
}
_finishTime = finishTime;
return this;
}
Expand Down Expand Up @@ -198,6 +208,15 @@ public String getTrackingUrl() {
return _trackingUrl;
}

/**
* Returns the queue in which the application was submitted
*
* @return The queue name
*/
public String getQueueName() {
return _queueName;
}

/**
* Sets the tracking url for the job
*
Expand All @@ -210,15 +229,15 @@ public AnalyticJob setTrackingUrl(String trackingUrl) {
}

/**
* Returns the analysed JobResult that could be directly serialized into DB.
* Returns the analysed AppResult that could be directly serialized into DB.
*
* This method fetches the data using the appropriate application fetcher, runs all the heuristics on them and
* loads it into the JobResult model.
* loads it into the AppResult model.
*
* @throws Exception if the analysis process encountered a problem.
* @return the analysed JobResult
* @return the analysed AppResult
*/
public JobResult getAnalysis() throws Exception {
public AppResult getAnalysis() throws Exception {
ElephantFetcher fetcher = ElephantContext.instance().getFetcherForApplicationType(getAppType());
HadoopApplicationData data = fetcher.fetchData(this);

Expand All @@ -227,6 +246,7 @@ public JobResult getAnalysis() throws Exception {
if (data == null || data.isEmpty()) {
// Example: a MR job has 0 mappers and 0 reducers
logger.info("No Data Received for analytic job: " + getAppId());
HeuristicResult.NO_DATA.addResultDetail("No Data Received", "");
analysisResults.add(HeuristicResult.NO_DATA);
} else {
List<Heuristic> heuristics = ElephantContext.instance().getHeuristicsForApplicationType(getAppType());
Expand All @@ -242,36 +262,45 @@ public JobResult getAnalysis() throws Exception {
String jobTypeName = jobType == null ? UNKNOWN_JOB_TYPE : jobType.getName();

// Load job information
JobResult result = new JobResult();
result.jobId = Utils.getJobIdFromApplicationId(getAppId());
result.url = getTrackingUrl();
AppResult result = new AppResult();
result.id = getAppId();
result.trackingUrl = getTrackingUrl();
result.queueName = getQueueName();
result.username = getUser();
result.startTime = getStartTime();
result.analysisTime = getFinishTime();
result.jobName = getName();
result.startTime = new Date(getStartTime());
result.finishTime = new Date(getFinishTime());
result.name = getName();
result.jobType = jobTypeName;

// Truncate long names
if (result.jobName.length() > 100) {
result.jobName = result.jobName.substring(0, 97) + "...";
if (result.name.length() > 255) {
result.name = result.name.substring(0, 252) + "...";
}

// Load Job Heuristic information
result.heuristicResults = new ArrayList<JobHeuristicResult>();
// Load App Heuristic information
int jobScore = 0;
result.yarnAppHeuristicResults = new ArrayList<AppHeuristicResult>();
Severity worstSeverity = Severity.NONE;
for (HeuristicResult heuristicResult : analysisResults) {
JobHeuristicResult detail = new JobHeuristicResult();
detail.analysisName = heuristicResult.getAnalysis();
detail.data = heuristicResult.getDetailsCSV();
detail.dataColumns = heuristicResult.getDetailsColumns();
AppHeuristicResult detail = new AppHeuristicResult();
detail.heuristicClass = heuristicResult.getHeuristicClassName();
detail.heuristicName = heuristicResult.getHeuristicName();
detail.severity = heuristicResult.getSeverity();
if (detail.dataColumns < 1) {
detail.dataColumns = 1;
detail.score = heuristicResult.getScore();
for (HeuristicResultDetails heuristicResultDetails : heuristicResult.getHeuristicResultDetails()) {
AppHeuristicResultDetails heuristicDetail = new AppHeuristicResultDetails();
heuristicDetail.yarnAppHeuristicResult = detail;
heuristicDetail.name = heuristicResultDetails.getName();
heuristicDetail.value = heuristicResultDetails.getValue();
heuristicDetail.details = heuristicResultDetails.getDetails();
detail.yarnAppHeuristicResultDetails.add(heuristicDetail);
}
result.heuristicResults.add(detail);
result.yarnAppHeuristicResults.add(detail);
worstSeverity = Severity.max(worstSeverity, detail.severity);
jobScore += detail.score;
}
result.severity = worstSeverity;
result.score = jobScore;

// Retrieve Azkaban execution, flow and jobs URLs from jobData and store them into result.
InfoExtractor.retrieveURLs(result, data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import model.JobResult;
import models.AppResult;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
Expand Down Expand Up @@ -161,14 +161,14 @@ private List<AnalyticJob> readApps(URL url) throws IOException, AuthenticationEx
JsonNode apps = rootNode.path("apps").path("app");

for (JsonNode app : apps) {
String id = app.get("id").getValueAsText();
String jobId = Utils.getJobIdFromApplicationId(id);
String appId = app.get("id").getValueAsText();

// When called first time after launch, hit the DB and avoid duplicated analytic jobs that have been analyzed
// before.
if (_lastTime > 0 || _lastTime == 0 && JobResult.find.byId(jobId) == null && JobResult.find.byId(id) == null) {
if (_lastTime > 0 || (_lastTime == 0 && AppResult.find.byId(appId) == null)) {
String user = app.get("user").getValueAsText();
String name = app.get("name").getValueAsText();
String queueName = app.get("queue").getValueAsText();
String trackingUrl = app.get("trackingUrl") != null? app.get("trackingUrl").getValueAsText() : null;
long startTime = app.get("startedTime").getLongValue();
long finishTime = app.get("finishedTime").getLongValue();
Expand All @@ -179,7 +179,7 @@ private List<AnalyticJob> readApps(URL url) throws IOException, AuthenticationEx
// If the application type is supported
if (type != null) {
AnalyticJob analyticJob = new AnalyticJob();
analyticJob.setAppId(id).setAppType(type).setJobId(jobId).setUser(user).setName(name)
analyticJob.setAppId(appId).setAppType(type).setUser(user).setName(name).setQueueName(queueName)
.setTrackingUrl(trackingUrl).setStartTime(startTime).setFinishTime(finishTime);

appList.add(analyticJob);
Expand Down
9 changes: 6 additions & 3 deletions app/com/linkedin/drelephant/analysis/Heuristic.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.linkedin.drelephant.analysis;

import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData;


/**
* This interface defines the Heuristic rule interface.
*
Expand All @@ -31,9 +34,9 @@ public interface Heuristic<T extends HadoopApplicationData> {
public HeuristicResult apply(T data);

/**
* Get the heuristic name
* Get the heuristic Configuration
*
* @return the name
* @return the heuristic configuration data
*/
public String getHeuristicName();
public HeuristicConfigurationData getHeuristicConfData();
}
Loading

0 comments on commit c467979

Please sign in to comment.