Skip to content

Commit

Permalink
[FLINK-3863] Yarn Cluster shutdown may fail if leader changed recently
Browse files Browse the repository at this point in the history
  • Loading branch information
mxm committed Jun 17, 2016
1 parent f9b52a3 commit 9e98424
Showing 1 changed file with 12 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,29 +176,22 @@ class ApplicationClient(
}
}

case LocalStopYarnSession(status, diagnostics) =>
case msg @ LocalStopYarnSession(status, diagnostics) =>
log.info("Sending StopCluster request to JobManager.")

val clusterStatus =
status match {
case FinalApplicationStatus.SUCCEEDED => ApplicationStatus.SUCCEEDED
case FinalApplicationStatus.KILLED => ApplicationStatus.CANCELED
case FinalApplicationStatus.FAILED => ApplicationStatus.FAILED
case _ => ApplicationStatus.UNKNOWN
}

yarnJobManager foreach {
// forward to preserve the sender's address
_ forward decorateMessage(new StopCluster(clusterStatus, diagnostics))
// preserve the original sender so we can reply
val originalSender = sender()

yarnJobManager match {
case Some(jm) =>
jm.tell(decorateMessage(new StopCluster(status, diagnostics)), originalSender)
case None =>
context.system.scheduler.scheduleOnce(1 second) {
// try once more; we might have been connected in the meantime
self.tell(msg, originalSender)
}(context.dispatcher)
}

case msg: StopClusterSuccessful =>
log.info("Remote JobManager has been stopped successfully. " +
"Stopping local application client")

// poison ourselves
self ! decorateMessage(PoisonPill)

// handle the responses from the PollYarnClusterStatus messages to the yarn job mgr
case status: GetClusterStatusResponse =>
latestClusterStatus = Some(status)
Expand Down

0 comments on commit 9e98424

Please sign in to comment.