-
Notifications
You must be signed in to change notification settings - Fork 856
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
A more general and accurate implementation of MetricsAggregator for Spark #144
base: master
Are you sure you want to change the base?
A more general and accurate implementation of MetricsAggregator for Spark #144
Conversation
…eue limit. Currently disabled. It will be enabled through pluggable heuristics later.
… Size' help pages
…job history server on hadoop2
Conflicts: app/com/linkedin/drelephant/ElephantRunner.java app/com/linkedin/drelephant/util/Utils.java app/controllers/Application.java test/com/linkedin/drelephant/util/UtilsTest.java
Aggregated Metrics Feature
…nables metrics by default. (linkedin#115) Following endpoints are exposed 1) /ping 2) /healthcheck 3) /metrics Following stats are available at present 1. Application status - /ping 2. Queue Size 3. Skipped Jobs Count 4. Retry Queue Size 5. Thread Deadlocks - /healthcheck 6. GC Count 7. GC Time 8. GC Count to Uptime ratio 9. Heap Memory Usage stats
Fixed it by using a static initializer which I think is a good choice because 1. We don't have to add a call to initializer code anywhere (either within or outside of InfoExtractor) 2. I was able to make the instance variable final 3. Automatic thread safety is provided at the language level
…linkedin#121) This commit brings Dr. Elephant's main producer/consumer run loop in line with standard Java best practices by submitting tasks to a work queue as Runnables making things much more robust. Previously, if a worker thread's main while loop exited for any reason that thread would live on but would stop processing data forever since the Executor would sit waiting for additional work to be submitted. Essentially, the problem was that things were being done in a standard way and there were 2 queues. We noticed that every once in a while a worker thread's while loop would exit leading to stalled progress over time after they had all done this. I tested this on CDH 5.3 with MapReduce 2.5.0 and Spark 1.5.2.
This Fetcher fetches MapReduce data from logs in HDFS directly and doesn't depend on the job history server, which is the bottleneck of the analyzing flow.
* adding more metrics for monitoring process jobs count and latency * Fix metrics null ptr API usage issues * Add jobs processed in last 24 hours metric
Includes resource usage, resource wastage and runtime metrics for Spark.
…e is to publish the metrics to other applications. The property is disabled by default and users who wish to make use of this specify their own agent jar.
…or Spark which is compatible with Spark on YARN as well as executor dynamic allocation mode.
…or Spark which is compatible with Spark on YARN as well as executor dynamic allocation mode.
@ericsahit, thanks for the PR. I'll take a look at it and add my comments. |
@akshayrai , OK. I have some login problem with gitter, Anyone please feel free to give me any suggestions. |
private static final Logger logger = LoggerFactory.getLogger(YarnSparkMetricsAggregator.class); | ||
|
||
private AggregatorConfigurationData _aggregatorConfigurationData; | ||
private double _storageMemWastageBuffer = 0.5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_storageMemwastageBuffer should be configurable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought this variable will be useless in Resource aggregation of Spark, because like MapReduce, we should figure out way of identifying the actual physical memory usage
of each Executor, then use (1-physicalMemoryUsage/ExecutorMemory)
as resource wasted. But in our company we add feature of checking the actual physical memory usage
embedded in our Spark branch(different from official spark version).
That's some suggestion about resource wasted
metric calculation, and i will change _storageMemwastageBuffer
for temporary.
resourceUsed += (executorDuration / Statistics.SECOND_IN_MS) * (perExecutorMem / FileUtils.ONE_MB); | ||
// maxMem is the maximum available storage memory | ||
// memUsed is how much storage memory is used. | ||
// any difference is wasted after a buffer of 50% is wasted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rewrite as "any difference after a buffer of 50% is wasted"
val eid = info.executorId | ||
taskEnd.reason match { | ||
case Resubmitted => | ||
// Note: For resubmitted tasks, we continue to use the metrics that belong to the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you are returning when the task was resubmitted, you are using the last attempt's metrics not first attempt's metrics, right ?
executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1 | ||
} | ||
|
||
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are addion the duration of task's duration to executor's duration. I have a couple of questions here.
- if there are multiple tasks running in parellel inside an executor wouldn't that count it same duration multiple times ?
- the reverse case where the executor is idle. you are not counting that. shouldn't you be ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Yes. Because in MapReduce, the basic resource allocation unit in resource manager(like YARN) will be one map or reduce task. But in Spark, the corresponding is Executor instead of computing task, because task is in form of thread and share memory and CPU resource of one Executor. We could see this difference like this picture Spark vs MapReduce.
- As described below, although one Executor is idle, it also take CPU and memory logical resource in resource manager(YARN). So i suppose we should calculate this metric according to the lifecycle of Executor instead of Task.
if (isYARNMode(envData.getSparkProperty(SPARK_MASTER, SPARK_MASTER_DEFAULT))) { | ||
val memoryOverheadFactor = envData.getSparkProperty(MEMORY_OVERHEAD_FACTOR, "0.20").toDouble | ||
val memoryOverhead: Int = envData.getSparkProperty(MEMORY_OVERHEAD, | ||
math.max((memoryOverheadFactor * executorMemory).toInt, 384).toString).toInt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the magic number 384 ? also, can you create constants for each of these
info.totalTasks = info.activeTasks + info.failedTasks + info.completedTasks | ||
info.duration = executorsListener.executorToDuration.getOrElse(info.execId, 0L) | ||
info.inputBytes = executorsListener.executorToInputBytes.getOrElse(info.execId, 0L) | ||
info.shuffleRead = executorsListener.executorToShuffleRead.getOrElse(info.execId, 0L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where are you using shuffleread, inputBytes etc. only startTime and finishTime are used, right ? Do you plan to use the rest later sometime
Hi @ericsahit @shankar37, is this PR still relevant or has it been superseded by one of the other Spark PRs? |
/**
} how do conf azkaban scheduler.xml @ @ericsahit |
A more general and accurate implementation of MetricsAggregator for Spark which is compatible with Spark on YARN as well as executor dynamic allocation mode.
What's new:
memoryOverhead
instead of onlyspark.executor.memory
.I test this feature for three real Spark applications on our YARN cluster and compare
Aggregate Memory Resource Allocation
(Resource used
in dr-elephant) metric of applications in YARN(showed when spark application finished in AM page), dr-original and and dr-optimized Below is result of one application for example:Consider YARN showed as the standard value,
Average deviation of this feature is 1.88%, as average deviation of dr-original is 280.86%.
This is our first PR, so there has also some TODO work about code style, code reasonable, and etc, so please help us review and feel free to give us advise.
Future work we plan:
bottleneck
resource in our company.Resource wasted
metric.Thanks!