Skip to content

Commit

Permalink
LIHADOOP-19609: Make Spark Event Log Directory configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
akshayrai committed May 4, 2016
1 parent 8647478 commit 1e64cba
Show file tree
Hide file tree
Showing 8 changed files with 325 additions and 9 deletions.
1 change: 1 addition & 0 deletions app-conf/FetcherConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
<!--
<params>
<event_log_size_limit_in_mb>100</event_log_size_limit_in_mb>
<event_log_dir>/system/spark-history</event_log_dir>
</params>
-->
</fetcher>
Expand Down
34 changes: 26 additions & 8 deletions app/org/apache/spark/deploy/history/SparkFSFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import com.linkedin.drelephant.configurations.fetcher.FetcherConfigurationData
import com.linkedin.drelephant.security.HadoopSecurity
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.util.{MemoryFormatUtils, Utils}
import com.linkedin.drelephant.analysis.{ApplicationType, AnalyticJob, ElephantFetcher};
import com.linkedin.drelephant.analysis.{ApplicationType, AnalyticJob, ElephantFetcher}
import org.apache.commons.io.FileUtils

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem
Expand All @@ -47,13 +49,20 @@ class SparkFSFetcher(fetcherConfData: FetcherConfigurationData) extends Elephant

import SparkFSFetcher._

var confEventLogSizeInMb = defEventLogSizeInMb
if (fetcherConfData.getParamMap.get(LOG_SIZE_XML_FIELD) != null) {
val logLimitSize = Utils.getParam(fetcherConfData.getParamMap.get(LOG_SIZE_XML_FIELD), 1)
if (logLimitSize != null) {
EVENT_LOG_SIZE_LIMIT_MB = MemoryFormatUtils.stringToBytes(logLimitSize(0) + "M");
confEventLogSizeInMb = logLimitSize(0)
}
}
logger.info("The event log limit of Spark application is set to " + EVENT_LOG_SIZE_LIMIT_MB + " MB")
logger.info("The event log limit of Spark application is set to " + confEventLogSizeInMb + " MB")

var confEventLogDir = fetcherConfData.getParamMap.get(LOG_DIR_XML_FIELD)
if (confEventLogDir == null || confEventLogDir.isEmpty) {
confEventLogDir = defEventLogDir
}
logger.info("The event log directory of Spark application is set to " + confEventLogDir)

private val _sparkConf = new SparkConf()

Expand All @@ -65,7 +74,7 @@ class SparkFSFetcher(fetcherConfData: FetcherConfigurationData) extends Elephant
val nodeAddress = conf.get("dfs.namenode.http-address", null)
val hdfsAddress = if (nodeAddress == null) "" else "webhdfs://" + nodeAddress

val uri = new URI(_sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR))
val uri = new URI(_sparkConf.get("spark.eventLog.dir", confEventLogDir))
val logDir = hdfsAddress + uri.getPath
logger.info("Looking for spark logs at logDir: " + logDir)
logDir
Expand Down Expand Up @@ -154,7 +163,7 @@ class SparkFSFetcher(fetcherConfData: FetcherConfigurationData) extends Elephant
dataCollection.getConf().setProperty("spark.app.id", appId)

logger.info("The event log of Spark application: " + appId + " is over the limit size of "
+ EVENT_LOG_SIZE_LIMIT_MB + " MB, the parsing process gets throttled.")
+ defEventLogSizeInMb + " MB, the parsing process gets throttled.")
} else {
logger.info("Replaying Spark logs for application: " + appId)

Expand Down Expand Up @@ -221,18 +230,27 @@ class SparkFSFetcher(fetcherConfData: FetcherConfigurationData) extends Elephant
* @return If the event log parsing should be throttled
*/
private def shouldThrottle(eventLogPath: Path): Boolean = {
fs.getFileStatus(eventLogPath).getLen() > (EVENT_LOG_SIZE_LIMIT_MB * 1024 * 1024)
fs.getFileStatus(eventLogPath).getLen() > (confEventLogSizeInMb * FileUtils.ONE_MB)
}

def getEventLogSize(): Double = {
confEventLogSizeInMb
}

def getEventLogDir(): String = {
confEventLogDir
}

}

private object SparkFSFetcher {
private val logger = Logger.getLogger(SparkFSFetcher.getClass)

val DEFAULT_LOG_DIR = "/system/spark-history"
var defEventLogDir = "/system/spark-history"
var defEventLogSizeInMb = 100d; // 100MB

val LOG_SIZE_XML_FIELD = "event_log_size_limit_in_mb"
var EVENT_LOG_SIZE_LIMIT_MB = 100d // 100MB
val LOG_DIR_XML_FIELD = "event_log_dir"

// Constants used to parse <= Spark 1.2.0 log directories.
val LOG_PREFIX = "EVENT_LOG_"
Expand Down
1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ object Dependencies {
"org.apache.hadoop" % "hadoop-common" % hadoopVersion % "compileonly",
"org.apache.hadoop" % "hadoop-common" % hadoopVersion % Test,
"org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion % "compileonly",
"org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion % Test,
"org.codehaus.jackson" % "jackson-mapper-asl" % jacksonMapperAslVersion,
"org.jsoup" % "jsoup" % jsoupVersion,
"org.mockito" % "mockito-core" % "1.10.19"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;


import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -36,6 +38,14 @@ public class FetcherConfigurationTest {
private static Document document2 = null;
private static Document document3 = null;
private static Document document4 = null;
private static Document document5 = null;

private static final String spark = "SPARK";
private static final String logDirField = "event_log_dir";
private static final String logDirValue = "/custom/configured";
private static final String logSizeField = "event_log_size_limit_in_mb";
private static final String logSizeValue = "50";


@BeforeClass
public static void runBeforeClass() {
Expand All @@ -54,6 +64,9 @@ public static void runBeforeClass() {
document4 = builder.parse(
FetcherConfigurationTest.class.getClassLoader().getResourceAsStream(
"configurations/fetcher/FetcherConfTest4.xml"));
document5 = builder.parse(
FetcherConfigurationTest.class.getClassLoader().getResourceAsStream(
"configurations/fetcher/FetcherConfTest5.xml"));
} catch (ParserConfigurationException e) {
throw new RuntimeException("XML Parser could not be created.", e);
} catch (SAXException e) {
Expand Down Expand Up @@ -102,8 +115,22 @@ public void testParseFetcherConf3() {
public void testParseFetcherConf4() {
expectedEx.expect(RuntimeException.class);
expectedEx.expectMessage("No tag or invalid tag 'applicationtype' in fetcher 1"
+ " classname com.linkedin.drelephant.mapreduce.MapReduceFetcherHadoop2");
+ " classname com.linkedin.drelephant.mapreduce.MapReduceFetcherHadoop2");
FetcherConfiguration fetcherConf = new FetcherConfiguration(document4.getDocumentElement());
}

/**
* Test Spark fetcher params, Event log size and log directory
*/
@Test
public void testParseFetcherConf5() {
FetcherConfiguration fetcherConf = new FetcherConfiguration(document5.getDocumentElement());
assertEquals(fetcherConf.getFetchersConfigurationData().size(), 1);
assertEquals(fetcherConf.getFetchersConfigurationData().get(0).getAppType().getName(), spark);
assertEquals(fetcherConf.getFetchersConfigurationData().get(0).getParamMap().size(), 3);
assertEquals(fetcherConf.getFetchersConfigurationData().get(0).getParamMap().get(logSizeField), logSizeValue);
assertEquals(fetcherConf.getFetchersConfigurationData().get(0).getParamMap().get(logDirField), logDirValue);
}

}

190 changes: 190 additions & 0 deletions test/org/apache/spark/deploy/history/SparkFsFetcherTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.spark.deploy.history;

import com.linkedin.drelephant.analysis.ElephantFetcher;
import com.linkedin.drelephant.configurations.fetcher.FetcherConfiguration;
import com.linkedin.drelephant.configurations.fetcher.FetcherConfigurationData;
import org.junit.BeforeClass;
import org.junit.Test;
import org.w3c.dom.Document;
import org.xml.sax.SAXException;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;

import static org.junit.Assert.assertEquals;

public class SparkFsFetcherTest {

private static Document document1 = null;
private static Document document2 = null;
private static Document document3 = null;

private static final String spark = "SPARK";
private static final String defEventLogDir = "/system/spark-history";
private static final String confEventLogDir = "/custom/configured";
private static final double defEventLogSize = 100;
private static final double confEventLogSize = 50;

@BeforeClass
public static void runBeforeClass() {
try {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = factory.newDocumentBuilder();
document1 = builder.parse(
SparkFsFetcherTest.class.getClassLoader().getResourceAsStream(
"configurations/fetcher/FetcherConfTest5.xml"));
document2 = builder.parse(
SparkFsFetcherTest.class.getClassLoader().getResourceAsStream(
"configurations/fetcher/FetcherConfTest6.xml"));
document3 = builder.parse(
SparkFsFetcherTest.class.getClassLoader().getResourceAsStream(
"configurations/fetcher/FetcherConfTest7.xml"));
} catch (ParserConfigurationException e) {
throw new RuntimeException("XML Parser could not be created.", e);
} catch (SAXException e) {
throw new RuntimeException("Test files are not properly formed", e);
} catch (IOException e) {
throw new RuntimeException("Unable to read test files ", e);
}
}

/**
* Test for verifying the configured event log directory and log size
*
* <params>
* <event_log_size_limit_in_mb>50</event_log_size_limit_in_mb>
* <event_log_dir>/custom/configured</event_log_dir>
* </params>
*/
@Test
public void testSparkFetcherConfig() {
FetcherConfiguration fetcherConf = new FetcherConfiguration(document1.getDocumentElement());
assertEquals(fetcherConf.getFetchersConfigurationData().size(), 1);
assertEquals(fetcherConf.getFetchersConfigurationData().get(0).getAppType().getName(), spark);

Class<?> fetcherClass = null;
FetcherConfigurationData data = fetcherConf.getFetchersConfigurationData().get(0);
try {
fetcherClass = SparkFsFetcherTest.class.getClassLoader().loadClass(data.getClassName());
Object sparkFetcherInstance = fetcherClass.getConstructor(FetcherConfigurationData.class).newInstance(data);
if (!(sparkFetcherInstance instanceof ElephantFetcher)) {
throw new IllegalArgumentException(
"Class " + fetcherClass.getName() + " is not an implementation of " + ElephantFetcher.class.getName());
}

// Check if the configurations are picked up correctly
assertEquals(confEventLogSize, ((SparkFSFetcher) sparkFetcherInstance).getEventLogSize(), 0);
assertEquals(confEventLogDir, ((SparkFSFetcher) sparkFetcherInstance).getEventLogDir());

} catch (InstantiationException e) {
throw new RuntimeException("Could not instantiate class " + data.getClassName(), e);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Could not find class " + data.getClassName(), e);
} catch (IllegalAccessException e) {
throw new RuntimeException("Could not access constructor for class" + data.getClassName(), e);
} catch (InvocationTargetException e) {
throw new RuntimeException("Could not invoke class " + data.getClassName(), e);
} catch (NoSuchMethodException e) {
throw new RuntimeException("Could not find constructor for class " + data.getClassName(), e);
}
}

/**
* Test for verifying unspecified log directory and log size configs
*
* <params>
* </params>
*/
@Test
public void testSparkFetcherUnspecifiedConfig() {
FetcherConfiguration fetcherConf = new FetcherConfiguration(document3.getDocumentElement());
assertEquals(fetcherConf.getFetchersConfigurationData().size(), 1);
assertEquals(fetcherConf.getFetchersConfigurationData().get(0).getAppType().getName(), spark);

Class<?> fetcherClass = null;
FetcherConfigurationData data = fetcherConf.getFetchersConfigurationData().get(0);
try {
fetcherClass = SparkFsFetcherTest.class.getClassLoader().loadClass(data.getClassName());
Object sparkFetcherInstance = fetcherClass.getConstructor(FetcherConfigurationData.class).newInstance(data);
if (!(sparkFetcherInstance instanceof ElephantFetcher)) {
throw new IllegalArgumentException(
"Class " + fetcherClass.getName() + " is not an implementation of " + ElephantFetcher.class.getName());
}

// Check if the default values are used
assertEquals(defEventLogSize, ((SparkFSFetcher) sparkFetcherInstance).getEventLogSize(), 0);
assertEquals(defEventLogDir, ((SparkFSFetcher) sparkFetcherInstance).getEventLogDir());

} catch (InstantiationException e) {
throw new RuntimeException("Could not instantiate class " + data.getClassName(), e);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Could not find class " + data.getClassName(), e);
} catch (IllegalAccessException e) {
throw new RuntimeException("Could not access constructor for class" + data.getClassName(), e);
} catch (InvocationTargetException e) {
throw new RuntimeException("Could not invoke class " + data.getClassName(), e);
} catch (NoSuchMethodException e) {
throw new RuntimeException("Could not find constructor for class " + data.getClassName(), e);
}
}

/**
* Test for verifying empty log directory and log size configs
*
* <params>
* <event_log_size_limit_in_mb></event_log_size_limit_in_mb>
* <event_log_dir>/system/spark-history</event_log_dir>
* </params>
*/
@Test
public void testSparkFetcherEmptyConfig() {
FetcherConfiguration fetcherConf = new FetcherConfiguration(document2.getDocumentElement());
assertEquals(fetcherConf.getFetchersConfigurationData().size(), 1);
assertEquals(fetcherConf.getFetchersConfigurationData().get(0).getAppType().getName(), spark);

Class<?> fetcherClass = null;
FetcherConfigurationData data = fetcherConf.getFetchersConfigurationData().get(0);
try {
fetcherClass = SparkFsFetcherTest.class.getClassLoader().loadClass(data.getClassName());
Object sparkFetcherInstance = fetcherClass.getConstructor(FetcherConfigurationData.class).newInstance(data);
if (!(sparkFetcherInstance instanceof ElephantFetcher)) {
throw new IllegalArgumentException(
"Class " + fetcherClass.getName() + " is not an implementation of " + ElephantFetcher.class.getName());
}

// Check if the default values are used
assertEquals(defEventLogSize, ((SparkFSFetcher) sparkFetcherInstance).getEventLogSize(), 0);
assertEquals(defEventLogDir, ((SparkFSFetcher) sparkFetcherInstance).getEventLogDir());

} catch (InstantiationException e) {
throw new RuntimeException("Could not instantiate class " + data.getClassName(), e);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Could not find class " + data.getClassName(), e);
} catch (IllegalAccessException e) {
throw new RuntimeException("Could not access constructor for class" + data.getClassName(), e);
} catch (InvocationTargetException e) {
throw new RuntimeException("Could not invoke class " + data.getClassName(), e);
} catch (NoSuchMethodException e) {
throw new RuntimeException("Could not find constructor for class " + data.getClassName(), e);
}
}
}
27 changes: 27 additions & 0 deletions test/resources/configurations/fetcher/FetcherConfTest5.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2016 LinkedIn Corp.
Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under
the License.
-->

<fetchers>
<fetcher>
<applicationtype>spark</applicationtype>
<classname>org.apache.spark.deploy.history.SparkFSFetcher</classname>
<params>
<event_log_size_limit_in_mb>50</event_log_size_limit_in_mb>
<event_log_dir>/custom/configured</event_log_dir>
</params>
</fetcher>
</fetchers>
Loading

0 comments on commit 1e64cba

Please sign in to comment.