Skip to content

Commit

Permalink
Make flow search more powerful (linkedin#99)
Browse files Browse the repository at this point in the history
  • Loading branch information
paulbramsen authored and akshayrai committed Jul 21, 2016
1 parent 600f330 commit f22440f
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 56 deletions.
5 changes: 5 additions & 0 deletions app-conf/GeneralConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,9 @@
<value>60000</value>
<description>Interval between retries in milliseconds</description>
</property>
<property>
<name>drelephant.application.search.match.partial</name>
<value>true</value>
<description>If this property is "false", search will only make exact matches</description>
</property>
</configuration>
25 changes: 25 additions & 0 deletions app/com/linkedin/drelephant/ElephantContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.spark.SparkMetricsAggregator;
import org.w3c.dom.Document;
Expand All @@ -65,10 +67,12 @@ public class ElephantContext {
private static final String FETCHERS_CONF = "FetcherConf.xml";
private static final String HEURISTICS_CONF = "HeuristicConf.xml";
private static final String JOB_TYPES_CONF = "JobTypeConf.xml";
private static final String GENERAL_CONF = "GeneralConf.xml";

private final Map<String, List<String>> _heuristicGroupedNames = new HashMap<String, List<String>>();
private List<HeuristicConfigurationData> _heuristicsConfData;
private List<FetcherConfigurationData> _fetchersConfData;
private Configuration _generalConf;
private List<AggregatorConfigurationData> _aggregatorConfData;

private final Map<String, ApplicationType> _nameToType = new HashMap<String, ApplicationType>();
Expand Down Expand Up @@ -100,6 +104,8 @@ private void loadConfiguration() {
loadHeuristics();
loadJobTypes();

loadGeneralConf();

// It is important to configure supported types in the LAST step so that we could have information from all
// configurable components.
configureSupportedApplicationTypes();
Expand Down Expand Up @@ -294,6 +300,16 @@ private void loadJobTypes() {
_appTypeToJobTypes = conf.getAppTypeToJobTypeList();
}

/**
* Load in the GeneralConf.xml file as a configuration object for other objects to access
*/
private void loadGeneralConf() {
logger.info("Loading configuration file " + GENERAL_CONF);

_generalConf = new Configuration();
_generalConf.addResource(this.getClass().getClassLoader().getResourceAsStream(GENERAL_CONF));
}

/**
* Given an application type, return the currently bound heuristics
*
Expand Down Expand Up @@ -360,6 +376,15 @@ public ApplicationType getApplicationTypeForName(String typeName) {
return _nameToType.get(typeName.toUpperCase());
}

/**
* Get the general configuration object.
*
* @return the genral configuration object.
*/
public Configuration getGeneralConf() {
return _generalConf;
}

/**
* Get the matched job type given a
*
Expand Down
15 changes: 5 additions & 10 deletions app/com/linkedin/drelephant/ElephantRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public class ElephantRunner implements Runnable {
private static final long RETRY_INTERVAL = 60 * 1000; // Interval between retries
private static final int EXECUTOR_NUM = 5; // The number of executor threads to analyse the jobs

private static final String GENERAL_CONF = "GeneralConf.xml";
private static final String FETCH_INTERVAL_KEY = "drelephant.analysis.fetch.interval";
private static final String RETRY_INTERVAL_KEY = "drelephant.analysis.retry.interval";
private static final String EXECUTOR_NUM_KEY = "drelephant.analysis.thread.count";
Expand All @@ -64,17 +63,13 @@ public class ElephantRunner implements Runnable {
private ExecutorService _service;
private BlockingQueue<AnalyticJob> _jobQueue;
private AnalyticJobGenerator _analyticJobGenerator;
private Configuration _configuration;

private void loadGeneralConfiguration() {
logger.info("Loading configuration file " + GENERAL_CONF);
Configuration configuration = ElephantContext.instance().getGeneralConf();

_configuration = new Configuration();
_configuration.addResource(this.getClass().getClassLoader().getResourceAsStream(GENERAL_CONF));

_executorNum = Utils.getNonNegativeInt(_configuration, EXECUTOR_NUM_KEY, EXECUTOR_NUM);
_fetchInterval = Utils.getNonNegativeLong(_configuration, FETCH_INTERVAL_KEY, FETCH_INTERVAL);
_retryInterval = Utils.getNonNegativeLong(_configuration, RETRY_INTERVAL_KEY, RETRY_INTERVAL);
_executorNum = Utils.getNonNegativeInt(configuration, EXECUTOR_NUM_KEY, EXECUTOR_NUM);
_fetchInterval = Utils.getNonNegativeLong(configuration, FETCH_INTERVAL_KEY, FETCH_INTERVAL);
_retryInterval = Utils.getNonNegativeLong(configuration, RETRY_INTERVAL_KEY, RETRY_INTERVAL);
}

private void loadAnalyticJobGenerator() {
Expand All @@ -85,7 +80,7 @@ private void loadAnalyticJobGenerator() {
}

try {
_analyticJobGenerator.configure(_configuration);
_analyticJobGenerator.configure(ElephantContext.instance().getGeneralConf());
} catch (Exception e) {
logger.error("Error occurred when configuring the analysis provider.", e);
throw new RuntimeException(e);
Expand Down
Loading

0 comments on commit f22440f

Please sign in to comment.