Skip to content

Commit

Permalink
Resources used (linkedin#173)
Browse files Browse the repository at this point in the history
* Fix default (mb) and verify memory size >= 0.
* Parse the string that's already been retrieved from the config.
  • Loading branch information
bpitman authored and akshayrai committed Jan 5, 2017
1 parent 36a0dda commit 467eae2
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class MapReduceMetricsAggregator implements HadoopMetricsAggregator {
private static final String MAP_CONTAINER_CONFIG = "mapreduce.map.memory.mb";
private static final String REDUCER_CONTAINER_CONFIG = "mapreduce.reduce.memory.mb";
private static final String REDUCER_SLOW_START_CONFIG = "mapreduce.job.reduce.slowstart.completedmaps";
private static final long CONTAINER_MEMORY_DEFAULT_BYTES = 2048L * FileUtils.ONE_MB;
private static final long CONTAINER_MEMORY_DEFAULT_MBYTES = 2048L;

private HadoopAggregatedData _hadoopAggregatedData = null;
private TaskLevelAggregatedMetrics mapTasks;
Expand Down Expand Up @@ -83,17 +83,19 @@ public HadoopAggregatedData getResult() {

private long getMapContainerSize(HadoopApplicationData data) {
try {
return Long.parseLong(data.getConf().getProperty(MAP_CONTAINER_CONFIG));
long value = Long.parseLong(data.getConf().getProperty(MAP_CONTAINER_CONFIG));
return (value < 0) ? CONTAINER_MEMORY_DEFAULT_MBYTES : value;
} catch ( NumberFormatException ex) {
return CONTAINER_MEMORY_DEFAULT_BYTES;
return CONTAINER_MEMORY_DEFAULT_MBYTES;
}
}

private long getReducerContainerSize(HadoopApplicationData data) {
try {
return Long.parseLong(data.getConf().getProperty(REDUCER_CONTAINER_CONFIG));
long value = Long.parseLong(data.getConf().getProperty(REDUCER_CONTAINER_CONFIG));
return (value < 0) ? CONTAINER_MEMORY_DEFAULT_MBYTES : value;
} catch ( NumberFormatException ex) {
return CONTAINER_MEMORY_DEFAULT_BYTES;
return CONTAINER_MEMORY_DEFAULT_MBYTES;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
*/
public abstract class GenericMemoryHeuristic implements Heuristic<MapReduceApplicationData> {
private static final Logger logger = Logger.getLogger(GenericMemoryHeuristic.class);
private static final long CONTAINER_MEMORY_DEFAULT_BYTES = 2048L * FileUtils.ONE_MB;
private static final long CONTAINER_MEMORY_DEFAULT_MBYTES = 2048L;

// Severity Parameters
private static final String MEM_RATIO_SEVERITY = "memory_ratio_severity";
Expand All @@ -53,6 +53,20 @@ public abstract class GenericMemoryHeuristic implements Heuristic<MapReduceAppli
private String _containerMemConf;
private HeuristicConfigurationData _heuristicConfData;

private long getContainerMemDefaultMBytes() {
Map<String, String> paramMap = _heuristicConfData.getParamMap();
if (paramMap.containsKey(CONTAINER_MEM_DEFAULT_MB)) {
String strValue = paramMap.get(CONTAINER_MEM_DEFAULT_MB);
try {
return Long.valueOf(strValue);
}
catch (NumberFormatException e) {
logger.warn(CONTAINER_MEM_DEFAULT_MB + ": expected number [" + strValue + "]");
}
}
return CONTAINER_MEMORY_DEFAULT_MBYTES;
}

private void loadParameters() {
Map<String, String> paramMap = _heuristicConfData.getParamMap();
String heuristicName = _heuristicConfData.getHeuristicName();
Expand All @@ -64,10 +78,7 @@ private void loadParameters() {
logger.info(heuristicName + " will use " + MEM_RATIO_SEVERITY + " with the following threshold settings: "
+ Arrays.toString(memRatioLimits));

long containerMemDefaultBytes = CONTAINER_MEMORY_DEFAULT_BYTES;
if (paramMap.containsKey(CONTAINER_MEM_DEFAULT_MB)) {
containerMemDefaultBytes = Long.valueOf(paramMap.get(CONTAINER_MEM_DEFAULT_MB)) * FileUtils.ONE_MB;
}
long containerMemDefaultBytes = getContainerMemDefaultMBytes() * FileUtils.ONE_MB;
logger.info(heuristicName + " will use " + CONTAINER_MEM_DEFAULT_MB + " with the following threshold setting: "
+ containerMemDefaultBytes);

Expand Down Expand Up @@ -104,23 +115,31 @@ public HeuristicResult apply(MapReduceApplicationData data) {
}

String containerSizeStr = data.getConf().getProperty(_containerMemConf);
if (containerSizeStr == null) {
return null;
}

long containerMem;
try {
containerMem = Long.parseLong(containerSizeStr);
} catch (NumberFormatException e) {
// Some job has a string var like "${VAR}" for this config.
if(containerSizeStr.startsWith("$")) {
String realContainerConf = containerSizeStr.substring(containerSizeStr.indexOf("{")+1,
containerSizeStr.indexOf("}"));
containerMem = Long.parseLong(data.getConf().getProperty(realContainerConf));
} else {
throw e;
long containerMem = -1L;

if (containerSizeStr != null) {
try {
containerMem = Long.parseLong(containerSizeStr);
} catch (NumberFormatException e0) {
// Some job has a string var like "${VAR}" for this config.
if(containerSizeStr.startsWith("$")) {
String realContainerConf = containerSizeStr.substring(containerSizeStr.indexOf("{")+1,
containerSizeStr.indexOf("}"));
String realContainerSizeStr = data.getConf().getProperty(realContainerConf);
try {
containerMem = Long.parseLong(realContainerSizeStr);
}
catch (NumberFormatException e1) {
logger.warn(realContainerConf + ": expected number [" + realContainerSizeStr + "]");
}
} else {
logger.warn(_containerMemConf + ": expected number [" + containerSizeStr + "]");
}
}
}
if (containerMem < 0) {
containerMem = getContainerMemDefaultMBytes();
}
containerMem *= FileUtils.ONE_MB;

MapReduceTaskData[] tasks = getTasks(data);
Expand Down

0 comments on commit 467eae2

Please sign in to comment.