Skip to content

Commit

Permalink
[SPARK-27189][CORE] Add Executor metrics and memory usage instrumenta…
Browse files Browse the repository at this point in the history
…tion to the metrics system

## What changes were proposed in this pull request?

This PR proposes to add instrumentation of memory usage via the Spark Dropwizard/Codahale metrics system. Memory usage metrics are available via the Executor metrics, recently implemented as detailed in https://issues.apache.org/jira/browse/SPARK-23206.
Additional notes: This takes advantage of the metrics poller introduced in apache#23767.

## Why are the changes needed?
Executor metrics bring have many useful insights on memory usage, in particular on the usage of storage memory and executor memory. This is useful for troubleshooting. Having the information in the metrics systems allows to add those metrics to Spark performance dashboards and study memory usage as a function of time, as in the example graph https://issues.apache.org/jira/secure/attachment/12962810/Example_dashboard_Spark_Memory_Metrics.PNG

## Does this PR introduce any user-facing change?
Adds `ExecutorMetrics` source to publish executor metrics via the Dropwizard metrics system. Details of the available metrics in docs/monitoring.md
Adds configuration parameter `spark.metrics.executormetrics.source.enabled`

## How was this patch tested?

Tested on YARN cluster and with an existing setup for a Spark dashboard based on InfluxDB and Grafana.

Closes apache#24132 from LucaCanali/memoryMetricsSource.

Authored-by: Luca Canali <[email protected]>
Signed-off-by: Imran Rashid <[email protected]>
  • Loading branch information
LucaCanali authored and squito committed Dec 9, 2019
1 parent a717d21 commit 729f43f
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 6 deletions.
16 changes: 13 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.deploy.StandaloneResourceUtils._
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.executor.{ExecutorMetrics, ExecutorMetricsSource}
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -551,9 +551,16 @@ class SparkContext(config: SparkConf) extends Logging {
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

val _executorMetricsSource =
if (_conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) {
Some(new ExecutorMetricsSource)
} else {
None
}

// create and start the heartbeater for collecting memory metrics
_heartbeater = new Heartbeater(
() => SparkContext.this.reportHeartBeat(),
() => SparkContext.this.reportHeartBeat(_executorMetricsSource),
"driver-heartbeater",
conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
_heartbeater.start()
Expand Down Expand Up @@ -622,6 +629,7 @@ class SparkContext(config: SparkConf) extends Logging {
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_env.metricsSystem.registerSource(new JVMCPUSource())
_executorMetricsSource.foreach(_.register(_env.metricsSystem))
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}
Expand Down Expand Up @@ -2473,8 +2481,10 @@ class SparkContext(config: SparkConf) extends Logging {
}

/** Reports heartbeat metrics for the driver. */
private def reportHeartBeat(): Unit = {
private def reportHeartBeat(executorMetricsSource: Option[ExecutorMetricsSource]): Unit = {
val currentMetrics = ExecutorMetrics.getCurrentMetrics(env.memoryManager)
executorMetricsSource.foreach(_.updateMetricsSnapshot(currentMetrics))

val driverUpdates = new HashMap[(Int, Int), ExecutorMetrics]
// In the driver, we do not track per-stage metrics, so use a dummy stage for the key
driverUpdates.put(EventLoggingListener.DRIVER_STAGE_KEY, new ExecutorMetrics(currentMetrics))
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,18 @@ private[spark] class Executor(
// create. The map key is a task id.
private val taskReaperForTask: HashMap[Long, TaskReaper] = HashMap[Long, TaskReaper]()

val executorMetricsSource =
if (conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) {
Some(new ExecutorMetricsSource)
} else {
None
}

if (!isLocal) {
env.blockManager.initialize(conf.getAppId)
env.metricsSystem.registerSource(executorSource)
env.metricsSystem.registerSource(new JVMCPUSource())
executorMetricsSource.foreach(_.register(env.metricsSystem))
env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource)
}

Expand Down Expand Up @@ -181,7 +189,8 @@ private[spark] class Executor(
// Poller for the memory metrics. Visible for testing.
private[executor] val metricsPoller = new ExecutorMetricsPoller(
env.memoryManager,
METRICS_POLLING_INTERVAL_MS)
METRICS_POLLING_INTERVAL_MS,
executorMetricsSource)

// Executor for the heartbeat task.
private val heartbeater = new Heartbeater(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ import org.apache.spark.util.{ThreadUtils, Utils}
*/
private[spark] class ExecutorMetricsPoller(
memoryManager: MemoryManager,
pollingInterval: Long) extends Logging {
pollingInterval: Long,
executorMetricsSource: Option[ExecutorMetricsSource]) extends Logging {

type StageKey = (Int, Int)
// Task Count and Metric Peaks
Expand Down Expand Up @@ -79,6 +80,7 @@ private[spark] class ExecutorMetricsPoller(

// get the latest values for the metrics
val latestMetrics = ExecutorMetrics.getCurrentMetrics(memoryManager)
executorMetricsSource.foreach(_.updateMetricsSnapshot(latestMetrics))

def updatePeaks(metrics: AtomicLongArray): Unit = {
(0 until metrics.length).foreach { i =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.executor

import com.codahale.metrics.{Gauge, MetricRegistry}

import org.apache.spark.metrics.{ExecutorMetricType, MetricsSystem}
import org.apache.spark.metrics.source.Source

/**
* Expose executor metrics from [[ExecutorMetricsType]] using the Dropwizard metrics system.
*
* Metrics related to the memory system can be expensive to gather, therefore
* we implement some optimizations:
* (1) Metrics values are cached, updated at each heartbeat (default period is 10 seconds).
* An alternative faster polling mechanism is used, only if activated, by setting
* spark.executor.metrics.pollingInterval=<interval in ms>.
* (2) Procfs metrics are gathered all in one-go and only conditionally:
* if the /proc filesystem exists
* and spark.eventLog.logStageExecutorProcessTreeMetrics.enabled=true
* and spark.eventLog.logStageExecutorMetrics.enabled=true.
*/
private[spark] class ExecutorMetricsSource extends Source {

override val metricRegistry = new MetricRegistry()
override val sourceName = "ExecutorMetrics"
@volatile var metricsSnapshot: Array[Long] = Array.fill(ExecutorMetricType.numMetrics)(0L)

// called by ExecutorMetricsPoller
def updateMetricsSnapshot(metricsUpdates: Array[Long]): Unit = {
metricsSnapshot = metricsUpdates
}

private class ExecutorMetricGauge(idx: Int) extends Gauge[Long] {
def getValue: Long = metricsSnapshot(idx)
}

def register(metricsSystem: MetricsSystem): Unit = {
val gauges: IndexedSeq[ExecutorMetricGauge] = (0 until ExecutorMetricType.numMetrics).map {
idx => new ExecutorMetricGauge(idx)
}.toIndexedSeq

ExecutorMetricType.metricToOffset.foreach {
case (name, idx) =>
metricRegistry.register(MetricRegistry.name(name), gauges(idx))
}

metricsSystem.registerSource(this)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,12 @@ package object config {
.stringConf
.createOptional

private[spark] val METRICS_EXECUTORMETRICS_SOURCE_ENABLED =
ConfigBuilder("spark.metrics.executorMetricsSource.enabled")
.doc("Whether to register the ExecutorMetrics source with the metrics system.")
.booleanConf
.createWithDefault(true)

private[spark] val METRICS_STATIC_SOURCES_ENABLED =
ConfigBuilder("spark.metrics.staticSources.enabled")
.doc("Whether to register static sources with the metrics system.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.metrics.source

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.internal.config.METRICS_STATIC_SOURCES_ENABLED
import org.apache.spark.internal.config.{METRICS_EXECUTORMETRICS_SOURCE_ENABLED, METRICS_STATIC_SOURCES_ENABLED}

class SourceConfigSuite extends SparkFunSuite with LocalSparkContext {

Expand Down Expand Up @@ -52,4 +52,32 @@ class SourceConfigSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("Test configuration for adding ExecutorMetrics source registration") {
val conf = new SparkConf()
conf.set(METRICS_EXECUTORMETRICS_SOURCE_ENABLED, true)
val sc = new SparkContext("local", "test", conf)
try {
val metricsSystem = sc.env.metricsSystem

// ExecutorMetrics source should be registered
assert (metricsSystem.getSourcesByName("ExecutorMetrics").nonEmpty)
} finally {
sc.stop()
}
}

test("Test configuration for skipping ExecutorMetrics source registration") {
val conf = new SparkConf()
conf.set(METRICS_EXECUTORMETRICS_SOURCE_ENABLED, false)
val sc = new SparkContext("local", "test", conf)
try {
val metricsSystem = sc.env.metricsSystem

// ExecutorMetrics source should not be registered
assert (metricsSystem.getSourcesByName("ExecutorMetrics").isEmpty)
} finally {
sc.stop()
}
}

}
41 changes: 41 additions & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,12 @@ This is the component with the largest amount of instrumented metrics
- namespace=JVMCPU
- jvmCpuTime

- namespace=ExecutorMetrics
- **note:** these metrics are conditional to a configuration parameter:
`spark.metrics.executorMetricsSource.enabled` (default is true)
- This source contains memory-related metrics. A full list of available metrics in this
namespace can be found in the corresponding entry for the Executor component instance.

- namespace=plugin.\<Plugin Class Name>
- Optional namespace(s). Metrics in this namespace are defined by user-supplied code, and
configured using the Spark plugin API. See "Advanced Instrumentation" below for how to load
Expand Down Expand Up @@ -1046,6 +1052,41 @@ when running in local mode.
- threadpool.maxPool_size
- threadpool.startedTasks

- namespace=ExecutorMetrics
- **notes:**
- These metrics are conditional to a configuration parameter:
`spark.metrics.executorMetricsSource.enabled` (default value is true)
- ExecutorMetrics are updated as part of heartbeat processes scheduled
for the executors and for the driver at regular intervals: `spark.executor.heartbeatInterval` (default value is 10 seconds)
- An optional faster polling mechanism is available for executor memory metrics,
it can be activated by setting a polling interval (in milliseconds) using the configuration parameter `spark.executor.metrics.pollingInterval`
- JVMHeapMemory
- JVMOffHeapMemory
- OnHeapExecutionMemory
- OnHeapStorageMemory
- OnHeapUnifiedMemory
- OffHeapExecutionMemory
- OffHeapStorageMemory
- OffHeapUnifiedMemory
- DirectPoolMemory
- MappedPoolMemory
- MinorGCCount
- MinorGCTime
- MajorGCCount
- MajorGCTime
- "ProcessTree*" metric counters:
- ProcessTreeJVMVMemory
- ProcessTreeJVMRSSMemory
- ProcessTreePythonVMemory
- ProcessTreePythonRSSMemory
- ProcessTreeOtherVMemory
- ProcessTreeOtherRSSMemory
- **note:** "ProcessTree*" metrics are collected only under certain conditions.
The conditions are the logical AND of the following: `/proc` filesystem exists,
`spark.eventLog.logStageExecutorProcessTreeMetrics.enabled=true`,
`spark.eventLog.logStageExecutorMetrics.enabled=true`.
"ProcessTree*" metrics report 0 when those conditions are not met.

- namespace=JVMCPU
- jvmCpuTime

Expand Down

0 comments on commit 729f43f

Please sign in to comment.