Skip to content

Commit

Permalink
Changed async to future/blocking and changed the error to warn (linke…
Browse files Browse the repository at this point in the history
…din#353)

Changed the usage of async to Future/blocking. Reason being due to the async keyword, it wasn't able to allocate threads to the process and hence, was timing out very frequently.
The number of concurrently blocking computations can exceed the parallelism level only if each blocking call is wrapped inside a blocking call. Otherwise, there is a risk that the thread pool in the global execution context is starved, and no computation can proceed.
Reference: https://docs.scala-lang.org/overviews/core/futures.html#the-global-execution-context
  • Loading branch information
skakker authored and akshayrai committed Mar 19, 2018
1 parent c89bafe commit 977623d
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 46 deletions.
4 changes: 2 additions & 2 deletions app/com/linkedin/drelephant/ElephantRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,11 @@ public void run() {
logger.error(ExceptionUtils.getStackTrace(e));

if (_analyticJob != null && _analyticJob.retry()) {
logger.error("Add analytic job id [" + _analyticJob.getAppId() + "] into the retry list.");
logger.warn("Add analytic job id [" + _analyticJob.getAppId() + "] into the retry list.");
_analyticJobGenerator.addIntoRetries(_analyticJob);
} else if (_analyticJob != null && _analyticJob.isSecondPhaseRetry()) {
//Putting the job into a second retry queue which fetches jobs after some interval. Some spark jobs may need more time than usual to process, hence the queue.
logger.error("Add analytic job id [" + _analyticJob.getAppId() + "] into the second retry list.");
logger.warn("Add analytic job id [" + _analyticJob.getAppId() + "] into the second retry list.");
_analyticJobGenerator.addIntoSecondRetryQueue(_analyticJob);
} else {
if (_analyticJob != null) {
Expand Down
40 changes: 21 additions & 19 deletions app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@

package com.linkedin.drelephant.spark.fetchers

import scala.async.Async
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.{Await, ExecutionContext, Future, blocking}
import scala.concurrent.duration.{Duration, SECONDS}
import scala.util.{Try, Success, Failure}
import scala.util.control.NonFatal

import com.linkedin.drelephant.analysis.{AnalyticJob, ElephantFetcher}
import com.linkedin.drelephant.configurations.fetcher.FetcherConfigurationData
Expand All @@ -35,9 +33,9 @@ import org.apache.spark.SparkConf
* A fetcher that gets Spark-related data from a combination of the Spark monitoring REST API and Spark event logs.
*/
class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData)
extends ElephantFetcher[SparkApplicationData] {
extends ElephantFetcher[SparkApplicationData] {

import SparkFetcher._
import Async.{async, await}
import ExecutionContext.Implicits.global

private val logger: Logger = Logger.getLogger(classOf[SparkFetcher])
Expand Down Expand Up @@ -94,35 +92,39 @@ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData)
Success(data)
},
e => {
logger.error(s"Failed fetching data for ${appId}", e)
logger.warn(s"Failed fetching data for ${appId}." + " I will retry after some time! " + "Exception Message is: " + e.getMessage)
Failure(e)
}
)
}

private def doFetchSparkApplicationData(analyticJob: AnalyticJob): Future[SparkApplicationData] = {
if (shouldProcessLogsLocally) {
async {
Future {
sparkRestClient.fetchEventLogAndParse(analyticJob.getAppId)
}
} else {
doFetchDataUsingRestAndLogClients(analyticJob)
}
}

private def doFetchDataUsingRestAndLogClients(analyticJob: AnalyticJob): Future[SparkApplicationData] = async {
val appId = analyticJob.getAppId
val restDerivedData = await(sparkRestClient.fetchData(appId, eventLogSource == EventLogSource.Rest))

val logDerivedData = eventLogSource match {
case EventLogSource.None => None
case EventLogSource.Rest => restDerivedData.logDerivedData
case EventLogSource.WebHdfs =>
val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy { _.startTime }.attemptId
Some(await(sparkLogClient.fetchData(appId, lastAttemptId)))
}
private def doFetchDataUsingRestAndLogClients(analyticJob: AnalyticJob): Future[SparkApplicationData] = Future {
blocking {
val appId = analyticJob.getAppId
val restDerivedData = Await.result(sparkRestClient.fetchData(appId, eventLogSource == EventLogSource.Rest), DEFAULT_TIMEOUT)

val logDerivedData = eventLogSource match {
case EventLogSource.None => None
case EventLogSource.Rest => restDerivedData.logDerivedData
case EventLogSource.WebHdfs =>
val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy {
_.startTime
}.attemptId
Some(Await.result(sparkLogClient.fetchData(appId, lastAttemptId), DEFAULT_TIMEOUT))
}

SparkApplicationData(appId, restDerivedData, logDerivedData)
SparkApplicationData(appId, restDerivedData, logDerivedData)
}
}

}
Expand Down
78 changes: 53 additions & 25 deletions app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.linkedin.drelephant.spark.fetchers

import java.io.{InputStream, BufferedInputStream}
import java.io.{BufferedInputStream, InputStream}
import java.net.URI
import java.text.SimpleDateFormat
import java.util.zip.ZipInputStream
Expand All @@ -25,8 +25,7 @@ import java.util.{Calendar, SimpleTimeZone}
import com.linkedin.drelephant.spark.legacydata.LegacyDataConverters
import org.apache.spark.deploy.history.SparkDataCollection

import scala.async.Async
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{Await, ExecutionContext, Future, blocking}
import scala.util.control.NonFatal
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
Expand All @@ -41,15 +40,17 @@ import javax.ws.rs.core.MediaType
import org.apache.log4j.Logger
import org.apache.spark.SparkConf

import scala.concurrent.duration.{Duration, SECONDS}

/**
* A client for getting data from the Spark monitoring REST API, e.g. <https://spark.apache.org/docs/1.4.1/monitoring.html#rest-api>.
*
* Jersey classloading seems to be brittle (at least when testing in the console), so some of the implementation is non-lazy
* or synchronous when needed.
*/
class SparkRestClient(sparkConf: SparkConf) {

import SparkRestClient._
import Async.{async, await}

private val logger: Logger = Logger.getLogger(classOf[SparkRestClient])

Expand All @@ -58,8 +59,8 @@ class SparkRestClient(sparkConf: SparkConf) {
private val historyServerUri: URI = sparkConf.getOption(HISTORY_SERVER_ADDRESS_KEY) match {
case Some(historyServerAddress) =>
val baseUri: URI =
// Latest versions of CDH include http in their history server address configuration.
// However, it is not recommended by Spark documentation(http://spark.apache.org/docs/latest/running-on-yarn.html)
// Latest versions of CDH include http in their history server address configuration.
// However, it is not recommended by Spark documentation(http://spark.apache.org/docs/latest/running-on-yarn.html)
if (historyServerAddress.contains(s"http://")) {
new URI(historyServerAddress)
} else {
Expand All @@ -79,29 +80,49 @@ class SparkRestClient(sparkConf: SparkConf) {
val (applicationInfo, attemptTarget) = getApplicationMetaData(appId)

// Limit the scope of async.
async {
val futureJobDatas = async { getJobDatas(attemptTarget) }
val futureStageDatas = async { getStageDatas(attemptTarget) }
val futureExecutorSummaries = async { getExecutorSummaries(attemptTarget) }
val futureLogData = if (fetchLogs) {
async { getLogData(attemptTarget)}
} else Future.successful(None)

SparkRestDerivedData(
applicationInfo,
await(futureJobDatas),
await(futureStageDatas),
await(futureExecutorSummaries),
await(futureLogData)
)
Future {
blocking {
val futureJobDatas = Future {
blocking {
getJobDatas(attemptTarget)
}
}
val futureStageDatas = Future {
blocking {
getStageDatas(attemptTarget)
}
}
val futureExecutorSummaries = Future {
blocking {
getExecutorSummaries(attemptTarget)
}
}
val futureLogData = if (fetchLogs) {
Future {
blocking {
getLogData(attemptTarget)
}
}
} else Future.successful(None)

SparkRestDerivedData(
applicationInfo,
Await.result(futureJobDatas, DEFAULT_TIMEOUT),
Await.result(futureStageDatas, DEFAULT_TIMEOUT),
Await.result(futureExecutorSummaries, Duration(5, SECONDS)),
Await.result(futureLogData, Duration(5, SECONDS))
)
}
}
}

def fetchEventLogAndParse(appId: String): SparkApplicationData = {
val (_, attemptTarget) = getApplicationMetaData(appId)
val logTarget = attemptTarget.path("logs")
logger.info(s"creating SparkApplication by calling REST API at ${logTarget.getUri} to get eventlogs")
resource.managed { getApplicationLogs(logTarget) }.acquireAndGet { zipInputStream =>
resource.managed {
getApplicationLogs(logTarget)
}.acquireAndGet { zipInputStream =>
getLogInputStream(zipInputStream, logTarget) match {
case (None, _) => throw new RuntimeException(s"Failed to read log for application ${appId}")
case (Some(inputStream), fileName) => {
Expand All @@ -121,7 +142,9 @@ class SparkRestClient(sparkConf: SparkConf) {

// These are pure and cannot fail, therefore it is safe to have
// them outside of the async block.
val lastAttemptId = applicationInfo.attempts.maxBy {_.startTime}.attemptId
val lastAttemptId = applicationInfo.attempts.maxBy {
_.startTime
}.attemptId
val attemptTarget = lastAttemptId.map(appTarget.path).getOrElse(appTarget)
(applicationInfo, attemptTarget)
}
Expand All @@ -140,7 +163,9 @@ class SparkRestClient(sparkConf: SparkConf) {
private def getLogData(attemptTarget: WebTarget): Option[SparkLogDerivedData] = {
val target = attemptTarget.path("logs")
logger.info(s"calling REST API at ${target.getUri} to get eventlogs")
resource.managed { getApplicationLogs(target) }.acquireAndGet { zis =>
resource.managed {
getApplicationLogs(target)
}.acquireAndGet { zis =>
val (inputStream, _) = getLogInputStream(zis, target)
inputStream.map(SparkLogClient.findDerivedData(_))
}
Expand Down Expand Up @@ -174,7 +199,9 @@ class SparkRestClient(sparkConf: SparkConf) {
throw new RuntimeException(s"Application for the log ${entryName} has not finished yet.")
}
val codec = SparkUtils.compressionCodecForLogName(sparkConf, entryName)
(Some(codec.map { _.compressedInputStream(zis)}.getOrElse(zis)), entryName)
(Some(codec.map {
_.compressedInputStream(zis)
}.getOrElse(zis)), entryName)
}
}

Expand Down Expand Up @@ -219,6 +246,7 @@ object SparkRestClient {
val HISTORY_SERVER_ADDRESS_KEY = "spark.yarn.historyServer.address"
val API_V1_MOUNT_PATH = "api/v1"
val IN_PROGRESS = ".inprogress"
val DEFAULT_TIMEOUT = Duration(5, SECONDS);

val SparkRestObjectMapper = {
val dateFormat = {
Expand Down

0 comments on commit 977623d

Please sign in to comment.