Skip to content

Commit

Permalink
Merge pull request linkedin#602 from linkedin/merge-tony-changes
Browse files Browse the repository at this point in the history
Merge TonY fixes to master
  • Loading branch information
erwa authored Jun 22, 2019
2 parents 7f648c9 + dcb8090 commit 96e2786
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.linkedin.drelephant.tony.heuristics;

import com.google.common.base.Strings;
import com.linkedin.drelephant.analysis.Heuristic;
import com.linkedin.drelephant.analysis.HeuristicResult;
import com.linkedin.drelephant.analysis.HeuristicResultDetails;
Expand All @@ -32,6 +33,7 @@
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.log4j.Logger;


Expand All @@ -51,6 +53,10 @@ public class TaskMemoryHeuristic implements Heuristic<TonyApplicationData> {
// Initialized to default max memory thresholds
private double[] maxMemoryLimits = {0.8, 0.7, 0.6, 0.5};

// If the requested memory is within this amount of the max memory usage, automatic pass.
// This is to prevent Dr. Elephant flagging container sizes of 3 GB when max memory usage is 2 GB.
private long graceMemoryHeadroomBytes;

/**
* Constructor for {@link TaskMemoryHeuristic}.
* @param heuristicConfData the configuration for this heuristic
Expand All @@ -67,6 +73,11 @@ public TaskMemoryHeuristic(HeuristicConfigurationData heuristicConfData) {
if (params.containsKey(TASK_MEMORY_THRESHOLDS_CONF)) {
maxMemoryLimits = Utils.getParam(params.get(TASK_MEMORY_THRESHOLDS_CONF), maxMemoryLimits.length);
}

Configuration yarnConf = new YarnConfiguration();
int minimumMBAllocation = yarnConf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
graceMemoryHeadroomBytes = 2 * minimumMBAllocation * FileUtils.ONE_MB;
}

@Override
Expand All @@ -78,13 +89,18 @@ public HeuristicResult apply(TonyApplicationData data) {
Set<String> taskTypes = com.linkedin.tony.util.Utils.getAllJobTypes(conf);
Severity finalSeverity = Severity.NONE;
List<HeuristicResultDetails> details = new ArrayList<>();
int severityScore = 0;

for (String taskType : taskTypes) {
details.add(new HeuristicResultDetails("Number of " + taskType + " tasks",
Integer.toString(taskMap.get(taskType).size())));
int taskInstances = conf.getInt(TonyConfigurationKeys.getInstancesKey(taskType), 0);
details.add(new HeuristicResultDetails("Number of " + taskType + " tasks", String.valueOf(taskInstances)));
if (taskInstances == 0) {
continue;
}

// get per task memory requested
String memoryString = conf.get(TonyConfigurationKeys.getResourceKey(taskType, Constants.MEMORY));
String memoryString = conf.get(TonyConfigurationKeys.getResourceKey(taskType, Constants.MEMORY),
TonyConfigurationKeys.DEFAULT_MEMORY);
String memoryStringMB = com.linkedin.tony.util.Utils.parseMemoryString(memoryString);
long taskBytesRequested = Long.parseLong(memoryStringMB) * FileUtils.ONE_MB;
details.add(new HeuristicResultDetails("Requested memory (MB) per " + taskType + " task",
Expand All @@ -100,18 +116,20 @@ public HeuristicResult apply(TonyApplicationData data) {
Long.toString((long) maxMemoryBytesUsed / FileUtils.ONE_MB)));

// compare to threshold and update severity
if (taskBytesRequested <= defaultContainerMemoryBytes) {
// If using default container memory, automatic pass
if (taskBytesRequested <= defaultContainerMemoryBytes
|| taskBytesRequested <= maxMemoryBytesUsed + graceMemoryHeadroomBytes) {
// If using default container memory or within grace headroom, automatic pass
continue;
}
double maxMemoryRatio = maxMemoryBytesUsed / taskBytesRequested;
Severity taskMemorySeverity = Severity.getSeverityDescending(maxMemoryRatio, maxMemoryLimits[0],
maxMemoryLimits[1], maxMemoryLimits[2], maxMemoryLimits[3]);
severityScore += Utils.getHeuristicScore(taskMemorySeverity, taskInstances);
finalSeverity = Severity.max(finalSeverity, taskMemorySeverity);
}

return new HeuristicResult(_heuristicConfData.getClassName(), _heuristicConfData.getHeuristicName(), finalSeverity,
0, details);
severityScore, details);
}

@Override
Expand Down
30 changes: 4 additions & 26 deletions app/com/linkedin/drelephant/util/InfoExtractor.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@
import com.linkedin.drelephant.clients.WorkflowClient;
import com.linkedin.drelephant.configurations.scheduler.SchedulerConfiguration;
import com.linkedin.drelephant.configurations.scheduler.SchedulerConfigurationData;

import com.linkedin.drelephant.tez.data.TezApplicationData;
import com.linkedin.drelephant.clients.WorkflowClient;

import com.linkedin.drelephant.mapreduce.data.MapReduceApplicationData;
import com.linkedin.drelephant.schedulers.Scheduler;
import com.linkedin.drelephant.spark.data.SparkApplicationData;

Expand All @@ -41,7 +36,6 @@

import models.AppResult;
import scala.Option;
import scala.Some;


/**
Expand Down Expand Up @@ -114,14 +108,11 @@ public static Scheduler getSchedulerInstance(String appId, Properties properties
* @param data The Hadoop application data
*/
public static void loadInfo(AppResult result, HadoopApplicationData data) {
Properties properties = new Properties();
if( data instanceof MapReduceApplicationData) {
properties = retrieveMapreduceProperties((MapReduceApplicationData) data);
} else if ( data instanceof SparkApplicationData) {
Properties properties;
if (data instanceof SparkApplicationData) {
properties = retrieveSparkProperties((SparkApplicationData) data);
}
else if(data instanceof TezApplicationData){
properties = retrieveTezProperties((TezApplicationData) data);
} else {
properties = data.getConf();
}
Scheduler scheduler = getSchedulerInstance(data.getAppId(), properties);

Expand Down Expand Up @@ -164,19 +155,6 @@ public static Properties retrieveSparkProperties(SparkApplicationData appData) {
return properties;
}

/**
* Retrieve the mapreduce application properties
* @param appData the mapReduce Application Data
* @return the retrieve mapreduce properties
*/
public static Properties retrieveMapreduceProperties(MapReduceApplicationData appData) {
return appData.getConf();
}

public static Properties retrieveTezProperties(TezApplicationData appData) {
return appData.getConf();
}

/**
* Populates the given app result with the info from the given application data and scheduler.
*
Expand Down
12 changes: 11 additions & 1 deletion app/views/help/tony/helpTaskMemory.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,15 @@
* the License.
*@
<p>
Don't use so much memory!
This heuristic shows how much memory you requested for each task type and what the max memory used
by any instance of each task type actually was. If the requested memory is within some grace amount
(default 2 GB) of the max memory, then severity will be none. Otherwise, if the max memory used is
less than some percentage (by default 80, 70, 60, or 50 percent) of the requested memory, then
Dr. Elephant will indicate some severity (low, moderate, severe, or critical, respectively).
</p>
<p>
To reduce the amount of memory requested for a task, you can update <code>tony.X.memory</code>, where
<code>X</code> is your task type. For example, to request 4 GB for your worker tasks, you can set
<code>tony.worker.memory=4g</code>. For more information on TonY configurations, please visit the
<a href="https://github.com/linkedin/TonY/wiki/TonY-Configurations">TonY Configurations Wiki page</a>.
</p>
2 changes: 1 addition & 1 deletion baseline.conf
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ readonly SCALA_CPD_THRESHOLD=0

# ******************* Baseline and Threshold numbers for Checkstyle *********************
# Threshold for Checkstyle errors post which build would fail
readonly CHECKSTYLE_ERROR_THRESHOLD=1388
readonly CHECKSTYLE_ERROR_THRESHOLD=1385
# Baseline for Checkstyle warnings(build wont fail for warnings)
readonly CHECKSTYLE_WARNING_BASELINE=730

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import com.linkedin.drelephant.analysis.Severity;
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData;
import com.linkedin.drelephant.tony.data.TonyApplicationData;
import com.linkedin.drelephant.util.Utils;
import com.linkedin.tony.Constants;
import com.linkedin.tony.TonyConfigurationKeys;
import com.linkedin.tony.events.Event;
import com.linkedin.tony.events.EventType;
import com.linkedin.tony.events.Metric;
import com.linkedin.tony.events.TaskFinished;
import com.linkedin.tony.rpc.impl.TaskStatus;
import controllers.Application;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -41,7 +43,7 @@
public class TaskMemoryHeuristicTest {

/**
* 3g workers requested, max worker memory < 50%
* 10g workers requested, max worker memory < 50%
*/
@Test
public void testCritical() {
Expand All @@ -52,68 +54,73 @@ public void testCritical() {
1e9,
1.3e9
}, Constants.PS_JOB_NAME, new double[]{0.5e9}),
ImmutableMap.of(Constants.WORKER_JOB_NAME, "3g", Constants.PS_JOB_NAME, "2g"),
Severity.CRITICAL
ImmutableMap.of(Constants.WORKER_JOB_NAME, "10g", Constants.PS_JOB_NAME, "2g"),
Severity.CRITICAL,
Severity.CRITICAL.getValue() * 4
);
}

/**
* 3g ps requested, max ps memory < 60%
* 10g workers requested, max worker memory < 70%; 10g ps requested, max ps memory < 60%
*/
@Test
public void testSevere() {
testHelper(
ImmutableMap.of(Constants.WORKER_JOB_NAME, new double[]{
1.5e9,
1.6e9,
}, Constants.PS_JOB_NAME, new double[]{1.84e9}),
ImmutableMap.of(Constants.WORKER_JOB_NAME, "2g", Constants.PS_JOB_NAME, "3g"),
Severity.SEVERE
6.5e9,
6.6e9,
}, Constants.PS_JOB_NAME, new double[]{5.84e9}),
ImmutableMap.of(Constants.WORKER_JOB_NAME, "10g", Constants.PS_JOB_NAME, "10g"),
Severity.SEVERE,
Severity.MODERATE.getValue() * 2 /* workers */ + Severity.SEVERE.getValue() * 1
);
}

/**
* 3g workers requested, max worker memory < 70%
* 10g workers requested, max worker memory < 70%
*/
@Test
public void testModerate() {
testHelper(
ImmutableMap.of(Constants.WORKER_JOB_NAME, new double[]{
2.14e9,
2e9,
6.5e9,
6.6e9,
}),
ImmutableMap.of(Constants.WORKER_JOB_NAME, "3g"),
Severity.MODERATE
ImmutableMap.of(Constants.WORKER_JOB_NAME, "10g"),
Severity.MODERATE,
Severity.MODERATE.getValue() * 2
);
}

/**
* 3g workers requested, max worker memory < 80%
* 10g workers requested, max worker memory < 80%
*/
@Test
public void testLow() {
testHelper(
ImmutableMap.of(Constants.WORKER_JOB_NAME, new double[]{
2e9,
2.45e9,
7.56e9,
7.45e9,
}),
ImmutableMap.of(Constants.WORKER_JOB_NAME, "3g"),
Severity.LOW
ImmutableMap.of(Constants.WORKER_JOB_NAME, "10g"),
Severity.LOW,
0
);
}

/**
* 3g workers requested, max worker memory > 80%
* 10g workers requested, max worker memory > 80%
*/
@Test
public void testNone() {
testHelper(
ImmutableMap.of(Constants.WORKER_JOB_NAME, new double[]{
2.5e9,
2.6e9,
8.5e9,
8.6e9,
}),
ImmutableMap.of(Constants.WORKER_JOB_NAME, "3g"),
Severity.NONE
ImmutableMap.of(Constants.WORKER_JOB_NAME, "10g"),
Severity.NONE,
0
);
}

Expand All @@ -128,11 +135,46 @@ public void testLowUtilizationDefaultContainerSize() {
0.6e9,
}),
ImmutableMap.of(Constants.WORKER_JOB_NAME, "2g"),
Severity.NONE
Severity.NONE,
0
);
}

public void testHelper(Map<String, double[]> memUsed, Map<String, String> memRequested, Severity expectedSeverity) {
/**
* Though memory utilization is about 50%, severity should be none
* because requested memory is within the default 2 GB grace headroom of the
* max used memory.
*/
@Test
public void testRequestedSizeWithinGraceHeadroomSeverity() {
testHelper(
ImmutableMap.of(Constants.WORKER_JOB_NAME, new double[]{
1.5e9,
1.6e9,
}),
ImmutableMap.of(Constants.WORKER_JOB_NAME, "3g"),
Severity.NONE,
0
);
}

/**
* Verifies that no exception is thrown when the task map is empty.
*/
@Test
public void testEmptyTaskMap() {
ApplicationType appType = new ApplicationType(Constants.APP_TYPE);
Configuration conf = new Configuration(false);
conf.setInt(TonyConfigurationKeys.getInstancesKey(Constants.PS_JOB_NAME), 0);
TonyApplicationData data = new TonyApplicationData("application_123_456",
appType, conf, Collections.EMPTY_LIST);
new TaskMemoryHeuristic(new HeuristicConfigurationData("ignored",
"ignored", "ignored", appType, Collections.EMPTY_MAP)).apply(data);

}

public void testHelper(Map<String, double[]> memUsed, Map<String, String> memRequested, Severity expectedSeverity,
int expectedScore) {
Configuration conf = new Configuration(false);
List<Event> events = new ArrayList<>();
for (Map.Entry<String, String> entry : memRequested.entrySet()) {
Expand All @@ -155,5 +197,6 @@ public void testHelper(Map<String, double[]> memUsed, Map<String, String> memReq
"ignored", "ignored", appType, Collections.EMPTY_MAP));
HeuristicResult result = heuristic.apply(data);
Assert.assertEquals(expectedSeverity, result.getSeverity());
Assert.assertEquals(expectedScore, result.getScore());
}
}

0 comments on commit 96e2786

Please sign in to comment.