Skip to content

Commit

Permalink
[SPARK-9790][YARN] Expose in WebUI if NodeManager is the reason why e…
Browse files Browse the repository at this point in the history
…xecutors were killed.

Author: Mark Grover <[email protected]>

Closes apache#8093 from markgrover/nm2.
  • Loading branch information
markgrover authored and Marcelo Vanzin committed Nov 3, 2015
1 parent f54ff19 commit b2e4b31
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 17 deletions.
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,19 @@ case class TaskCommitDenied(
* the task crashed the JVM.
*/
@DeveloperApi
case class ExecutorLostFailure(execId: String, exitCausedByApp: Boolean = true)
extends TaskFailedReason {
case class ExecutorLostFailure(
execId: String,
exitCausedByApp: Boolean = true,
reason: Option[String]) extends TaskFailedReason {
override def toErrorString: String = {
val exitBehavior = if (exitCausedByApp) {
"caused by one of the running tasks"
} else {
"unrelated to the running tasks"
}
s"ExecutorLostFailure (executor ${execId} exited due to an issue ${exitBehavior})"
s"ExecutorLostFailure (executor ${execId} exited ${exitBehavior})" +
reason.map { r => s" Reason: $r" }.getOrElse("")
}

override def countTowardsTaskFailures: Boolean = exitCausedByApp
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
* The default `timeout` will be used in every trial of calling `sendWithReply`. Because this
* method retries, the message handling in the receiver side should be idempotent.
*
* Note: this is a blocking action which may cost a lot of time, so don't call it in an message
* Note: this is a blocking action which may cost a lot of time, so don't call it in a message
* loop of [[RpcEndpoint]].
*
* @param message the message to send
Expand All @@ -82,7 +82,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
* retries. `timeout` will be used in every trial of calling `sendWithReply`. Because this method
* retries, the message handling in the receiver side should be idempotent.
*
* Note: this is a blocking action which may cost a lot of time, so don't call it in an message
* Note: this is a blocking action which may cost a lot of time, so don't call it in a message
* loop of [[RpcEndpoint]].
*
* @param message the message to send
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -802,8 +802,8 @@ private[spark] class TaskSetManager(
case exited: ExecutorExited => exited.exitCausedByApp
case _ => true
}
handleFailedTask(
tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, exitCausedByApp))
handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, exitCausedByApp,
Some(reason.toString)))
}
// recalculate valid locality levels and waits when executor is lost
recomputeLocality()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}

}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down Expand Up @@ -195,7 +194,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
addressToExecutorId
.get(remoteAddress)
.foreach(removeExecutor(_, SlaveLost("remote Rpc client disassociated")))
.foreach(removeExecutor(_, SlaveLost("Remote RPC client disassociated. Likely due to " +
"containers exceeding thresholds, or network issues. Check driver logs for WARN " +
"messages.")))
}

// Make fake resource offers on just one executor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ private[spark] abstract class YarnSchedulerBackend(
addWebUIFilter(filterName, filterParams, proxyBase)

case RemoveExecutor(executorId, reason) =>
logWarning(reason.toString)
removeExecutor(executorId, reason)
}

Expand Down
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,10 @@ private[spark] object JsonProtocol {
("Job ID" -> taskCommitDenied.jobID) ~
("Partition ID" -> taskCommitDenied.partitionID) ~
("Attempt Number" -> taskCommitDenied.attemptNumber)
case ExecutorLostFailure(executorId, exitCausedByApp) =>
case ExecutorLostFailure(executorId, exitCausedByApp, reason) =>
("Executor ID" -> executorId) ~
("Exit Caused By App" -> exitCausedByApp)
("Exit Caused By App" -> exitCausedByApp) ~
("Loss Reason" -> reason.map(_.toString))
case _ => Utils.emptyJson
}
("Reason" -> reason) ~ json
Expand Down Expand Up @@ -812,7 +813,11 @@ private[spark] object JsonProtocol {
case `executorLostFailure` =>
val exitCausedByApp = Utils.jsonOption(json \ "Exit Caused By App").map(_.extract[Boolean])
val executorId = Utils.jsonOption(json \ "Executor ID").map(_.extract[String])
ExecutorLostFailure(executorId.getOrElse("Unknown"), exitCausedByApp.getOrElse(true))
val reason = Utils.jsonOption(json \ "Loss Reason").map(_.extract[String])
ExecutorLostFailure(
executorId.getOrElse("Unknown"),
exitCausedByApp.getOrElse(true),
reason)
case `unknownReason` => UnknownReason
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
ExceptionFailure("Exception", "description", null, null, None, None),
TaskResultLost,
TaskKilled,
ExecutorLostFailure("0"),
ExecutorLostFailure("0", true, Some("Induced failure")),
UnknownReason)
var failCount = 0
for (reason <- taskFailedReasons) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class JsonProtocolSuite extends SparkFunSuite {
testTaskEndReason(TaskResultLost)
testTaskEndReason(TaskKilled)
testTaskEndReason(TaskCommitDenied(2, 3, 4))
testTaskEndReason(ExecutorLostFailure("100", true))
testTaskEndReason(ExecutorLostFailure("100", true, Some("Induced failure")))
testTaskEndReason(UnknownReason)

// BlockId
Expand Down Expand Up @@ -296,10 +296,10 @@ class JsonProtocolSuite extends SparkFunSuite {

test("ExecutorLostFailure backward compatibility") {
// ExecutorLostFailure in Spark 1.1.0 does not have an "Executor ID" property.
val executorLostFailure = ExecutorLostFailure("100", true)
val executorLostFailure = ExecutorLostFailure("100", true, Some("Induced failure"))
val oldEvent = JsonProtocol.taskEndReasonToJson(executorLostFailure)
.removeField({ _._1 == "Executor ID" })
val expectedExecutorLostFailure = ExecutorLostFailure("Unknown", true)
val expectedExecutorLostFailure = ExecutorLostFailure("Unknown", true, Some("Induced failure"))
assert(expectedExecutorLostFailure === JsonProtocol.taskEndReasonFromJson(oldEvent))
}

Expand Down Expand Up @@ -603,10 +603,11 @@ class JsonProtocolSuite extends SparkFunSuite {
assert(jobId1 === jobId2)
assert(partitionId1 === partitionId2)
assert(attemptNumber1 === attemptNumber2)
case (ExecutorLostFailure(execId1, exit1CausedByApp),
ExecutorLostFailure(execId2, exit2CausedByApp)) =>
case (ExecutorLostFailure(execId1, exit1CausedByApp, reason1),
ExecutorLostFailure(execId2, exit2CausedByApp, reason2)) =>
assert(execId1 === execId2)
assert(exit1CausedByApp === exit2CausedByApp)
assert(reason1 === reason2)
case (UnknownReason, UnknownReason) =>
case _ => fail("Task end reasons don't match in types!")
}
Expand Down

0 comments on commit b2e4b31

Please sign in to comment.