Skip to content

Commit

Permalink
Cleanes up MapReduceTaskData class by removing unnecessary constructo…
Browse files Browse the repository at this point in the history
…rs (linkedin#202)
  • Loading branch information
shkhrgpt authored and akshayrai committed Feb 7, 2017
1 parent 0fc5ac2 commit d2eea3c
Show file tree
Hide file tree
Showing 25 changed files with 57 additions and 57 deletions.
32 changes: 8 additions & 24 deletions app/com/linkedin/drelephant/mapreduce/data/MapReduceTaskData.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* This class manages the MapReduce Tasks
*/
public class MapReduceTaskData {

private MapReduceCounterData _counterHolder;
private String _taskId;
// The successful attempt id
Expand All @@ -30,39 +31,22 @@ public class MapReduceTaskData {
private long _sortTimeMs = 0;
private long _startTimeMs = 0;
private long _finishTimeMs = 0;
private boolean _sampled = false;

public MapReduceTaskData(MapReduceCounterData counterHolder, long[] time) {
this._counterHolder = counterHolder;
this._totalTimeMs = time[0];
this._shuffleTimeMs = time[1];
this._sortTimeMs = time[2];
this._startTimeMs = time[3];
this._finishTimeMs = time[4];
this._sampled = true;
}

public MapReduceTaskData(MapReduceCounterData counterHolder) {
this._counterHolder = counterHolder;
}
// This flag will only be true when successfully setting time and counter values.
private boolean _isTimeAndCounterDataPresent = false;

public MapReduceTaskData(String taskId, String taskAttemptId) {
this._taskId = taskId;
this._attemptId = taskAttemptId;
}

public void setCounter(MapReduceCounterData counterHolder) {
this._counterHolder = counterHolder;
this._sampled = true;
}

public void setTime(long[] time) {
public void setTimeAndCounter(long[] time, MapReduceCounterData counterHolder) {
this._totalTimeMs = time[0];
this._shuffleTimeMs = time[1];
this._sortTimeMs = time[2];
this._startTimeMs = time[3];
this._finishTimeMs = time[4];
this._sampled = true;
this._counterHolder = counterHolder;
this._isTimeAndCounterDataPresent = true;
}

public MapReduceCounterData getCounters() {
Expand Down Expand Up @@ -93,8 +77,8 @@ public long getFinishTimeMs() {
return _finishTimeMs;
}

public boolean isSampled() {
return _sampled;
public boolean isTimeAndCounterDataPresent() {
return _isTimeAndCounterDataPresent;
}

public String getTaskId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,7 @@ private MapReduceTaskData[] getTaskData(String jobId, List<JobHistoryParser.Task
MapReduceCounterData taskCounterData = getCounterData(tInfo.getCounters());
long[] taskExecTime = getTaskExecTime(tInfo.getAllTaskAttempts().get(attemptId));

taskList[i].setCounter(taskCounterData);
taskList[i].setTime(taskExecTime);
taskList[i].setTimeAndCounter(taskExecTime, taskCounterData);
}
return taskList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,7 @@ private void getTaskData(String jobId, List<MapReduceTaskData> taskList) throws
URL taskAttemptURL = getTaskAttemptURL(jobId, data.getTaskId(), data.getAttemptId());
long[] taskExecTime = getTaskExecTime(taskAttemptURL);

data.setCounter(taskCounter);
data.setTime(taskExecTime);
data.setTimeAndCounter(taskExecTime, taskCounter);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public HeuristicResult apply(MapReduceApplicationData data) {
List<Long> inputBytes = new ArrayList<Long>();

for (int i = 0; i < tasks.length; i++) {
if (tasks[i].isSampled()) {
if (tasks[i].isTimeAndCounterDataPresent()) {
inputBytes.add(tasks[i].getCounters().get(_counterName));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public HeuristicResult apply(MapReduceApplicationData data) {
List<Long> runtimesMs = new ArrayList<Long>();

for (MapReduceTaskData task : tasks) {
if (task.isSampled()) {
if (task.isTimeAndCounterDataPresent()) {
runtimesMs.add(task.getTotalRunTimeMs());
gcMs.add(task.getCounters().get(MapReduceCounterData.CounterName.GC_MILLISECONDS));
cpuMs.add(task.getCounters().get(MapReduceCounterData.CounterName.CPU_MILLISECONDS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public HeuristicResult apply(MapReduceApplicationData data) {
long taskPMin = Long.MAX_VALUE;
long taskPMax = 0;
for (MapReduceTaskData task : tasks) {
if (task.isSampled()) {
if (task.isTimeAndCounterDataPresent()) {
runtimesMs.add(task.getTotalRunTimeMs());
long taskPMem = task.getCounters().get(MapReduceCounterData.CounterName.PHYSICAL_MEMORY_BYTES);
long taskVMem = task.getCounters().get(MapReduceCounterData.CounterName.VIRTUAL_MEMORY_BYTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public HeuristicResult apply(MapReduceApplicationData data) {

for (MapReduceTaskData task : tasks) {

if (task.isSampled()) {
if (task.isTimeAndCounterDataPresent()) {
long inputBytes = task.getCounters().get(MapReduceCounterData.CounterName.HDFS_BYTES_READ);
long runtimeMs = task.getTotalRunTimeMs();
inputByteSizes.add(inputBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public HeuristicResult apply(MapReduceApplicationData data) {

for (MapReduceTaskData task : tasks) {

if (task.isSampled()) {
if (task.isTimeAndCounterDataPresent()) {
totalSpills += task.getCounters().get(MapReduceCounterData.CounterName.SPILLED_RECORDS);
totalOutputRecords += task.getCounters().get(MapReduceCounterData.CounterName.MAP_OUTPUT_RECORDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public HeuristicResult apply(MapReduceApplicationData data) {

for (MapReduceTaskData task : tasks) {

if (task.isSampled()) {
if (task.isTimeAndCounterDataPresent()) {
inputBytes.add(task.getCounters().get(MapReduceCounterData.CounterName.HDFS_BYTES_READ));
long taskTime = task.getTotalRunTimeMs();
runtimesMs.add(taskTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public HeuristicResult apply(MapReduceApplicationData data) {
long taskMaxMs = 0;

for (MapReduceTaskData task : tasks) {
if (task.isSampled()) {
if (task.isTimeAndCounterDataPresent()) {
long taskTime = task.getTotalRunTimeMs();
runTimesMs.add(taskTime);
taskMinMs = Math.min(taskMinMs, taskTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public HeuristicResult apply(MapReduceApplicationData data) {
List<Long> sortTimeMs = new ArrayList<Long>();

for (MapReduceTaskData task : tasks) {
if (task.isSampled()) {
if (task.isTimeAndCounterDataPresent()) {
execTimeMs.add(task.getCodeExecutionTimeMs());
shuffleTimeMs.add(task.getShuffleTimeMs());
sortTimeMs.add(task.getSortTimeMs());
Expand Down
6 changes: 4 additions & 2 deletions test/com/linkedin/drelephant/analysis/AnalyticJobTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public void testGetAnalysis()
for (int i = 1; i <= mappers.length; i++) {
MapReduceCounterData taskCounter = new MapReduceCounterData();
setCounterData(taskCounter, FILENAME_MAPPERTASK.replaceFirst("\\$", Integer.toString(i)));
mappers[i - 1] = new MapReduceTaskData(taskCounter, mapperTasksTime[i - 1]);
mappers[i - 1 ] = new MapReduceTaskData("task-id-"+(i-1), "task-attempt-id-"+(i-1));
mappers[i - 1].setTimeAndCounter(mapperTasksTime[i - 1], taskCounter);
}

// Setup reducer data
Expand All @@ -84,7 +85,8 @@ public void testGetAnalysis()
for (int i = 1; i <= reducers.length; i++) {
MapReduceCounterData taskCounter = new MapReduceCounterData();
setCounterData(taskCounter, FILENAME_REDUCERTASK.replaceFirst("\\$", Integer.toString(i)));
reducers[i - 1] = new MapReduceTaskData(taskCounter, reducerTasksTime[i - 1]);
reducers[i - 1] = new MapReduceTaskData("task-id-"+(i-1), "task-attempt-id-"+(i-1));
reducers[i - 1].setTimeAndCounter(reducerTasksTime[i - 1], taskCounter);
}

// Setup job configuration data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ public void testTaskLevelData() {
counterData.set(MapReduceCounterData.CounterName.PHYSICAL_MEMORY_BYTES, 655577088L);
counterData.set(MapReduceCounterData.CounterName.VIRTUAL_MEMORY_BYTES, 3051589632L);
long time[] = {0,0,0,1464218501117L, 1464218534148L};
taskData[0] = new MapReduceTaskData(counterData);
taskData[0].setTime(time);
taskData[1] = new MapReduceTaskData(counterData);
taskData[0] = new MapReduceTaskData("task", "id");
taskData[0].setTimeAndCounter(time, counterData);
taskData[1] = new MapReduceTaskData("task", "id");
taskData[1].setTimeAndCounter(new long[5], counterData);
TaskLevelAggregatedMetrics taskMetrics = new TaskLevelAggregatedMetrics(taskData, 4096L, 1463218501117L);
Assert.assertEquals(taskMetrics.getDelay(), 1000000000L);
Assert.assertEquals(taskMetrics.getResourceUsed(), 135168L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,12 @@ private Severity analyzeJob(long runtimeMs, String queueName) throws IOException
jobConf.put("mapred.job.queue.name", queueName);
int i = 0;
for (; i < 2 * NUM_TASKS / 3; i++) {
mappers[i] = new MapReduceTaskData(dummyCounter, new long[] { runtimeMs, 0, 0 ,0, 0 });
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
mappers[i].setTimeAndCounter(new long[] { runtimeMs, 0, 0, 0, 0 }, dummyCounter);
}
for (i = 0; i < NUM_TASKS / 3; i++) {
reducers[i] = new MapReduceTaskData(dummyCounter, new long[] { runtimeMs, 0, 0 ,0, 0 });
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
reducers[i].setTimeAndCounter(new long[] { runtimeMs, 0, 0, 0, 0 }, dummyCounter);
}
MapReduceApplicationData data =
new MapReduceApplicationData().setCounters(dummyCounter).setReducerData(reducers).setMapperData(mappers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,12 @@ private Severity analyzeJob(int numSmallTasks, int numLargeTasks, long smallInpu

int i = 0;
for (; i < numSmallTasks; i++) {
mappers[i] = new MapReduceTaskData(smallCounter, new long[5]);
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
mappers[i].setTimeAndCounter(new long[5], smallCounter);
}
for (; i < numSmallTasks + numLargeTasks; i++) {
mappers[i] = new MapReduceTaskData(largeCounter, new long[5]);
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
mappers[i].setTimeAndCounter(new long[5], largeCounter);
}

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setMapperData(mappers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ private Severity analyzeJob(long runtimeMs, long cpuMs, long gcMs) throws IOExce

int i = 0;
for (; i < NUMTASKS; i++) {
mappers[i] = new MapReduceTaskData(counter, new long[]{runtimeMs, 0 , 0, 0, 0});
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
mappers[i].setTimeAndCounter(new long[]{runtimeMs, 0 , 0, 0, 0}, counter);
}

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setMapperData(mappers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ private Severity analyzeJob(long taskAvgMemMB, long containerMemMB) throws IOExc

int i = 0;
for (; i < NUMTASKS; i++) {
mappers[i] = new MapReduceTaskData(counter, new long[5]);
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
mappers[i].setTimeAndCounter(new long[5], counter);
}

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setMapperData(mappers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ private Severity analyzeJob(long runtimeMs, long readBytes) throws IOException {

int i = 0;
for (; i < NUMTASKS; i++) {
mappers[i] = new MapReduceTaskData(counter, new long[] { runtimeMs, 0, 0 ,0, 0});
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
mappers[i].setTimeAndCounter(new long[] { runtimeMs, 0, 0 ,0, 0}, counter);
}

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setMapperData(mappers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ private Severity analyzeJob(long spilledRecords, long mapRecords, int numTasks)
counter.set(MapReduceCounterData.CounterName.MAP_OUTPUT_RECORDS, mapRecords);

for (int i=0; i < numTasks; i++) {
mappers[i] = new MapReduceTaskData(counter, new long[5]);
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
mappers[i].setTimeAndCounter(new long[5], counter);
}

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setMapperData(mappers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ private Severity analyzeJob(int numTasks, long runtime) throws IOException {

int i = 0;
for (; i < numTasks; i++) {
mappers[i] = new MapReduceTaskData(taskCounter, new long[] { runtime, 0, 0 ,0, 0});
mappers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
mappers[i].setTimeAndCounter(new long[] { runtime, 0, 0, 0, 0 }, taskCounter);
}

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setMapperData(mappers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,12 @@ private Severity analyzeJob(int numSmallTasks, int numLargeTasks, long smallInpu

int i = 0;
for (; i < numSmallTasks; i++) {
reducers[i] = new MapReduceTaskData(smallCounter, new long[5]);
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
reducers[i].setTimeAndCounter(new long[5], smallCounter);
}
for (; i < numSmallTasks + numLargeTasks; i++) {
reducers[i] = new MapReduceTaskData(largeCounter, new long[5]);
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
reducers[i].setTimeAndCounter(new long[5], largeCounter);
}

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setReducerData(reducers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ private Severity analyzeJob(long runtimeMs, long cpuMs, long gcMs) throws IOExce

int i = 0;
for (; i < NUMTASKS; i++) {
reducers[i] = new MapReduceTaskData(counter, new long[]{runtimeMs, 0 , 0, 0, 0});
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
reducers[i].setTimeAndCounter(new long[] { runtimeMs, 0, 0, 0, 0 }, counter);
}

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setReducerData(reducers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ private Severity analyzeJob(long taskAvgMemMB, long containerMemMB) throws IOExc

int i = 0;
for (; i < NUMTASKS; i++) {
reducers[i] = new MapReduceTaskData(counter, new long[5]);
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
reducers[i].setTimeAndCounter(new long[5], counter);
}

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(jobCounter).setReducerData(reducers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ private Severity analyzeJob(long runtimeMs, int numTasks) throws IOException {

int i = 0;
for (; i < numTasks; i++) {
reducers[i] = new MapReduceTaskData(dummyCounter, new long[] { runtimeMs, 0, 0, 0, 0 });
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
reducers[i].setTimeAndCounter(new long[] { runtimeMs, 0, 0, 0, 0 }, dummyCounter);
}

MapReduceApplicationData data = new MapReduceApplicationData().setCounters(dummyCounter).setReducerData(reducers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ private Severity analyzeJob(long shuffleTimeMs, long sortTimeMs, long reduceTime

int i = 0;
for (; i < NUMTASKS; i++) {
reducers[i] = new MapReduceTaskData(dummyCounter,
new long[] { shuffleTimeMs + sortTimeMs + reduceTimeMs, shuffleTimeMs, sortTimeMs, 0, 0});
reducers[i] = new MapReduceTaskData("task-id-"+i, "task-attempt-id-"+i);
reducers[i].setTimeAndCounter(
new long[] { shuffleTimeMs + sortTimeMs + reduceTimeMs, shuffleTimeMs, sortTimeMs, 0, 0}, dummyCounter);
}
MapReduceApplicationData data = new MapReduceApplicationData().setCounters(dummyCounter).setReducerData(reducers);
HeuristicResult result = _heuristic.apply(data);
Expand Down

0 comments on commit d2eea3c

Please sign in to comment.