Skip to content

Commit

Permalink
changed async for LogClient (linkedin#354)
Browse files Browse the repository at this point in the history
  • Loading branch information
skakker authored and akshayrai committed Mar 19, 2018
1 parent 977623d commit fe7bfea
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package com.linkedin.drelephant.spark.fetchers
import java.io.InputStream
import java.security.PrivilegedAction

import scala.async.Async
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, blocking}
import scala.io.Source

import com.linkedin.drelephant.security.HadoopSecurity
Expand All @@ -39,7 +38,6 @@ import org.json4s.jackson.JsonMethods
*/
class SparkLogClient(hadoopConfiguration: Configuration, sparkConf: SparkConf, eventLogUri: Option[String]) {
import SparkLogClient._
import Async.async

private val logger: Logger = Logger.getLogger(classOf[SparkLogClient])

Expand All @@ -64,8 +62,9 @@ class SparkLogClient(hadoopConfiguration: Configuration, sparkConf: SparkConf, e
val (eventLogPath, eventLogCodec) =
sparkUtils.pathAndCodecforEventLog(sparkConf, eventLogFileSystem, baseEventLogPath, appId, attemptId)

async {
sparkUtils.withEventLog(eventLogFileSystem, eventLogPath, eventLogCodec)(findDerivedData(_))
Future { blocking {
sparkUtils.withEventLog(eventLogFileSystem, eventLogPath, eventLogCodec)(findDerivedData(_))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ class SparkRestClient(sparkConf: SparkConf) {
): Future[SparkRestDerivedData] = {
val (applicationInfo, attemptTarget) = getApplicationMetaData(appId)

// Limit the scope of async.
Future {
blocking {
val futureJobDatas = Future {
Expand Down Expand Up @@ -140,8 +139,6 @@ class SparkRestClient(sparkConf: SparkConf) {

val applicationInfo = getApplicationInfo(appTarget)

// These are pure and cannot fail, therefore it is safe to have
// them outside of the async block.
val lastAttemptId = applicationInfo.attempts.maxBy {
_.startTime
}.attemptId
Expand Down
3 changes: 1 addition & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,9 @@ object Dependencies {
"io.dropwizard.metrics" % "metrics-healthchecks" % "3.1.2",
"org.mockito" % "mockito-core" % "1.10.19" exclude ("org.hamcrest", "hamcrest-core"),
"org.jmockit" % "jmockit" % "1.23" % Test,
"org.scala-lang.modules" %% "scala-async" % "0.9.5",
"org.apache.httpcomponents" % "httpclient" % "4.5.2",
"org.apache.httpcomponents" % "httpcore" % "4.4.4",
"org.scalatest" %% "scalatest" % "3.0.0" % Test,
"org.scalatest" %% "scalatest" % "3.0.0" % Test,
"com.h2database" % "h2" % "1.4.196" % Test

) :+ sparkExclusion
Expand Down

0 comments on commit fe7bfea

Please sign in to comment.