diff --git a/app/com/linkedin/drelephant/ElephantRunner.java b/app/com/linkedin/drelephant/ElephantRunner.java index 537048745..e85d73015 100644 --- a/app/com/linkedin/drelephant/ElephantRunner.java +++ b/app/com/linkedin/drelephant/ElephantRunner.java @@ -192,11 +192,11 @@ public void run() { logger.error(ExceptionUtils.getStackTrace(e)); if (_analyticJob != null && _analyticJob.retry()) { - logger.error("Add analytic job id [" + _analyticJob.getAppId() + "] into the retry list."); + logger.warn("Add analytic job id [" + _analyticJob.getAppId() + "] into the retry list."); _analyticJobGenerator.addIntoRetries(_analyticJob); } else if (_analyticJob != null && _analyticJob.isSecondPhaseRetry()) { //Putting the job into a second retry queue which fetches jobs after some interval. Some spark jobs may need more time than usual to process, hence the queue. - logger.error("Add analytic job id [" + _analyticJob.getAppId() + "] into the second retry list."); + logger.warn("Add analytic job id [" + _analyticJob.getAppId() + "] into the second retry list."); _analyticJobGenerator.addIntoSecondRetryQueue(_analyticJob); } else { if (_analyticJob != null) { diff --git a/app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala b/app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala index 7be4f33fb..a9ef1d815 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala @@ -16,11 +16,9 @@ package com.linkedin.drelephant.spark.fetchers -import scala.async.Async -import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.{Await, ExecutionContext, Future, blocking} import scala.concurrent.duration.{Duration, SECONDS} import scala.util.{Try, Success, Failure} -import scala.util.control.NonFatal import com.linkedin.drelephant.analysis.{AnalyticJob, ElephantFetcher} import com.linkedin.drelephant.configurations.fetcher.FetcherConfigurationData @@ -35,9 +33,9 @@ import org.apache.spark.SparkConf * A fetcher that gets Spark-related data from a combination of the Spark monitoring REST API and Spark event logs. */ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData) - extends ElephantFetcher[SparkApplicationData] { + extends ElephantFetcher[SparkApplicationData] { + import SparkFetcher._ - import Async.{async, await} import ExecutionContext.Implicits.global private val logger: Logger = Logger.getLogger(classOf[SparkFetcher]) @@ -94,7 +92,7 @@ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData) Success(data) }, e => { - logger.error(s"Failed fetching data for ${appId}", e) + logger.warn(s"Failed fetching data for ${appId}." + " I will retry after some time! " + "Exception Message is: " + e.getMessage) Failure(e) } ) @@ -102,7 +100,7 @@ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData) private def doFetchSparkApplicationData(analyticJob: AnalyticJob): Future[SparkApplicationData] = { if (shouldProcessLogsLocally) { - async { + Future { sparkRestClient.fetchEventLogAndParse(analyticJob.getAppId) } } else { @@ -110,19 +108,23 @@ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData) } } - private def doFetchDataUsingRestAndLogClients(analyticJob: AnalyticJob): Future[SparkApplicationData] = async { - val appId = analyticJob.getAppId - val restDerivedData = await(sparkRestClient.fetchData(appId, eventLogSource == EventLogSource.Rest)) - - val logDerivedData = eventLogSource match { - case EventLogSource.None => None - case EventLogSource.Rest => restDerivedData.logDerivedData - case EventLogSource.WebHdfs => - val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy { _.startTime }.attemptId - Some(await(sparkLogClient.fetchData(appId, lastAttemptId))) - } + private def doFetchDataUsingRestAndLogClients(analyticJob: AnalyticJob): Future[SparkApplicationData] = Future { + blocking { + val appId = analyticJob.getAppId + val restDerivedData = Await.result(sparkRestClient.fetchData(appId, eventLogSource == EventLogSource.Rest), DEFAULT_TIMEOUT) + + val logDerivedData = eventLogSource match { + case EventLogSource.None => None + case EventLogSource.Rest => restDerivedData.logDerivedData + case EventLogSource.WebHdfs => + val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy { + _.startTime + }.attemptId + Some(Await.result(sparkLogClient.fetchData(appId, lastAttemptId), DEFAULT_TIMEOUT)) + } - SparkApplicationData(appId, restDerivedData, logDerivedData) + SparkApplicationData(appId, restDerivedData, logDerivedData) + } } } diff --git a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala index ce81ceb05..7a39454cd 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala @@ -16,7 +16,7 @@ package com.linkedin.drelephant.spark.fetchers -import java.io.{InputStream, BufferedInputStream} +import java.io.{BufferedInputStream, InputStream} import java.net.URI import java.text.SimpleDateFormat import java.util.zip.ZipInputStream @@ -25,8 +25,7 @@ import java.util.{Calendar, SimpleTimeZone} import com.linkedin.drelephant.spark.legacydata.LegacyDataConverters import org.apache.spark.deploy.history.SparkDataCollection -import scala.async.Async -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{Await, ExecutionContext, Future, blocking} import scala.util.control.NonFatal import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import com.fasterxml.jackson.module.scala.DefaultScalaModule @@ -41,6 +40,8 @@ import javax.ws.rs.core.MediaType import org.apache.log4j.Logger import org.apache.spark.SparkConf +import scala.concurrent.duration.{Duration, SECONDS} + /** * A client for getting data from the Spark monitoring REST API, e.g. . * @@ -48,8 +49,8 @@ import org.apache.spark.SparkConf * or synchronous when needed. */ class SparkRestClient(sparkConf: SparkConf) { + import SparkRestClient._ - import Async.{async, await} private val logger: Logger = Logger.getLogger(classOf[SparkRestClient]) @@ -58,8 +59,8 @@ class SparkRestClient(sparkConf: SparkConf) { private val historyServerUri: URI = sparkConf.getOption(HISTORY_SERVER_ADDRESS_KEY) match { case Some(historyServerAddress) => val baseUri: URI = - // Latest versions of CDH include http in their history server address configuration. - // However, it is not recommended by Spark documentation(http://spark.apache.org/docs/latest/running-on-yarn.html) + // Latest versions of CDH include http in their history server address configuration. + // However, it is not recommended by Spark documentation(http://spark.apache.org/docs/latest/running-on-yarn.html) if (historyServerAddress.contains(s"http://")) { new URI(historyServerAddress) } else { @@ -79,21 +80,39 @@ class SparkRestClient(sparkConf: SparkConf) { val (applicationInfo, attemptTarget) = getApplicationMetaData(appId) // Limit the scope of async. - async { - val futureJobDatas = async { getJobDatas(attemptTarget) } - val futureStageDatas = async { getStageDatas(attemptTarget) } - val futureExecutorSummaries = async { getExecutorSummaries(attemptTarget) } - val futureLogData = if (fetchLogs) { - async { getLogData(attemptTarget)} - } else Future.successful(None) - - SparkRestDerivedData( - applicationInfo, - await(futureJobDatas), - await(futureStageDatas), - await(futureExecutorSummaries), - await(futureLogData) - ) + Future { + blocking { + val futureJobDatas = Future { + blocking { + getJobDatas(attemptTarget) + } + } + val futureStageDatas = Future { + blocking { + getStageDatas(attemptTarget) + } + } + val futureExecutorSummaries = Future { + blocking { + getExecutorSummaries(attemptTarget) + } + } + val futureLogData = if (fetchLogs) { + Future { + blocking { + getLogData(attemptTarget) + } + } + } else Future.successful(None) + + SparkRestDerivedData( + applicationInfo, + Await.result(futureJobDatas, DEFAULT_TIMEOUT), + Await.result(futureStageDatas, DEFAULT_TIMEOUT), + Await.result(futureExecutorSummaries, Duration(5, SECONDS)), + Await.result(futureLogData, Duration(5, SECONDS)) + ) + } } } @@ -101,7 +120,9 @@ class SparkRestClient(sparkConf: SparkConf) { val (_, attemptTarget) = getApplicationMetaData(appId) val logTarget = attemptTarget.path("logs") logger.info(s"creating SparkApplication by calling REST API at ${logTarget.getUri} to get eventlogs") - resource.managed { getApplicationLogs(logTarget) }.acquireAndGet { zipInputStream => + resource.managed { + getApplicationLogs(logTarget) + }.acquireAndGet { zipInputStream => getLogInputStream(zipInputStream, logTarget) match { case (None, _) => throw new RuntimeException(s"Failed to read log for application ${appId}") case (Some(inputStream), fileName) => { @@ -121,7 +142,9 @@ class SparkRestClient(sparkConf: SparkConf) { // These are pure and cannot fail, therefore it is safe to have // them outside of the async block. - val lastAttemptId = applicationInfo.attempts.maxBy {_.startTime}.attemptId + val lastAttemptId = applicationInfo.attempts.maxBy { + _.startTime + }.attemptId val attemptTarget = lastAttemptId.map(appTarget.path).getOrElse(appTarget) (applicationInfo, attemptTarget) } @@ -140,7 +163,9 @@ class SparkRestClient(sparkConf: SparkConf) { private def getLogData(attemptTarget: WebTarget): Option[SparkLogDerivedData] = { val target = attemptTarget.path("logs") logger.info(s"calling REST API at ${target.getUri} to get eventlogs") - resource.managed { getApplicationLogs(target) }.acquireAndGet { zis => + resource.managed { + getApplicationLogs(target) + }.acquireAndGet { zis => val (inputStream, _) = getLogInputStream(zis, target) inputStream.map(SparkLogClient.findDerivedData(_)) } @@ -174,7 +199,9 @@ class SparkRestClient(sparkConf: SparkConf) { throw new RuntimeException(s"Application for the log ${entryName} has not finished yet.") } val codec = SparkUtils.compressionCodecForLogName(sparkConf, entryName) - (Some(codec.map { _.compressedInputStream(zis)}.getOrElse(zis)), entryName) + (Some(codec.map { + _.compressedInputStream(zis) + }.getOrElse(zis)), entryName) } } @@ -219,6 +246,7 @@ object SparkRestClient { val HISTORY_SERVER_ADDRESS_KEY = "spark.yarn.historyServer.address" val API_V1_MOUNT_PATH = "api/v1" val IN_PROGRESS = ".inprogress" + val DEFAULT_TIMEOUT = Duration(5, SECONDS); val SparkRestObjectMapper = { val dateFormat = {