Skip to content

Commit

Permalink
Fixes MapReduce aggregator and heuristic to correctly handle task dat…
Browse files Browse the repository at this point in the history
…a when sampling is enabled (linkedin#222)
  • Loading branch information
shkhrgpt authored and akshayrai committed Mar 28, 2017
1 parent cbd1622 commit fdfb643
Show file tree
Hide file tree
Showing 15 changed files with 56 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public long getResourceUsed() {

/**
* Computes the aggregated metrics -> peakMemory, delay, total task duration, wasted resources and memory usage.
* Aggregated metrics are expected to be approximation when sampling is enabled.
* @param taskDatas
* @param containerSize
* @param idealStartTime
Expand All @@ -106,6 +107,9 @@ private void compute(MapReduceTaskData[] taskDatas, long containerSize, long ide
}

for (MapReduceTaskData taskData: taskDatas) {
if (!taskData.isTimeAndCounterDataPresent()) {
continue;
}
long taskMemory = taskData.getCounters().get(MapReduceCounterData.CounterName.PHYSICAL_MEMORY_BYTES)/ FileUtils.ONE_MB; // MB
long taskVM = taskData.getCounters().get(MapReduceCounterData.CounterName.VIRTUAL_MEMORY_BYTES)/ FileUtils.ONE_MB; // MB
long taskDuration = taskData.getFinishTimeMs() - taskData.getStartTimeMs(); // Milliseconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
import com.linkedin.drelephant.mapreduce.data.MapReduceApplicationData;
import com.linkedin.drelephant.mapreduce.data.MapReduceTaskData;
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import com.linkedin.drelephant.analysis.Heuristic;
import com.linkedin.drelephant.analysis.HeuristicResult;
import com.linkedin.drelephant.analysis.Severity;


public class JobQueueLimitHeuristic implements Heuristic<MapReduceApplicationData> {

private HeuristicConfigurationData _heuristicConfData;
Expand Down Expand Up @@ -89,13 +91,13 @@ public HeuristicResult apply(MapReduceApplicationData data) {
}

private Severity[] getTasksSeverity(MapReduceTaskData[] tasks, long queueTimeout) {
Severity[] tasksSeverity = new Severity[tasks.length];
int i = 0;
List<Severity> taskSeverityList = new ArrayList<Severity>();
for (MapReduceTaskData task : tasks) {
tasksSeverity[i] = getQueueLimitSeverity(task.getTotalRunTimeMs(), queueTimeout);
i++;
if (task.isTimeAndCounterDataPresent()) {
taskSeverityList.add(getQueueLimitSeverity(task.getTotalRunTimeMs(), queueTimeout));
}
}
return tasksSeverity;
return taskSeverityList.toArray(new Severity[taskSeverityList.size()]);
}

private long getSeverityFrequency(Severity severity, Severity[] tasksSeverity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void testNullTaskArray() {

@Test
public void testTaskLevelData() {
MapReduceTaskData taskData[] = new MapReduceTaskData[2];
MapReduceTaskData taskData[] = new MapReduceTaskData[3];
MapReduceCounterData counterData = new MapReduceCounterData();
counterData.set(MapReduceCounterData.CounterName.PHYSICAL_MEMORY_BYTES, 655577088L);
counterData.set(MapReduceCounterData.CounterName.VIRTUAL_MEMORY_BYTES, 3051589632L);
Expand All @@ -51,6 +51,8 @@ public void testTaskLevelData() {
taskData[0].setTimeAndCounter(time, counterData);
taskData[1] = new MapReduceTaskData("task", "id");
taskData[1].setTimeAndCounter(new long[5], counterData);
// Non-sampled task, which does not contain time and counter data
taskData[2] = new MapReduceTaskData("task", "id");
TaskLevelAggregatedMetrics taskMetrics = new TaskLevelAggregatedMetrics(taskData, 4096L, 1463218501117L);
Assert.assertEquals(taskMetrics.getDelay(), 1000000000L);
Assert.assertEquals(taskMetrics.getResourceUsed(), 135168L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,23 @@ public void testNonDefaultRuntimeNone() throws IOException {

private Severity analyzeJob(long runtimeMs, String queueName) throws IOException {
MapReduceCounterData dummyCounter = new MapReduceCounterData();
MapReduceTaskData[] mappers = new MapReduceTaskData[2 * NUM_TASKS / 3];
MapReduceTaskData[] reducers = new MapReduceTaskData[NUM_TASKS / 3];
MapReduceTaskData[] mappers = new MapReduceTaskData[(2 * NUM_TASKS / 3) + 1];
MapReduceTaskData[] reducers = new MapReduceTaskData[(NUM_TASKS / 3) + 1];
Properties jobConf = new Properties();
jobConf.put("mapred.job.queue.name", queueName);
int i = 0;
for (; i < 2 * NUM_TASKS / 3; i++) {
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
mappers[i].setTimeAndCounter(new long[] { runtimeMs, 0, 0, 0, 0 }, dummyCounter);
}
// Non-sampled task, which does not contain time and counter data
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
for (i = 0; i < NUM_TASKS / 3; i++) {
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
reducers[i].setTimeAndCounter(new long[] { runtimeMs, 0, 0, 0, 0 }, dummyCounter);
}
// Non-sampled task, which does not contain time and counter data
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
MapReduceApplicationData data =
new MapReduceApplicationData().setCounters(dummyCounter).setReducerData(reducers).setMapperData(mappers)
.setJobConf(jobConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void testSmallTasks() throws IOException {
private Severity analyzeJob(int numSmallTasks, int numLargeTasks, long smallInputSize, long largeInputSize)
throws IOException {
MapReduceCounterData jobCounter = new MapReduceCounterData();
MapReduceTaskData[] mappers = new MapReduceTaskData[numSmallTasks + numLargeTasks];
MapReduceTaskData[] mappers = new MapReduceTaskData[numSmallTasks + numLargeTasks + 1];

MapReduceCounterData smallCounter = new MapReduceCounterData();
smallCounter.set(MapReduceCounterData.CounterName.HDFS_BYTES_READ, smallInputSize);
Expand All @@ -89,6 +89,8 @@ private Severity analyzeJob(int numSmallTasks, int numLargeTasks, long smallInpu
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
mappers[i].setTimeAndCounter(new long[5], largeCounter);
}
// Non-sampled task, which does not contain time and counter data
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setMapperData(mappers);
HeuristicResult result = _heuristic.apply(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testShortTasksNone() throws IOException {

private Severity analyzeJob(long runtimeMs, long cpuMs, long gcMs) throws IOException {
MapReduceCounterData jobCounter = new MapReduceCounterData();
MapReduceTaskData[] mappers = new MapReduceTaskData[NUMTASKS];
MapReduceTaskData[] mappers = new MapReduceTaskData[NUMTASKS + 1];

MapReduceCounterData counter = new MapReduceCounterData();
counter.set(MapReduceCounterData.CounterName.CPU_MILLISECONDS, cpuMs);
Expand All @@ -73,6 +73,8 @@ private Severity analyzeJob(long runtimeMs, long cpuMs, long gcMs) throws IOExce
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
mappers[i].setTimeAndCounter(new long[]{runtimeMs, 0 , 0, 0, 0}, counter);
}
// Non-sampled task, which does not contain time and counter data
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setMapperData(mappers);
HeuristicResult result = _heuristic.apply(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void testDefaultContainerNoneMore() throws IOException {

private Severity analyzeJob(long taskAvgMemMB, long containerMemMB) throws IOException {
MapReduceCounterData jobCounter = new MapReduceCounterData();
MapReduceTaskData[] mappers = new MapReduceTaskData[NUMTASKS];
MapReduceTaskData[] mappers = new MapReduceTaskData[NUMTASKS + 1];

MapReduceCounterData counter = new MapReduceCounterData();
counter.set(MapReduceCounterData.CounterName.PHYSICAL_MEMORY_BYTES, taskAvgMemMB* FileUtils.ONE_MB);
Expand All @@ -83,6 +83,8 @@ private Severity analyzeJob(long taskAvgMemMB, long containerMemMB) throws IOExc
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
mappers[i].setTimeAndCounter(new long[5], counter);
}
// Non-sampled task, which does not contain time and counter data
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setMapperData(mappers);
data.setJobConf(p);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testShortTask() throws IOException {

private Severity analyzeJob(long runtimeMs, long readBytes) throws IOException {
MapReduceCounterData jobCounter = new MapReduceCounterData();
MapReduceTaskData[] mappers = new MapReduceTaskData[NUMTASKS];
MapReduceTaskData[] mappers = new MapReduceTaskData[NUMTASKS + 1];

MapReduceCounterData counter = new MapReduceCounterData();
counter.set(MapReduceCounterData.CounterName.HDFS_BYTES_READ, readBytes);
Expand All @@ -92,6 +92,8 @@ private Severity analyzeJob(long runtimeMs, long readBytes) throws IOException {
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
mappers[i].setTimeAndCounter(new long[] { runtimeMs, 0, 0 ,0, 0}, counter);
}
// Non-sampled task, which does not contain time and counter data
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setMapperData(mappers);
HeuristicResult result = _heuristic.apply(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,19 @@ public void testSmallNumTasks() throws IOException {

private Severity analyzeJob(long spilledRecords, long mapRecords, int numTasks) throws IOException {
MapReduceCounterData jobCounter = new MapReduceCounterData();
MapReduceTaskData[] mappers = new MapReduceTaskData[numTasks];
MapReduceTaskData[] mappers = new MapReduceTaskData[numTasks + 1];

MapReduceCounterData counter = new MapReduceCounterData();
counter.set(MapReduceCounterData.CounterName.SPILLED_RECORDS, spilledRecords);
counter.set(MapReduceCounterData.CounterName.MAP_OUTPUT_RECORDS, mapRecords);

for (int i=0; i < numTasks; i++) {
int i = 0;
for (; i < numTasks; i++) {
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
mappers[i].setTimeAndCounter(new long[5], counter);
}
// Non-sampled task, which does not contain time and counter data
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setMapperData(mappers);
HeuristicResult result = _heuristic.apply(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void testShortRuntimeTasksNone() throws IOException {

private Severity analyzeJob(int numTasks, long runtime) throws IOException {
MapReduceCounterData jobCounter = new MapReduceCounterData();
MapReduceTaskData[] mappers = new MapReduceTaskData[numTasks];
MapReduceTaskData[] mappers = new MapReduceTaskData[numTasks + 1];

MapReduceCounterData taskCounter = new MapReduceCounterData();
taskCounter.set(MapReduceCounterData.CounterName.HDFS_BYTES_READ, DUMMY_INPUT_SIZE);
Expand All @@ -98,6 +98,8 @@ private Severity analyzeJob(int numTasks, long runtime) throws IOException {
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
mappers[i].setTimeAndCounter(new long[] { runtime, 0, 0, 0, 0 }, taskCounter);
}
// Non-sampled task, which does not contain time and counter data
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setMapperData(mappers);
HeuristicResult result = _heuristic.apply(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void testSmallTasks() throws IOException {
private Severity analyzeJob(int numSmallTasks, int numLargeTasks, long smallInputSize, long largeInputSize)
throws IOException {
MapReduceCounterData jobCounter = new MapReduceCounterData();
MapReduceTaskData[] reducers = new MapReduceTaskData[numSmallTasks + numLargeTasks];
MapReduceTaskData[] reducers = new MapReduceTaskData[numSmallTasks + numLargeTasks + 1];

MapReduceCounterData smallCounter = new MapReduceCounterData();
smallCounter.set(MapReduceCounterData.CounterName.REDUCE_SHUFFLE_BYTES, smallInputSize);
Expand All @@ -88,6 +88,8 @@ private Severity analyzeJob(int numSmallTasks, int numLargeTasks, long smallInpu
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
reducers[i].setTimeAndCounter(new long[5], largeCounter);
}
// Non-sampled task, which does not contain time and counter data
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setReducerData(reducers);
HeuristicResult result = _heuristic.apply(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testShortTasksNone() throws IOException {

private Severity analyzeJob(long runtimeMs, long cpuMs, long gcMs) throws IOException {
MapReduceCounterData jobCounter = new MapReduceCounterData();
MapReduceTaskData[] reducers = new MapReduceTaskData[NUMTASKS];
MapReduceTaskData[] reducers = new MapReduceTaskData[NUMTASKS + 1];

MapReduceCounterData counter = new MapReduceCounterData();
counter.set(MapReduceCounterData.CounterName.CPU_MILLISECONDS, cpuMs);
Expand All @@ -72,6 +72,8 @@ private Severity analyzeJob(long runtimeMs, long cpuMs, long gcMs) throws IOExce
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
reducers[i].setTimeAndCounter(new long[] { runtimeMs, 0, 0, 0, 0 }, counter);
}
// Non-sampled task, which does not contain time and counter data
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setReducerData(reducers);
HeuristicResult result = _heuristic.apply(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testDefaultContainerNoneMore() throws IOException {

private Severity analyzeJob(long taskAvgMemMB, long containerMemMB) throws IOException {
MapReduceCounterData jobCounter = new MapReduceCounterData();
MapReduceTaskData[] reducers = new MapReduceTaskData[NUMTASKS];
MapReduceTaskData[] reducers = new MapReduceTaskData[NUMTASKS + 1];

MapReduceCounterData counter = new MapReduceCounterData();
counter.set(MapReduceCounterData.CounterName.PHYSICAL_MEMORY_BYTES, taskAvgMemMB* FileUtils.ONE_MB);
Expand All @@ -82,6 +82,8 @@ private Severity analyzeJob(long taskAvgMemMB, long containerMemMB) throws IOExc
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
reducers[i].setTimeAndCounter(new long[5], counter);
}
// Non-sampled task, which does not contain time and counter data
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setReducerData(reducers);
data.setJobConf(p);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,15 @@ public void testLongRunetimeSevereMore() throws IOException {

private Severity analyzeJob(long runtimeMs, int numTasks) throws IOException {
MapReduceCounterData dummyCounter = new MapReduceCounterData();
MapReduceTaskData[] reducers = new MapReduceTaskData[numTasks];
MapReduceTaskData[] reducers = new MapReduceTaskData[numTasks + 1];

int i = 0;
for (; i < numTasks; i++) {
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
reducers[i].setTimeAndCounter(new long[] { runtimeMs, 0, 0, 0, 0 }, dummyCounter);
}
// Non-sampled task, which does not contain time and counter data
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(dummyCounter).setReducerData(reducers);
HeuristicResult result = _heuristic.apply(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,16 @@ public void testShortSort() throws IOException {

private Severity analyzeJob(long shuffleTimeMs, long sortTimeMs, long reduceTimeMs) throws IOException {
MapReduceCounterData dummyCounter = new MapReduceCounterData();
MapReduceTaskData[] reducers = new MapReduceTaskData[NUMTASKS];
MapReduceTaskData[] reducers = new MapReduceTaskData[NUMTASKS + 1];

int i = 0;
for (; i < NUMTASKS; i++) {
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
reducers[i].setTimeAndCounter(
new long[] { shuffleTimeMs + sortTimeMs + reduceTimeMs, shuffleTimeMs, sortTimeMs, 0, 0}, dummyCounter);
}
// Non-sampled task, which does not contain time and counter data
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
MapReduceApplicationData data = new MapReduceApplicationData().setCounters(dummyCounter).setReducerData(reducers);
HeuristicResult result = _heuristic.apply(data);
return result.getSeverity();
Expand Down

0 comments on commit fdfb643

Please sign in to comment.