diff --git a/app-conf/HeuristicConf.xml b/app-conf/HeuristicConf.xml index 833fa2bc3..3e264e853 100644 --- a/app-conf/HeuristicConf.xml +++ b/app-conf/HeuristicConf.xml @@ -193,5 +193,11 @@ com.linkedin.drelephant.spark.heuristics.StagesHeuristic views.html.help.spark.helpStagesHeuristic + + spark + Executor GC + com.linkedin.drelephant.spark.heuristics.ExecutorGcHeuristic + views.html.help.spark.helpExecutorGcHeuristic + diff --git a/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala b/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala index 1b013c0f3..9c4b534a0 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala @@ -87,6 +87,7 @@ trait ExecutorSummary{ def totalShuffleRead: Long def totalShuffleWrite: Long def maxMemory: Long + def totalGCTime: Long def executorLogs: Map[String, String]} trait JobData{ @@ -292,6 +293,7 @@ class ExecutorSummaryImpl( var totalShuffleRead: Long, var totalShuffleWrite: Long, var maxMemory: Long, + var totalGCTime: Long, var executorLogs: Map[String, String]) extends ExecutorSummary class JobDataImpl( diff --git a/app/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristic.scala new file mode 100644 index 000000000..23da7db28 --- /dev/null +++ b/app/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristic.scala @@ -0,0 +1,119 @@ +/* + * Copyright 2016 LinkedIn Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.linkedin.drelephant.spark.heuristics + +import com.linkedin.drelephant.analysis.Severity +import com.linkedin.drelephant.spark.fetchers.statusapiv1._ +import com.linkedin.drelephant.analysis._ +import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData +import com.linkedin.drelephant.spark.data.SparkApplicationData + +import scala.collection.JavaConverters + +/** + * A heuristic based on GC time and CPU run time. It calculates the ratio of the total time a job spends in GC to the total run time of a job and warns if too much time is spent in GC. + */ +class ExecutorGcHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData) + extends Heuristic[SparkApplicationData] { + + import ExecutorGcHeuristic._ + import JavaConverters._ + + val gcSeverityAThresholds: SeverityThresholds = + SeverityThresholds.parse(heuristicConfigurationData.getParamMap.get(GC_SEVERITY_A_THRESHOLDS_KEY), ascending = true) + .getOrElse(DEFAULT_GC_SEVERITY_A_THRESHOLDS) + + val gcSeverityDThresholds: SeverityThresholds = + SeverityThresholds.parse(heuristicConfigurationData.getParamMap.get(GC_SEVERITY_D_THRESHOLDS_KEY), ascending = true) + .getOrElse(DEFAULT_GC_SEVERITY_D_THRESHOLDS) + + override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData + + override def apply(data: SparkApplicationData): HeuristicResult = { + val evaluator = new Evaluator(this, data) + var resultDetails = Seq( + new HeuristicResultDetails("GC time to Executor Run time ratio", evaluator.ratio.toString), + new HeuristicResultDetails("Total GC time", evaluator.jvmTime.toString), + new HeuristicResultDetails("Total Executor Runtime", evaluator.executorRunTimeTotal.toString) + ) + + //adding recommendations to the result, severityTimeA corresponds to the ascending severity calculation + if (evaluator.severityTimeA.getValue > Severity.LOW.getValue) { + resultDetails = resultDetails :+ new HeuristicResultDetails("Gc ratio high", "The job is spending too much time on GC. We recommend increasing the executor memory.") + } + //severityTimeD corresponds to the descending severity calculation + if (evaluator.severityTimeD.getValue > Severity.LOW.getValue) { + resultDetails = resultDetails :+ new HeuristicResultDetails("Gc ratio low", "The job is spending too less time in GC. Please check if you have asked for more executor memory than required.") + } + + val result = new HeuristicResult( + heuristicConfigurationData.getClassName, + heuristicConfigurationData.getHeuristicName, + evaluator.severityTimeA, + 0, + resultDetails.asJava + ) + result + } +} + +object ExecutorGcHeuristic { + val SPARK_EXECUTOR_MEMORY = "spark.executor.memory" + val SPARK_EXECUTOR_CORES = "spark.executor.cores" + + /** The ascending severity thresholds for the ratio of JVM GC Time and executor Run Time (checking whether ratio is above normal) + * These thresholds are experimental and are likely to change */ + val DEFAULT_GC_SEVERITY_A_THRESHOLDS = + SeverityThresholds(low = 0.08D, moderate = 0.1D, severe = 0.15D, critical = 0.2D, ascending = true) + + /** The descending severity thresholds for the ratio of JVM GC Time and executor Run Time (checking whether ratio is below normal) + * These thresholds are experimental and are likely to change */ + val DEFAULT_GC_SEVERITY_D_THRESHOLDS = + SeverityThresholds(low = 0.05D, moderate = 0.04D, severe = 0.03D, critical = 0.01D, ascending = false) + + val GC_SEVERITY_A_THRESHOLDS_KEY: String = "gc_severity_A_threshold" + val GC_SEVERITY_D_THRESHOLDS_KEY: String = "gc_severity_D_threshold" + + class Evaluator(executorGcHeuristic: ExecutorGcHeuristic, data: SparkApplicationData) { + lazy val executorAndDriverSummaries: Seq[ExecutorSummary] = data.executorSummaries + lazy val executorSummaries: Seq[ExecutorSummary] = executorAndDriverSummaries.filterNot(_.id.equals("driver")) + lazy val appConfigurationProperties: Map[String, String] = + data.appConfigurationProperties + var (jvmTime, executorRunTimeTotal) = getTimeValues(executorSummaries) + + var ratio: Double = jvmTime.toDouble / executorRunTimeTotal.toDouble + + lazy val severityTimeA: Severity = executorGcHeuristic.gcSeverityAThresholds.severityOf(ratio) + lazy val severityTimeD: Severity = executorGcHeuristic.gcSeverityDThresholds.severityOf(ratio) + + /** + * returns the total JVM GC Time and total executor Run Time across all stages + * @param executorSummaries + * @return + */ + private def getTimeValues(executorSummaries: Seq[ExecutorSummary]): (Long, Long) = { + var jvmGcTimeTotal: Long = 0 + var executorRunTimeTotal: Long = 0 + executorSummaries.foreach(executorSummary => { + jvmGcTimeTotal+=executorSummary.totalGCTime + executorRunTimeTotal+=executorSummary.totalDuration + }) + (jvmGcTimeTotal, executorRunTimeTotal) + } + } +} + diff --git a/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala index 0c7412fe0..4abce291d 100644 --- a/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala +++ b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala @@ -173,6 +173,7 @@ object LegacyDataConverters { executorInfo.shuffleRead, executorInfo.shuffleWrite, executorInfo.maxMem, + executorInfo.totalGCTime, executorLogs = Map.empty ) } diff --git a/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java b/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java index 7b0fcb5c2..4e2ad4de3 100644 --- a/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java +++ b/app/com/linkedin/drelephant/spark/legacydata/SparkExecutorData.java @@ -43,6 +43,7 @@ public static class ExecutorInfo { public long inputBytes = 0L; public long outputBytes = 0L; public long shuffleRead = 0L; + public long totalGCTime = 0L; public long shuffleWrite = 0L; public String toString() { @@ -50,7 +51,7 @@ public String toString() { + ", maxMem: " + maxMem + ", diskUsed: " + diskUsed + ", totalTasks" + totalTasks + ", tasksActive: " + activeTasks + ", tasksComplete: " + completedTasks + ", tasksFailed: " + failedTasks + ", duration: " + duration + ", inputBytes: " + inputBytes + ", outputBytes:" + outputBytes + ", shuffleRead: " + shuffleRead - + ", shuffleWrite: " + shuffleWrite + "}"; + + ", shuffleWrite: " + shuffleWrite + ", totalGCTime: " + totalGCTime + "}"; } } diff --git a/app/views/help/spark/helpExecutorGcHeuristic.scala.html b/app/views/help/spark/helpExecutorGcHeuristic.scala.html new file mode 100644 index 000000000..02ca91ac7 --- /dev/null +++ b/app/views/help/spark/helpExecutorGcHeuristic.scala.html @@ -0,0 +1,20 @@ +@* +* Copyright 2016 LinkedIn Corp. +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*@ + +

This analysis shows how much time a job is spending in GC. To normalise the results across all jobs, the ratio of the time a job spends in Gc to the total run time of the job is calculated.

+

A job is flagged if the ratio is too high, meaning the job spends too much time in GC.

+

Suggestions

+

We recommend increasing the executor memory.

\ No newline at end of file diff --git a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala index 3947fdf3f..77e3e1d29 100644 --- a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala +++ b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala @@ -194,6 +194,7 @@ object SparkMetricsAggregatorTest { totalShuffleRead = 0, totalShuffleWrite = 0, maxMemory = 0, + totalGCTime = 0, executorLogs = Map.empty ) } diff --git a/test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala new file mode 100644 index 000000000..869b9cb67 --- /dev/null +++ b/test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala @@ -0,0 +1,138 @@ +/* + * Copyright 2016 LinkedIn Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.linkedin.drelephant.spark.heuristics + +import scala.collection.JavaConverters +import com.linkedin.drelephant.analysis.{ApplicationType, Severity, SeverityThresholds} +import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData +import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData} +import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, ExecutorSummaryImpl, StageDataImpl} +import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate +import org.scalatest.{FunSpec, Matchers} + +import scala.concurrent.duration.Duration + + +class ExecutorGcHeuristicTest extends FunSpec with Matchers { + import ExecutorGcHeuristicTest._ + + describe("ExecutorGcHeuristic") { + val heuristicConfigurationData = newFakeHeuristicConfigurationData( + Map( + "max_to_median_ratio_severity_thresholds" -> "1.414,2,4,16", + "ignore_max_bytes_less_than_threshold" -> "4000000", + "ignore_max_millis_less_than_threshold" -> "4000001" + ) + ) + val executorGcHeuristic = new ExecutorGcHeuristic(heuristicConfigurationData) + + val executorSummaries = Seq( + newFakeExecutorSummary( + id = "1", + totalGCTime = Duration("2min").toMillis, + totalDuration = Duration("15min").toMillis + ), + newFakeExecutorSummary( + id = "2", + totalGCTime = Duration("6min").toMillis, + totalDuration = Duration("14min").toMillis + ), + newFakeExecutorSummary( + id = "3", + totalGCTime = Duration("4min").toMillis, + totalDuration = Duration("20min").toMillis + ), + newFakeExecutorSummary( + id = "4", + totalGCTime = Duration("8min").toMillis, + totalDuration = Duration("30min").toMillis + ) + ) + + describe(".apply") { + val data1 = newFakeSparkApplicationData(executorSummaries) + val heuristicResult = executorGcHeuristic.apply(data1) + val heuristicResultDetails = heuristicResult.getHeuristicResultDetails + + it("returns the severity") { + heuristicResult.getSeverity should be(Severity.CRITICAL) + } + + it("returns the JVM GC time to Executor Run time duration") { + val details = heuristicResultDetails.get(0) + details.getName should include("GC time to Executor Run time ratio") + details.getValue should include("0.2531") + } + + it("returns the total GC time") { + val details = heuristicResultDetails.get(1) + details.getName should include("Total GC time") + details.getValue should be("1200000") + } + + it("returns the executor's run time") { + val details = heuristicResultDetails.get(2) + details.getName should include("Total Executor Runtime") + details.getValue should be("4740000") + } + } + } +} + +object ExecutorGcHeuristicTest { + import JavaConverters._ + + def newFakeHeuristicConfigurationData(params: Map[String, String] = Map.empty): HeuristicConfigurationData = + new HeuristicConfigurationData("heuristic", "class", "view", new ApplicationType("type"), params.asJava) + + def newFakeExecutorSummary( + id: String, + totalGCTime: Long, + totalDuration: Long + ): ExecutorSummaryImpl = new ExecutorSummaryImpl( + id, + hostPort = "", + rddBlocks = 0, + memoryUsed=0, + diskUsed = 0, + activeTasks = 0, + failedTasks = 0, + completedTasks = 0, + totalTasks = 0, + totalDuration, + totalInputBytes=0, + totalShuffleRead=0, + totalShuffleWrite= 0, + maxMemory= 0, + totalGCTime, + executorLogs = Map.empty + ) + + def newFakeSparkApplicationData( + executorSummaries: Seq[ExecutorSummaryImpl] + ): SparkApplicationData = { + val appId = "application_1" + + val restDerivedData = SparkRestDerivedData( + new ApplicationInfoImpl(appId, name = "app", Seq.empty), + jobDatas = Seq.empty, + stageDatas = Seq.empty, + executorSummaries = executorSummaries + ) + SparkApplicationData(appId, restDerivedData, None) + } +} diff --git a/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala index dfdcf4a15..7dbeea921 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala @@ -249,6 +249,7 @@ object ExecutorsHeuristicTest { totalShuffleRead, totalShuffleWrite, maxMemory, + totalGCTime = 0, executorLogs = Map.empty )