Skip to content

Commit

Permalink
Add GeneralConf.xml to make number of executors configurable (linkedi…
Browse files Browse the repository at this point in the history
  • Loading branch information
stiga-huang authored and akshayrai committed Jun 22, 2016
1 parent f57b4f5 commit a616a69
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 12 deletions.
35 changes: 35 additions & 0 deletions app-conf/GeneralConf.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2016 LinkedIn Corp.
Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under
the License.
-->

<!-- General configurations -->
<configuration>
<property>
<name>drelephant.analysis.thread.count</name>
<value>3</value>
<description>Number of threads to analyze the completed jobs</description>
</property>
<property>
<name>drelephant.analysis.fetch.interval</name>
<value>60000</value>
<description>Interval between fetches in milliseconds</description>
</property>
<property>
<name>drelephant.analysis.retry.interval</name>
<value>60000</value>
<description>Interval between retries in milliseconds</description>
</property>
</configuration>
47 changes: 36 additions & 11 deletions app/com/linkedin/drelephant/ElephantRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import com.linkedin.drelephant.util.Utils;
import models.AppResult;

import org.apache.commons.lang.exception.ExceptionUtils;
Expand All @@ -45,26 +46,46 @@
public class ElephantRunner implements Runnable {
private static final Logger logger = Logger.getLogger(ElephantRunner.class);

private static final long WAIT_INTERVAL = 60 * 1000; // Interval between fetches and retries
private static final long FETCH_INTERVAL = 60 * 1000; // Interval between fetches
private static final long RETRY_INTERVAL = 60 * 1000; // Interval between retries
private static final int EXECUTOR_NUM = 3; // The number of executor threads to analyse the jobs

private static final String GENERAL_CONF = "GeneralConf.xml";
private static final String FETCH_INTERVAL_KEY = "drelephant.analysis.fetch.interval";
private static final String RETRY_INTERVAL_KEY = "drelephant.analysis.retry.interval";
private static final String EXECUTOR_NUM_KEY = "drelephant.analysis.thread.count";

private AtomicBoolean _running = new AtomicBoolean(true);
private long lastRun;
private long _fetchInterval;
private long _retryInterval;
private int _executorNum;
private HadoopSecurity _hadoopSecurity;
private ExecutorService _service;
private BlockingQueue<AnalyticJob> _jobQueue;
private AnalyticJobGenerator _analyticJobGenerator;
private Configuration _configuration;

private void loadGeneralConfiguration() {
logger.info("Loading configuration file " + GENERAL_CONF);

_configuration = new Configuration();
_configuration.addResource(this.getClass().getClassLoader().getResourceAsStream(GENERAL_CONF));

_executorNum = Utils.getNonNegativeInt(_configuration, EXECUTOR_NUM_KEY, EXECUTOR_NUM);
_fetchInterval = Utils.getNonNegativeLong(_configuration, FETCH_INTERVAL_KEY, FETCH_INTERVAL);
_retryInterval = Utils.getNonNegativeLong(_configuration, RETRY_INTERVAL_KEY, RETRY_INTERVAL);
}

private void loadAnalyticJobGenerator() {
Configuration configuration = new Configuration();
if (HadoopSystemContext.isHadoop2Env()) {
_analyticJobGenerator = new AnalyticJobGeneratorHadoop2();
} else {
throw new RuntimeException("Unsupported Hadoop major version detected. It is not 2.x.");
}

try {
_analyticJobGenerator.configure(configuration);
_analyticJobGenerator.configure(_configuration);
} catch (Exception e) {
logger.error("Error occurred when configuring the analysis provider.", e);
throw new RuntimeException(e);
Expand All @@ -80,13 +101,17 @@ public void run() {
@Override
public Void run() {
HDFSContext.load();
loadGeneralConfiguration();
loadAnalyticJobGenerator();
ElephantContext.init();

_service = Executors.newFixedThreadPool(EXECUTOR_NUM);
_jobQueue = new LinkedBlockingQueue<AnalyticJob>();
for (int i = 0; i < EXECUTOR_NUM; i++) {
_service.submit(new ExecutorThread(i + 1, _jobQueue));
logger.info("executor num is " + _executorNum);
if (_executorNum > 0) {
_service = Executors.newFixedThreadPool(_executorNum);
for (int i = 0; i < _executorNum; i++) {
_service.submit(new ExecutorThread(i + 1, _jobQueue));
}
}

while (_running.get() && !Thread.currentThread().isInterrupted()) {
Expand All @@ -100,7 +125,7 @@ public Void run() {
} catch (IOException e) {
logger.info("Error with hadoop kerberos login", e);
//Wait for a while before retry
waitInterval();
waitInterval(_retryInterval);
continue;
}

Expand All @@ -110,15 +135,15 @@ public Void run() {
} catch (Exception e) {
logger.error("Error fetching job list. Try again later...", e);
//Wait for a while before retry
waitInterval();
waitInterval(_retryInterval);
continue;
}

_jobQueue.addAll(todos);
logger.info("Job queue size is " + _jobQueue.size());

//Wait for a while before next fetch
waitInterval();
waitInterval(_fetchInterval);
}
logger.info("Main thread is terminated.");
return null;
Expand Down Expand Up @@ -172,9 +197,9 @@ public void run() {
}
}

private void waitInterval() {
private void waitInterval(long interval) {
// Wait for long enough
long nextRun = lastRun + WAIT_INTERVAL;
long nextRun = lastRun + interval;
long waitTime = nextRun - System.currentTimeMillis();

if (waitTime <= 0) {
Expand Down
54 changes: 53 additions & 1 deletion app/com/linkedin/drelephant/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import models.AppResult;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.w3c.dom.Document;
import org.xml.sax.SAXException;
Expand Down Expand Up @@ -242,4 +242,56 @@ public static String truncateField(String field, int limit, String appId) {
public static boolean isSet(String property) {
return property != null && !property.isEmpty();
}

/**
* Get non negative int value from Configuration.
*
* If the value is not set or not an integer, the provided default value is returned.
* If the value is negative, 0 is returned.
*
* @param conf Configuration to be extracted
* @param key property name
* @param defaultValue default value
* @return non negative int value
*/
public static int getNonNegativeInt(Configuration conf, String key, int defaultValue) {
try {
int value = conf.getInt(key, defaultValue);
if (value < 0) {
value = 0;
logger.warn("Configuration " + key + " is negative. Resetting it to 0");
}
return value;
} catch (NumberFormatException e) {
logger.error("Invalid configuration " + key + ". Value is " + conf.get(key)
+ ". Resetting it to default value: " + defaultValue);
return defaultValue;
}
}

/**
* Get non negative long value from Configuration.
*
* If the value is not set or not a long, the provided default value is returned.
* If the value is negative, 0 is returned.
*
* @param conf Configuration to be extracted
* @param key property name
* @param defaultValue default value
* @return non negative long value
*/
public static long getNonNegativeLong(Configuration conf, String key, long defaultValue) {
try {
long value = conf.getLong(key, defaultValue);
if (value < 0) {
value = 0;
logger.warn("Configuration " + key + " is negative. Resetting it to 0");
}
return value;
} catch (NumberFormatException e) {
logger.error("Invalid configuration " + key + ". Value is " + conf.get(key)
+ ". Resetting it to default value: " + defaultValue);
return defaultValue;
}
}
}
42 changes: 42 additions & 0 deletions test/com/linkedin/drelephant/util/UtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -120,4 +121,45 @@ public void testParseCsKeyValue() {
assertEquals("bar2", properties2.get("foo2"));
assertEquals("bar3", properties2.get("foo3"));
}

@Test
public void testGetNonNegativeInt() {
Configuration conf = new Configuration();
conf.set("foo1", "100");
conf.set("foo2", "-100");
conf.set("foo3", "0");
conf.set("foo4", "0.5");
conf.set("foo5", "9999999999999999");
conf.set("foo6", "bar");

int defaultValue = 50;
assertEquals(100, Utils.getNonNegativeInt(conf, "foo1", defaultValue));
assertEquals(0, Utils.getNonNegativeInt(conf, "foo2", defaultValue));
assertEquals(0, Utils.getNonNegativeInt(conf, "foo3", defaultValue));
assertEquals(defaultValue, Utils.getNonNegativeInt(conf, "foo4", defaultValue));
assertEquals(defaultValue, Utils.getNonNegativeInt(conf, "foo5", defaultValue));
assertEquals(defaultValue, Utils.getNonNegativeInt(conf, "foo6", defaultValue));
assertEquals(defaultValue, Utils.getNonNegativeInt(conf, "foo7", defaultValue));
}

@Test
public void testGetNonNegativeLong() {
Configuration conf = new Configuration();

conf.set("foo1", "100");
conf.set("foo2", "-100");
conf.set("foo3", "0");
conf.set("foo4", "0.5");
conf.set("foo5", "9999999999999999");
conf.set("foo6", "bar");

long defaultValue = 50;
assertEquals(100, Utils.getNonNegativeLong(conf, "foo1", defaultValue));
assertEquals(0, Utils.getNonNegativeLong(conf, "foo2", defaultValue));
assertEquals(0, Utils.getNonNegativeLong(conf, "foo3", defaultValue));
assertEquals(defaultValue, Utils.getNonNegativeLong(conf, "foo4", defaultValue));
assertEquals(9999999999999999L, Utils.getNonNegativeLong(conf, "foo5", defaultValue));
assertEquals(defaultValue, Utils.getNonNegativeLong(conf, "foo6", defaultValue));
assertEquals(defaultValue, Utils.getNonNegativeLong(conf, "foo7", defaultValue));
}
}

0 comments on commit a616a69

Please sign in to comment.