Skip to content

Commit

Permalink
Patched
Browse files Browse the repository at this point in the history
  • Loading branch information
lishid committed Mar 25, 2014
1 parent 08aefa7 commit 9795c6d
Show file tree
Hide file tree
Showing 51 changed files with 9,343 additions and 464 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ target
# Play
logs
RUNNING_PID


*.jar
42 changes: 42 additions & 0 deletions app/Global.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import com.linkedin.drelephant.DrElephant;
import com.sun.security.sasl.util.AbstractSaslImpl;
import play.Application;
import play.GlobalSettings;
import play.Logger;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.logging.Level;

public class Global extends GlobalSettings {

DrElephant drElephant;

public void onStart(Application app) {
Logger.info("Application has started");
try {
drElephant = new DrElephant();
drElephant.start();
} catch (IOException e) {
e.printStackTrace();
}
}

public void onStop(Application app) {
Logger.info("Application shutdown...");
if (drElephant != null) {
drElephant.kill();
}
}

static void setFinalStatic(Field field, Object newValue) throws Exception {
field.setAccessible(true);

Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);

field.set(null, newValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,14 @@
import java.io.File;
import java.io.IOException;

public class DrElephant {
public class DrElephant extends Thread {
private ElephantRunner elephant;

public static void main(String[] args) throws IOException {
if (args.length > 0) {
analyse(args[0]);
} else {
File storage = new File("results.txt");
if (!storage.exists()) {
storage.createNewFile();
}

ElephantRunner elephant = new ElephantRunner(storage);
elephant.run();
new DrElephant().start();
}
}

Expand All @@ -30,4 +26,27 @@ public static void analyse(String jobId) throws IOException {
HeuristicResult result = analyser.analyse(jobData);
System.out.println(result.getMessage());
}

public DrElephant() throws IOException {
File storage = new File("results.txt");
if (!storage.exists()) {
try {
storage.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
}
elephant = new ElephantRunner(storage);
}

@Override
public void run() {
elephant.run();
}

public void kill() {
if (elephant != null) {
elephant.kill();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public ElephantAnalyser() {

public HeuristicResult analyse(HadoopJobData data) {
if (data.getMapperData().length == 0 && data.getReducerData().length == 0) {
return new HeuristicResult("No mapper/reducer data received", false);
return new HeuristicResult("No mapper/reducer data received", true);
}

HeuristicResult result = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ public ElephantFetcher() throws IOException {

private void init() throws IOException {
logger.info("Connecting to the jobtracker");
jobClient = new JobClient(new JobConf(new Configuration()));
Configuration config = new Configuration();
logger.info(config);
jobClient = new JobClient(new JobConf(config));
}

public HadoopJobData getJobData(JobID job_id) throws IOException {
Expand All @@ -29,6 +31,12 @@ public HadoopJobData getJobData(JobID job_id) throws IOException {
return null;
}

JobStatus status = job.getJobStatus();

String username = status.getUsername();
long startTime = status.getStartTime();
String jobUrl = job.getTrackingURL();

TaskReport[] mapperTasks = getMapTaskReports(job_id);
TaskReport[] reducerTasks = getReduceTaskReports(job_id);

Expand All @@ -42,7 +50,7 @@ public HadoopJobData getJobData(JobID job_id) throws IOException {
reducers[i] = new HadoopTaskData(job, reducerTasks[i], true);
}

return new HadoopJobData(counterHolder, mappers, reducers);
return new HadoopJobData(counterHolder, mappers, reducers).setUsername(username).setStartTime(startTime).setUrl(jobUrl);
}

private RunningJob getJob(JobID job_id) throws IOException {
Expand Down
154 changes: 154 additions & 0 deletions app/com/linkedin/drelephant/ElephantRunner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package com.linkedin.drelephant;

import com.linkedin.drelephant.analysis.Constants;
import com.linkedin.drelephant.analysis.HeuristicResult;
import com.linkedin.drelephant.hadoop.HadoopJobData;
import model.AnalysisResult;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.log4j.Logger;

import java.io.File;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

public class ElephantRunner implements Runnable {
private static final long WAIT_INTERVAL = 5 * 60 * 1000;
private static final Logger logger = Logger.getLogger(ElephantRunner.class);
private AtomicBoolean running = new AtomicBoolean(true);
private File storage;
private boolean firstRun = true;

public ElephantRunner(File storage) {
this.storage = storage;
}

@Override
public void run() {
Constants.load();
try {
ElephantFetcher fetcher = new ElephantFetcher();
ElephantAnalyser analyser = new ElephantAnalyser();
Set<JobID> previousJobs = new HashSet<JobID>();
long lastRun;

while (running.get()) {
lastRun = System.currentTimeMillis();

try {
logger.info("Fetching job list.");
JobStatus[] jobs = fetcher.getJobList();
if (jobs == null) {
throw new IllegalArgumentException("Jobtracker returned 'null' for job list");
}

Set<JobID> successJobs = filterSuccessfulJobs(jobs);

successJobs = filterPreviousJobs(successJobs, previousJobs);

logger.info(successJobs.size() + " jobs to analyse.");

//Analyse all ready jobs
for (JobID jobId : successJobs) {
try {
analyzeJob(fetcher, analyser, jobId);
previousJobs.add(jobId);
} catch (Exception e) {
logger.error("Error analysing job", e);
}
}
logger.info("Finished all jobs. Waiting for refresh.");

} catch (Exception e) {
logger.error("Error getting job list", e);
}

//Wait for long enough
long nextRun = lastRun + WAIT_INTERVAL;
long waitTime = nextRun - System.currentTimeMillis();
while (running.get() && waitTime > 0) {
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
waitTime = nextRun - System.currentTimeMillis();
}
}
} catch (Exception e) {
logger.error("Error in ElephantRunner", e);
}
}

private Set<JobID> filterSuccessfulJobs(JobStatus[] jobs) {
Set<JobID> successJobs = new HashSet<JobID>();
for (JobStatus job : jobs) {
if (job.getRunState() == JobStatus.SUCCEEDED && job.isJobComplete()) {
successJobs.add(job.getJobID());
}
}
return successJobs;
}

private Set<JobID> filterPreviousJobs(Set<JobID> jobs, Set<JobID> previousJobs) {
logger.info("Cleaning up previous runs.");
//On first run, check against DB
if (firstRun) {
Set<JobID> newJobs = new HashSet<JobID>();
for (JobID jobId : jobs) {
AnalysisResult prevResult = AnalysisResult.find.byId(jobId.toString());
if (prevResult == null) {
//Job not found, add to new jobs list
newJobs.add(jobId);
} else {
//Job found, add to old jobs list
previousJobs.add(jobId);
}
}
jobs = newJobs;
firstRun = false;
} else {
//Remove untracked jobs
previousJobs.retainAll(jobs);
//Remove previously analysed jobs
jobs.removeAll(previousJobs);
}

return jobs;
}

private void analyzeJob(ElephantFetcher fetcher, ElephantAnalyser analyser, JobID jobId) throws Exception {
logger.info("Looking at job " + jobId);
HadoopJobData jobData = fetcher.getJobData(jobId);

//Job wiped from jobtracker already.
if (jobData == null) {
return;
}

HeuristicResult analysisResult = analyser.analyse(jobData);

//Save to DB
AnalysisResult result = new AnalysisResult();
result.job_id = jobId.toString();
result.success = analysisResult.succeeded();
result.url = jobData.getUrl();
result.username = jobData.getUsername();
result.message = analysisResult.getMessage();
result.data = analysisResult.getDetailsCSV();
result.dataColumns = analysisResult.getDetailsColumns();
result.startTime = jobData.getStartTime();
result.analysisTime = System.currentTimeMillis();

if(result.dataColumns < 1) {
result.dataColumns = 1;
}

result.save();
}

public void kill() {
running.set(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,17 @@

public class Constants {
private static final Logger logger = Logger.getLogger(Constants.class);
public static final long HDFS_BLOCK_SIZE;
public static long HDFS_BLOCK_SIZE = 64 * 1024 * 1024;
public static final long DISK_READ_SPEED = 100 * 1024 * 1024;
public static final int SHUFFLE_SORT_MAX_SAMPLE_SIZE = 50;

static {
long block_size = 64 * 1024 * 1024;

public static void load() {
try {
block_size = FileSystem.get(new Configuration()).getDefaultBlockSize(new Path("/"));
HDFS_BLOCK_SIZE = FileSystem.get(new Configuration()).getDefaultBlockSize(new Path("/"));
} catch (IOException e) {
logger.error("Error getting FS Block Size!", e);
}

HDFS_BLOCK_SIZE = block_size;
}

public static void load() {
logger.info("HDFS BLock size: " + HDFS_BLOCK_SIZE);
}
}
Loading

0 comments on commit 9795c6d

Please sign in to comment.