Skip to content

Commit

Permalink
[HADOOP-7814] Prepare to add metrics library to dr-elephant
Browse files Browse the repository at this point in the history
Changed HadoopJobData to include finishTime since that is needed for
metrics.
Changed the signature of getJobCounter to include jobConf and jobData
so that it can publish metrics
Updated README.md

Tested locally on my box and on spades

RB=406817
BUGS=HADOOP-7814
R=fli,mwagner
A=fli
  • Loading branch information
mcvsubbu committed Dec 15, 2014
1 parent 089281a commit ba05463
Show file tree
Hide file tree
Showing 13 changed files with 207 additions and 145 deletions.
60 changes: 27 additions & 33 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,26 @@
* To be able to build & run the application, download and install [Play framework 2.2.2](http://downloads.typesafe.com/play/2.2.2/play-2.2.2.zip).
* The pre-installed play command on our boxes will not work as it is configured to look at LinkedIns repos
* If this is your first time working with Dr. Elephant, take the deployed Hadoop jars and put them in the /lib directory:
scp eat1-magicgw01.grid.linkedin.com:/export/apps/hadoop/latest/hadoop-core-1.2.1-p3.jar ./lib/.

* To build and run the application in dev mode, run from command line "play run" in the project directory.
* There is need to investigate the framework to see how one can add parameters to the classpath in dev mode.

### Deployment

* To create a deployment package, use "play dist" to create a zip package, or use "play universal:package-zip-tarball" to create a tarball
* To run the deployed package with Hadoop properly, some changes needs to be added to the startup script located at ./bin/dr-elephant

* in the classpath ("declare -r app\_classpath=...") , add to the end of the string, before the end quotes

:$HADOOP_HOME/*:$HADOOP_HOME/lib/*:$HADOOP_HOME/conf

* after the next line ("addJava ... ;"), add new line

addJava "-Djava.library.path=$HADOOP_HOME/lib/native/Linux-amd64-64"

### New Deployment (All previous instructions are deprecated!)

* ./compile.sh will create two zips under 'dist' dir which can deploy with h1 and h2 directly without changing classpath
* When test dr.e in hadoop2.x locally, HADOOP_HOME and HADOOP_CONF_DIR need to be set properly
* Upon deployment on cluster, we can specify keytab and database location at runtime: ./bin/dr-elephant -Dhttp.port=xxxx -Dkeytab.user="xxxx" -Dkeytab.location="xxxx" -Ddb.default.url="jdbc:mysql://xxxx" -Ddb.default.user=xxxx -Ddb.default.password=xxxx so that we don't have to change application.conf at compile time

scp eat1-magicgw01.grid.linkedin.com:/export/apps/hadoop/latest/hadoop-core-*.jar ./lib/.
NOTE: When building dr-elephant, this jar file should be removed or renamed. Otherwise, the hadoop-2 executable built will not work.
* To compile dr-elephant, run the script ./compile.sh. A zip file is created in the 'dist' directory.
* You can run dr-elephant on your desktop and have it point to a cluster. Do the following:
- cd dist;unzip *.zip; # and then cd to the dr-elephant release directory created.
- Copy the cluster's configuration onto a local directory on your machine.
- Copy the hadoop distribution onto a local directory (or, build hadoop locally).
- For Hadoop-2, the following setup is needed:
export HADOOP_HOME=/path/to/project/hadoop/hadoop-dist/target/hadoop-2.3.0_li-SNAPSHOT
export HADOOP_CONF_DIR=/path/to/config/etc/hadoop
export PATH=$HADOOP_HOME/bin:$PATH # because dr-elephant uses 'hadoop classpath' to load the right classes
vim $HADOOP_CONF_DIR/mapred-site.xml
# Make sure to set the job history server co-ordinates if it is not already set
# <property><name>mapreduce.jobhistory.webapp.address</name><value>eat1-hcl0764.grid:19888</value></property>
- Set up and start mysql locally on your box, the default port is 3306. Create a database called 'drelephant', if it does not exist.
- You can now start dr-elephant with the following command (assuming you are running mysql locally on port 3306):
./bin/dr-elephant -Dhttp.port=8089 -Ddb.default.url="jdbc:mysql://ssubrama-ld1.linkedin.biz:3306/drelephant?characterEncoding=UTF-8" -Devolutionplugin=enabled -DapplyEvolutions.default=true
- Note that you can add other properties with the -D option, as -Dprop.name=value
- You can debug dr-elephant using eclispse or idea by adding the argument -Djvm_args="-Xdebug -Xrunjdwp:transport=dt_socket,address=8876,server=y,suspend=y"
This will start dr-elephant and wait until you attach eclipse/idea to port 8876. You can then set breakpoints and debug interactively.


### DB Schema evolutions
Expand All @@ -47,18 +43,16 @@ When the schema in the model package changes, play will need to be ran to automa
* Remove the header in the sql file so it does not get overwritten
* Browse the page again to refresh the schema to add the indices.

### Running on the cluster
### Deployment on the cluster

* SSH into the machine
* sudo as elephant
* go to /export/apps/elephant/
* To start: ./run.sh
* To kill: ./kill.sh
* To deploy new version:
* scp machine:location-to-drelephant.zip /export/apps/elephant/
* ./kill.sh
* unzip dr-elephant-0.1-SNAPSHOT.zip
* ./run.sh
* unzip the file
* cd dr-elephant*/bin
* To start: ./start.sh
* To stop: ./stop.sh
* To deploy new version, be sure to kill the running process first

### Adding new heuristics

Expand All @@ -68,4 +62,4 @@ When the schema in the model package changes, play will need to be ran to automa
* heuristicname: Name of the heuristic. (e.g. Mapper Spill)
* classname: This should be the fully qualified name of the class (e.g. com.linkedin.drelephant.analysis.heuristics.MapperSpillHeuristic)
* viewname: This should be the fully qualified name of the view. ( e.g. views.html.helpMapperSpill )
*Run Doctor Elephant, it should now include the new heuristics.
*Run Doctor Elephant, it should now include the new heuristics.
81 changes: 47 additions & 34 deletions app/com/linkedin/drelephant/ElephantFetcherClassic.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,20 @@
import com.linkedin.drelephant.hadoop.HadoopJobData;
import com.linkedin.drelephant.hadoop.HadoopTaskData;
import com.linkedin.drelephant.math.Statistics;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import model.JobResult;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Counters.Counter;
Expand All @@ -27,21 +38,6 @@
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;


public class ElephantFetcherClassic implements ElephantFetcher {
private static final Logger logger = Logger.getLogger(ElephantFetcher.class);
Expand Down Expand Up @@ -133,9 +129,10 @@ public void fetchJobData(HadoopJobData jobData) throws IOException, Authenticati
JobStatus status = job.getJobStatus();
String username = status.getUsername();
long startTime = status.getStartTime();
long finishTime = status.getFinishTime();
String jobUrl = job.getTrackingURL();
String jobName = job.getJobName();
jobData.setUsername(username).setStartTime(startTime).setUrl(jobUrl).setJobName(jobName);
jobData.setUsername(username).setStartTime(startTime).setFinishTime(finishTime).setUrl(jobUrl).setJobName(jobName);

// Fetch job counter
HadoopCounterHolder counterHolder = fetchCounter(job.getCounters());
Expand Down Expand Up @@ -315,16 +312,16 @@ private long[] tryExtractDetailFromRow(Element row, boolean isMapper) throws Par
}

private HadoopCounterHolder fetchCounter(Counters counters) {
Map<CounterName, Long> counterMap = new EnumMap<CounterName, Long>(CounterName.class);
HadoopCounterHolder holder = new HadoopCounterHolder();
for (Counters.Group group : counters) {
for (Counter ctr : group) {
CounterName cn = CounterName.getCounterFromName(ctr.getName());
if (cn != null) {
counterMap.put(cn, ctr.getValue());
holder.set(cn.getGroupName(), cn.getName(), ctr.getValue());
}
}
}
return new HadoopCounterHolder(counterMap);
return holder;
}

private void addJobToRetryList(HadoopJobData job) {
Expand Down Expand Up @@ -400,41 +397,43 @@ private boolean checkRetiredAndFetchJobData(HadoopJobData jobData) throws IOExce
HadoopTaskData[] mapperData = fetchAllTaskDataForRetiredJob(mapperUrl, true);
HadoopTaskData[] reducerData = fetchAllTaskDataForRetiredJob(reducerUrl, false);

long startTime = fetchStartTimeForRetiredJob(doc, jobUrl);
StartAndEndTimes times = fetchTimesForRetiredJob(doc, jobUrl);
String username = jobConf.getProperty("user.name");
String jobName = jobConf.getProperty("mapred.job.name");
jobData.setUsername(username).setStartTime(startTime).setUrl(jobUrl).setJobName(jobName);
jobData.setUsername(username).setStartTime(times._startTime).setFinishTime(times._finishTime).setUrl(jobUrl).setJobName(jobName);
jobData.setCounters(jobCounter).setJobConf(jobConf).setMapperData(mapperData).setReducerData(reducerData);
return true;
}

// Fetch job counter from job's main page
private HadoopCounterHolder fetchJobCounterForRetiredJob(Document doc) throws IOException, AuthenticationException {
Map<CounterName, Long> counterMap = new EnumMap<CounterName, Long>(CounterName.class);
HadoopCounterHolder holder = new HadoopCounterHolder();
Elements rows = doc.select("table").select("tr");
for (Element row : rows) {
Elements cells = row.select("> td");
if (cells.size() == 5) {
String countername = cells.get(1).text().trim();
CounterName cn = CounterName.getCounterFromDisplayName(countername);
if (cn != null) {
counterMap.put(cn, Long.parseLong(cells.get(4).text().trim().replace(",", "")));
long value = Long.parseLong(cells.get(4).text().trim().replace(",", ""));
holder.set(cn.getGroupName(), cn.getName(), value);
}
} else if (cells.size() == 4) {
String countername = cells.get(0).text().trim();
CounterName cn = CounterName.getCounterFromDisplayName(countername);
if (cn != null) {
counterMap.put(cn, Long.parseLong(cells.get(3).text().trim().replace(",", "")));
long value = Long.parseLong(cells.get(3).text().trim().replace(",", ""));
holder.set(cn.getGroupName(), cn.getName(), value);
}
}
}
return new HadoopCounterHolder(counterMap);
return holder;
}

// Fetch task counter from task's counter page
public HadoopCounterHolder fetchTaskCounterForRetiredJob(String taskCounterUrl) throws IOException,
AuthenticationException {
Map<CounterName, Long> counterMap = new EnumMap<CounterName, Long>(CounterName.class);
HadoopCounterHolder holder = new HadoopCounterHolder();
Document doc = ThreadContextMR1.fetchHtmlDoc(taskCounterUrl);
Elements rows = doc.select("table").select("tr");
for (Element row : rows) {
Expand All @@ -443,26 +442,40 @@ public HadoopCounterHolder fetchTaskCounterForRetiredJob(String taskCounterUrl)
String countername = cells.get(1).text().trim();
CounterName cn = CounterName.getCounterFromDisplayName(countername);
if (cn != null) {
counterMap.put(cn, Long.parseLong(cells.get(2).text().trim().replace(",", "")));
long value = Long.parseLong(cells.get(2).text().trim().replace(",", ""));
holder.set(cn.getGroupName(), cn.getName(), value);
}
}
}
return new HadoopCounterHolder(counterMap);
return holder;
}

// We fetch the start time of job's Setup task as the job's start time, shown in job's main page
private long fetchStartTimeForRetiredJob(Document doc, String jobUrl) throws IOException {

static class StartAndEndTimes {
public long _startTime;
public long _finishTime;
}

private StartAndEndTimes fetchTimesForRetiredJob(Document doc, String jobUrl)
throws IOException {
Elements rows = doc.select("table").select("tr");
for (Element row : rows) {
Elements cells = row.select("> td");
if (cells.size() == 7 && cells.get(0).text().trim().equals("Setup")) {
SimpleDateFormat dateFormat = new SimpleDateFormat("d-MMM-yyyy HH:mm:ss");
StartAndEndTimes times = new StartAndEndTimes();
try {
long time = dateFormat.parse(cells.get(5).text().trim()).getTime();
return time;
times._startTime = dateFormat.parse(cells.get(5).text().trim()).getTime();
} catch (ParseException e) {
throw new IOException("Error in fetching start time from job page in URL : " + jobUrl);
}
try {
times._finishTime = dateFormat.parse(cells.get(6).text().trim()).getTime();
} catch (ParseException e) {
throw new IOException("Error in fetching start time data from job page in URL : " + jobUrl);
throw new IOException("Error in fetching finish time from job page in URL : " + jobUrl);
}
return times;
}
}
throw new IOException("Unable to fetch start time data from job page in URL : " + jobUrl);
Expand Down
71 changes: 39 additions & 32 deletions app/com/linkedin/drelephant/ElephantFetcherYarn.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,24 @@
import com.linkedin.drelephant.hadoop.HadoopJobData;
import com.linkedin.drelephant.hadoop.HadoopTaskData;
import com.linkedin.drelephant.math.Statistics;

import model.JobResult;

import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.log4j.Logger;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import model.JobResult;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.log4j.Logger;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;


public class ElephantFetcherYarn implements ElephantFetcher {
Expand Down Expand Up @@ -221,10 +217,27 @@ private List<HadoopJobData> getJobData(URL url, boolean checkDB) throws IOExcept
}

// New job
/*
{
"finishTime": 1415816989533,
"id": "job_1415656299984_0014",
"mapsCompleted": 1,
"mapsTotal": 1,
"name": "word count",
"queue": "default",
"reducesCompleted": 1,
"reducesTotal": 1,
"startTime": 1415816979666,
"state": "SUCCEEDED",
"submitTime": 1415816976300,
"user": "ssubrama"
}
*/
HadoopJobData jobData = new HadoopJobData();
jobData.setJobId(jobId).setUsername(job.get("user").getValueAsText())
.setJobName(job.get("name").getValueAsText()).setUrl(getJobDetailURL(jobId))
.setStartTime(job.get("startTime").getLongValue());
.setStartTime(job.get("startTime").getLongValue())
.setFinishTime(job.get("finishTime").getLongValue());

jobList.add(jobData);
}
Expand All @@ -245,46 +258,40 @@ private Properties getProperties(URL url) throws IOException, AuthenticationExce
return jobConf;
}

private HadoopCounterHolder getJobCounter(URL url) throws IOException, AuthenticationException {
Map<CounterName, Long> counterMap = new EnumMap<CounterName, Long>(CounterName.class);
private HadoopCounterHolder getJobCounter(URL url)
throws IOException, AuthenticationException {
HadoopCounterHolder holder = new HadoopCounterHolder();

JsonNode rootNode = ThreadContextMR2.readJsonNode(url);
JsonNode groups = rootNode.path("jobCounters").path("counterGroup");

for (JsonNode group : groups) {
for (JsonNode counter : group.path("counter")) {
String name = counter.get("name").getValueAsText();
CounterName cn = CounterName.getCounterFromName(name);
if (cn != null) {
counterMap.put(cn, counter.get("totalCounterValue").getLongValue());
}
String counterName = counter.get("name").getValueAsText();
CounterName cn = CounterName.getCounterFromName(counterName);
Long counterValue = counter.get("totalCounterValue").getLongValue();
String groupName = group.get("counterGroupName").getValueAsText();
holder.set(groupName, counterName, counterValue);
}
}
return new HadoopCounterHolder(counterMap);
return holder;
}

private HadoopCounterHolder getTaskCounter(URL url) throws IOException, AuthenticationException {
Map<CounterName, Long> counterMap = new EnumMap<CounterName, Long>(CounterName.class);

JsonNode rootNode = ThreadContextMR2.readJsonNode(url);
JsonNode groups = rootNode.path("jobTaskCounters").path("taskCounterGroup");
HadoopCounterHolder holder = new HadoopCounterHolder();

for (JsonNode group : groups) {
for (JsonNode counter : group.path("counter")) {
String name = counter.get("name").getValueAsText();
CounterName cn = CounterName.getCounterFromName(name);
if (cn != null) {
counterMap.put(cn, counter.get("value").getLongValue());
}
String groupName = group.get("counterGroupName").getValueAsText();
Long value = counter.get("value").getLongValue();
holder.set(groupName, name, value);
}
}

for (CounterName name : CounterName.values()) {
if (!counterMap.containsKey(name)) {
counterMap.put(name, 0L);
}
}
return new HadoopCounterHolder(counterMap);
return holder;
}

private long[] getTaskExecTime(URL url) throws IOException, AuthenticationException {
Expand Down
Loading

0 comments on commit ba05463

Please sign in to comment.