Skip to content

Commit

Permalink
[SPARK-10790] [YARN] Fix initial executor number not set issue and co…
Browse files Browse the repository at this point in the history
…nsolidate the codes

This bug is introduced in [SPARK-9092](https://issues.apache.org/jira/browse/SPARK-9092), `targetExecutorNumber` should use `minExecutors` if `initialExecutors` is not set. Using 0 instead will meet the problem as mentioned in [SPARK-10790](https://issues.apache.org/jira/browse/SPARK-10790).

Also consolidate and simplify some similar code snippets to keep the consistent semantics.

Author: jerryshao <[email protected]>

Closes #8910 from jerryshao/SPARK-10790.
  • Loading branch information
jerryshao authored and Marcelo Vanzin committed Sep 28, 2015
1 parent d8d50ed commit 353c30b
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,25 +81,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
.orNull
// If dynamic allocation is enabled, start at the configured initial number of executors.
// Default to minExecutors if no initialExecutors is set.
if (isDynamicAllocationEnabled) {
val minExecutorsConf = "spark.dynamicAllocation.minExecutors"
val initialExecutorsConf = "spark.dynamicAllocation.initialExecutors"
val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors"
val minNumExecutors = sparkConf.getInt(minExecutorsConf, 0)
val initialNumExecutors = sparkConf.getInt(initialExecutorsConf, minNumExecutors)
val maxNumExecutors = sparkConf.getInt(maxExecutorsConf, Integer.MAX_VALUE)

// If defined, initial executors must be between min and max
if (initialNumExecutors < minNumExecutors || initialNumExecutors > maxNumExecutors) {
throw new IllegalArgumentException(
s"$initialExecutorsConf must be between $minExecutorsConf and $maxNumExecutors!")
}

numExecutors = initialNumExecutors
} else {
val numExecutorsConf = "spark.executor.instances"
numExecutors = sparkConf.getInt(numExecutorsConf, numExecutors)
}
numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
principal = Option(principal)
.orElse(sparkConf.getOption("spark.yarn.principal"))
.orNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,7 @@ private[yarn] class YarnAllocator(
@volatile private var numExecutorsFailed = 0

@volatile private var targetNumExecutors =
if (Utils.isDynamicAllocationEnabled(sparkConf)) {
sparkConf.getInt("spark.dynamicAllocation.initialExecutors", 0)
} else {
sparkConf.getInt("spark.executor.instances", YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS)
}
YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)

// Executor loss reason requests that are pending - maps from executor ID for inquiry to a
// list of requesters that should be responded to once we find out why the given executor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,5 +314,28 @@ object YarnSparkHadoopUtil {
def getClassPathSeparator(): String = {
classPathSeparatorField.get(null).asInstanceOf[String]
}

/**
* Getting the initial target number of executors depends on whether dynamic allocation is
* enabled.
*/
def getInitialTargetExecutorNumber(conf: SparkConf): Int = {
if (Utils.isDynamicAllocationEnabled(conf)) {
val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
val initialNumExecutors =
conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors)
val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", Int.MaxValue)
require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
s"initial executor number $initialNumExecutors must between min executor number" +
s"$minNumExecutors and max executor number $maxNumExecutors")

initialNumExecutors
} else {
val targetNumExecutors =
sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(DEFAULT_NUMBER_EXECUTORS)
// System property can override environment variable.
conf.getInt("spark.executor.instances", targetNumExecutors)
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,13 @@

package org.apache.spark.scheduler.cluster

import java.net.NetworkInterface

import org.apache.hadoop.yarn.api.ApplicationConstants.Environment

import scala.collection.JavaConverters._

import org.apache.hadoop.yarn.api.records.NodeState
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration

import org.apache.spark.SparkContext
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.{IntParam, Utils}
import org.apache.spark.util.Utils

private[spark] class YarnClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
Expand All @@ -40,13 +32,7 @@ private[spark] class YarnClusterSchedulerBackend(

override def start() {
super.start()
totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS
if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
.getOrElse(totalExpectedExecutors)
}
// System property can override environment variable.
totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors)
totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sc.conf)
}

override def applicationId(): String =
Expand Down

0 comments on commit 353c30b

Please sign in to comment.