Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A more general and accurate implementation of MetricsAggregator for Spark #144

Open
wants to merge 231 commits into
base: master
Choose a base branch
from

Conversation

ericsahit
Copy link

A more general and accurate implementation of MetricsAggregator for Spark which is compatible with Spark on YARN as well as executor dynamic allocation mode.

What's new:

  1. Memory: Use actual executor memory request considering memoryOverhead instead of only spark.executor.memory.
  2. Time: Use running time of all existed executor, not duration(sum of task executing time) of executors only including of those which are alive while ending of application.

I test this feature for three real Spark applications on our YARN cluster and compare Aggregate Memory Resource Allocation(Resource used in dr-elephant) metric of applications in YARN(showed when spark application finished in AM page), dr-original and and dr-optimized Below is result of one application for example:

  • YARN: 410620516 MB-seconds
  • dr-original: 1394513510.4 MB-seconds
  • dr-optimized: 405405696 MB-seconds

Consider YARN showed as the standard value,

  • deviation of dr-original:(410620516-1394513510.4)/410620516=239.61%
  • deviation of dr-optimized:(410620516-405405696)/410620516=1.27%

Average deviation of this feature is 1.88%, as average deviation of dr-original is 280.86%.

This is our first PR, so there has also some TODO work about code style, code reasonable, and etc, so please help us review and feel free to give us advise.

Future work we plan:

  • Add cpu-time resource usage for CPU are also can be scheduled and CPU resource even is bottleneck resource in our company.
  • Catch the real physical memory resource usage and optimize the Resource wasted metric.

Thanks!

Akshay Rai and others added 30 commits September 9, 2014 01:57
…eue limit.

Currently disabled. It will be enabled through pluggable heuristics later.
YannByron and others added 24 commits July 7, 2016 14:46
Conflicts:
	app/com/linkedin/drelephant/ElephantRunner.java
	app/com/linkedin/drelephant/util/Utils.java
	app/controllers/Application.java
	test/com/linkedin/drelephant/util/UtilsTest.java
…nables metrics by default. (linkedin#115)

Following endpoints are exposed
	1) /ping
	2) /healthcheck
	3) /metrics

Following stats are available at present
  1. Application status - /ping
  2. Queue Size
  3. Skipped Jobs Count
  4. Retry Queue Size
  5. Thread Deadlocks - /healthcheck
  6. GC Count
  7. GC Time
  8. GC Count to Uptime ratio
  9. Heap Memory Usage stats
Fixed it by using a static initializer which I think is a good choice because

1. We don't have to add a call to initializer code anywhere (either within or outside of InfoExtractor)
2. I was able to make the instance variable final
3. Automatic thread safety is provided at the language level
…linkedin#121)

This commit brings Dr. Elephant's main producer/consumer run loop in line with standard Java best practices by submitting tasks to a work queue as Runnables making things much more robust.

Previously, if a worker thread's main while loop exited for any reason that thread would live on but would stop processing data forever since the Executor would sit waiting for additional work to be submitted. Essentially, the problem was that things were being done in a standard way and there were 2 queues. We noticed that every once in a while a worker thread's while loop would exit leading to stalled progress over time after they had all done this.

I tested this on CDH 5.3 with MapReduce 2.5.0 and Spark 1.5.2.
This Fetcher fetches MapReduce data from logs in HDFS directly and doesn't depend on the job history server, which is the bottleneck of the analyzing flow.
* adding more metrics for monitoring process jobs count and latency

* Fix metrics null ptr API usage issues

* Add jobs processed in last 24 hours metric
Includes resource usage, resource wastage and runtime metrics for Spark.
…e is to publish the metrics

to other applications. The property is disabled by default and users who wish to make use of this
specify their own agent jar.
…or Spark which is compatible with Spark on YARN as well as executor dynamic allocation mode.
…or Spark which is compatible with Spark on YARN as well as executor dynamic allocation mode.
@akshayrai
Copy link
Contributor

@ericsahit, thanks for the PR. I'll take a look at it and add my comments.

@ericsahit
Copy link
Author

@akshayrai , OK. I have some login problem with gitter, Anyone please feel free to give me any suggestions.

private static final Logger logger = LoggerFactory.getLogger(YarnSparkMetricsAggregator.class);

private AggregatorConfigurationData _aggregatorConfigurationData;
private double _storageMemWastageBuffer = 0.5;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_storageMemwastageBuffer should be configurable

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this variable will be useless in Resource aggregation of Spark, because like MapReduce, we should figure out way of identifying the actual physical memory usage of each Executor, then use (1-physicalMemoryUsage/ExecutorMemory) as resource wasted. But in our company we add feature of checking the actual physical memory usage embedded in our Spark branch(different from official spark version).
That's some suggestion about resource wasted metric calculation, and i will change _storageMemwastageBuffer for temporary.

resourceUsed += (executorDuration / Statistics.SECOND_IN_MS) * (perExecutorMem / FileUtils.ONE_MB);
// maxMem is the maximum available storage memory
// memUsed is how much storage memory is used.
// any difference is wasted after a buffer of 50% is wasted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rewrite as "any difference after a buffer of 50% is wasted"

val eid = info.executorId
taskEnd.reason match {
case Resubmitted =>
// Note: For resubmitted tasks, we continue to use the metrics that belong to the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you are returning when the task was resubmitted, you are using the last attempt's metrics not first attempt's metrics, right ?

executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
}

executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are addion the duration of task's duration to executor's duration. I have a couple of questions here.

  1. if there are multiple tasks running in parellel inside an executor wouldn't that count it same duration multiple times ?
  2. the reverse case where the executor is idle. you are not counting that. shouldn't you be ?

Copy link
Author

@ericsahit ericsahit Oct 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Yes. Because in MapReduce, the basic resource allocation unit in resource manager(like YARN) will be one map or reduce task. But in Spark, the corresponding is Executor instead of computing task, because task is in form of thread and share memory and CPU resource of one Executor. We could see this difference like this picture Spark vs MapReduce.
  2. As described below, although one Executor is idle, it also take CPU and memory logical resource in resource manager(YARN). So i suppose we should calculate this metric according to the lifecycle of Executor instead of Task.

if (isYARNMode(envData.getSparkProperty(SPARK_MASTER, SPARK_MASTER_DEFAULT))) {
val memoryOverheadFactor = envData.getSparkProperty(MEMORY_OVERHEAD_FACTOR, "0.20").toDouble
val memoryOverhead: Int = envData.getSparkProperty(MEMORY_OVERHEAD,
math.max((memoryOverheadFactor * executorMemory).toInt, 384).toString).toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the magic number 384 ? also, can you create constants for each of these

info.totalTasks = info.activeTasks + info.failedTasks + info.completedTasks
info.duration = executorsListener.executorToDuration.getOrElse(info.execId, 0L)
info.inputBytes = executorsListener.executorToInputBytes.getOrElse(info.execId, 0L)
info.shuffleRead = executorsListener.executorToShuffleRead.getOrElse(info.execId, 0L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where are you using shuffleread, inputBytes etc. only startTime and finishTime are used, right ? Do you plan to use the rest later sometime

@cwsteinbach
Copy link

Hi @ericsahit @shankar37, is this PR still relevant or has it been superseded by one of the other Spark PRs?

@fusonghe
Copy link

/**
Controls the Compare Feature
*/
public static Result compare() {
DynamicForm form = Form.form().bindFromRequest(request());
String partialFlowExecId1 = form.get(COMPARE_FLOW_ID1);
partialFlowExecId1 = (partialFlowExecId1 != null) ? partialFlowExecId1.trim() : null;
String partialFlowExecId2 = form.get(COMPARE_FLOW_ID2);
partialFlowExecId2 = (partialFlowExecId2 != null) ? partialFlowExecId2.trim() : null;

List<AppResult> results1 = null;
List<AppResult> results2 = null;
if (partialFlowExecId1 != null && !partialFlowExecId1.isEmpty() && partialFlowExecId2 != null && !partialFlowExecId2.isEmpty()) {
  IdUrlPair flowExecIdPair1 = bestSchedulerInfoMatchGivenPartialId(partialFlowExecId1, AppResult.TABLE.FLOW_EXEC_ID);
  IdUrlPair flowExecIdPair2 = bestSchedulerInfoMatchGivenPartialId(partialFlowExecId2, AppResult.TABLE.FLOW_EXEC_ID);
  results1 = AppResult.find
      .select(AppResult.getSearchFields() + "," + AppResult.TABLE.JOB_DEF_ID + "," + AppResult.TABLE.JOB_DEF_URL
          + "," + AppResult.TABLE.FLOW_EXEC_ID + "," + AppResult.TABLE.FLOW_EXEC_URL)
      .where().eq(AppResult.TABLE.FLOW_EXEC_ID, flowExecIdPair1.getId()).setMaxRows(100)
      .fetch(AppResult.TABLE.APP_HEURISTIC_RESULTS, AppHeuristicResult.getSearchFields())
      .findList();
  results2 = AppResult.find
      .select(
          AppResult.getSearchFields() + "," + AppResult.TABLE.JOB_DEF_ID + "," + AppResult.TABLE.JOB_DEF_URL + ","
              + AppResult.TABLE.FLOW_EXEC_ID + "," + AppResult.TABLE.FLOW_EXEC_URL)
      .where().eq(AppResult.TABLE.FLOW_EXEC_ID, flowExecIdPair2.getId()).setMaxRows(100)
      .fetch(AppResult.TABLE.APP_HEURISTIC_RESULTS, AppHeuristicResult.getSearchFields())
      .findList();
}
return ok(comparePage.render(compareResults.render(compareFlows(results1, results2))));

}

how do conf azkaban scheduler.xml @ @ericsahit

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.