Skip to content

Commit

Permalink
[SPARK-1930] The Container is running beyond physical memory limits, …
Browse files Browse the repository at this point in the history
…so as to be killed

Author: witgo <[email protected]>

Closes apache#894 from witgo/SPARK-1930 and squashes the following commits:

564307e [witgo] Update the running-on-yarn.md
3747515 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1930
172647b [witgo] add memoryOverhead docs
a0ff545 [witgo] leaving only two configs
a17bda2 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1930
478ca15 [witgo] Merge branch 'master' into SPARK-1930
d1244a1 [witgo] Merge branch 'master' into SPARK-1930
8b967ae [witgo] Merge branch 'master' into SPARK-1930
655a820 [witgo] review commit
71859a7 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1930
e3c531d [witgo] review commit
e16f190 [witgo] different memoryOverhead
ffa7569 [witgo] review commit
5c9581f [witgo] Merge branch 'master' into SPARK-1930
9a6bcf2 [witgo] review commit
8fae45a [witgo] fix NullPointerException
e0dcc16 [witgo] Adding  configuration items
b6a989c [witgo] Fix container memory beyond limit, were killed
  • Loading branch information
witgo authored and tgravescs committed Jun 16, 2014
1 parent 4fdb491 commit cdf2b04
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 18 deletions.
14 changes: 14 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,20 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
The address of the Spark history server (i.e. host.com:18080). The address should not contain a scheme (http://). Defaults to not being set since the history server is an optional service. This address is given to the YARN ResourceManager when the Spark application finishes to link the application from the ResourceManager UI to the Spark history server UI.
</td>
</tr>
<tr>
<td><code>spark.yarn.executor.memoryOverhead</code></td>
<td>384</code></td>
<td>
The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
</td>
</tr>
<tr>
<td><code>spark.yarn.driver.memoryOverhead</code></td>
<td>384</code></td>
<td>
The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
</td>
</tr>
</table>

By default, Spark on YARN will use a Spark jar installed locally, but the Spark JAR can also be in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn't need to be distributed each time an application runs. To point to a JAR on HDFS, `export SPARK_JAR=hdfs:///some/path`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa

val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
// Memory for the ApplicationMaster.
capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
capability.setMemory(args.amMemory + memoryOverhead)
amContainer.setResource(capability)

appContext.setQueue(args.amQueue)
Expand Down Expand Up @@ -115,7 +115,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
val minResMemory = newApp.getMinimumResourceCapability().getMemory()
val amMemory = ((args.amMemory / minResMemory) * minResMemory) +
((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
YarnAllocationHandler.MEMORY_OVERHEAD)
memoryOverhead)
amMemory
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,15 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp

appAttemptId = getApplicationAttemptId()
resourceManager = registerWithResourceManager()

val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()

// Compute number of threads for akka
val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()

if (minimumMemory > 0) {
val mem = args.executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
val mem = args.executorMemory + sparkConf.getInt("spark.yarn.executor.memoryOverhead",
YarnAllocationHandler.MEMORY_OVERHEAD)
val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)

if (numCore > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ private[yarn] class YarnAllocationHandler(
// Containers to be released in next request to RM
private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]

// Additional memory overhead - in mb.
private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
YarnAllocationHandler.MEMORY_OVERHEAD)

private val numExecutorsRunning = new AtomicInteger()
// Used to generate a unique id per executor
private val executorIdCounter = new AtomicInteger()
Expand All @@ -99,7 +103,7 @@ private[yarn] class YarnAllocationHandler(
def getNumExecutorsFailed: Int = numExecutorsFailed.intValue

def isResourceConstraintSatisfied(container: Container): Boolean = {
container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
container.getResource.getMemory >= (executorMemory + memoryOverhead)
}

def allocateContainers(executorsToRequest: Int) {
Expand Down Expand Up @@ -229,7 +233,7 @@ private[yarn] class YarnAllocationHandler(
val containerId = container.getId

assert( container.getResource.getMemory >=
(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
(executorMemory + memoryOverhead))

if (numExecutorsRunningNow > maxExecutors) {
logInfo("""Ignoring container %s at host %s, since we already have the required number of
Expand Down Expand Up @@ -450,7 +454,7 @@ private[yarn] class YarnAllocationHandler(

if (numExecutors > 0) {
logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors,
executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
executorMemory + memoryOverhead))
} else {
logDebug("Empty allocation req .. release : " + releasedContainerList)
}
Expand Down Expand Up @@ -505,7 +509,7 @@ private[yarn] class YarnAllocationHandler(
val rsrcRequest = Records.newRecord(classOf[ResourceRequest])
val memCapability = Records.newRecord(classOf[Resource])
// There probably is some overhead here, let's reserve a bit more memory.
memCapability.setMemory(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
memCapability.setMemory(executorMemory + memoryOverhead)
rsrcRequest.setCapability(memCapability)

val pri = Records.newRecord(classOf[Priority])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,21 @@ trait ClientBase extends Logging {
val APP_FILE_PERMISSION: FsPermission =
FsPermission.createImmutable(Integer.parseInt("644", 8).toShort)

// Additional memory overhead - in mb.
protected def memoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
YarnAllocationHandler.MEMORY_OVERHEAD)

// TODO(harvey): This could just go in ClientArguments.
def validateArgs() = {
Map(
((args.userJar == null && args.amClass == classOf[ApplicationMaster].getName) ->
"Error: You must specify a user jar when running in standalone mode!"),
(args.userClass == null) -> "Error: You must specify a user class!",
(args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!",
(args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be" +
"greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
(args.executorMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Executor memory size" +
"must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString)
(args.amMemory <= memoryOverhead) -> ("Error: AM memory size must be" +
"greater than: " + memoryOverhead),
(args.executorMemory <= memoryOverhead) -> ("Error: Executor memory size" +
"must be greater than: " + memoryOverhead.toString)
).foreach { case(cond, errStr) =>
if (cond) {
logError(errStr)
Expand All @@ -101,7 +105,7 @@ trait ClientBase extends Logging {
logError(errorMessage)
throw new IllegalArgumentException(errorMessage)
}
val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
val amMem = args.amMemory + memoryOverhead
if (amMem > maxMem) {

val errorMessage = "Required AM memory (%d) is above the max threshold (%d) of this cluster."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa

// Memory for the ApplicationMaster.
val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
memoryResource.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
memoryResource.setMemory(args.amMemory + memoryOverhead)
appContext.setResource(memoryResource)

// Finally, submit and monitor the application.
Expand Down Expand Up @@ -117,7 +117,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
// val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory()
// var amMemory = ((args.amMemory / minResMemory) * minResMemory) +
// ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) -
// YarnAllocationHandler.MEMORY_OVERHEAD)
// memoryOverhead )
args.amMemory
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ private[yarn] class YarnAllocationHandler(
// Containers to be released in next request to RM
private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]

// Additional memory overhead - in mb.
private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
YarnAllocationHandler.MEMORY_OVERHEAD)

// Number of container requests that have been sent to, but not yet allocated by the
// ApplicationMaster.
private val numPendingAllocate = new AtomicInteger()
Expand All @@ -106,7 +110,7 @@ private[yarn] class YarnAllocationHandler(
def getNumExecutorsFailed: Int = numExecutorsFailed.intValue

def isResourceConstraintSatisfied(container: Container): Boolean = {
container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
container.getResource.getMemory >= (executorMemory + memoryOverhead)
}

def releaseContainer(container: Container) {
Expand Down Expand Up @@ -248,7 +252,7 @@ private[yarn] class YarnAllocationHandler(
val executorHostname = container.getNodeId.getHost
val containerId = container.getId

val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
val executorMemoryOverhead = (executorMemory + memoryOverhead)
assert(container.getResource.getMemory >= executorMemoryOverhead)

if (numExecutorsRunningNow > maxExecutors) {
Expand Down Expand Up @@ -477,7 +481,7 @@ private[yarn] class YarnAllocationHandler(
numPendingAllocate.addAndGet(numExecutors)
logInfo("Will Allocate %d executor containers, each with %d memory".format(
numExecutors,
(executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)))
(executorMemory + memoryOverhead)))
} else {
logDebug("Empty allocation request ...")
}
Expand Down Expand Up @@ -537,7 +541,7 @@ private[yarn] class YarnAllocationHandler(
priority: Int
): ArrayBuffer[ContainerRequest] = {

val memoryRequest = executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD
val memoryRequest = executorMemory + memoryOverhead
val resource = Resource.newInstance(memoryRequest, executorCores)

val prioritySetting = Records.newRecord(classOf[Priority])
Expand Down

0 comments on commit cdf2b04

Please sign in to comment.