Skip to content

Commit

Permalink
Merge pull request linkedin#109 from nntnag17/master
Browse files Browse the repository at this point in the history
Aggregated Metrics Feature
  • Loading branch information
nntnag17 authored Jul 20, 2016
2 parents ddaba87 + 27bcaf6 commit 600f330
Show file tree
Hide file tree
Showing 68 changed files with 3,341 additions and 168 deletions.
40 changes: 40 additions & 0 deletions app-conf/AggregatorConf.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2016 LinkedIn Corp.
Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under
the License.
-->

<!-- Data aggregators configurations
An Aggregator implements HadoopMetricsAggregator interface and help aggregate a certain application type data.
Example:
<aggregator>
# Choose the application type that this aggregator is for
<applicationtype>mapreduce</applicationtype>
# Specify the implementation class
<classname>com.linkedin.drelephant.mapreduce.MapReduceAggregator</classname>
</aggregator>
-->
<aggregators>
<aggregator>
<applicationtype>mapreduce</applicationtype>
<classname>com.linkedin.drelephant.mapreduce.MapReduceMetricsAggregator</classname>
</aggregator>
<aggregator>
<applicationtype>spark</applicationtype>
<classname>org.apache.spark.SparkMetricsAggregator</classname>
</aggregator>
</aggregators>
3 changes: 3 additions & 0 deletions app-conf/FetcherConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
<fetcher>
<applicationtype>mapreduce</applicationtype>
<classname>com.linkedin.drelephant.mapreduce.MapReduceFetcherHadoop2</classname>
<params>
<sampling_enabled>false</sampling_enabled>
</params>
</fetcher>
<fetcher>
<applicationtype>spark</applicationtype>
Expand Down
51 changes: 51 additions & 0 deletions app/com/linkedin/drelephant/ElephantContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@
import com.linkedin.drelephant.analysis.ApplicationType;
import com.linkedin.drelephant.analysis.ElephantFetcher;
import com.linkedin.drelephant.analysis.HadoopApplicationData;
import com.linkedin.drelephant.analysis.HadoopMetricsAggregator;
import com.linkedin.drelephant.analysis.Heuristic;
import com.linkedin.drelephant.analysis.HeuristicResult;
import com.linkedin.drelephant.analysis.JobType;
import com.linkedin.drelephant.configurations.aggregator.AggregatorConfiguration;
import com.linkedin.drelephant.configurations.aggregator.AggregatorConfigurationData;
import com.linkedin.drelephant.configurations.fetcher.FetcherConfiguration;
import com.linkedin.drelephant.configurations.fetcher.FetcherConfigurationData;
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfiguration;
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData;
import com.linkedin.drelephant.configurations.jobtype.JobTypeConfiguration;
import com.linkedin.drelephant.mapreduce.MapReduceMetricsAggregator;
import com.linkedin.drelephant.util.Utils;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
Expand All @@ -41,6 +45,7 @@
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.spark.SparkMetricsAggregator;
import org.w3c.dom.Document;
import play.api.Play;
import play.api.templates.Html;
Expand All @@ -56,16 +61,19 @@ public class ElephantContext {
private static final Logger logger = Logger.getLogger(ElephantContext.class);
private static ElephantContext INSTANCE;

private static final String AGGREGATORS_CONF = "AggregatorConf.xml";
private static final String FETCHERS_CONF = "FetcherConf.xml";
private static final String HEURISTICS_CONF = "HeuristicConf.xml";
private static final String JOB_TYPES_CONF = "JobTypeConf.xml";

private final Map<String, List<String>> _heuristicGroupedNames = new HashMap<String, List<String>>();
private List<HeuristicConfigurationData> _heuristicsConfData;
private List<FetcherConfigurationData> _fetchersConfData;
private List<AggregatorConfigurationData> _aggregatorConfData;

private final Map<String, ApplicationType> _nameToType = new HashMap<String, ApplicationType>();
private final Map<ApplicationType, List<Heuristic>> _typeToHeuristics = new HashMap<ApplicationType, List<Heuristic>>();
private final Map<ApplicationType, HadoopMetricsAggregator> _typeToAggregator = new HashMap<ApplicationType, HadoopMetricsAggregator>();
private final Map<ApplicationType, ElephantFetcher> _typeToFetcher = new HashMap<ApplicationType, ElephantFetcher>();
private final Map<String, Html> _heuristicToView = new HashMap<String, Html>();
private Map<ApplicationType, List<JobType>> _appTypeToJobTypes = new HashMap<ApplicationType, List<JobType>>();
Expand All @@ -87,6 +95,7 @@ private ElephantContext() {
}

private void loadConfiguration() {
loadAggregators();
loadFetchers();
loadHeuristics();
loadJobTypes();
Expand All @@ -96,6 +105,42 @@ private void loadConfiguration() {
configureSupportedApplicationTypes();
}


private void loadAggregators() {
Document document = Utils.loadXMLDoc(AGGREGATORS_CONF);

_aggregatorConfData = new AggregatorConfiguration(document.getDocumentElement()).getAggregatorsConfigurationData();
for (AggregatorConfigurationData data : _aggregatorConfData) {
try {
Class<?> aggregatorClass = Play.current().classloader().loadClass(data.getClassName());
Object instance = aggregatorClass.getConstructor(AggregatorConfigurationData.class).newInstance(data);
if (!(instance instanceof HadoopMetricsAggregator)) {
throw new IllegalArgumentException(
"Class " + aggregatorClass.getName() + " is not an implementation of " + HadoopMetricsAggregator.class.getName());
}

ApplicationType type = data.getAppType();
if (_typeToAggregator.get(type) == null) {
_typeToAggregator.put(type, (HadoopMetricsAggregator) instance);
}

logger.info("Load Aggregator : " + data.getClassName());
} catch (ClassNotFoundException e) {
throw new RuntimeException("Could not find class " + data.getClassName(), e);
} catch (InstantiationException e) {
throw new RuntimeException("Could not instantiate class " + data.getClassName(), e);
} catch (IllegalAccessException e) {
throw new RuntimeException("Could not access constructor for class" + data.getClassName(), e);
} catch (RuntimeException e) {
throw new RuntimeException(data.getClassName() + " is not a valid Aggregator class.", e);
} catch (InvocationTargetException e) {
throw new RuntimeException("Could not invoke class " + data.getClassName(), e);
} catch (NoSuchMethodException e) {
throw new RuntimeException("Could not find constructor for class " + data.getClassName(), e);
}
}

}
/**
* Load all the fetchers configured in FetcherConf.xml
*/
Expand Down Expand Up @@ -216,7 +261,9 @@ private void loadHeuristics() {
private void configureSupportedApplicationTypes() {
Set<ApplicationType> supportedTypes = Sets.intersection(_typeToFetcher.keySet(), _typeToHeuristics.keySet());
supportedTypes = Sets.intersection(supportedTypes, _appTypeToJobTypes.keySet());
supportedTypes = Sets.intersection(supportedTypes, _typeToAggregator.keySet());

_typeToAggregator.keySet().retainAll(supportedTypes);
_typeToFetcher.keySet().retainAll(supportedTypes);
_typeToHeuristics.keySet().retainAll(supportedTypes);
_appTypeToJobTypes.keySet().retainAll(supportedTypes);
Expand Down Expand Up @@ -300,6 +347,10 @@ public ElephantFetcher getFetcherForApplicationType(ApplicationType type) {
return _typeToFetcher.get(type);
}

public HadoopMetricsAggregator getAggregatorForApplicationType(ApplicationType type) {
return _typeToAggregator.get(type);
}

/**
* Get the application type given a type name.
*
Expand Down
2 changes: 1 addition & 1 deletion app/com/linkedin/drelephant/ElephantRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class ElephantRunner implements Runnable {

private static final long FETCH_INTERVAL = 60 * 1000; // Interval between fetches
private static final long RETRY_INTERVAL = 60 * 1000; // Interval between retries
private static final int EXECUTOR_NUM = 3; // The number of executor threads to analyse the jobs
private static final int EXECUTOR_NUM = 5; // The number of executor threads to analyse the jobs

private static final String GENERAL_CONF = "GeneralConf.xml";
private static final String FETCH_INTERVAL_KEY = "drelephant.analysis.fetch.interval";
Expand Down
8 changes: 7 additions & 1 deletion app/com/linkedin/drelephant/analysis/AnalyticJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.linkedin.drelephant.util.InfoExtractor;
import com.linkedin.drelephant.util.Utils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import models.AppHeuristicResult;
import models.AppHeuristicResultDetails;
Expand Down Expand Up @@ -250,6 +249,10 @@ public AppResult getAnalysis() throws Exception {
JobType jobType = ElephantContext.instance().matchJobType(data);
String jobTypeName = jobType == null ? UNKNOWN_JOB_TYPE : jobType.getName();

HadoopMetricsAggregator hadoopMetricsAggregator = ElephantContext.instance().getAggregatorForApplicationType(getAppType());
hadoopMetricsAggregator.aggregate(data);
HadoopAggregatedData hadoopAggregatedData = hadoopMetricsAggregator.getResult();

// Load app information
AppResult result = new AppResult();
result.id = Utils.truncateField(getAppId(), AppResult.ID_LIMIT, getAppId());
Expand All @@ -260,6 +263,9 @@ public AppResult getAnalysis() throws Exception {
result.finishTime = getFinishTime();
result.name = Utils.truncateField(getName(), AppResult.APP_NAME_LIMIT, getAppId());
result.jobType = Utils.truncateField(jobTypeName, AppResult.JOBTYPE_LIMIT, getAppId());
result.resourceUsed = hadoopAggregatedData.getResourceUsed();
result.totalDelay = hadoopAggregatedData.getTotalDelay();
result.resourceWasted = hadoopAggregatedData.getResourceWasted();

// Load App Heuristic information
int jobScore = 0;
Expand Down
76 changes: 76 additions & 0 deletions app/com/linkedin/drelephant/analysis/HadoopAggregatedData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.analysis;

/**
* This class contains the aggregated data of a job
*/
public class HadoopAggregatedData {

private long resourceUsed = 0;
private long resourceWasted = 0;
private long totalDelay = 0;

/**
* Returns the resource usage of the job
* @return The resource usage of the job
*/
public long getResourceUsed() {
return resourceUsed;
}

/**
* Setter for the resource usage of the job
* @param resourceUsed The resource usage of the job
*/
public void setResourceUsed(long resourceUsed) {
this.resourceUsed = resourceUsed;
}

/**
* Returns the wasted resources of the job
* @return The wasted resources of the job
*/
public long getResourceWasted() {
return resourceWasted;
}

/**
* Setter for the wasted resources
* @param resourceWasted The wasted resources of the job
*/
public void setResourceWasted(long resourceWasted) {
this.resourceWasted = resourceWasted;
}

/**
* returns the total delay of the job
* @return The total delay of the job
*/
public long getTotalDelay() {
return totalDelay;
}

/**
* Setter for the total delay of the job
* @param totalDelay The total delay of the job
*/
public void setTotalDelay(long totalDelay) {
this.totalDelay = totalDelay;
}

}
22 changes: 22 additions & 0 deletions app/com/linkedin/drelephant/analysis/HadoopMetricsAggregator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.analysis;

public interface HadoopMetricsAggregator {
public void aggregate(HadoopApplicationData data);
public HadoopAggregatedData getResult();
}
60 changes: 60 additions & 0 deletions app/com/linkedin/drelephant/analysis/Metrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.analysis;

public enum Metrics {

// Currently supported metrics
USED_RESOURCES("Used Resources", "resources", "The resources used by the job"),
WASTED_RESOURCES("Wasted Resources", "resources", "The resources wasted by the job"),
RUNTIME("Run Time", "time", "The run time of the job"),
WAIT_TIME("Wait Time", "time", "The wait time of the job");

private String text;
private String type;
private String description;

Metrics(String text, String type, String description) {
this.text = text;
this.type = type;
this.description = description;
}

/**
* Returns the value of the text for the metrics
* @return The text value
*/
public String getText() {
return text;
}

/**
* Returns the type of the metrics. It can be one of resources or time
* @return The type of the metrics.
*/
public String getType() {
return type;
}

/**
* Returns the description of the metrics
* @return The description of the metrics
*/
public String getDescription() {
return description;
}
}
Loading

0 comments on commit 600f330

Please sign in to comment.