Skip to content

Commit

Permalink
[SPARK-9092] Fixed incompatibility when both num-executors and dynami…
Browse files Browse the repository at this point in the history
…c...

… allocation are set. Now, dynamic allocation is set to false when num-executors is explicitly specified as an argument. Consequently, executorAllocationManager in not initialized in the SparkContext.

Author: Niranjan Padmanabhan <[email protected]>

Closes #7657 from neurons/SPARK-9092.

(cherry picked from commit 738f353)
Signed-off-by: Marcelo Vanzin <[email protected]>
  • Loading branch information
Niranjan Padmanabhan authored and Marcelo Vanzin committed Aug 12, 2015
1 parent b28295f commit 8537e51
Show file tree
Hide file tree
Showing 14 changed files with 64 additions and 26 deletions.
19 changes: 19 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
val driverOptsKey = "spark.driver.extraJavaOptions"
val driverClassPathKey = "spark.driver.extraClassPath"
val driverLibraryPathKey = "spark.driver.extraLibraryPath"
val sparkExecutorInstances = "spark.executor.instances"

// Used by Yarn in 1.1 and before
sys.props.get("spark.driver.libraryPath").foreach { value =>
Expand Down Expand Up @@ -476,6 +477,24 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}
}
}

if (!contains(sparkExecutorInstances)) {
sys.env.get("SPARK_WORKER_INSTANCES").foreach { value =>
val warning =
s"""
|SPARK_WORKER_INSTANCES was detected (set to '$value').
|This is deprecated in Spark 1.0+.
|
|Please instead use:
| - ./spark-submit with --num-executors to specify the number of executors
| - Or set SPARK_EXECUTOR_INSTANCES
| - spark.executor.instances to configure the number of instances in the spark config.
""".stripMargin
logWarning(warning)

set("spark.executor.instances", value)
}
}
}

/**
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

// Optionally scale number of executors dynamically based on workload. Exposed for testing.
val dynamicAllocationEnabled = _conf.getBoolean("spark.dynamicAllocation.enabled", false)
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
logInfo("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
}

_executorAllocationManager =
if (dynamicAllocationEnabled) {
Some(new ExecutorAllocationManager(this, listenerBus, _conf))
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,8 @@ object SparkSubmit {

// Yarn client only
OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = "spark.executor.instances"),
OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.instances"),
OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),
OptionAssigner(args.principal, YARN, CLIENT, sysProp = "spark.yarn.principal"),
Expand All @@ -433,7 +434,6 @@ object SparkSubmit {
OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
OptionAssigner(args.driverCores, YARN, CLUSTER, clOption = "--driver-cores"),
OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"),
OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"),
OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = "--executor-cores"),
OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"),
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2286,6 +2286,17 @@ private[spark] object Utils extends Logging {
isInDirectory(parent, child.getParentFile)
}

/**
* Return whether dynamic allocation is enabled in the given conf
* Dynamic allocation and explicitly setting the number of executors are inherently
* incompatible. In environments where dynamic allocation is turned on by default,
* the latter should override the former (SPARK-9092).
*/
def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
conf.contains("spark.dynamicAllocation.enabled") &&
conf.getInt("spark.executor.instances", 0) == 0
}

}

private [util] class SparkShutdownHookManager {
Expand Down
8 changes: 8 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -285,4 +285,12 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("No exception when both num-executors and dynamic allocation set.") {
noException should be thrownBy {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")
.set("spark.dynamicAllocation.enabled", "true").set("spark.executor.instances", "6"))
assert(sc.executorAllocationManager.isEmpty)
assert(sc.getConf.getInt("spark.executor.instances", 0) === 6)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ class SparkSubmitSuite
childArgsStr should include ("--executor-cores 5")
childArgsStr should include ("--arg arg1 --arg arg2")
childArgsStr should include ("--queue thequeue")
childArgsStr should include ("--num-executors 6")
childArgsStr should include regex ("--jar .*thejar.jar")
childArgsStr should include regex ("--addJars .*one.jar,.*two.jar,.*three.jar")
childArgsStr should include regex ("--files .*file1.txt,.*file2.txt")
Expand Down
2 changes: 1 addition & 1 deletion docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ If you need a reference to the proper location to put log files in the YARN so t
<td><code>spark.executor.instances</code></td>
<td>2</td>
<td>
The number of executors. Note that this property is incompatible with <code>spark.dynamicAllocation.enabled</code>.
The number of executors. Note that this property is incompatible with <code>spark.dynamicAllocation.enabled</code>. If both <code>spark.dynamicAllocation.enabled</code> and <code>spark.executor.instances</code> are specified, dynamic allocation is turned off and the specified number of <code>spark.executor.instances</code> is used.
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ private[spark] class ApplicationMaster(

// Default to numExecutors * 2, with minimum of 3
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
sparkConf.getInt("spark.yarn.max.worker.failures",
math.max(sparkConf.getInt("spark.executor.instances", 0) * 2, 3)))

@volatile private var exitCode = 0
@volatile private var unregistered = false
Expand Down Expand Up @@ -493,7 +494,6 @@ private[spark] class ApplicationMaster(
*/
private def startUserApplication(): Thread = {
logInfo("Starting the user application in a separate Thread")
System.setProperty("spark.executor.instances", args.numExecutors.toString)

val classpath = Client.getUserClasspath(sparkConf)
val urls = classpath.map { entry =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
var userArgs: Seq[String] = Nil
var executorMemory = 1024
var executorCores = 1
var numExecutors = DEFAULT_NUMBER_EXECUTORS
var propertiesFile: String = null

parseArgs(args.toList)
Expand Down Expand Up @@ -63,10 +62,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
userArgsBuffer += value
args = tail

case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
numExecutors = value
args = tail

case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail =>
executorMemory = value
args = tail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,6 @@ private[spark] class Client(
userArgs ++ Seq(
"--executor-memory", args.executorMemory.toString + "m",
"--executor-cores", args.executorCores.toString,
"--num-executors ", args.numExecutors.toString,
"--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
LOCALIZED_CONF_DIR, SPARK_CONF_FILE))

Expand Down Expand Up @@ -960,6 +959,10 @@ object Client extends Logging {
val sparkConf = new SparkConf

val args = new ClientArguments(argStrings, sparkConf)
// to maintain backwards-compatibility
if (!Utils.isDynamicAllocationEnabled(sparkConf)) {
sparkConf.setIfMissing("spark.executor.instances", args.numExecutors.toString)
}
new Client(args, sparkConf).run()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
private val amMemOverheadKey = "spark.yarn.am.memoryOverhead"
private val driverCoresKey = "spark.driver.cores"
private val amCoresKey = "spark.yarn.am.cores"
private val isDynamicAllocationEnabled =
sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)
private val isDynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(sparkConf)

parseArgs(args.toList)
loadEnvironmentArgs()
Expand Down Expand Up @@ -196,11 +195,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
if (args(0) == "--num-workers") {
println("--num-workers is deprecated. Use --num-executors instead.")
}
// Dynamic allocation is not compatible with this option
if (isDynamicAllocationEnabled) {
throw new IllegalArgumentException("Explicitly setting the number " +
"of executors is not compatible with spark.dynamicAllocation.enabled!")
}
numExecutors = value
args = tail

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.util.Collections
import java.util.concurrent._
import java.util.regex.Pattern

import org.apache.spark.util.Utils

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}

Expand Down Expand Up @@ -86,7 +88,12 @@ private[yarn] class YarnAllocator(
private var executorIdCounter = 0
@volatile private var numExecutorsFailed = 0

@volatile private var targetNumExecutors = args.numExecutors
@volatile private var targetNumExecutors =
if (Utils.isDynamicAllocationEnabled(sparkConf)) {
sparkConf.getInt("spark.dynamicAllocation.initialExecutors", 0)
} else {
sparkConf.getInt("spark.executor.instances", YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS)
}

// Keep track of which container is running which executor to remove the executors later
// Visible for testing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ private[spark] class YarnClientSchedulerBackend(
// List of (target Client argument, environment variable, Spark property)
val optionTuples =
List(
("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"),
("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"),
("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
Expand All @@ -92,7 +90,6 @@ private[spark] class YarnClientSchedulerBackend(
)
// Warn against the following deprecated environment variables: env var -> suggestion
val deprecatedEnvVars = Map(
"SPARK_WORKER_INSTANCES" -> "SPARK_WORKER_INSTANCES or --num-executors through spark-submit",
"SPARK_WORKER_MEMORY" -> "SPARK_EXECUTOR_MEMORY or --executor-memory through spark-submit",
"SPARK_WORKER_CORES" -> "SPARK_EXECUTOR_CORES or --executor-cores through spark-submit")
optionTuples.foreach { case (optionName, envVar, sparkProp) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,17 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter

def createAllocator(maxExecutors: Int = 5): YarnAllocator = {
val args = Array(
"--num-executors", s"$maxExecutors",
"--executor-cores", "5",
"--executor-memory", "2048",
"--jar", "somejar.jar",
"--class", "SomeClass")
val sparkConfClone = sparkConf.clone()
sparkConfClone.set("spark.executor.instances", maxExecutors.toString)
new YarnAllocator(
"not used",
mock(classOf[RpcEndpointRef]),
conf,
sparkConf,
sparkConfClone,
rmClient,
appAttemptId,
new ApplicationMasterArguments(args),
Expand Down

0 comments on commit 8537e51

Please sign in to comment.