Skip to content

Commit

Permalink
IPSO changes to MR Tuning
Browse files Browse the repository at this point in the history
  • Loading branch information
pralabhkumar committed Jul 19, 2018
1 parent dd31ad5 commit 58f012c
Show file tree
Hide file tree
Showing 15 changed files with 918 additions and 59 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,74 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.mapreduce.heuristics;

public class CommonConstantsHeuristic {

public static final String MAPPER_SPEED="Mapper Speed";
public static final String TOTAL_INPUT_SIZE_IN_MB="Total input size in MB";
public static final String MAPPER_SPEED = "Mapper Speed";
public static final String TOTAL_INPUT_SIZE_IN_MB = "Total input size in MB";

public enum UtilizedParameterKeys {
AVG_PHYSICAL_MEMORY("Avg Physical Memory (MB)"),
MAX_PHYSICAL_MEMORY("Max Physical Memory (MB)"),
MIN_PHYSICAL_MEMORY("Min Physical Memory (MB)"),
AVG_VIRTUAL_MEMORY("Avg Virtual Memory (MB)"),
MAX_VIRTUAL_MEMORY("Max Virtual Memory (MB)"),
MIN_VIRTUAL_MEMORY("Min Virtual Memory (MB)"),
AVG_TOTAL_COMMITTED_HEAP_USAGE_MEMORY("Avg Total Committed Heap Usage Memory (MB)"),
MAX_TOTAL_COMMITTED_HEAP_USAGE_MEMORY("Max Total Committed Heap Usage Memory (MB)"),
MIN_TOTAL_COMMITTED_HEAP_USAGE_MEMORY("Min Total Committed Heap Usage Memory (MB)");
private String value;

UtilizedParameterKeys(String value) {
this.value = value;
}

public String getValue() {
return value;
}
}

public enum ParameterKeys {
MAPPER_MEMORY_HADOOP_CONF("mapreduce.map.memory.mb"),
MAPPER_HEAP_HADOOP_CONF("mapreduce.map.java.opts"),
SORT_BUFFER_HADOOP_CONF("mapreduce.task.io.sort.mb"),
SORT_FACTOR_HADOOP_CONF("mapreduce.task.io.sort.factor"),
SORT_SPILL_HADOOP_CONF("mapreduce.map.sort.spill.percent"),
REDUCER_MEMORY_HADOOP_CONF("mapreduce.reduce.memory.mb"),
REDUCER_HEAP_HADOOP_CONF("mapreduce.reduce.java.opts"),
SPLIT_SIZE_HADOOP_CONF("mapreduce.input.fileinputformat.split.maxsize"),
CHILD_HEAP_SIZE_HADOOP_CONF("mapred.child.java.opts"),
PIG_SPLIT_SIZE_HADOOP_CONF("pig.maxCombinedSplitSize"),
MAPPER_MEMORY_HEURISTICS_CONF("Mapper Memory"),
MAPPER_HEAP_HEURISTICS_CONF("Mapper Heap"),
REDUCER_MEMORY_HEURISTICS_CONF("Reducer Memory"),
REDUCER_HEAP_HEURISTICS_CONF("Reducer heap"),
SORT_BUFFER_HEURISTICS_CONF("Sort Buffer"),
SORT_FACTOR_HEURISTICS_CONF("Sort Factor"),
SORT_SPILL_HEURISTICS_CONF("Sort Spill"),
SPLIT_SIZE_HEURISTICS_CONF("Split Size"),
PIG_MAX_SPLIT_SIZE_HEURISTICS_CONF("Pig Max Split Size");
private String value;
ParameterKeys(String value) {
this.value = value;
}

public String getValue() {
return value;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.mapreduce.heuristics;

import com.linkedin.drelephant.analysis.Heuristic;
import com.linkedin.drelephant.analysis.HeuristicResult;
import com.linkedin.drelephant.analysis.Severity;
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData;
import com.linkedin.drelephant.mapreduce.data.MapReduceApplicationData;

import static com.linkedin.drelephant.mapreduce.heuristics.CommonConstantsHeuristic.ParameterKeys.*;

import org.apache.log4j.Logger;


/*
Heuristics to collect memory data/counter values about application previous exeution.
*/
public class ConfigurationHeuristic implements Heuristic<MapReduceApplicationData> {
private static final Logger logger = Logger.getLogger(ConfigurationHeuristic.class);

private HeuristicConfigurationData _heuristicConfData;

public ConfigurationHeuristic(HeuristicConfigurationData heuristicConfData) {
this._heuristicConfData = heuristicConfData;
}

@Override
public HeuristicConfigurationData getHeuristicConfData() {
return _heuristicConfData;
}

@Override
public HeuristicResult apply(MapReduceApplicationData data) {
if (!data.getSucceeded()) {
return null;
}
String mapperMemory = data.getConf().getProperty(MAPPER_MEMORY_HADOOP_CONF.getValue());
String mapperHeap = data.getConf().getProperty(MAPPER_HEAP_HADOOP_CONF.getValue());
if (mapperHeap == null) {
mapperHeap = data.getConf().getProperty(CHILD_HEAP_SIZE_HADOOP_CONF.getValue());
}
String sortBuffer = data.getConf().getProperty(SORT_BUFFER_HADOOP_CONF.getValue());
String sortFactor = data.getConf().getProperty(SORT_FACTOR_HADOOP_CONF.getValue());
String sortSplill = data.getConf().getProperty(SORT_SPILL_HADOOP_CONF.getValue());
String reducerMemory = data.getConf().getProperty(REDUCER_MEMORY_HADOOP_CONF.getValue());
String reducerHeap = data.getConf().getProperty(REDUCER_HEAP_HADOOP_CONF.getValue());
if (reducerHeap == null) {
reducerHeap = data.getConf().getProperty(CHILD_HEAP_SIZE_HADOOP_CONF.getValue());
}
String splitSize = data.getConf().getProperty(SPLIT_SIZE_HADOOP_CONF.getValue());
String pigSplitSize = data.getConf().getProperty(PIG_SPLIT_SIZE_HADOOP_CONF.getValue());
HeuristicResult result =
new HeuristicResult(_heuristicConfData.getClassName(), _heuristicConfData.getHeuristicName(), Severity.LOW, 0);

result.addResultDetail(MAPPER_MEMORY_HEURISTICS_CONF.getValue(), mapperMemory);
result.addResultDetail(MAPPER_HEAP_HEURISTICS_CONF.getValue(), mapperHeap.replaceAll("\\s+", "\n"));
result.addResultDetail(REDUCER_MEMORY_HEURISTICS_CONF.getValue(), reducerMemory);
result.addResultDetail(REDUCER_HEAP_HEURISTICS_CONF.getValue(), reducerHeap.replaceAll("\\s+", "\n"));
result.addResultDetail(SORT_BUFFER_HEURISTICS_CONF.getValue(), sortBuffer);
result.addResultDetail(SORT_FACTOR_HEURISTICS_CONF.getValue(), sortFactor);
result.addResultDetail(SORT_SPILL_HEURISTICS_CONF.getValue(), sortSplill);
if (splitSize != null) {
result.addResultDetail(SPLIT_SIZE_HEURISTICS_CONF.getValue(), splitSize);
}
if (pigSplitSize != null) {
result.addResultDetail(PIG_MAX_SPLIT_SIZE_HEURISTICS_CONF.getValue(), pigSplitSize);
}

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;

import static com.linkedin.drelephant.mapreduce.heuristics.CommonConstantsHeuristic.UtilizedParameterKeys.*;


/**
* This heuristic deals with the efficiency of container size
Expand All @@ -59,8 +61,7 @@ private long getContainerMemDefaultMBytes() {
String strValue = paramMap.get(CONTAINER_MEM_DEFAULT_MB);
try {
return Long.valueOf(strValue);
}
catch (NumberFormatException e) {
} catch (NumberFormatException e) {
logger.warn(CONTAINER_MEM_DEFAULT_MB + ": expected number [" + strValue + "]");
}
}
Expand All @@ -80,7 +81,7 @@ private void loadParameters() {

long containerMemDefaultBytes = getContainerMemDefaultMBytes() * FileUtils.ONE_MB;
logger.info(heuristicName + " will use " + CONTAINER_MEM_DEFAULT_MB + " with the following threshold setting: "
+ containerMemDefaultBytes);
+ containerMemDefaultBytes);

double[] confMemoryLimits = Utils.getParam(paramMap.get(CONTAINER_MEM_SEVERITY), memoryLimits.length);
if (confMemoryLimits != null) {
Expand Down Expand Up @@ -110,7 +111,7 @@ public HeuristicConfigurationData getHeuristicConfData() {
@Override
public HeuristicResult apply(MapReduceApplicationData data) {

if(!data.getSucceeded()) {
if (!data.getSucceeded()) {
return null;
}

Expand All @@ -122,14 +123,13 @@ public HeuristicResult apply(MapReduceApplicationData data) {
containerMem = Long.parseLong(containerSizeStr);
} catch (NumberFormatException e0) {
// Some job has a string var like "${VAR}" for this config.
if(containerSizeStr.startsWith("$")) {
String realContainerConf = containerSizeStr.substring(containerSizeStr.indexOf("{")+1,
containerSizeStr.indexOf("}"));
if (containerSizeStr.startsWith("$")) {
String realContainerConf =
containerSizeStr.substring(containerSizeStr.indexOf("{") + 1, containerSizeStr.indexOf("}"));
String realContainerSizeStr = data.getConf().getProperty(realContainerConf);
try {
containerMem = Long.parseLong(realContainerSizeStr);
}
catch (NumberFormatException e1) {
} catch (NumberFormatException e1) {
logger.warn(realContainerConf + ": expected number [" + realContainerSizeStr + "]");
}
} else {
Expand All @@ -146,26 +146,40 @@ public HeuristicResult apply(MapReduceApplicationData data) {
List<Long> taskPMems = new ArrayList<Long>();
List<Long> taskVMems = new ArrayList<Long>();
List<Long> runtimesMs = new ArrayList<Long>();
long taskPMin = Long.MAX_VALUE;
long taskPMax = 0;
List<Long> taskHeapUsages = new ArrayList<Long>();
long taskPMin = Long.MAX_VALUE, taskVMin = Long.MAX_VALUE, taskHUMin = Long.MAX_VALUE;
long taskPMax = 0, taskVMax = 0, taskHUMax = 0;
for (MapReduceTaskData task : tasks) {
if (task.isTimeAndCounterDataPresent()) {
runtimesMs.add(task.getTotalRunTimeMs());
long taskPMem = task.getCounters().get(MapReduceCounterData.CounterName.PHYSICAL_MEMORY_BYTES);
long taskVMem = task.getCounters().get(MapReduceCounterData.CounterName.VIRTUAL_MEMORY_BYTES);
taskPMems.add(taskPMem);
long taskHeapUsage = task.getCounters().get(MapReduceCounterData.CounterName.COMMITTED_HEAP_BYTES);
taskPMin = Math.min(taskPMin, taskPMem);
taskPMax = Math.max(taskPMax, taskPMem);
taskVMin = Math.min(taskVMin, taskVMem);
taskVMax = Math.max(taskVMax, taskVMem);
taskHUMin = Math.min(taskHUMin, taskHeapUsage);
taskHUMax = Math.max(taskHUMax, taskHeapUsage);
taskVMems.add(taskVMem);
taskPMems.add(taskPMem);
taskHeapUsages.add(taskHeapUsage);
}
}

if(taskPMin == Long.MAX_VALUE) {
if (taskPMin == Long.MAX_VALUE) {
taskPMin = 0;
}
if (taskVMin == Long.MAX_VALUE) {
taskVMin = 0;
}
if (taskHUMin == Long.MAX_VALUE) {
taskHUMin = 0;
}

long taskPMemAvg = Statistics.average(taskPMems);
long taskVMemAvg = Statistics.average(taskVMems);
long taskHeapUsageAvg = Statistics.average(taskHeapUsages);
long averageTimeMs = Statistics.average(runtimesMs);

Severity severity;
Expand All @@ -175,37 +189,51 @@ public HeuristicResult apply(MapReduceApplicationData data) {
severity = getTaskMemoryUtilSeverity(taskPMemAvg, containerMem);
}

HeuristicResult result = new HeuristicResult(_heuristicConfData.getClassName(),
_heuristicConfData.getHeuristicName(), severity, Utils.getHeuristicScore(severity, tasks.length));
HeuristicResult result =
new HeuristicResult(_heuristicConfData.getClassName(), _heuristicConfData.getHeuristicName(), severity,
Utils.getHeuristicScore(severity, tasks.length));

result.addResultDetail("Number of tasks", Integer.toString(tasks.length));
result.addResultDetail("Avg task runtime", Statistics.readableTimespan(averageTimeMs));
result.addResultDetail("Avg Physical Memory (MB)", Long.toString(taskPMemAvg / FileUtils.ONE_MB));
result.addResultDetail("Max Physical Memory (MB)", Long.toString(taskPMax / FileUtils.ONE_MB));
result.addResultDetail("Min Physical Memory (MB)", Long.toString(taskPMin / FileUtils.ONE_MB));
result.addResultDetail("Avg Virtual Memory (MB)", Long.toString(taskVMemAvg / FileUtils.ONE_MB));
result.addResultDetail(AVG_PHYSICAL_MEMORY.getValue(),
Long.toString(taskPMemAvg / FileUtils.ONE_MB));
result.addResultDetail(MAX_PHYSICAL_MEMORY.getValue(),
Long.toString(taskPMax / FileUtils.ONE_MB));
result.addResultDetail(MIN_PHYSICAL_MEMORY.getValue(),
Long.toString(taskPMin / FileUtils.ONE_MB));
result.addResultDetail(AVG_VIRTUAL_MEMORY.getValue(),
Long.toString(taskVMemAvg / FileUtils.ONE_MB));
result.addResultDetail(MAX_VIRTUAL_MEMORY.getValue(),
Long.toString(taskVMax / FileUtils.ONE_MB));
result.addResultDetail(MIN_VIRTUAL_MEMORY.getValue(),
Long.toString(taskVMin / FileUtils.ONE_MB));
result.addResultDetail(AVG_TOTAL_COMMITTED_HEAP_USAGE_MEMORY.getValue(),
Long.toString(taskHeapUsageAvg / FileUtils.ONE_MB));
result.addResultDetail(MAX_TOTAL_COMMITTED_HEAP_USAGE_MEMORY.getValue(),
Long.toString(taskHUMax / FileUtils.ONE_MB));
result.addResultDetail(MIN_TOTAL_COMMITTED_HEAP_USAGE_MEMORY.getValue(),
Long.toString(taskHUMin / FileUtils.ONE_MB));
result.addResultDetail("Requested Container Memory", FileUtils.byteCountToDisplaySize(containerMem));

return result;
}

private Severity getTaskMemoryUtilSeverity(long taskMemAvg, long taskMemMax) {
double ratio = ((double)taskMemAvg) / taskMemMax;
double ratio = ((double) taskMemAvg) / taskMemMax;
Severity sevRatio = getMemoryRatioSeverity(ratio);
// Severity is reduced if the requested container memory is close to default
Severity sevMax = getContainerMemorySeverity(taskMemMax);

return Severity.min(sevRatio, sevMax);
}


private Severity getContainerMemorySeverity(long taskMemMax) {
return Severity.getSeverityAscending(
taskMemMax, memoryLimits[0], memoryLimits[1], memoryLimits[2], memoryLimits[3]);
return Severity.getSeverityAscending(taskMemMax, memoryLimits[0], memoryLimits[1], memoryLimits[2],
memoryLimits[3]);
}

private Severity getMemoryRatioSeverity(double ratio) {
return Severity.getSeverityDescending(
ratio, memRatioLimits[0], memRatioLimits[1], memRatioLimits[2], memRatioLimits[3]);
return Severity.getSeverityDescending(ratio, memRatioLimits[0], memRatioLimits[1], memRatioLimits[2],
memRatioLimits[3]);
}
}
12 changes: 12 additions & 0 deletions app/com/linkedin/drelephant/tuning/AutoTuningAPIHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,12 @@ private void setMaxAllowedMetricIncreasePercentage(TuningInput tuningInput) {
*/
private void setTuningAlgorithm(TuningInput tuningInput) throws IllegalArgumentException {
//Todo: Handle algorithm version later
logger.info(" Optimization Algorithm " + tuningInput.getOptimizationAlgo());
TuningAlgorithm tuningAlgorithm = TuningAlgorithm.find.select("*")
.where()
.eq(TuningAlgorithm.TABLE.jobType, tuningInput.getJobType())
.eq(TuningAlgorithm.TABLE.optimizationMetric, tuningInput.getOptimizationMetric())
.eq(TuningAlgorithm.TABLE.optimizationAlgo, tuningInput.getOptimizationAlgo())
.findUnique();
if (tuningAlgorithm == null) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -437,6 +439,7 @@ private void insertParamSet(JobDefinition job, TuningAlgorithm tuningAlgorithm,
jobSuggestedParamSet.isParamSetBest = false;
jobSuggestedParamSet.save();
insertParameterValues(jobSuggestedParamSet, paramValueMap);
intializeOptimizationAlgoPrerequisite(tuningAlgorithm, jobSuggestedParamSet);
logger.debug("Default parameter set inserted for job: " + job.jobName);
}

Expand All @@ -457,6 +460,15 @@ private void insertParameterValues(JobSuggestedParamSet jobSuggestedParamSet, Ma
}
}

private void intializeOptimizationAlgoPrerequisite(TuningAlgorithm tuningAlgorithm,
JobSuggestedParamSet jobSuggestedParamSet) {
logger.info("Inserting parameter constraint " + tuningAlgorithm.optimizationAlgo.name());
AutoTuningOptimizeManager manager = OptimizationAlgoFactory.getOptimizationAlogrithm(tuningAlgorithm);
if (manager != null) {
manager.intializePrerequisite(tuningAlgorithm, jobSuggestedParamSet);
}
}

/**
* Inserts parameter value in database
* @param jobSuggestedParamSet Parameter set to which the parameter belongs
Expand Down
Loading

0 comments on commit 58f012c

Please sign in to comment.