Skip to content

Commit

Permalink
Minor changes to the help page, adding hadoop security to work with k…
Browse files Browse the repository at this point in the history
…eytab
  • Loading branch information
lishid committed Apr 18, 2014
1 parent 7a82ecf commit 33a5d99
Show file tree
Hide file tree
Showing 24 changed files with 308 additions and 127 deletions.
21 changes: 15 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
## Dr Elephant

### Compiling
### Compiling & testing locally

* This project uses Ant to compile and package the jar.
* Running ant in the project directory will package the jar file ready for use.
* To be able to build & run the application, download and install [Play framework 2.2.2](http://downloads.typesafe.com/play/2.2.2/play-2.2.2.zip).
* To build and run the application in dev mode, run from command line "play run" in the project directory.
* There is need to investigate the framework to see how one can add parameters to the classpath in dev mode.

### Running
### Deployment

* To run the project, copy the package "dr-elephant.jar" to a machine that has hadoop setup.
* Use the command 'hadoop jar dr-elephant.jar job_id' to start execution.
* To create a deployment package, use "play dist" to create a zip package, or use "play universal:package-zip-tarball" to create a tarball
* To run the deployed package with Hadoop properly, some changes needs to be added to the startup script located at ./bin/dr-elephant

* in the classpath ("declare -r app\_classpath=...") , add to the end of the string, before the end quotes

:$HADOOP_HOME/*:$HADOOP_HOME/lib/*:$HADOOP_HOME/conf

* after the next line ("addJava ... ;"), add new line

addJava "-Djava.library.path=$HADOOP_HOME/lib/native/Linux-amd64-64"
1 change: 0 additions & 1 deletion app/com/linkedin/drelephant/DrElephant.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public static void analyse(String jobId) throws IOException {
HadoopJobData jobData = fetcher.getJobData(job_id);
ElephantAnalyser analyser = new ElephantAnalyser();
HeuristicResult[] result = analyser.analyse(jobData);
//System.out.println(result);
}

public DrElephant() throws IOException {
Expand Down
13 changes: 13 additions & 0 deletions app/com/linkedin/drelephant/ElephantAnalyser.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ public HeuristicResult[] analyse(HadoopJobData data) {
return results.toArray(new HeuristicResult[results.size()]);
}

public String getJobType(HadoopJobData data) {
String pigVersion = data.getJobConf().getProperty("pig.version");
if (pigVersion != null && !pigVersion.isEmpty()) {
return "Pig";
}
String hiveMapredMode = data.getJobConf().getProperty("hive.mapred.mode");
if (hiveMapredMode != null && !hiveMapredMode.isEmpty()) {
return "Hive";
}

return "Unknown/Hadoop";
}

public static ElephantAnalyser instance() {
return instance;
}
Expand Down
54 changes: 53 additions & 1 deletion app/com/linkedin/drelephant/ElephantFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,21 @@
import com.linkedin.drelephant.hadoop.HadoopCounterHolder;
import com.linkedin.drelephant.hadoop.HadoopJobData;
import com.linkedin.drelephant.hadoop.HadoopTaskData;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.log4j.Logger;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Properties;

public class ElephantFetcher {
private static final Logger logger = Logger.getLogger(ElephantFetcher.class);
Expand Down Expand Up @@ -49,7 +59,18 @@ public HadoopJobData getJobData(JobID job_id) throws IOException {
reducers[i] = new HadoopTaskData(job, reducerTasks[i], true);
}

return new HadoopJobData(counterHolder, mappers, reducers)
Properties jobConf = null;
try {
jobConf = getJobConf(job);
} catch (AuthenticationException e) {
e.printStackTrace();
}

if (jobConf == null) {
jobConf = new Properties();
}

return new HadoopJobData(counterHolder, mappers, reducers, jobConf)
.setUsername(username).setStartTime(startTime).setUrl(jobUrl).setJobName(jobName);
}

Expand All @@ -68,4 +89,35 @@ private TaskReport[] getReduceTaskReports(JobID job_id) throws IOException {
public JobStatus[] getJobList() throws IOException {
return jobClient.getAllJobs();
}

public Properties getJobConf(RunningJob job) throws IOException, AuthenticationException {
Properties properties = new Properties();
String jobconfUrl = getJobconfUrl(job);
if (jobconfUrl == null) {
return properties;
}

URL url = new URL(jobconfUrl);
AuthenticatedURL.Token token = new AuthenticatedURL.Token();
HttpURLConnection conn = new AuthenticatedURL().openConnection(url, token);
String data = IOUtils.toString(conn.getInputStream());
Document doc = Jsoup.parse(data);
Elements rows = doc.select("table").select("tr");
for (int i = 1; i < rows.size(); i++) {
Element row = rows.get(i);
Elements cells = row.select("> td");
if (cells.size() == 2) {
String key = cells.get(0).text().trim();
String value = cells.get(1).text().trim();
properties.put(key, value);
}
}
return properties;
}

private String getJobconfUrl(RunningJob job) {
String jobDetails = job.getTrackingURL();
String root = jobDetails.substring(0, jobDetails.indexOf("jobdetails.jsp"));
return root + "jobconf.jsp?jobid=" + job.getID().toString();
}
}
100 changes: 57 additions & 43 deletions app/com/linkedin/drelephant/ElephantRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import com.linkedin.drelephant.analysis.HeuristicResult;
import com.linkedin.drelephant.analysis.Severity;
import com.linkedin.drelephant.hadoop.HadoopJobData;
import com.linkedin.drelephant.hadoop.HadoopSecurity;
import com.linkedin.drelephant.notifications.EmailThread;
import model.JobHeuristicResult;
import model.JobResult;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.log4j.Logger;

import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
Expand All @@ -22,62 +24,71 @@ public class ElephantRunner implements Runnable {
private AtomicBoolean running = new AtomicBoolean(true);
private EmailThread emailer = new EmailThread();
private boolean firstRun = true;
private HadoopSecurity hadoopSecurity;

@Override
public void run() {
Constants.load();
emailer.start();
try {
ElephantFetcher fetcher = new ElephantFetcher();
Set<JobID> previousJobs = new HashSet<JobID>();
long lastRun;

while (running.get()) {
lastRun = System.currentTimeMillis();

hadoopSecurity = new HadoopSecurity();
hadoopSecurity.doAs(new PrivilegedAction<Void>() {
@Override
public Void run() {
Constants.load();
emailer.start();
try {
logger.info("Fetching job list.");
JobStatus[] jobs = fetcher.getJobList();
if (jobs == null) {
throw new IllegalArgumentException("Jobtracker returned 'null' for job list");
}

Set<JobID> successJobs = filterSuccessfulJobs(jobs);
ElephantFetcher fetcher = new ElephantFetcher();
Set<JobID> previousJobs = new HashSet<JobID>();
long lastRun;

successJobs = filterPreviousJobs(successJobs, previousJobs);
while (running.get()) {
lastRun = System.currentTimeMillis();

logger.info(successJobs.size() + " jobs to analyse.");

//Analyse all ready jobs
for (JobID jobId : successJobs) {
try {
analyzeJob(fetcher, jobId);
previousJobs.add(jobId);
logger.info("Fetching job list.");
hadoopSecurity.checkLogin();
JobStatus[] jobs = fetcher.getJobList();
if (jobs == null) {
throw new IllegalArgumentException("Jobtracker returned 'null' for job list");
}

Set<JobID> successJobs = filterSuccessfulJobs(jobs);

successJobs = filterPreviousJobs(successJobs, previousJobs);

logger.info(successJobs.size() + " jobs to analyse.");

//Analyse all ready jobs
for (JobID jobId : successJobs) {
try {
analyzeJob(fetcher, jobId);
previousJobs.add(jobId);
} catch (Exception e) {
logger.error("Error analysing job", e);
}
}
logger.info("Finished all jobs. Waiting for refresh.");

} catch (Exception e) {
logger.error("Error analysing job", e);
logger.error("Error getting job list", e);
}
}
logger.info("Finished all jobs. Waiting for refresh.");

} catch (Exception e) {
logger.error("Error getting job list", e);
}

//Wait for long enough
long nextRun = lastRun + WAIT_INTERVAL;
long waitTime = nextRun - System.currentTimeMillis();
while (running.get() && waitTime > 0) {
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
//Wait for long enough
long nextRun = lastRun + WAIT_INTERVAL;
long waitTime = nextRun - System.currentTimeMillis();
while (running.get() && waitTime > 0) {
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
waitTime = nextRun - System.currentTimeMillis();
}
}
waitTime = nextRun - System.currentTimeMillis();
} catch (Exception e) {
logger.error("Error in ElephantRunner", e);
}
return null;
}
} catch (Exception e) {
logger.error("Error in ElephantRunner", e);
}
});
}

private Set<JobID> filterSuccessfulJobs(JobStatus[] jobs) {
Expand Down Expand Up @@ -162,6 +173,9 @@ private void analyzeJob(ElephantFetcher fetcher, JobID jobId) throws Exception {

result.save();

//TODO: Save this
logger.info("Job Type: " + ElephantAnalyser.instance().getJobType(jobData));

emailer.enqueue(result);
}

Expand Down
1 change: 1 addition & 0 deletions app/com/linkedin/drelephant/analysis/Heuristic.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@

public interface Heuristic {
public HeuristicResult apply(HadoopJobData data);

public String getHeuristicName();
}
3 changes: 2 additions & 1 deletion app/com/linkedin/drelephant/analysis/HeuristicResult.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.drelephant.analysis;

import java.util.*;
import java.util.ArrayList;
import java.util.List;

public class HeuristicResult {
private String analysis;
Expand Down
10 changes: 5 additions & 5 deletions app/com/linkedin/drelephant/analysis/Severity.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ public enum Severity {

private int value;
private String text;
private String color;
private String bootstrapColor;

Severity(int value, String text, String color) {
Severity(int value, String text, String bootstrapColor) {
this.value = value;
this.text = text;
this.color = color;
this.bootstrapColor = bootstrapColor;
}

public int getValue() {
Expand All @@ -36,8 +36,8 @@ public String getText() {
return text;
}

public String getColor() {
return color;
public String getBootstrapColor() {
return bootstrapColor;
}

public static Severity byValue(int value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@
import com.linkedin.drelephant.hadoop.HadoopTaskData;
import com.linkedin.drelephant.math.Statistics;

import java.util.Arrays;
import java.util.Collections;

public class ShuffleSortHeuristic implements Heuristic {
public static final String heuristicName = "Shuffle & Sort";

Expand All @@ -21,7 +18,7 @@ public String getHeuristicName() {

@Override
public HeuristicResult apply(HadoopJobData data) {
HadoopTaskData[] tasks = createSample(data.getReducerData());
HadoopTaskData[] tasks = Statistics.createSample(HadoopTaskData.class, data.getReducerData(), Constants.SHUFFLE_SORT_MAX_SAMPLE_SIZE);

//Gather data
fetchShuffleSort(tasks);
Expand Down Expand Up @@ -58,26 +55,6 @@ public HeuristicResult apply(HadoopJobData data) {
return result;
}

private HadoopTaskData[] createSample(HadoopTaskData[] reducers) {
int MAX_SAMPLE_SIZE = Constants.SHUFFLE_SORT_MAX_SAMPLE_SIZE;

//Skip this process if number of items already smaller than sample size
if (reducers.length <= MAX_SAMPLE_SIZE) {
return reducers;
}

HadoopTaskData[] result = new HadoopTaskData[MAX_SAMPLE_SIZE];

//Shuffle a clone copy
HadoopTaskData[] clone = reducers.clone();
Collections.shuffle(Arrays.asList(clone));

//Take the first n items
System.arraycopy(clone, 0, result, 0, MAX_SAMPLE_SIZE);

return result;
}

private void fetchShuffleSort(HadoopTaskData[] reducers) {
for (HadoopTaskData reducer : reducers) {
reducer.fetchTaskDetails();
Expand Down
9 changes: 8 additions & 1 deletion app/com/linkedin/drelephant/hadoop/HadoopJobData.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.drelephant.hadoop;

import java.io.IOException;
import java.util.Properties;

public class HadoopJobData {
private String username = "";
Expand All @@ -10,11 +11,13 @@ public class HadoopJobData {
private HadoopCounterHolder counterHolder;
private HadoopTaskData[] mapperData;
private HadoopTaskData[] reducerData;
private Properties jobConf;

public HadoopJobData(HadoopCounterHolder counters, HadoopTaskData[] mappers, HadoopTaskData[] reducers) throws IOException {
public HadoopJobData(HadoopCounterHolder counters, HadoopTaskData[] mappers, HadoopTaskData[] reducers, Properties jobConf) throws IOException {
counterHolder = counters;
mapperData = mappers;
reducerData = reducers;
this.jobConf = jobConf;
}

public HadoopJobData setUsername(String username) {
Expand Down Expand Up @@ -49,6 +52,10 @@ public HadoopTaskData[] getReducerData() {
return reducerData;
}

public Properties getJobConf() {
return jobConf;
}

public String getUsername() {
return username;
}
Expand Down
Loading

0 comments on commit 33a5d99

Please sign in to comment.