Skip to content

Commit

Permalink
[SPARK-4011] tighten the visibility of the members in Master/Worker c…
Browse files Browse the repository at this point in the history
…lass

https://issues.apache.org/jira/browse/SPARK-4011

Currently, most of the members in Master/Worker are with public accessibility. We might wish to tighten the accessibility of them

a bit more discussion is here:

apache#2828

Author: CodingCat <[email protected]>

Closes apache#4844 from CodingCat/SPARK-4011 and squashes the following commits:

1a64175 [CodingCat] fix compilation issue
e7fd375 [CodingCat] Sean is right....
f5034a4 [CodingCat] fix rebase mistake
8d5b0c0 [CodingCat] loose more fields
0072f96 [CodingCat] lose some restrictions based on the possible design intention
de77286 [CodingCat] tighten accessibility of deploy package
12b4fd3 [CodingCat] tighten accessibility of deploy.worker
1243bc7 [CodingCat] tighten accessibility of deploy.rest
c5f622c [CodingCat] tighten the accessibility of deploy.history
d441e20 [CodingCat] tighten accessibility of deploy.client
4e0ce4a [CodingCat] tighten the accessibility of the members of classes in master
23cddbb [CodingCat] stylistic fix
9a3a340 [CodingCat] tighten the access of worker class
67a0559 [CodingCat] tighten the access permission in Master
  • Loading branch information
CodingCat authored and srowen committed Mar 17, 2015
1 parent b2d8c02 commit 25f3580
Show file tree
Hide file tree
Showing 49 changed files with 277 additions and 265 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.util.{IntParam, MemoryParam}
/**
* Command-line parser for the driver client.
*/
private[spark] class ClientArguments(args: Array[String]) {
private[deploy] class ClientArguments(args: Array[String]) {
import ClientArguments._

var cmd: String = "" // 'launch' or 'kill'
Expand Down Expand Up @@ -96,7 +96,7 @@ private[spark] class ClientArguments(args: Array[String]) {
/**
* Print usage and exit JVM with the given exit code.
*/
def printUsageAndExit(exitCode: Int) {
private def printUsageAndExit(exitCode: Int) {
// TODO: It wouldn't be too hard to allow users to submit their app and dependency jars
// separately similar to in the YARN client.
val usage =
Expand All @@ -116,10 +116,10 @@ private[spark] class ClientArguments(args: Array[String]) {
}
}

object ClientArguments {
private[spark] val DEFAULT_CORES = 1
private[spark] val DEFAULT_MEMORY = 512 // MB
private[spark] val DEFAULT_SUPERVISE = false
private[deploy] object ClientArguments {
val DEFAULT_CORES = 1
val DEFAULT_MEMORY = 512 // MB
val DEFAULT_SUPERVISE = false

def isValidJarUrl(s: String): Boolean = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

private[spark] class DriverDescription(
private[deploy] class DriverDescription(
val jarUrl: String,
val mem: Int,
val cores: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ package org.apache.spark.deploy
* This state is sufficient for the Master to reconstruct its internal data structures during
* failover.
*/
private[spark] class ExecutorDescription(
private[deploy] class ExecutorDescription(
val appId: String,
val execId: Int,
val cores: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

private[spark] object ExecutorState extends Enumeration {
private[deploy] object ExecutorState extends Enumeration {

val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,29 @@ import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}
* - The docker images tagged spark-test-master and spark-test-worker are built from the
* docker/ directory. Run 'docker/spark-test/build' to generate these.
*/
private[spark] object FaultToleranceTest extends App with Logging {
private object FaultToleranceTest extends App with Logging {

val conf = new SparkConf()
val ZK_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark")
private val conf = new SparkConf()
private val ZK_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark")

val masters = ListBuffer[TestMasterInfo]()
val workers = ListBuffer[TestWorkerInfo]()
var sc: SparkContext = _
private val masters = ListBuffer[TestMasterInfo]()
private val workers = ListBuffer[TestWorkerInfo]()
private var sc: SparkContext = _

val zk = SparkCuratorUtil.newClient(conf)
private val zk = SparkCuratorUtil.newClient(conf)

var numPassed = 0
var numFailed = 0
private var numPassed = 0
private var numFailed = 0

val sparkHome = System.getenv("SPARK_HOME")
private val sparkHome = System.getenv("SPARK_HOME")
assertTrue(sparkHome != null, "Run with a valid SPARK_HOME")

val containerSparkHome = "/opt/spark"
val dockerMountDir = "%s:%s".format(sparkHome, containerSparkHome)
private val containerSparkHome = "/opt/spark"
private val dockerMountDir = "%s:%s".format(sparkHome, containerSparkHome)

System.setProperty("spark.driver.host", "172.17.42.1") // default docker host ip

def afterEach() {
private def afterEach() {
if (sc != null) {
sc.stop()
sc = null
Expand Down Expand Up @@ -179,7 +179,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
}
}

def test(name: String)(fn: => Unit) {
private def test(name: String)(fn: => Unit) {
try {
fn
numPassed += 1
Expand All @@ -197,19 +197,19 @@ private[spark] object FaultToleranceTest extends App with Logging {
afterEach()
}

def addMasters(num: Int) {
private def addMasters(num: Int) {
logInfo(s">>>>> ADD MASTERS $num <<<<<")
(1 to num).foreach { _ => masters += SparkDocker.startMaster(dockerMountDir) }
}

def addWorkers(num: Int) {
private def addWorkers(num: Int) {
logInfo(s">>>>> ADD WORKERS $num <<<<<")
val masterUrls = getMasterUrls(masters)
(1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls) }
}

/** Creates a SparkContext, which constructs a Client to interact with our cluster. */
def createClient() = {
private def createClient() = {
logInfo(">>>>> CREATE CLIENT <<<<<")
if (sc != null) { sc.stop() }
// Counter-hack: Because of a hack in SparkEnv#create() that changes this
Expand All @@ -218,27 +218,27 @@ private[spark] object FaultToleranceTest extends App with Logging {
sc = new SparkContext(getMasterUrls(masters), "fault-tolerance", containerSparkHome)
}

def getMasterUrls(masters: Seq[TestMasterInfo]): String = {
private def getMasterUrls(masters: Seq[TestMasterInfo]): String = {
"spark://" + masters.map(master => master.ip + ":7077").mkString(",")
}

def getLeader: TestMasterInfo = {
private def getLeader: TestMasterInfo = {
val leaders = masters.filter(_.state == RecoveryState.ALIVE)
assertTrue(leaders.size == 1)
leaders(0)
}

def killLeader(): Unit = {
private def killLeader(): Unit = {
logInfo(">>>>> KILL LEADER <<<<<")
masters.foreach(_.readState())
val leader = getLeader
masters -= leader
leader.kill()
}

def delay(secs: Duration = 5.seconds) = Thread.sleep(secs.toMillis)
private def delay(secs: Duration = 5.seconds) = Thread.sleep(secs.toMillis)

def terminateCluster() {
private def terminateCluster() {
logInfo(">>>>> TERMINATE CLUSTER <<<<<")
masters.foreach(_.kill())
workers.foreach(_.kill())
Expand All @@ -247,7 +247,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
}

/** This includes Client retry logic, so it may take a while if the cluster is recovering. */
def assertUsable() = {
private def assertUsable() = {
val f = future {
try {
val res = sc.parallelize(0 until 10).collect()
Expand All @@ -269,7 +269,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
* Asserts that the cluster is usable and that the expected masters and workers
* are all alive in a proper configuration (e.g., only one leader).
*/
def assertValidClusterState() = {
private def assertValidClusterState() = {
logInfo(">>>>> ASSERT VALID CLUSTER STATE <<<<<")
assertUsable()
var numAlive = 0
Expand Down Expand Up @@ -325,7 +325,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
}
}

def assertTrue(bool: Boolean, message: String = "") {
private def assertTrue(bool: Boolean, message: String = "") {
if (!bool) {
throw new IllegalStateException("Assertion failed: " + message)
}
Expand All @@ -335,7 +335,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
numFailed))
}

private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile: File)
private class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile: File)
extends Logging {

implicit val formats = org.json4s.DefaultFormats
Expand Down Expand Up @@ -377,7 +377,7 @@ private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val
format(ip, dockerId.id, logFile.getAbsolutePath, state)
}

private[spark] class TestWorkerInfo(val ip: String, val dockerId: DockerId, val logFile: File)
private class TestWorkerInfo(val ip: String, val dockerId: DockerId, val logFile: File)
extends Logging {

implicit val formats = org.json4s.DefaultFormats
Expand All @@ -390,7 +390,7 @@ private[spark] class TestWorkerInfo(val ip: String, val dockerId: DockerId, val
"[ip=%s, id=%s, logFile=%s]".format(ip, dockerId, logFile.getAbsolutePath)
}

private[spark] object SparkDocker {
private object SparkDocker {
def startMaster(mountDir: String): TestMasterInfo = {
val cmd = Docker.makeRunCmd("spark-test-master", mountDir = mountDir)
val (ip, id, outFile) = startNode(cmd)
Expand Down Expand Up @@ -425,11 +425,11 @@ private[spark] object SparkDocker {
}
}

private[spark] class DockerId(val id: String) {
private class DockerId(val id: String) {
override def toString = id
}

private[spark] object Docker extends Logging {
private object Docker extends Logging {
def makeRunCmd(imageTag: String, args: String = "", mountDir: String = ""): ProcessBuilder = {
val mountCmd = if (mountDir != "") { " -v " + mountDir } else ""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateR
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.deploy.worker.ExecutorRunner

private[spark] object JsonProtocol {
private[deploy] object JsonProtocol {
def writeWorkerInfo(obj: WorkerInfo) = {
("id" -> obj.id) ~
("host" -> obj.host) ~
Expand Down
Loading

0 comments on commit 25f3580

Please sign in to comment.