Skip to content

Commit

Permalink
Dr. Elephant Tez Support working patch (linkedin#313)
Browse files Browse the repository at this point in the history
  • Loading branch information
chinmayms authored and akshayrai committed Apr 4, 2018
1 parent d5a6897 commit a0470a3
Show file tree
Hide file tree
Showing 39 changed files with 3,971 additions and 83 deletions.
4 changes: 4 additions & 0 deletions app-conf/AggregatorConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
<applicationtype>mapreduce</applicationtype>
<classname>com.linkedin.drelephant.mapreduce.MapReduceMetricsAggregator</classname>
</aggregator>
<aggregator>
<applicationtype>tez</applicationtype>
<classname>com.linkedin.drelephant.tez.TezMetricsAggregator</classname>
</aggregator>
<aggregator>
<applicationtype>spark</applicationtype>
<classname>com.linkedin.drelephant.spark.SparkMetricsAggregator</classname>
Expand Down
7 changes: 7 additions & 0 deletions app-conf/FetcherConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@
</fetcher>
-->
<fetchers>
<!--
REST based fetcher for Tez jobs which pulls job metrics and data from Timeline Server API
-->
<fetcher>
<applicationtype>tez</applicationtype>
<classname>com.linkedin.drelephant.tez.fetchers.TezFetcher</classname>
</fetcher>
<!--
<fetcher>
<applicationtype>mapreduce</applicationtype>
Expand Down
117 changes: 117 additions & 0 deletions app-conf/HeuristicConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,123 @@
<!-- Heuristics configurations, each heuristic will be loaded by a particular analyser -->
<heuristics>

<!-- TEZ HEURISTICS -->

<heuristic>
<applicationtype>tez</applicationtype>
<heuristicname>Mapper Data Skew</heuristicname>
<classname>com.linkedin.drelephant.tez.heuristics.MapperDataSkewHeuristic</classname>
<viewname>views.html.help.mapreduce.helpMapperSpill</viewname>
<!--<params>
<num_tasks_severity>10, 50, 100, 200</num_tasks_severity>
<deviation_severity>2, 4, 8, 16</deviation_severity>
<files_severity>1/8, 1/4, 1/2, 1</files_severity>
</params>-->
</heuristic>

<heuristic>
<applicationtype>tez</applicationtype>
<heuristicname>Mapper GC</heuristicname>
<classname>com.linkedin.drelephant.tez.heuristics.MapperGCHeuristic</classname>
<viewname>views.html.help.mapreduce.helpMapperSpill</viewname>
<!--<params>
<exclude_jobtypes_filter>OozieLauncher</exclude_jobtypes_filter>
<gc_ratio_severity>0.01, 0.02, 0.03, 0.04</gc_ratio_severity>
<runtime_severity_in_min>5, 10, 12, 15</runtime_severity_in_min>
</params>-->
</heuristic>

<heuristic>
<applicationtype>tez</applicationtype>
<heuristicname>Mapper Time</heuristicname>
<classname>com.linkedin.drelephant.tez.heuristics.MapperTimeHeuristic</classname>
<viewname>views.html.help.mapreduce.helpMapperSpill</viewname>
<!--<params>
<exclude_jobtypes_filter>OozieLauncher</exclude_jobtypes_filter>
<short_runtime_severity_in_min>10, 4, 2, 1</short_runtime_severity_in_min>
<long_runtime_severity_in_min>15, 30, 60, 120</long_runtime_severity_in_min>
<num_tasks_severity>50, 101, 500, 1000</num_tasks_severity>
</params>-->
</heuristic>

<heuristic>
<applicationtype>tez</applicationtype>
<heuristicname>Mapper Speed</heuristicname>
<classname>com.linkedin.drelephant.tez.heuristics.MapperSpeedHeuristic</classname>
<viewname>views.html.help.mapreduce.helpMapperSpill</viewname>
<!--<params>
<exclude_jobtypes_filter>OozieLauncher</exclude_jobtypes_filter>
<disk_speed_severity>1/2, 1/4, 1/8, 1/32</disk_speed_severity>
<runtime_severity_in_min>5, 10, 15, 30</runtime_severity_in_min>
</params>-->
</heuristic>

<heuristic>
<applicationtype>tez</applicationtype>
<heuristicname>Mapper Memory</heuristicname>
<classname>com.linkedin.drelephant.tez.heuristics.MapperMemoryHeuristic</classname>
<viewname>views.html.help.mapreduce.helpMapperSpill</viewname>
<!--<params>
<memory_ratio_severity>0.6, 0.5, 0.4, 0.3</memory_ratio_severity>
</params>-->
</heuristic>

<heuristic>
<applicationtype>tez</applicationtype>
<heuristicname>Mapper Spill</heuristicname>
<classname>com.linkedin.drelephant.tez.heuristics.MapperSpillHeuristic</classname>
<viewname>views.html.help.mapreduce.helpMapperSpill</viewname>
<!--<params>
<spill_severity>2.01, 2.2, 2.5, 3</spill_severity>
<num_tasks_severity>50, 100, 500, 1000</num_tasks_severity>
</params>-->
</heuristic>

<heuristic>
<applicationtype>tez</applicationtype>
<heuristicname>Reducer Data Skew</heuristicname>
<classname>com.linkedin.drelephant.tez.heuristics.ReducerDataSkewHeuristic</classname>
<viewname>views.html.help.mapreduce.helpMapperSpill</viewname>
<!--<params>
<num_tasks_severity>10, 50, 100, 200</num_tasks_severity>
<deviation_severity>2, 4, 8, 16</deviation_severity>
<files_severity>1/8, 1/4, 1/2, 1</files_severity>
</params>-->
</heuristic>

<heuristic>
<applicationtype>tez</applicationtype>
<heuristicname>Reducer GC</heuristicname>
<classname>com.linkedin.drelephant.tez.heuristics.ReducerGCHeuristic</classname>
<viewname>views.html.help.mapreduce.helpMapperSpill</viewname>
<!--<params>
<gc_ratio_severity>0.01, 0.02, 0.03, 0.04</gc_ratio_severity>
<runtime_severity_in_min>5, 10, 12, 15</runtime_severity_in_min>
</params>-->
</heuristic>

<heuristic>
<applicationtype>tez</applicationtype>
<heuristicname>Reducer Time</heuristicname>
<classname>com.linkedin.drelephant.tez.heuristics.ReducerTimeHeuristic</classname>
<viewname>views.html.help.mapreduce.helpMapperSpill</viewname>
<!--<params>
<short_runtime_severity_in_min>10, 4, 2, 1</short_runtime_severity_in_min>
<long_runtime_severity_in_min>15, 30, 60, 120</long_runtime_severity_in_min>
<num_tasks_severity>50, 101, 500, 1000</num_tasks_severity>
</params>-->
</heuristic>

<heuristic>
<applicationtype>tez</applicationtype>
<heuristicname>Reducer Memory</heuristicname>
<classname>com.linkedin.drelephant.tez.heuristics.ReducerMemoryHeuristic</classname>
<viewname>views.html.help.mapreduce.helpMapperSpill</viewname>
<!--<params>
<memory_ratio_severity>0.6, 0.5, 0.4, 0.3</memory_ratio_severity>
</params>-->
</heuristic>

<!-- MAP-REDUCE HEURISTICS -->

<heuristic>
Expand Down
6 changes: 6 additions & 0 deletions app-conf/JobTypeConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
<applicationtype>mapreduce</applicationtype>
<conf>pig.script</conf>
</jobType>
<jobType>
<name>Tez</name>
<applicationtype>tez</applicationtype>
<conf>hive.mapred.mode</conf>
<isDefault/>
</jobType>
<jobType>
<name>Hive</name>
<applicationtype>mapreduce</applicationtype>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,23 @@
import com.linkedin.drelephant.mapreduce.data.MapReduceApplicationData;
import com.linkedin.drelephant.mapreduce.data.MapReduceCounterData;
import com.linkedin.drelephant.mapreduce.data.MapReduceTaskData;
import com.linkedin.drelephant.math.Statistics;
import com.linkedin.drelephant.configurations.fetcher.FetcherConfigurationData;
import com.linkedin.drelephant.util.ThreadContextMR2;
import com.linkedin.drelephant.util.Utils;

import java.io.IOException;
import java.lang.Integer;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.log4j.Logger;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;


/**
Expand Down Expand Up @@ -387,78 +380,3 @@ private JsonNode getTaskFirstFailedAttempt(URL taskAllAttemptsUrl) throws IOExce
}
}
}

final class ThreadContextMR2 {
private static final Logger logger = Logger.getLogger(ThreadContextMR2.class);
private static final AtomicInteger THREAD_ID = new AtomicInteger(1);

private static final ThreadLocal<Integer> _LOCAL_THREAD_ID = new ThreadLocal<Integer>() {
@Override
public Integer initialValue() {
return THREAD_ID.getAndIncrement();
}
};

private static final ThreadLocal<Long> _LOCAL_LAST_UPDATED = new ThreadLocal<Long>();
private static final ThreadLocal<Long> _LOCAL_UPDATE_INTERVAL = new ThreadLocal<Long>();

private static final ThreadLocal<Pattern> _LOCAL_DIAGNOSTIC_PATTERN = new ThreadLocal<Pattern>() {
@Override
public Pattern initialValue() {
// Example: "Task task_1443068695259_9143_m_000475 failed 1 times"
return Pattern.compile(
".*[\\s\\u00A0]+(task_[0-9]+_[0-9]+_[m|r]_[0-9]+)[\\s\\u00A0]+.*");
}
};

private static final ThreadLocal<AuthenticatedURL.Token> _LOCAL_AUTH_TOKEN =
new ThreadLocal<AuthenticatedURL.Token>() {
@Override
public AuthenticatedURL.Token initialValue() {
_LOCAL_LAST_UPDATED.set(System.currentTimeMillis());
// Random an interval for each executor to avoid update token at the same time
_LOCAL_UPDATE_INTERVAL.set(Statistics.MINUTE_IN_MS * 30 + new Random().nextLong()
% (3 * Statistics.MINUTE_IN_MS));
logger.info("Executor " + _LOCAL_THREAD_ID.get() + " update interval " + _LOCAL_UPDATE_INTERVAL.get() * 1.0
/ Statistics.MINUTE_IN_MS);
return new AuthenticatedURL.Token();
}
};

private static final ThreadLocal<AuthenticatedURL> _LOCAL_AUTH_URL = new ThreadLocal<AuthenticatedURL>() {
@Override
public AuthenticatedURL initialValue() {
return new AuthenticatedURL();
}
};

private static final ThreadLocal<ObjectMapper> _LOCAL_MAPPER = new ThreadLocal<ObjectMapper>() {
@Override
public ObjectMapper initialValue() {
return new ObjectMapper();
}
};

private ThreadContextMR2() {
// Empty on purpose
}

public static Matcher getDiagnosticMatcher(String diagnosticInfo) {
return _LOCAL_DIAGNOSTIC_PATTERN.get().matcher(diagnosticInfo);
}

public static JsonNode readJsonNode(URL url) throws IOException, AuthenticationException {
HttpURLConnection conn = _LOCAL_AUTH_URL.get().openConnection(url, _LOCAL_AUTH_TOKEN.get());
return _LOCAL_MAPPER.get().readTree(conn.getInputStream());
}

public static void updateAuthToken() {
long curTime = System.currentTimeMillis();
if (curTime - _LOCAL_LAST_UPDATED.get() > _LOCAL_UPDATE_INTERVAL.get()) {
logger.info("Executor " + _LOCAL_THREAD_ID.get() + " updates its AuthenticatedToken.");
_LOCAL_AUTH_TOKEN.set(new AuthenticatedURL.Token());
_LOCAL_AUTH_URL.set(new AuthenticatedURL());
_LOCAL_LAST_UPDATED.set(curTime);
}
}
}
109 changes: 109 additions & 0 deletions app/com/linkedin/drelephant/tez/TezMetricsAggregator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package com.linkedin.drelephant.tez;

import com.linkedin.drelephant.analysis.*;
import com.linkedin.drelephant.configurations.aggregator.AggregatorConfigurationData;
import com.linkedin.drelephant.tez.data.TezApplicationData;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;

/**
* Aggregates task level metrics to application
*/

public class TezMetricsAggregator implements HadoopMetricsAggregator {

private static final Logger logger = Logger.getLogger(TezMetricsAggregator.class);

private static final String TEZ_CONTAINER_CONFIG = "hive.tez.container.size";
private static final String MAP_CONTAINER_CONFIG = "mapreduce.map.memory.mb";
private static final String REDUCER_CONTAINER_CONFIG = "mapreduce.reduce.memory.mb";
private static final String REDUCER_SLOW_START_CONFIG = "mapreduce.job.reduce.slowstart.completedmaps";
private static final long CONTAINER_MEMORY_DEFAULT_BYTES = 2048L * FileUtils.ONE_MB;

private HadoopAggregatedData _hadoopAggregatedData = null;
private TezTaskLevelAggregatedMetrics _mapTasks;
private TezTaskLevelAggregatedMetrics _reduceTasks;

private AggregatorConfigurationData _aggregatorConfigurationData;

public TezMetricsAggregator(AggregatorConfigurationData _aggregatorConfigurationData) {
this._aggregatorConfigurationData = _aggregatorConfigurationData;
_hadoopAggregatedData = new HadoopAggregatedData();
}

@Override
public void aggregate(HadoopApplicationData hadoopData) {

TezApplicationData data = (TezApplicationData) hadoopData;

long mapTaskContainerSize = getMapContainerSize(data);
long reduceTaskContainerSize = getReducerContainerSize(data);

int reduceTaskSlowStartPercentage =
(int) (Double.parseDouble(data.getConf().getProperty(REDUCER_SLOW_START_CONFIG)) * 100);


//overwrite reduceTaskSlowStartPercentage to 100%. TODO: make use of the slow start percent
reduceTaskSlowStartPercentage = 100;

_mapTasks = new TezTaskLevelAggregatedMetrics(data.getMapTaskData(), mapTaskContainerSize, data.getStartTime());

long reduceIdealStartTime = _mapTasks.getNthPercentileFinishTime(reduceTaskSlowStartPercentage);

// Mappers list is empty
if(reduceIdealStartTime == -1) {
// ideal start time for reducer is infinite since it cannot start
reduceIdealStartTime = Long.MAX_VALUE;
}

_reduceTasks = new TezTaskLevelAggregatedMetrics(data.getReduceTaskData(), reduceTaskContainerSize, reduceIdealStartTime);

_hadoopAggregatedData.setResourceUsed(_mapTasks.getResourceUsed() + _reduceTasks.getResourceUsed());
_hadoopAggregatedData.setTotalDelay(_mapTasks.getDelay() + _reduceTasks.getDelay());
_hadoopAggregatedData.setResourceWasted(_mapTasks.getResourceWasted() + _reduceTasks.getResourceWasted());
}

@Override
public HadoopAggregatedData getResult() {
return _hadoopAggregatedData;
}

private long getMapContainerSize(HadoopApplicationData data) {
try {
long mapContainerSize = Long.parseLong(data.getConf().getProperty(TEZ_CONTAINER_CONFIG));
if (mapContainerSize > 0)
return mapContainerSize;
else
return Long.parseLong(data.getConf().getProperty(MAP_CONTAINER_CONFIG));
} catch ( NumberFormatException ex) {
return CONTAINER_MEMORY_DEFAULT_BYTES;
}
}

private long getReducerContainerSize(HadoopApplicationData data) {
try {
long reducerContainerSize = Long.parseLong(data.getConf().getProperty(TEZ_CONTAINER_CONFIG));
if (reducerContainerSize > 0)
return reducerContainerSize;
else
return Long.parseLong(data.getConf().getProperty(REDUCER_CONTAINER_CONFIG));
} catch ( NumberFormatException ex) {
return CONTAINER_MEMORY_DEFAULT_BYTES;
}
}
}
Loading

0 comments on commit a0470a3

Please sign in to comment.