Skip to content

Commit

Permalink
Fix linkedin#553: Add support for TonY jobs (linkedin#563)
Browse files Browse the repository at this point in the history
* Checkpoint

* Skeleton code for TonY jobtype support in Dr. Elephant linkedin#553

* Removed some commented-out code, removed "TonY" from "TonY Task Memory"

* Implement TonyFetcher

* Check if config file exists before trying to open it

* Add comment explaining yyyy/MM/dd calculation issue

* Implement TonY TaskMemoryHeuristic

* Minor tweaks to test case in TaskMemoryHeuristicTest

* Implement TonyMetricsAggregator

* Remove unused imports, update compile.sh to overwrite when unzipping without prompting

* Fix NPE bug and print task information even when no metrics

* Add license headers and remove JDK7 from .travis.yml

* Fix Checkstyle link

* Fix Checkstyle issues

* Add <isDefault/>' for TonY jobType in test JobTypeConf.xml

* Add comments explain the events we parse and what information we extract from them
  • Loading branch information
erwa authored and mkumar1984 committed May 14, 2019
1 parent 45b9c9f commit a045b64
Show file tree
Hide file tree
Showing 25 changed files with 1,027 additions and 30 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
# general
*~
*.log
dr_elephant.log.*
tmp
dump
.history
/*.iml
/dr-elephant-*/
/out

# Eclipse
Expand Down Expand Up @@ -58,3 +60,4 @@ public/assets/ember/
public/assets/fonts/
web/bower_components/
web/node_modules/
web/package-lock.json
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ language: scala
sudo: true
jdk:
- oraclejdk8
- openjdk7
python: "2.6"
install:
- sudo pip install inspyred
Expand Down
4 changes: 4 additions & 0 deletions app-conf/AggregatorConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,8 @@
<allocated_memory_waste_buffer_percentage>0.5</allocated_memory_waste_buffer_percentage>
</params>
</aggregator>
<aggregator>
<applicationtype>tony</applicationtype>
<classname>com.linkedin.drelephant.tony.TonyMetricsAggregator</classname>
</aggregator>
</aggregators>
9 changes: 9 additions & 0 deletions app-conf/FetcherConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,13 @@
</params>
</fetcher>
-->

<!--
Fetcher for TonY jobs. To use this, you must set the TONY_CONF_DIR environment variable to the directory
containing the tony-site.xml file.
-->
<!--fetcher>
<applicationtype>tony</applicationtype>
<classname>com.linkedin.drelephant.tony.fetchers.TonyFetcher</classname>
</fetcher-->
</fetchers>
16 changes: 16 additions & 0 deletions app-conf/HeuristicConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -354,4 +354,20 @@
<viewname>views.html.help.spark.helpExecutorGcHeuristic</viewname>
</heuristic>


<!-- TONY HEURISTICS -->
<heuristic>
<applicationtype>tony</applicationtype>
<heuristicname>Task Memory</heuristicname>
<classname>com.linkedin.drelephant.tony.heuristics.TaskMemoryHeuristic</classname>
<viewname>views.html.help.tony.helpTaskMemory</viewname>
<params>
<!-- 2048 is the default set in TaskMemoryHeuristic -->
<!--container_memory_default_mb>2048</container_memory_default_mb-->
<!-- These are the default max memory limits defiend in TaskMemoryHeuristic -->
<!--task_memory_thresholds>0.8, 0.7, 0.6, 0.5</task_memory_thresholds-->
</params>
</heuristic>
<!-- END TONY HEURISTICS -->

</heuristics>
6 changes: 6 additions & 0 deletions app-conf/JobTypeConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,10 @@
<conf>mapred.child.java.opts</conf>
<isDefault/>
</jobType>
<jobType>
<name>TonY</name>
<applicationtype>TONY</applicationtype>
<conf>tony.application.name</conf>
<isDefault/>
</jobType>
</jobTypes>
4 changes: 2 additions & 2 deletions app/com/linkedin/drelephant/analysis/HeuristicResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class HeuristicResult {
* Heuristic Result Constructor
*
* @param heuristicClass The Heuristic class
* @param heuristicName The name of the Heursitic
* @param heuristicName The name of the Heuristic
* @param severity The severity of the result
* @param score The computed score
*/
Expand Down Expand Up @@ -73,7 +73,7 @@ public HeuristicResult(String heuristicClass, String heuristicName, Severity sev
/**
* Returns the heuristic analyser class name
*
* @return the heursitic class name
* @return the heuristic class name
*/
public String getHeuristicClassName() {
return _heuristicClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private void compute(MapReduceTaskData[] taskDatas, long containerSize, long ide
}

// wastedResources
long wastedMemory = containerSize - (long) (peakMemoryNeed * MEMORY_BUFFER); // give a 50% buffer
long wastedMemory = containerSize - (long) (peakMemoryNeed * MEMORY_BUFFER); // give a 50% buffer
if(wastedMemory > 0) {
for (long duration : durations) {
_resourceWasted += (wastedMemory) * (duration / Statistics.SECOND_IN_MS); // MB Seconds
Expand Down
89 changes: 89 additions & 0 deletions app/com/linkedin/drelephant/tony/TonyMetricsAggregator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2019 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.linkedin.drelephant.tony;

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.drelephant.analysis.HadoopAggregatedData;
import com.linkedin.drelephant.analysis.HadoopApplicationData;
import com.linkedin.drelephant.analysis.HadoopMetricsAggregator;
import com.linkedin.drelephant.configurations.aggregator.AggregatorConfigurationData;
import com.linkedin.drelephant.math.Statistics;
import com.linkedin.drelephant.tony.data.TonyApplicationData;
import com.linkedin.drelephant.tony.data.TonyTaskData;
import com.linkedin.drelephant.tony.util.TonyUtils;
import com.linkedin.tony.Constants;
import com.linkedin.tony.TonyConfigurationKeys;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;


public class TonyMetricsAggregator implements HadoopMetricsAggregator {
@VisibleForTesting
static final double MEMORY_BUFFER = 1.5;

private HadoopAggregatedData _hadoopAggregatedData;

/**
* Creates a new {@code TonyMetricsAggregator}.
* @param unused Dr. Elephant expects a constructor of this form but {@code TonyMetricsAggregator} does not need this
*/
public TonyMetricsAggregator(AggregatorConfigurationData unused) { }

@Override
public void aggregate(HadoopApplicationData data) {
_hadoopAggregatedData = new HadoopAggregatedData();

TonyApplicationData tonyData = (TonyApplicationData) data;
Configuration tonyConf = tonyData.getConfiguration();

long mbSecUsed = 0;
long mbSecWasted = 0;

Map<String, Map<Integer, TonyTaskData>> taskMap = tonyData.getTaskMap();
for (Map.Entry<String, Map<Integer, TonyTaskData>> entry : taskMap.entrySet()) {
String taskType = entry.getKey();

String memoryString = tonyConf.get(TonyConfigurationKeys.getResourceKey(taskType, Constants.MEMORY));
String memoryStringMB = com.linkedin.tony.util.Utils.parseMemoryString(memoryString);
long mbRequested = Long.parseLong(memoryStringMB);
double maxMemoryMBUsed = TonyUtils.getMaxMemoryBytesUsedForTaskType(taskMap, taskType) / FileUtils.ONE_MB;

for (TonyTaskData taskData : entry.getValue().values()) {
long taskDurationSec = (taskData.getTaskEndTime() - taskData.getTaskStartTime()) / Statistics.SECOND_IN_MS;
mbSecUsed += mbRequested * taskDurationSec;

if (maxMemoryMBUsed <= 0) {
// If we don't have max memory metrics, don't calculate wasted memory.
continue;
}
long wastedMemory = (long) (mbRequested - maxMemoryMBUsed * MEMORY_BUFFER);
if (wastedMemory > 0) {
mbSecWasted += wastedMemory * taskDurationSec;
}
}
}

_hadoopAggregatedData.setResourceUsed(mbSecUsed);
_hadoopAggregatedData.setResourceWasted(mbSecWasted);
// TODO: Calculate and set delay
}

@Override
public HadoopAggregatedData getResult() {
return _hadoopAggregatedData;
}
}
123 changes: 123 additions & 0 deletions app/com/linkedin/drelephant/tony/data/TonyApplicationData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright 2019 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.linkedin.drelephant.tony.data;

import com.linkedin.drelephant.analysis.ApplicationType;
import com.linkedin.drelephant.analysis.HadoopApplicationData;
import com.linkedin.tony.events.Event;
import com.linkedin.tony.events.EventType;
import com.linkedin.tony.events.TaskFinished;
import com.linkedin.tony.events.TaskStarted;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;


public class TonyApplicationData implements HadoopApplicationData {
private String _appId;
private ApplicationType _appType;
private Configuration _configuration;
private Properties _props;
private Map<String, Map<Integer, TonyTaskData>> _taskMap;

/**
* Constructor for {@code TonyApplicationData}.
* @param appId application id
* @param appType application type (should be TONY)
* @param configuration the configuration for this application
* @param events the events emitted by this application
*/
public TonyApplicationData(String appId, ApplicationType appType, Configuration configuration, List<Event> events) {
_appId = appId;
_appType = appType;

_configuration = configuration;
_props = new Properties();
for (Map.Entry<String, String> entry : configuration) {
_props.setProperty(entry.getKey(), entry.getValue());
}

_taskMap = new HashMap<>();
processEvents(events);
}

@Override
public String getAppId() {
return _appId;
}

/**
* Returns the {@link Configuration} for this application.
* @return the configuration for this application
*/
public Configuration getConfiguration() {
return _configuration;
}

@Override
public Properties getConf() {
return _props;
}

@Override
public ApplicationType getApplicationType() {
return _appType;
}

/**
* Returns a map of task data.
* @return a map from task type to a map of task index to task data
*/
public Map<String, Map<Integer, TonyTaskData>> getTaskMap() {
return _taskMap;
}

@Override
public boolean isEmpty() {
return false;
}

private void initTaskMap(String taskType, int taskIndex) {
if (!_taskMap.containsKey(taskType)) {
_taskMap.put(taskType, new HashMap<>());
}

if (!_taskMap.get(taskType).containsKey(taskIndex)) {
_taskMap.get(taskType).put(taskIndex, new TonyTaskData(taskType, taskIndex));
}
}

private void processEvents(List<Event> events) {
for (Event event : events) {
if (event.getType().equals(EventType.TASK_STARTED)) {
TaskStarted taskStartedEvent = (TaskStarted) event.getEvent();
String taskType = taskStartedEvent.getTaskType();
int taskIndex = taskStartedEvent.getTaskIndex();
initTaskMap(taskType, taskIndex);
_taskMap.get(taskType).get(taskIndex).setTaskStartTime(event.getTimestamp());
} else if (event.getType().equals(EventType.TASK_FINISHED)) {
TaskFinished taskFinishedEvent = (TaskFinished) event.getEvent();
String taskType = taskFinishedEvent.getTaskType();
int taskIndex = taskFinishedEvent.getTaskIndex();
initTaskMap(taskType, taskIndex);
_taskMap.get(taskType).get(taskIndex).setTaskEndTime(event.getTimestamp());
_taskMap.get(taskType).get(taskIndex).setMetrics(taskFinishedEvent.getMetrics());
}
}
}
}
Loading

0 comments on commit a045b64

Please sign in to comment.