forked from linkedin/dr-elephant
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: lishid <[email protected]>
- Loading branch information
Showing
51 changed files
with
9,321 additions
and
464 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,3 +31,6 @@ target | |
# Play | ||
logs | ||
RUNNING_PID | ||
|
||
|
||
*.jar |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
import com.linkedin.drelephant.DrElephant; | ||
import com.sun.security.sasl.util.AbstractSaslImpl; | ||
import play.Application; | ||
import play.GlobalSettings; | ||
import play.Logger; | ||
|
||
import java.io.IOException; | ||
import java.lang.reflect.Field; | ||
import java.lang.reflect.Modifier; | ||
import java.util.logging.Level; | ||
|
||
public class Global extends GlobalSettings { | ||
|
||
DrElephant drElephant; | ||
|
||
public void onStart(Application app) { | ||
Logger.info("Application has started"); | ||
java.util.logging.Logger logger = java.util.logging.Logger.getLogger("javax.security.sasl"); | ||
logger.setLevel(Level.OFF); | ||
Logger.info("LOGGER LEVEL: " + logger.getLevel()); | ||
try { | ||
setFinalStatic(AbstractSaslImpl.class.getDeclaredField("logger"), logger); | ||
} catch (Exception e) { | ||
e.printStackTrace(); | ||
} | ||
try { | ||
drElephant = new DrElephant(); | ||
drElephant.start(); | ||
} catch (IOException e) { | ||
e.printStackTrace(); | ||
} | ||
} | ||
|
||
public void onStop(Application app) { | ||
Logger.info("Application shutdown..."); | ||
if (drElephant != null) { | ||
drElephant.kill(); | ||
} | ||
} | ||
|
||
static void setFinalStatic(Field field, Object newValue) throws Exception { | ||
field.setAccessible(true); | ||
|
||
Field modifiersField = Field.class.getDeclaredField("modifiers"); | ||
modifiersField.setAccessible(true); | ||
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); | ||
|
||
field.set(null, newValue); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
package com.linkedin.drelephant; | ||
|
||
import com.linkedin.drelephant.analysis.Constants; | ||
import com.linkedin.drelephant.analysis.HeuristicResult; | ||
import com.linkedin.drelephant.hadoop.HadoopJobData; | ||
import model.AnalysisResult; | ||
import org.apache.hadoop.mapred.JobID; | ||
import org.apache.hadoop.mapred.JobStatus; | ||
import org.apache.log4j.Logger; | ||
|
||
import java.io.File; | ||
import java.util.HashSet; | ||
import java.util.Set; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
public class ElephantRunner implements Runnable { | ||
private static final long WAIT_INTERVAL = 5 * 60 * 1000; | ||
private static final Logger logger = Logger.getLogger(ElephantRunner.class); | ||
private AtomicBoolean running = new AtomicBoolean(true); | ||
private File storage; | ||
private boolean firstRun = true; | ||
|
||
public ElephantRunner(File storage) { | ||
this.storage = storage; | ||
} | ||
|
||
@Override | ||
public void run() { | ||
Constants.load(); | ||
try { | ||
ElephantFetcher fetcher = new ElephantFetcher(); | ||
ElephantAnalyser analyser = new ElephantAnalyser(); | ||
Set<JobID> previousJobs = new HashSet<JobID>(); | ||
long lastRun; | ||
|
||
while (running.get()) { | ||
lastRun = System.currentTimeMillis(); | ||
|
||
try { | ||
logger.info("Fetching job list."); | ||
JobStatus[] jobs = fetcher.getJobList(); | ||
if (jobs == null) { | ||
throw new IllegalArgumentException("Jobtracker returned 'null' for job list"); | ||
} | ||
|
||
Set<JobID> successJobs = filterSuccessfulJobs(jobs); | ||
|
||
successJobs = filterPreviousJobs(successJobs, previousJobs); | ||
|
||
logger.info(successJobs.size() + " jobs to analyse."); | ||
|
||
//Analyse all ready jobs | ||
for (JobID jobId : successJobs) { | ||
try { | ||
analyzeJob(fetcher, analyser, jobId); | ||
previousJobs.add(jobId); | ||
} catch (Exception e) { | ||
logger.error("Error analysing job", e); | ||
} | ||
} | ||
logger.info("Finished all jobs. Waiting for refresh."); | ||
|
||
} catch (Exception e) { | ||
logger.error("Error getting job list", e); | ||
} | ||
|
||
//Wait for long enough | ||
long nextRun = lastRun + WAIT_INTERVAL; | ||
long waitTime = nextRun - System.currentTimeMillis(); | ||
while (running.get() && waitTime > 0) { | ||
try { | ||
Thread.sleep(waitTime); | ||
} catch (InterruptedException e) { | ||
e.printStackTrace(); | ||
} | ||
waitTime = nextRun - System.currentTimeMillis(); | ||
} | ||
} | ||
} catch (Exception e) { | ||
logger.error("Error in ElephantRunner", e); | ||
} | ||
} | ||
|
||
private Set<JobID> filterSuccessfulJobs(JobStatus[] jobs) { | ||
Set<JobID> successJobs = new HashSet<JobID>(); | ||
for (JobStatus job : jobs) { | ||
if (job.getRunState() == JobStatus.SUCCEEDED && job.isJobComplete()) { | ||
successJobs.add(job.getJobID()); | ||
} | ||
} | ||
return successJobs; | ||
} | ||
|
||
private Set<JobID> filterPreviousJobs(Set<JobID> jobs, Set<JobID> previousJobs) { | ||
logger.info("Cleaning up previous runs."); | ||
//On first run, check against DB | ||
if (firstRun) { | ||
Set<JobID> newJobs = new HashSet<JobID>(); | ||
for (JobID jobId : jobs) { | ||
AnalysisResult prevResult = AnalysisResult.find.byId(jobId.toString()); | ||
if (prevResult == null) { | ||
//Job not found, add to new jobs list | ||
newJobs.add(jobId); | ||
} else { | ||
//Job found, add to old jobs list | ||
previousJobs.add(jobId); | ||
} | ||
} | ||
jobs = newJobs; | ||
firstRun = false; | ||
} else { | ||
//Remove untracked jobs | ||
previousJobs.retainAll(jobs); | ||
//Remove previously analysed jobs | ||
jobs.removeAll(previousJobs); | ||
} | ||
|
||
return jobs; | ||
} | ||
|
||
private void analyzeJob(ElephantFetcher fetcher, ElephantAnalyser analyser, JobID jobId) throws Exception { | ||
logger.info("Looking at job " + jobId); | ||
HadoopJobData jobData = fetcher.getJobData(jobId); | ||
|
||
//Job wiped from jobtracker already. | ||
if (jobData == null) { | ||
return; | ||
} | ||
|
||
HeuristicResult analysisResult = analyser.analyse(jobData); | ||
|
||
//Save to DB | ||
AnalysisResult result = new AnalysisResult(); | ||
result.job_id = jobId.toString(); | ||
result.success = analysisResult.succeeded(); | ||
result.url = jobData.getUrl(); | ||
result.username = jobData.getUsername(); | ||
result.message = analysisResult.getMessage(); | ||
result.data = analysisResult.getDetailsCSV(); | ||
result.dataColumns = analysisResult.getDetailsColumns(); | ||
result.startTime = jobData.getStartTime(); | ||
result.analysisTime = System.currentTimeMillis(); | ||
|
||
if(result.dataColumns < 1) { | ||
result.dataColumns = 1; | ||
} | ||
|
||
result.save(); | ||
} | ||
|
||
public void kill() { | ||
running.set(false); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
package com.linkedin.drelephant.analysis; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
public class HeuristicResult { | ||
public static final HeuristicResult SUCCESS = new HeuristicResult("Everything looks good", true); | ||
|
||
private String message; | ||
private List<String> details; | ||
private int detailsColumns = 0; | ||
private boolean success; | ||
|
||
public HeuristicResult(String message, boolean success) { | ||
this.message = message; | ||
this.details = new ArrayList<String>(); | ||
this.success = success; | ||
} | ||
|
||
public boolean succeeded() { | ||
return success; | ||
} | ||
|
||
public String getMessage() { | ||
return message; | ||
} | ||
|
||
public List<String> getDetails() { | ||
return details; | ||
} | ||
|
||
public String getDetailsCSV() { | ||
StringBuilder sb = new StringBuilder(); | ||
for (String line : details) { | ||
sb.append(line).append("\n"); | ||
} | ||
return sb.toString().trim(); | ||
} | ||
|
||
public int getDetailsColumns() { | ||
return detailsColumns; | ||
} | ||
|
||
public void addDetail(String... parts) { | ||
details.add(createLine(parts)); | ||
if (parts.length > detailsColumns) { | ||
detailsColumns = parts.length; | ||
} | ||
} | ||
|
||
public static String createLine(String... parts) { | ||
StringBuilder sb = new StringBuilder(); | ||
String quotes = "\""; | ||
String comma = ","; | ||
for (int i = 0; i < parts.length; i++) { | ||
sb.append(quotes).append(parts[i].replaceAll("\"", "\\\"")).append(quotes); | ||
if (i != parts.length - 1) { | ||
sb.append(comma); | ||
} | ||
} | ||
return sb.toString(); | ||
} | ||
} |
Oops, something went wrong.