Skip to content

Commit

Permalink
HADOOP-5255: Azkaban integration
Browse files Browse the repository at this point in the history
RB2 w/o space changes

Limit number of historical executions to display

Formatting
  • Loading branch information
Mark Wagner committed Jul 2, 2014
1 parent 9e7bce2 commit cd0e734
Show file tree
Hide file tree
Showing 12 changed files with 561 additions and 361 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
### Compiling & testing locally

* To be able to build & run the application, download and install [Play framework 2.2.2](http://downloads.typesafe.com/play/2.2.2/play-2.2.2.zip).
* The pre-installed play command on our boxes will not work as it is configured to look at LinkedIns repos
* If this is your first time working with Dr. Elephant, take the deployed Hadoop jars and put them in the /lib directory:
scp eat1-magicgw01.grid.linkedin.com:/export/apps/hadoop/latest/hadoop-core-1.2.1-p3.jar ./lib/.

* To build and run the application in dev mode, run from command line "play run" in the project directory.
* There is need to investigate the framework to see how one can add parameters to the classpath in dev mode.

Expand Down
352 changes: 172 additions & 180 deletions app/com/linkedin/drelephant/ElephantRunner.java
Original file line number Diff line number Diff line change
@@ -1,206 +1,198 @@
package com.linkedin.drelephant;

import com.linkedin.drelephant.analysis.Constants;
import com.linkedin.drelephant.analysis.HeuristicResult;
import com.linkedin.drelephant.analysis.Severity;
import com.linkedin.drelephant.hadoop.HadoopJobData;
import com.linkedin.drelephant.hadoop.HadoopSecurity;
import com.linkedin.drelephant.notifications.EmailThread;
import com.linkedin.drelephant.util.Utils;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import model.JobHeuristicResult;
import model.JobResult;
import model.JobType;

import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.log4j.Logger;

import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import com.linkedin.drelephant.analysis.Constants;
import com.linkedin.drelephant.analysis.HeuristicResult;
import com.linkedin.drelephant.analysis.Severity;
import com.linkedin.drelephant.hadoop.HadoopJobData;
import com.linkedin.drelephant.hadoop.HadoopSecurity;
import com.linkedin.drelephant.notifications.EmailThread;

public class ElephantRunner implements Runnable {
private static final long WAIT_INTERVAL = 5 * 60 * 1000;
private static final Logger logger = Logger.getLogger(ElephantRunner.class);
private AtomicBoolean running = new AtomicBoolean(true);
private EmailThread emailer = new EmailThread();
private boolean firstRun = true;
private HadoopSecurity hadoopSecurity;

@Override
public void run() {
hadoopSecurity = new HadoopSecurity();
hadoopSecurity.doAs(new PrivilegedAction<Void>() {
@Override
public Void run() {
Constants.load();
emailer.start();
Set<JobID> previousJobs = new HashSet<JobID>();
long lastRun;
ElephantFetcher fetcher;
try {
fetcher = new ElephantFetcher();
} catch (Exception e) {
logger.error("Error initializing fetcher", e);
return null;
}

while (running.get()) {
lastRun = System.currentTimeMillis();

try {
logger.info("Fetching job list.");
hadoopSecurity.checkLogin();
JobStatus[] jobs = fetcher.getJobList();

Set<JobID> successJobs = filterSuccessfulJobs(jobs);
successJobs = filterPreviousJobs(successJobs, previousJobs);

logger.info(successJobs.size() + " jobs to analyse.");

//Analyse all ready jobs
for (JobID jobId : successJobs) {
boolean success = analyzeJob(fetcher, jobId);
if (success) {
previousJobs.add(jobId);
}
}
logger.info("Finished all jobs. Waiting for refresh.");

} catch (Exception e) {
logger.error("Error in Runner thread", e);
}

//Wait for long enough
long nextRun = lastRun + WAIT_INTERVAL;
long waitTime = nextRun - System.currentTimeMillis();
while (running.get() && waitTime > 0) {
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
logger.error("Thread interrupted", e);
}
waitTime = nextRun - System.currentTimeMillis();
}
}
return null;
}
});
}

private Set<JobID> filterSuccessfulJobs(JobStatus[] jobs) {
Set<JobID> successJobs = new HashSet<JobID>();
for (JobStatus job : jobs) {
if (job.getRunState() == JobStatus.SUCCEEDED && job.isJobComplete()) {
successJobs.add(job.getJobID());
}
private static final long WAIT_INTERVAL = 10 * 1000;
private static final Logger logger = Logger.getLogger(ElephantRunner.class);
private AtomicBoolean running = new AtomicBoolean(true);
private EmailThread emailer = new EmailThread();
private boolean firstRun = true;
private HadoopSecurity hadoopSecurity;
private InfoExtractor urlRetriever = new InfoExtractor();

@Override
public void run() {
hadoopSecurity = new HadoopSecurity();
hadoopSecurity.doAs(new PrivilegedAction<Void>() {
@Override
public Void run() {
Constants.load();
emailer.start();
Set<JobID> previousJobs = new HashSet<JobID>();
long lastRun;
ElephantFetcher fetcher;
try {
fetcher = new ElephantFetcher();
} catch (Exception e) {
logger.error("Error initializing fetcher", e);
return null;
}
return successJobs;
}

private Set<JobID> filterPreviousJobs(Set<JobID> jobs, Set<JobID> previousJobs) {
logger.info("Cleaning up previous runs.");
//On first run, check against DB
if (firstRun) {
Set<JobID> newJobs = new HashSet<JobID>();
for (JobID jobId : jobs) {
JobResult prevResult = JobResult.find.byId(jobId.toString());
if (prevResult == null) {
//Job not found, add to new jobs list
newJobs.add(jobId);
} else {
//Job found, add to old jobs list
previousJobs.add(jobId);
}
}
jobs = newJobs;
firstRun = false;
} else {
//Remove untracked jobs
previousJobs.retainAll(jobs);
//Remove previously analysed jobs
jobs.removeAll(previousJobs);
}
while (running.get()) {
lastRun = System.currentTimeMillis();

return jobs;
}
try {
logger.info("Fetching job list.");
hadoopSecurity.checkLogin();
JobStatus[] jobs = fetcher.getJobList();

/**
* @param fetcher
* @param jobId
* @return true if analysis succeeded
*/
private boolean analyzeJob(ElephantFetcher fetcher, JobID jobId) {
ElephantAnalyser analyser = ElephantAnalyser.instance();
try {
logger.info("Looking at job " + jobId);
HadoopJobData jobData = fetcher.getJobData(jobId);
Set<JobID> successJobs = filterSuccessfulJobs(jobs);
successJobs = filterPreviousJobs(successJobs, previousJobs);

//Job wiped from jobtracker already.
if (jobData == null) {
return true;
}
logger.info(successJobs.size() + " jobs to analyse.");

HeuristicResult[] analysisResults = analyser.analyse(jobData);
JobType jobType = analyser.getJobType(jobData);

//Save to DB
JobResult result = new JobResult();
result.job_id = jobId.toString();
result.url = jobData.getUrl();
result.username = jobData.getUsername();
result.startTime = jobData.getStartTime();
result.analysisTime = System.currentTimeMillis();
result.jobName = jobData.getJobName();
result.jobType = jobType;

//Truncate long names
if (result.jobName.length() > 100) {
result.jobName = result.jobName.substring(0, 97) + "...";
// Analyse all ready jobs
for (JobID jobId : successJobs) {
boolean success = analyzeJob(fetcher, jobId);
if (success) {
previousJobs.add(jobId);
}
}
result.heuristicResults = new ArrayList<JobHeuristicResult>();

Severity worstSeverity = Severity.NONE;

for (HeuristicResult heuristicResult : analysisResults) {
JobHeuristicResult detail = new JobHeuristicResult();
detail.analysisName = heuristicResult.getAnalysis();
detail.data = heuristicResult.getDetailsCSV();
detail.dataColumns = heuristicResult.getDetailsColumns();
detail.severity = heuristicResult.getSeverity();
if (detail.dataColumns < 1) {
detail.dataColumns = 1;
}
result.heuristicResults.add(detail);
worstSeverity = Severity.max(worstSeverity, detail.severity);
logger.info("Finished all jobs. Waiting for refresh.");

} catch (Exception e) {
logger.error("Error in Runner thread", e);
}

// Wait for long enough
long nextRun = lastRun + WAIT_INTERVAL;
long waitTime = nextRun - System.currentTimeMillis();
while (running.get() && waitTime > 0) {
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
logger.error("Thread interrupted", e);
}
waitTime = nextRun - System.currentTimeMillis();
}
}
return null;
}
});
}

private Set<JobID> filterSuccessfulJobs(JobStatus[] jobs) {
Set<JobID> successJobs = new HashSet<JobID>();
for (JobStatus job : jobs) {
if (job.getRunState() == JobStatus.SUCCEEDED && job.isJobComplete()) {
successJobs.add(job.getJobID());
}
}
return successJobs;
}

private Set<JobID> filterPreviousJobs(Set<JobID> jobs, Set<JobID> previousJobs) {
logger.info("Cleaning up previous runs.");
// On first run, check against DB
if (firstRun) {
Set<JobID> newJobs = new HashSet<JobID>();
for (JobID jobId : jobs) {
JobResult prevResult = JobResult.find.byId(jobId.toString());
if (prevResult == null) {
// Job not found, add to new jobs list
newJobs.add(jobId);
} else {
// Job found, add to old jobs list
previousJobs.add(jobId);
}
}
jobs = newJobs;
firstRun = false;
} else {
// Remove untracked jobs
previousJobs.retainAll(jobs);
// Remove previously analysed jobs
jobs.removeAll(previousJobs);
}

result.severity = worstSeverity;

Map<String, String> metaUrls = analyser.getMetaUrls(jobData);
String[] csvLines = new String[metaUrls.size()];
int i = 0;
for (Map.Entry<String, String> entry : metaUrls.entrySet()) {
csvLines[i] = Utils.createCsvLine(entry.getKey(), entry.getValue());
i++;
}
result.metaUrls = Utils.combineCsvLines(csvLines);
return jobs;
}

/**
* @param fetcher
* @param jobId
* @return true if analysis succeeded
*/
private boolean analyzeJob(ElephantFetcher fetcher, JobID jobId) {
ElephantAnalyser analyser = ElephantAnalyser.instance();
try {
logger.info("Looking at job " + jobId);
HadoopJobData jobData = fetcher.getJobData(jobId);

// Job wiped from jobtracker already.
if (jobData == null) {
return true;
}

HeuristicResult[] analysisResults = analyser.analyse(jobData);
JobType jobType = analyser.getJobType(jobData);

// Save to DB
JobResult result = new JobResult();
result.job_id = jobId.toString();
result.url = jobData.getUrl();
result.username = jobData.getUsername();
result.startTime = jobData.getStartTime();
result.analysisTime = System.currentTimeMillis();
result.jobName = jobData.getJobName();
result.jobType = jobType;

// Truncate long names
if (result.jobName.length() > 100) {
result.jobName = result.jobName.substring(0, 97) + "...";
}
result.heuristicResults = new ArrayList<JobHeuristicResult>();

Severity worstSeverity = Severity.NONE;

for (HeuristicResult heuristicResult : analysisResults) {
JobHeuristicResult detail = new JobHeuristicResult();
detail.analysisName = heuristicResult.getAnalysis();
detail.data = heuristicResult.getDetailsCSV();
detail.dataColumns = heuristicResult.getDetailsColumns();
detail.severity = heuristicResult.getSeverity();
if (detail.dataColumns < 1) {
detail.dataColumns = 1;
}
result.heuristicResults.add(detail);
worstSeverity = Severity.max(worstSeverity, detail.severity);
}

result.save();
result.severity = worstSeverity;
urlRetriever.retrieveURLs(result, jobData);

result.save();

emailer.enqueue(result);
return true;
} catch (Exception e) {
logger.error("Error analysing job", e);
}
return false;
emailer.enqueue(result);
return true;
} catch (Exception e) {
logger.error("Error analysing job", e);
}
return false;
}

public void kill() {
running.set(false);
emailer.kill();
}
public void kill() {
running.set(false);
emailer.kill();
}
}
Loading

0 comments on commit cd0e734

Please sign in to comment.