Skip to content

Commit

Permalink
Removing mentions in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pwendell committed Jan 13, 2014
1 parent 0d4886c commit 0bb3307
Show file tree
Hide file tree
Showing 12 changed files with 2 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
}
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
System.clearProperty("spark.hostPort")
}

test("halting by voting") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ private[spark] object CoarseGrainedExecutorBackend {
indestructible = true, conf = new SparkConf)
// set it
val sparkHostPort = hostname + ":" + boundPort
// conf.set("spark.hostPort", sparkHostPort)
actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
name = "Executor")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
override def start() {
val properties = new ArrayBuffer[(String, String)]
for ((key, value) <- scheduler.sc.conf.getAll) {
if (key.startsWith("spark.") && !key.equals("spark.hostPort")) {
if (key.startsWith("spark.")) {
properties += ((key, value))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ object LocalSparkContext {
}
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
System.clearProperty("spark.hostPort")
}

/** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val hostname = "localhost"
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf)
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
System.setProperty("spark.hostPort", hostname + ":" + boundPort)

val masterTracker = new MapOutputTrackerMaster(conf)
masterTracker.trackerActor = actorSystem.actorOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, conf = conf)
this.actorSystem = actorSystem
conf.set("spark.driver.port", boundPort.toString)
conf.set("spark.hostPort", "localhost:" + boundPort)

master = new BlockManagerMaster(
actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf)
Expand All @@ -65,13 +64,10 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
conf.set("spark.storage.disableBlockManagerHeartBeat", "true")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
// Set some value ...
conf.set("spark.hostPort", Utils.localHostName() + ":" + 1111)
}

after {
System.clearProperty("spark.driver.port")
System.clearProperty("spark.hostPort")

if (store != null) {
store.stop()
Expand Down
2 changes: 0 additions & 2 deletions repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class ReplSuite extends FunSuite {
}
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
System.clearProperty("spark.hostPort")
return out.toString
}

Expand Down Expand Up @@ -75,7 +74,6 @@ class ReplSuite extends FunSuite {

interp.sparkContext.stop()
System.clearProperty("spark.driver.port")
System.clearProperty("spark.hostPort")
}

test("simple foreach with accumulator") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)

// These should be unset when a checkpoint is deserialized,
// otherwise the SparkContext won't initialize correctly.
sparkConf.remove("spark.hostPort").remove("spark.driver.host").remove("spark.driver.port")
sparkConf.remove("spark.driver.host").remove("spark.driver.port")

def validate() {
assert(master != null, "Checkpoint.master is null")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ object MasterFailureTest extends Logging {

// Setup the streaming computation with the given operation
System.clearProperty("spark.driver.port")
System.clearProperty("spark.hostPort")
val ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map())
ssc.checkpoint(checkpointDir.toString)
val inputStream = ssc.textFileStream(testDir.toString)
Expand Down Expand Up @@ -233,7 +232,6 @@ object MasterFailureTest extends Logging {
// (iii) Its not timed out yet
System.clearProperty("spark.streaming.clock")
System.clearProperty("spark.driver.port")
System.clearProperty("spark.hostPort")
ssc.start()
val startTime = System.currentTimeMillis()
while (!killed && !isLastOutputGenerated && !isTimedOut) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public abstract class LocalJavaStreamingContext {
@Before
public void setUp() {
System.clearProperty("spark.driver.port");
System.clearProperty("spark.hostPort");
System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
ssc.checkpoint("checkpoint");
Expand All @@ -41,6 +40,5 @@ public void tearDown() {

// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port");
System.clearProperty("spark.hostPort");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ class CheckpointSuite extends TestSuiteBase {
)
ssc = new StreamingContext(checkpointDir)
System.clearProperty("spark.driver.port")
System.clearProperty("spark.hostPort")
ssc.start()
val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches)
// the first element will be re-processed data of the last batch before restart
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
def afterFunction() {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
System.clearProperty("spark.hostPort")
}

before(beforeFunction)
Expand Down

0 comments on commit 0bb3307

Please sign in to comment.