forked from linkedin/dr-elephant
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Spark Executor GC Heuristic (linkedin#311)
- Loading branch information
Showing
9 changed files
with
290 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
119 changes: 119 additions & 0 deletions
119
app/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristic.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
/* | ||
* Copyright 2016 LinkedIn Corp. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package com.linkedin.drelephant.spark.heuristics | ||
|
||
import com.linkedin.drelephant.analysis.Severity | ||
import com.linkedin.drelephant.spark.fetchers.statusapiv1._ | ||
import com.linkedin.drelephant.analysis._ | ||
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData | ||
import com.linkedin.drelephant.spark.data.SparkApplicationData | ||
|
||
import scala.collection.JavaConverters | ||
|
||
/** | ||
* A heuristic based on GC time and CPU run time. It calculates the ratio of the total time a job spends in GC to the total run time of a job and warns if too much time is spent in GC. | ||
*/ | ||
class ExecutorGcHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData) | ||
extends Heuristic[SparkApplicationData] { | ||
|
||
import ExecutorGcHeuristic._ | ||
import JavaConverters._ | ||
|
||
val gcSeverityAThresholds: SeverityThresholds = | ||
SeverityThresholds.parse(heuristicConfigurationData.getParamMap.get(GC_SEVERITY_A_THRESHOLDS_KEY), ascending = true) | ||
.getOrElse(DEFAULT_GC_SEVERITY_A_THRESHOLDS) | ||
|
||
val gcSeverityDThresholds: SeverityThresholds = | ||
SeverityThresholds.parse(heuristicConfigurationData.getParamMap.get(GC_SEVERITY_D_THRESHOLDS_KEY), ascending = true) | ||
.getOrElse(DEFAULT_GC_SEVERITY_D_THRESHOLDS) | ||
|
||
override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData | ||
|
||
override def apply(data: SparkApplicationData): HeuristicResult = { | ||
val evaluator = new Evaluator(this, data) | ||
var resultDetails = Seq( | ||
new HeuristicResultDetails("GC time to Executor Run time ratio", evaluator.ratio.toString), | ||
new HeuristicResultDetails("Total GC time", evaluator.jvmTime.toString), | ||
new HeuristicResultDetails("Total Executor Runtime", evaluator.executorRunTimeTotal.toString) | ||
) | ||
|
||
//adding recommendations to the result, severityTimeA corresponds to the ascending severity calculation | ||
if (evaluator.severityTimeA.getValue > Severity.LOW.getValue) { | ||
resultDetails = resultDetails :+ new HeuristicResultDetails("Gc ratio high", "The job is spending too much time on GC. We recommend increasing the executor memory.") | ||
} | ||
//severityTimeD corresponds to the descending severity calculation | ||
if (evaluator.severityTimeD.getValue > Severity.LOW.getValue) { | ||
resultDetails = resultDetails :+ new HeuristicResultDetails("Gc ratio low", "The job is spending too less time in GC. Please check if you have asked for more executor memory than required.") | ||
} | ||
|
||
val result = new HeuristicResult( | ||
heuristicConfigurationData.getClassName, | ||
heuristicConfigurationData.getHeuristicName, | ||
evaluator.severityTimeA, | ||
0, | ||
resultDetails.asJava | ||
) | ||
result | ||
} | ||
} | ||
|
||
object ExecutorGcHeuristic { | ||
val SPARK_EXECUTOR_MEMORY = "spark.executor.memory" | ||
val SPARK_EXECUTOR_CORES = "spark.executor.cores" | ||
|
||
/** The ascending severity thresholds for the ratio of JVM GC Time and executor Run Time (checking whether ratio is above normal) | ||
* These thresholds are experimental and are likely to change */ | ||
val DEFAULT_GC_SEVERITY_A_THRESHOLDS = | ||
SeverityThresholds(low = 0.08D, moderate = 0.1D, severe = 0.15D, critical = 0.2D, ascending = true) | ||
|
||
/** The descending severity thresholds for the ratio of JVM GC Time and executor Run Time (checking whether ratio is below normal) | ||
* These thresholds are experimental and are likely to change */ | ||
val DEFAULT_GC_SEVERITY_D_THRESHOLDS = | ||
SeverityThresholds(low = 0.05D, moderate = 0.04D, severe = 0.03D, critical = 0.01D, ascending = false) | ||
|
||
val GC_SEVERITY_A_THRESHOLDS_KEY: String = "gc_severity_A_threshold" | ||
val GC_SEVERITY_D_THRESHOLDS_KEY: String = "gc_severity_D_threshold" | ||
|
||
class Evaluator(executorGcHeuristic: ExecutorGcHeuristic, data: SparkApplicationData) { | ||
lazy val executorAndDriverSummaries: Seq[ExecutorSummary] = data.executorSummaries | ||
lazy val executorSummaries: Seq[ExecutorSummary] = executorAndDriverSummaries.filterNot(_.id.equals("driver")) | ||
lazy val appConfigurationProperties: Map[String, String] = | ||
data.appConfigurationProperties | ||
var (jvmTime, executorRunTimeTotal) = getTimeValues(executorSummaries) | ||
|
||
var ratio: Double = jvmTime.toDouble / executorRunTimeTotal.toDouble | ||
|
||
lazy val severityTimeA: Severity = executorGcHeuristic.gcSeverityAThresholds.severityOf(ratio) | ||
lazy val severityTimeD: Severity = executorGcHeuristic.gcSeverityDThresholds.severityOf(ratio) | ||
|
||
/** | ||
* returns the total JVM GC Time and total executor Run Time across all stages | ||
* @param executorSummaries | ||
* @return | ||
*/ | ||
private def getTimeValues(executorSummaries: Seq[ExecutorSummary]): (Long, Long) = { | ||
var jvmGcTimeTotal: Long = 0 | ||
var executorRunTimeTotal: Long = 0 | ||
executorSummaries.foreach(executorSummary => { | ||
jvmGcTimeTotal+=executorSummary.totalGCTime | ||
executorRunTimeTotal+=executorSummary.totalDuration | ||
}) | ||
(jvmGcTimeTotal, executorRunTimeTotal) | ||
} | ||
} | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
@* | ||
* Copyright 2016 LinkedIn Corp. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*@ | ||
|
||
<p>This analysis shows how much time a job is spending in GC. To normalise the results across all jobs, the ratio of the time a job spends in Gc to the total run time of the job is calculated. </p> | ||
<p>A job is flagged if the ratio is too high, meaning the job spends too much time in GC.</p> | ||
<h3>Suggestions</h3> | ||
<p>We recommend increasing the executor memory.</p> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
138 changes: 138 additions & 0 deletions
138
test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
/* | ||
* Copyright 2016 LinkedIn Corp. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package com.linkedin.drelephant.spark.heuristics | ||
|
||
import scala.collection.JavaConverters | ||
import com.linkedin.drelephant.analysis.{ApplicationType, Severity, SeverityThresholds} | ||
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData | ||
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData} | ||
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, ExecutorSummaryImpl, StageDataImpl} | ||
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate | ||
import org.scalatest.{FunSpec, Matchers} | ||
|
||
import scala.concurrent.duration.Duration | ||
|
||
|
||
class ExecutorGcHeuristicTest extends FunSpec with Matchers { | ||
import ExecutorGcHeuristicTest._ | ||
|
||
describe("ExecutorGcHeuristic") { | ||
val heuristicConfigurationData = newFakeHeuristicConfigurationData( | ||
Map( | ||
"max_to_median_ratio_severity_thresholds" -> "1.414,2,4,16", | ||
"ignore_max_bytes_less_than_threshold" -> "4000000", | ||
"ignore_max_millis_less_than_threshold" -> "4000001" | ||
) | ||
) | ||
val executorGcHeuristic = new ExecutorGcHeuristic(heuristicConfigurationData) | ||
|
||
val executorSummaries = Seq( | ||
newFakeExecutorSummary( | ||
id = "1", | ||
totalGCTime = Duration("2min").toMillis, | ||
totalDuration = Duration("15min").toMillis | ||
), | ||
newFakeExecutorSummary( | ||
id = "2", | ||
totalGCTime = Duration("6min").toMillis, | ||
totalDuration = Duration("14min").toMillis | ||
), | ||
newFakeExecutorSummary( | ||
id = "3", | ||
totalGCTime = Duration("4min").toMillis, | ||
totalDuration = Duration("20min").toMillis | ||
), | ||
newFakeExecutorSummary( | ||
id = "4", | ||
totalGCTime = Duration("8min").toMillis, | ||
totalDuration = Duration("30min").toMillis | ||
) | ||
) | ||
|
||
describe(".apply") { | ||
val data1 = newFakeSparkApplicationData(executorSummaries) | ||
val heuristicResult = executorGcHeuristic.apply(data1) | ||
val heuristicResultDetails = heuristicResult.getHeuristicResultDetails | ||
|
||
it("returns the severity") { | ||
heuristicResult.getSeverity should be(Severity.CRITICAL) | ||
} | ||
|
||
it("returns the JVM GC time to Executor Run time duration") { | ||
val details = heuristicResultDetails.get(0) | ||
details.getName should include("GC time to Executor Run time ratio") | ||
details.getValue should include("0.2531") | ||
} | ||
|
||
it("returns the total GC time") { | ||
val details = heuristicResultDetails.get(1) | ||
details.getName should include("Total GC time") | ||
details.getValue should be("1200000") | ||
} | ||
|
||
it("returns the executor's run time") { | ||
val details = heuristicResultDetails.get(2) | ||
details.getName should include("Total Executor Runtime") | ||
details.getValue should be("4740000") | ||
} | ||
} | ||
} | ||
} | ||
|
||
object ExecutorGcHeuristicTest { | ||
import JavaConverters._ | ||
|
||
def newFakeHeuristicConfigurationData(params: Map[String, String] = Map.empty): HeuristicConfigurationData = | ||
new HeuristicConfigurationData("heuristic", "class", "view", new ApplicationType("type"), params.asJava) | ||
|
||
def newFakeExecutorSummary( | ||
id: String, | ||
totalGCTime: Long, | ||
totalDuration: Long | ||
): ExecutorSummaryImpl = new ExecutorSummaryImpl( | ||
id, | ||
hostPort = "", | ||
rddBlocks = 0, | ||
memoryUsed=0, | ||
diskUsed = 0, | ||
activeTasks = 0, | ||
failedTasks = 0, | ||
completedTasks = 0, | ||
totalTasks = 0, | ||
totalDuration, | ||
totalInputBytes=0, | ||
totalShuffleRead=0, | ||
totalShuffleWrite= 0, | ||
maxMemory= 0, | ||
totalGCTime, | ||
executorLogs = Map.empty | ||
) | ||
|
||
def newFakeSparkApplicationData( | ||
executorSummaries: Seq[ExecutorSummaryImpl] | ||
): SparkApplicationData = { | ||
val appId = "application_1" | ||
|
||
val restDerivedData = SparkRestDerivedData( | ||
new ApplicationInfoImpl(appId, name = "app", Seq.empty), | ||
jobDatas = Seq.empty, | ||
stageDatas = Seq.empty, | ||
executorSummaries = executorSummaries | ||
) | ||
SparkApplicationData(appId, restDerivedData, None) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters