Skip to content

Commit

Permalink
[SPARK-11872] Prevent the call to SparkContext#stop() in the listener…
Browse files Browse the repository at this point in the history
… bus's thread

This is continuation of SPARK-11761

Andrew suggested adding this protection. See tail of apache#9741

Author: tedyu <[email protected]>

Closes apache#9852 from tedyu/master.
  • Loading branch information
tedyu authored and zsxwing committed Nov 24, 2015
1 parent 19530da commit 8101254
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 0 deletions.
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1694,6 +1694,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

// Shut down the SparkContext.
def stop() {
if (AsynchronousListenerBus.withinListenerThread.value) {
throw new SparkException("Cannot stop SparkContext within listener thread of" +
" AsynchronousListenerBus")
}
// Use the stopping variable to ensure no contention for the stop scenario.
// Still track the stopped variable for use elsewhere in the code.
if (!stopped.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._

import org.scalatest.Matchers

import org.apache.spark.SparkException
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.ResetSystemProperties
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
Expand All @@ -36,6 +37,21 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match

val jobCompletionTime = 1421191296660L

test("don't call sc.stop in listener") {
sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SparkContextStoppingListener(sc)
val bus = new LiveListenerBus
bus.addListener(listener)

// Starting listener bus should flush all buffered events
bus.start(sc)
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)

bus.stop()
assert(listener.sparkExSeen)
}

test("basic creation and shutdown of LiveListenerBus") {
val counter = new BasicJobCounter
val bus = new LiveListenerBus
Expand Down Expand Up @@ -443,6 +459,21 @@ private class BasicJobCounter extends SparkListener {
override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
}

/**
* A simple listener that tries to stop SparkContext.
*/
private class SparkContextStoppingListener(val sc: SparkContext) extends SparkListener {
@volatile var sparkExSeen = false
override def onJobEnd(job: SparkListenerJobEnd): Unit = {
try {
sc.stop()
} catch {
case se: SparkException =>
sparkExSeen = true
}
}
}

private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListener {
var count = 0
override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
Expand Down

0 comments on commit 8101254

Please sign in to comment.