Skip to content

Commit

Permalink
added logic for map reduce time-skew heuristic (linkedin#267)
Browse files Browse the repository at this point in the history
  • Loading branch information
skakker authored and akshayrai committed Jul 28, 2017
1 parent f777264 commit d2c603b
Show file tree
Hide file tree
Showing 22 changed files with 230 additions and 80 deletions.
12 changes: 6 additions & 6 deletions app-conf/HeuristicConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

<heuristic>
<applicationtype>mapreduce</applicationtype>
<heuristicname>Mapper Data Skew</heuristicname>
<classname>com.linkedin.drelephant.mapreduce.heuristics.MapperDataSkewHeuristic</classname>
<viewname>views.html.help.mapreduce.helpMapperDataSkew</viewname>
<heuristicname>Mapper Skew</heuristicname>
<classname>com.linkedin.drelephant.mapreduce.heuristics.MapperSkewHeuristic</classname>
<viewname>views.html.help.mapreduce.helpMapperSkew</viewname>
<!--<params>
<num_tasks_severity>10, 50, 100, 200</num_tasks_severity>
<deviation_severity>2, 4, 8, 16</deviation_severity>
Expand Down Expand Up @@ -93,9 +93,9 @@

<heuristic>
<applicationtype>mapreduce</applicationtype>
<heuristicname>Reducer Data Skew</heuristicname>
<classname>com.linkedin.drelephant.mapreduce.heuristics.ReducerDataSkewHeuristic</classname>
<viewname>views.html.help.mapreduce.helpReducerDataSkew</viewname>
<heuristicname>Reducer Skew</heuristicname>
<classname>com.linkedin.drelephant.mapreduce.heuristics.ReducerSkewHeuristic</classname>
<viewname>views.html.help.mapreduce.helpReducerSkew</viewname>
<!--<params>
<num_tasks_severity>10, 50, 100, 200</num_tasks_severity>
<deviation_severity>2, 4, 8, 16</deviation_severity>
Expand Down
2 changes: 1 addition & 1 deletion app-conf/elephant.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ enable_analytics=false

# Additional Configuration
# Check https://www.playframework.com/documentation/2.2.x/ProductionConfiguration
jvm_args="-Devolutionplugin=enabled -DapplyEvolutions.default=true"
jvm_props="-Devolutionplugin=enabled -DapplyEvolutions.default=true"

# Property enables dropwizard metrics for the application.
# More info on Dropwizard metrics at http://metrics.dropwizard.io
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,18 @@ public class MapReduceTaskData {
private String _taskId;
// The successful attempt id
private String _attemptId;

public void setTotalTimeMs(long totalTimeMs, boolean isTimeDataPresent) {
this._totalTimeMs = totalTimeMs;
this._isTimeDataPresent = isTimeDataPresent;
}

private long _totalTimeMs = 0;
private long _shuffleTimeMs = 0;
private long _sortTimeMs = 0;
private long _startTimeMs = 0;
private long _finishTimeMs = 0;

// This flag will only be true when successfully setting time and counter values.
private boolean _isTimeDataPresent = false;
private boolean _isCounterDataPresent = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,23 @@

import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData;
import com.linkedin.drelephant.util.Utils;

import java.util.Arrays;
import java.util.ArrayList;
import java.util.Map;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.time.DurationFormatUtils;
import org.apache.log4j.Logger;


/**
* This Heuristic analyses the skewness in the task input data
*/
public abstract class GenericDataSkewHeuristic implements Heuristic<MapReduceApplicationData> {
private static final Logger logger = Logger.getLogger(GenericDataSkewHeuristic.class);
public abstract class GenericSkewHeuristic implements Heuristic<MapReduceApplicationData> {
private static final Logger logger = Logger.getLogger(GenericSkewHeuristic.class);

// Severity Parameters
private static final String NUM_TASKS_SEVERITY = "num_tasks_severity";
Expand All @@ -51,7 +54,7 @@ public abstract class GenericDataSkewHeuristic implements Heuristic<MapReduceApp
// Default value of parameters
private double[] numTasksLimits = {10, 50, 100, 200}; // Number of map or reduce tasks
private double[] deviationLimits = {2, 4, 8, 16}; // Deviation in i/p bytes btw 2 groups
private double[] filesLimits = {1d/8, 1d/4, 1d/2, 1d}; // Fraction of HDFS Block Size
private double[] filesLimits = {1d / 8, 1d / 4, 1d / 2, 1d}; // Fraction of HDFS Block Size

private List<MapReduceCounterData.CounterName> _counterNames;
private HeuristicConfigurationData _heuristicConfData;
Expand All @@ -78,15 +81,16 @@ private void loadParameters() {
if (confFilesThreshold != null) {
filesLimits = confFilesThreshold;
}
logger.info(heuristicName + " will use " + FILES_SEVERITY + " with the following threshold settings: "
+ Arrays.toString(filesLimits));
logger.info(
heuristicName + " will use " + FILES_SEVERITY + " with the following threshold settings: " + Arrays.toString(
filesLimits));
for (int i = 0; i < filesLimits.length; i++) {
filesLimits[i] = filesLimits[i] * HDFSContext.HDFS_BLOCK_SIZE;
}
}

protected GenericDataSkewHeuristic(List<MapReduceCounterData.CounterName> counterNames,
HeuristicConfigurationData heuristicConfData) {
protected GenericSkewHeuristic(List<MapReduceCounterData.CounterName> counterNames,
HeuristicConfigurationData heuristicConfData) {
this._counterNames = counterNames;
this._heuristicConfData = heuristicConfData;

Expand All @@ -103,27 +107,56 @@ public HeuristicConfigurationData getHeuristicConfData() {
@Override
public HeuristicResult apply(MapReduceApplicationData data) {

if(!data.getSucceeded()) {
if (!data.getSucceeded()) {
return null;
}

MapReduceTaskData[] tasks = getTasks(data);

//Gathering data for checking time skew
List<Long> timeTaken = new ArrayList<Long>();

for (int i = 0; i < tasks.length; i++) {
if (tasks[i].isTimeDataPresent()) {
timeTaken.add(tasks[i].getTotalRunTimeMs());
}
}

long[][] groupsTime = Statistics.findTwoGroups(Longs.toArray(timeTaken));

long timeAvg1 = Statistics.average(groupsTime[0]);
long timeAvg2 = Statistics.average(groupsTime[1]);

//seconds are used for calculating deviation as they provide a better idea than millisecond.
long timeAvgSec1 = TimeUnit.MILLISECONDS.toSeconds(timeAvg1);
long timeAvgSec2 = TimeUnit.MILLISECONDS.toSeconds(timeAvg2);

long minTime = Math.min(timeAvgSec1, timeAvgSec2);
long diffTime = Math.abs(timeAvgSec1 - timeAvgSec2);

//using the same deviation limits for time skew as for data skew. It can be changed in the fututre.
Severity severityTime = getDeviationSeverity(minTime, diffTime);

//This reduces severity if number of tasks is insignificant
severityTime = Severity.min(severityTime,
Severity.getSeverityAscending(groupsTime[0].length, numTasksLimits[0], numTasksLimits[1], numTasksLimits[2],
numTasksLimits[3]));

//Gather data
List<Long> inputBytes = new ArrayList<Long>();

for (int i = 0; i < tasks.length; i++) {
if (tasks[i].isCounterDataPresent()) {
long inputByte = 0;
for (MapReduceCounterData.CounterName counterName: _counterNames) {
for (MapReduceCounterData.CounterName counterName : _counterNames) {
inputByte += tasks[i].getCounters().get(counterName);
}
inputBytes.add(inputByte);
}
}

// Ratio of total tasks / sampled tasks
double scale = ((double)tasks.length) / inputBytes.size();
double scale = ((double) tasks.length) / inputBytes.size();
//Analyze data. TODO: This is a temp fix. findTwogroups should support list as input
long[][] groups = Statistics.findTwoGroups(Longs.toArray(inputBytes));

Expand All @@ -133,37 +166,54 @@ public HeuristicResult apply(MapReduceApplicationData data) {
long min = Math.min(avg1, avg2);
long diff = Math.abs(avg2 - avg1);

Severity severity = getDeviationSeverity(min, diff);
Severity severityData = getDeviationSeverity(min, diff);

//This reduces severity if the largest file sizes are insignificant
severity = Severity.min(severity, getFilesSeverity(avg2));
severityData = Severity.min(severityData, getFilesSeverity(avg2));

//This reduces severity if number of tasks is insignificant
severity = Severity.min(severity, Severity.getSeverityAscending(
groups[0].length, numTasksLimits[0], numTasksLimits[1], numTasksLimits[2], numTasksLimits[3]));
severityData = Severity.min(severityData,
Severity.getSeverityAscending(groups[0].length, numTasksLimits[0], numTasksLimits[1], numTasksLimits[2],
numTasksLimits[3]));

Severity severity = Severity.max(severityData, severityTime);

HeuristicResult result =
new HeuristicResult(_heuristicConfData.getClassName(), _heuristicConfData.getHeuristicName(), severity,
Utils.getHeuristicScore(severityData, tasks.length));

HeuristicResult result = new HeuristicResult(_heuristicConfData.getClassName(),
_heuristicConfData.getHeuristicName(), severity, Utils.getHeuristicScore(severity, tasks.length));
result.addResultDetail("Data skew (Number of tasks)", Integer.toString(tasks.length));
result.addResultDetail("Data skew (Group A)",
groups[0].length + " tasks @ " + FileUtils.byteCountToDisplaySize(avg1) + " avg");
result.addResultDetail("Data skew (Group B)",
groups[1].length + " tasks @ " + FileUtils.byteCountToDisplaySize(avg2) + " avg");

result.addResultDetail("Number of tasks", Integer.toString(tasks.length));
result.addResultDetail("Group A", groups[0].length + " tasks @ " + FileUtils.byteCountToDisplaySize(avg1) + " avg");
result.addResultDetail("Group B", groups[1].length + " tasks @ " + FileUtils.byteCountToDisplaySize(avg2) + " avg");
result.addResultDetail("Time skew (Number of tasks)", Integer.toString(tasks.length));
result.addResultDetail("Time skew (Group A)",
groupsTime[0].length + " tasks @ " + convertTimeMs(timeAvg1) + " avg");
result.addResultDetail("Time skew (Group B)",
groupsTime[1].length + " tasks @ " + convertTimeMs(timeAvg2) + " avg");

return result;
}

private String convertTimeMs(long timeMs) {
if (timeMs < 1000) {
return Long.toString(timeMs) + " msec";
}
return DurationFormatUtils.formatDuration(timeMs, "HH:mm:ss") + " HH:MM:SS";
}

private Severity getDeviationSeverity(long averageMin, long averageDiff) {
if (averageMin <= 0) {
averageMin = 1;
}
long value = averageDiff / averageMin;
return Severity.getSeverityAscending(
value, deviationLimits[0], deviationLimits[1], deviationLimits[2], deviationLimits[3]);
return Severity.getSeverityAscending(value, deviationLimits[0], deviationLimits[1], deviationLimits[2],
deviationLimits[3]);
}

private Severity getFilesSeverity(long value) {
return Severity.getSeverityAscending(
value, filesLimits[0], filesLimits[1], filesLimits[2], filesLimits[3]);
return Severity.getSeverityAscending(value, filesLimits[0], filesLimits[1], filesLimits[2], filesLimits[3]);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
/**
* This Heuristic analyses the skewness in the mapper input data
*/
public class MapperDataSkewHeuristic extends GenericDataSkewHeuristic {
public class MapperSkewHeuristic extends GenericSkewHeuristic {

public MapperDataSkewHeuristic(HeuristicConfigurationData heuristicConfData) {
public MapperSkewHeuristic(HeuristicConfigurationData heuristicConfData) {
super(Arrays.asList(
MapReduceCounterData.CounterName.HDFS_BYTES_READ,
MapReduceCounterData.CounterName.S3_BYTES_READ,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
/**
* This Heuristic analyses the skewness in the reducer input data
*/
public class ReducerDataSkewHeuristic extends GenericDataSkewHeuristic {
public class ReducerSkewHeuristic extends GenericSkewHeuristic {

public ReducerDataSkewHeuristic(HeuristicConfigurationData heuristicConfData) {
public ReducerSkewHeuristic(HeuristicConfigurationData heuristicConfData) {
super(Arrays.asList(MapReduceCounterData.CounterName.REDUCE_SHUFFLE_BYTES), heuristicConfData);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*@

<p>
This analysis shows whether there is a data-skew for the data entering mapper tasks.
This analysis shows whether there is a data/time-skew for the data/time entering mapper tasks.
</p>
<p>
This result of the analysis shows two groups of the spectrum, where the first group has significantly less input data compared to the second group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*@

<p>
This analysis shows whether there is a data-skew for the data entering reducer tasks.
This analysis shows whether there is a data/time-skew for the data/time entering reducer tasks.
</p>
<p>
This result of the analysis shows two groups of the spectrum, where the first group has significantly less input data compared to the second group.
Expand All @@ -24,7 +24,7 @@ <h5>Example</h5>
<p>
<div class="list-group">
<a class="list-group-item list-group-item-danger" href="">
<h4 class="list-group-item-heading">Reducer Data Skew</h4>
<h4 class="list-group-item-heading">Reducer Skew</h4>
<table class="list-group-item-text table table-condensed left-table">
<thead><tr><th colspan="2">Severity: Critical</th></tr></thead>
<tbody>
Expand Down
34 changes: 17 additions & 17 deletions test/com/linkedin/drelephant/analysis/AnalyticJobTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.linkedin.drelephant.mapreduce.data.MapReduceApplicationData;
import com.linkedin.drelephant.mapreduce.data.MapReduceCounterData;
import com.linkedin.drelephant.mapreduce.data.MapReduceTaskData;
import com.linkedin.drelephant.mapreduce.heuristics.MapperDataSkewHeuristic;
import com.linkedin.drelephant.mapreduce.heuristics.MapperSkewHeuristic;
import common.TestUtil;
import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -160,40 +160,40 @@ private List<Heuristic> loadHeuristics() {
List<Heuristic> heuristics = new ArrayList<Heuristic>();
// dummy hash map
Map<String, String> paramsMap = new HashMap<String, String>();
heuristics.add(new MapperDataSkewHeuristic(new HeuristicConfigurationData("Mapper Data Skew",
"com.linkedin.drelephant.mapreduce.heuristics.MapperDataSkewHeuristic",
"views.html.help.mapreduce.helpMapperDataSkew", new ApplicationType("mapreduce"), paramsMap)));
heuristics.add(new MapperDataSkewHeuristic(
heuristics.add(new MapperSkewHeuristic(new HeuristicConfigurationData("Mapper Skew",
"com.linkedin.drelephant.mapreduce.heuristics.MapperSkewHeuristic",
"views.html.help.mapreduce.helpMapperSkew", new ApplicationType("mapreduce"), paramsMap)));
heuristics.add(new MapperSkewHeuristic(
new HeuristicConfigurationData("Mapper GC", "com.linkedin.drelephant.mapreduce.heuristics.MapperGCHeuristic",
"views.html.help.mapreduce.helpGC", new ApplicationType("mapreduce"), paramsMap)));
heuristics.add(new MapperDataSkewHeuristic(new HeuristicConfigurationData("Mapper Time",
heuristics.add(new MapperSkewHeuristic(new HeuristicConfigurationData("Mapper Time",
"com.linkedin.drelephant.mapreduce.heuristics.MapperTimeHeuristic", "views.html.help.mapreduce.helpMapperTime",
new ApplicationType("mapreduce"), paramsMap)));
heuristics.add(new MapperDataSkewHeuristic(new HeuristicConfigurationData("Mapper Speed",
heuristics.add(new MapperSkewHeuristic(new HeuristicConfigurationData("Mapper Speed",
"com.linkedin.drelephant.mapreduce.heuristics.MapperSpeedHeuristic",
"views.html.help.mapreduce.helpMapperSpeed", new ApplicationType("mapreduce"), paramsMap)));
heuristics.add(new MapperDataSkewHeuristic(new HeuristicConfigurationData("Mapper Spill",
heuristics.add(new MapperSkewHeuristic(new HeuristicConfigurationData("Mapper Spill",
"com.linkedin.drelephant.mapreduce.heuristics.MapperSpillHeuristic",
"views.html.help.mapreduce.helpMapperSpill", new ApplicationType("mapreduce"), paramsMap)));
heuristics.add(new MapperDataSkewHeuristic(new HeuristicConfigurationData("Mapper Memory",
heuristics.add(new MapperSkewHeuristic(new HeuristicConfigurationData("Mapper Memory",
"com.linkedin.drelephant.mapreduce.heuristics.MapperMemoryHeuristic",
"views.html.help.mapreduce.helpMapperMemory", new ApplicationType("mapreduce"), paramsMap)));
heuristics.add(new MapperDataSkewHeuristic(new HeuristicConfigurationData("Reducer Data Skew",
"com.linkedin.drelephant.mapreduce.heuristics.ReducerDataSkewHeuristic",
"views.html.help.mapreduce.helpReducerDataSkew", new ApplicationType("mapreduce"), paramsMap)));
heuristics.add(new MapperDataSkewHeuristic(
heuristics.add(new MapperSkewHeuristic(new HeuristicConfigurationData("Reducer Skew",
"com.linkedin.drelephant.mapreduce.heuristics.ReducerSkewHeuristic",
"views.html.help.mapreduce.helpReducerSkew", new ApplicationType("mapreduce"), paramsMap)));
heuristics.add(new MapperSkewHeuristic(
new HeuristicConfigurationData("Reducer GC", "com.linkedin.drelephant.mapreduce.heuristics.ReducerGCHeuristic",
"views.html.help.mapreduce.helpGC", new ApplicationType("mapreduce"), paramsMap)));
heuristics.add(new MapperDataSkewHeuristic(new HeuristicConfigurationData("Reducer Time",
heuristics.add(new MapperSkewHeuristic(new HeuristicConfigurationData("Reducer Time",
"com.linkedin.drelephant.mapreduce.heuristics.ReducerTimeHeuristic",
"views.html.help.mapreduce.helpReducerTime", new ApplicationType("mapreduce"), paramsMap)));
heuristics.add(new MapperDataSkewHeuristic(new HeuristicConfigurationData("Reducer Memory",
heuristics.add(new MapperSkewHeuristic(new HeuristicConfigurationData("Reducer Memory",
"com.linkedin.drelephant.mapreduce.heuristics.ReducerMemoryHeuristic",
"views.html.help.mapreduce.helpReducerMemory", new ApplicationType("mapreduce"), paramsMap)));
heuristics.add(new MapperDataSkewHeuristic(new HeuristicConfigurationData("Shuffle &#38; Sort",
heuristics.add(new MapperSkewHeuristic(new HeuristicConfigurationData("Shuffle &#38; Sort",
"com.linkedin.drelephant.mapreduce.heuristics.ShuffleSortHeuristic",
"views.html.help.mapreduce.helpShuffleSort", new ApplicationType("mapreduce"), paramsMap)));
heuristics.add(new MapperDataSkewHeuristic(
heuristics.add(new MapperSkewHeuristic(
new HeuristicConfigurationData("Exception", "com.linkedin.drelephant.mapreduce.heuristics.ExceptionHeuristic",
"views.html.help.mapreduce.helpException", new ApplicationType("mapreduce"), paramsMap)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void testParseFetcherConf2() {
public void testParseFetcherConf3() {
expectedEx.expect(RuntimeException.class);
expectedEx.expectMessage("No tag 'heuristicname' in heuristic 1 classname"
+ " com.linkedin.drelephant.mapreduce.heuristics.MapperDataSkewHeuristic");
+ " com.linkedin.drelephant.mapreduce.heuristics.MapperSkewHeuristic");
HeuristicConfiguration heuristicConf = new HeuristicConfiguration(document3.getDocumentElement());
}

Expand All @@ -106,7 +106,7 @@ public void testParseFetcherConf3() {
public void testParseFetcherConf4() {
expectedEx.expect(RuntimeException.class);
expectedEx.expectMessage("No tag 'viewname' in heuristic 1 classname"
+ " com.linkedin.drelephant.mapreduce.heuristics.MapperDataSkewHeuristic");
+ " com.linkedin.drelephant.mapreduce.heuristics.MapperSkewHeuristic");
HeuristicConfiguration heuristicConf = new HeuristicConfiguration(document4.getDocumentElement());
}

Expand Down
Loading

0 comments on commit d2c603b

Please sign in to comment.