-
Notifications
You must be signed in to change notification settings - Fork 855
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial changes for adding Spark stage analysis #438
base: customSHSWork
Are you sure you want to change the base?
Conversation
(stageData.stageId, stageData.tasks.map(tasks => tasks.values)) | ||
}.toMap | ||
|
||
val failedTasksStageMap = data.stagesWithFailedTasks.map { stageData => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can use flatMap here instead of map and then flatten.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, changed to flatMap.
stageData.shuffleWriteBytes, stageData.outputBytes).max | ||
val rawSpillSeverity = executionMemorySpillThresholds.severityOf( | ||
stageData.memoryBytesSpilled / maxData.toDouble) | ||
val tmp = DEFAULT_MAX_DATA_PROCESSED_THRESHOLD |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not used anywhere
taskSkewThresholds.severityOf(max / median) | ||
case _ => Severity.NONE | ||
} | ||
val median = medianTime.getOrElse(0.0D) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not used below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
// add more information about what might be causing skew if skew is being flagged | ||
// (reported severity is significant), or there is execution memory spill, since skew | ||
// can also cause execution memory spill. | ||
val median = Utils.getDuration(medianTime.map(_.toLong).getOrElse(0L)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can use above declared variables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The naming was confusing -- these are the string representation of median and max -- I've renamed to medianStr and maxStr.
*/ | ||
def getStageAnalysis(curNumPartitions: Int): Seq[StageAnalysis] = { | ||
data.stagesWithFailedTasks.map { stageData => | ||
(stageData.stageId, stageData.tasks.map(tasks => tasks.values)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this block has no usage below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed on Slack, leaving this in -- it will be called, and the results saved in SparkApplicationData, for the different heuristics (long task, task skew, execution memory spill, and configuration parameter recommendations). The code for configuration parameter recommendations isn't quite ready yet, so I will merge that separately.
* (value of spark.sql.shuffle.partitions). | ||
* @return list of analysis results of stages. | ||
*/ | ||
def getStageAnalysis(curNumPartitions: Int): Seq[StageAnalysis] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we calculate the cuNumPartition in the method itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it can parse out the value for "spark.sql.shuffle.partitions" -- I'll change.
val MAX_RECOMMENDED_PARTITIONS_KEY = "max_recommended_partitions" | ||
|
||
|
||
// Severity hresholds for task duration in minutes, when checking to see if the median task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit : hresholds => thresholds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
// The ascending severity thresholds for the ratio of JVM GC time and task run time, | ||
// checking if too much time is being spent in GC. | ||
val DEFAULT_GC_SEVERITY_A_THRESHOLDS = | ||
SeverityThresholds(low = 0.08D, moderate = 0.09D, severe = 0.1D, critical = 0.15D, ascending = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be more lenient for GC threshold , since as per the feedback , increasing memory for it cause other heuristics to get failed. Moreover UDF can also cause GC .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a while, the GC stats were being double counted (LIHADOOP-40386), and GC was also being flagged for very short runtimes (LIHADOOP-38532) -- let's wait on changing the threshold values, and check the results for more recent runs (if there are still a lot of conflicting GC warnings). We do still want to flag (and give recommendations if the issue due to UDFs (users can try to fix/optimize the UDFs if they are aware of the problem), although it's not something that can be fixed by changing configuration parameters. We can also recommend changing to ParallelGC or G1GC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let see behaviour after LIHADOOP-40386, LIHADOOP-38532 and see for conflicting GC warnings .
// The default threshold (3TB) for checking for maximum amount of data processed, for which to | ||
// alert for execution memory spill. Tasks processing more data would be expected to have some | ||
// amount of spill, due to the large amount of data processed. | ||
val DEFAULT_MAX_DATA_PROCESSED_THRESHOLD = "3TB" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the rational behind 3TB as threshold ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a lot of data being processed, it may not be possible (or at least desirable) to avoid execution memory spill. We don't want to recommend increasing executor memory too high (resource consumption), or too many partitions (shuffle overhead), or reduce cores too low (decreased parallelism). For 3TB number, using some reasonable numbers for configuration parameters, I'm estimating based on:
spark.executor.memory / spark.executor.cores * spark.memory.fraction * spark.memory.storageFraction * spark.sql.shuffle.partitions
5GB / 2 * 0.6 * 0.5 * 4000
Looking at this again though, it would make more sense to calculate the estimate based on some of the other constants and thresholds as well, so that this number would be adjusted automatically with the other values. @pralabhkumar , let me know if this sounds good. Another option is to add more comments about calculating a value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think for now just adding more comments about calculating a value should be good and monitories the behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some comments.
details += s"Stage $stageId has $inputBytes input, $shuffleReadBytes shuffle read, " + | ||
"$shuffleWriteBytes shuffle write. Please try to reduce the amount of data being processed." | ||
} else { | ||
details += s"Stage $stageId: please optimize the code to improve performance." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where are we planning to use this details . How are we planning to show this to user ? . Is it going to be used in Heuristics ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the current plan is that the details would be shown in Heuristics. This could be pretty messy if there are a lot of lines, however. Is there a better way to present the information?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes showing details on heuristics , would make UI pretty messy . I think we can add , detail button on each heuristics , clicking that can show this detail (again I am not sure other ways to represent details)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be a bit more hidden, but a button that would show the details would be cleaner.
else { | ||
if (stageData.inputBytes > 0) { | ||
// The stage is reading input data, try to increase the number of readers | ||
details += s"Stage $stageId: please set DaliSpark.SPLIT_SIZE to a smaller " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is Dali specific suggestion . What if user is not using Dali or for open source , should we give more general suggestion regarding split size ? .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it is Dali and LinkedIn specific, so not appropriate for open source. I can modify the suggestion to make it more general, but it would be good to have the more specific suggestion for our users. What would be the best way to customize? Perhaps there could be a heuristic configuration, with a map of issue type to recommendation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think , we can enhance the details and be specific to LinkedIn user and other outside users . Something like "please set DaliSpark.SPLIT_SIZE to a smaller : Linkedin User ,
please set mapreduce.input.fileinputformat.split.maxsize : Outside user" .
I think , it make sense to have map of issue type to recommendation
* from analyzing the stage for errors causing tasks to fail will be appended. | ||
* @return | ||
*/ | ||
private def checkForSpecificTaskError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whats the use of this check ? . If the task failed because of some reason other than OOM (which is handled for checkForTaskFailure) and the job passes (because of task retry) , why user should bother about the reason of task failed .?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's also used to identify container killed by YARN for exeeding memory limits.
@@ -211,7 +214,7 @@ class SparkRestClient(sparkConf: SparkConf) { | |||
} | |||
|
|||
private def getStageDatas(attemptTarget: WebTarget): Seq[StageDataImpl] = { | |||
val target = attemptTarget.path("stages") | |||
val target = attemptTarget.path("stages/withSummaries") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't find this api in spark documentation. Is it LinkedIn specific enhancements?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is LinkedIn specific enhancements. Should there be a flag/configuration parameter for adding "withSummaries"? It's needed for some of the heuristics (task skew and long tasks).
val SPARK_SQL_SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions" | ||
val SPARK_MEMORY_FRACTION = "spark.memory.fraction" | ||
|
||
// Spark default configuration values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these default parameters should picked from spark default configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If not explicitly specified by the user or spark-defaults.conf, then it may not be listed as part of the Spark environment REST API return value. spark.sql.shuffle.partitions isn't specified in spark-default.conf, and many users don't specify the value explicitly. Just double-checked, and it set to default Spark values.
details += s"Stage $stageId has $inputBytes input, $shuffleReadBytes shuffle read, " + | ||
"$shuffleWriteBytes shuffle write. Please try to reduce the amount of data being processed." | ||
} else { | ||
details += s"Stage $stageId: please optimize the code to improve performance." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, how would user come to know about which code to optimize. From Spark history UI it is very difficult to find out stage to code mapping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is really difficult unfortunately. This is mostly alerting them that there is some issue. They can view the DAG for the stage, which may give some idea about the operations, and help narrow down the code. This (mapping stage to code) is something that would be good for us to add better support for. Adding more information about datasets read/processed could help. Adding some support for getting the DAG from the REST API would make it possible to list the operations, and possibly add more information to Spark.
Some of the thresholds are meant to be set with heuristic configuration parameters. The stage analysis can be used for multiple heuristics (long task, task skew, execution memory spill, configuration parameter recommendations). Does it make sense to set these thresholds for each heuristic (and call StageAnalysis each time), or would it be better to consolidate? With the independent configuration parameters, users can decide which ones to use/include. However, keeping the values in sync across multiple heuristics seems awkward. Perhaps this could be multi-level, with a general Spark (or Pig) configuration parameter list, which would kick in if there isn't a heuristic-level setting. This could still be confusing if misconfigured though. |
fc7de94
to
ffe4d17
Compare
- Add StageAnalyzer for analyzing the stages for a Spark application for execution memory spill, long tasks, task skew, and failures. - Call REST API for getting failed tasks. - Modify call to stages REST API to get task and executor summaries.
b81ad27
to
135bb05
Compare
Dr-elephant Compare does not display information How to configure @edwinalu @chriseppstein |
long tasks, task skew, and failures.