Skip to content

Commit

Permalink
[core] additional telemetry for worker pool (ray-project#32686)
Browse files Browse the repository at this point in the history
Why are these changes needed?
Add more telemetry to understand the effectiveness of the worker pool

how often is it re-using a worker or creating new one
why it is not re-using workers

Signed-off-by: Clarence Ng <[email protected]>
  • Loading branch information
clarng authored Feb 27, 2023
1 parent dc8c368 commit 8979292
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
29 changes: 28 additions & 1 deletion src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ WorkerPool::WorkerPool(instrumented_io_context &io_service,
// processes have started before a task runs on the node (as opposed to the
// metric not existing at all).
stats::NumWorkersStarted.Record(0);
stats::NumWorkersStartedFromCache.Record(0);
stats::NumCachedWorkersSkippedJobMismatch.Record(0);
stats::NumCachedWorkersSkippedDynamicOptionsMismatch.Record(0);
stats::NumCachedWorkersSkippedRuntimeEnvironmentMismatch.Record(0);
#ifndef _WIN32
// Ignore SIGCHLD signals. If we don't do this, then worker processes will
// become zombies instead of dying gracefully.
Expand Down Expand Up @@ -1222,6 +1226,10 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec,
dynamic_options = task_spec.DynamicWorkerOptions();
}

int64_t skip_cached_worker_job_mismatch = 0;
int64_t skip_cached_worker_dynamic_options_mismatch = 0;
int64_t skip_cached_worker_runtime_env_mismatch = 0;

const int runtime_env_hash = task_spec.GetRuntimeEnvHash();
for (auto it = idle_of_all_languages_.rbegin(); it != idle_of_all_languages_.rend();
it++) {
Expand All @@ -1233,20 +1241,29 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec,
// Don't allow worker reuse across jobs. Reuse worker with unassigned job_id is OK.
if (!it->first->GetAssignedJobId().IsNil() &&
it->first->GetAssignedJobId() != task_spec.JobId()) {
skip_cached_worker_job_mismatch++;
stats::NumCachedWorkersSkippedJobMismatch.Record(1);
continue;
}

// Skip if the dynamic_options doesn't match.
if (LookupWorkerDynamicOptions(it->first->GetStartupToken()) != dynamic_options) {
skip_cached_worker_dynamic_options_mismatch++;
stats::NumCachedWorkersSkippedDynamicOptionsMismatch.Record(1);
continue;
}

// These workers are exiting. So skip them.
if (pending_exit_idle_workers_.count(it->first->WorkerId())) {
continue;
}

// Skip if the runtime env doesn't match.
// TODO(clarng): consider re-using worker that has runtime envionrment
// if the task doesn't require one.
if (runtime_env_hash != it->first->GetRuntimeEnvHash()) {
skip_cached_worker_runtime_env_mismatch++;
stats::NumCachedWorkersSkippedRuntimeEnvironmentMismatch.Record(1);
continue;
}

Expand All @@ -1263,9 +1280,15 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec,
if (worker == nullptr) {
// There are no more cached workers available to execute this task.
// Start a new worker process.
RAY_LOG(DEBUG) << "No cached worker, cached workers skipped due to mismatch job "
<< skip_cached_worker_job_mismatch
<< " due to mismatch dynamic options "
<< skip_cached_worker_dynamic_options_mismatch
<< " due to mismatch runtime environment "
<< skip_cached_worker_runtime_env_mismatch;
if (task_spec.HasRuntimeEnv()) {
// create runtime env.
RAY_LOG(DEBUG) << "Creating runtime env for task " << task_spec.TaskId();
RAY_LOG(DEBUG) << "GetOrCreateRuntimeEnv for task " << task_spec.TaskId();
GetOrCreateRuntimeEnv(
task_spec.SerializedRuntimeEnv(),
task_spec.RuntimeEnvConfig(),
Expand Down Expand Up @@ -1306,6 +1329,9 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec,
if (worker) {
RAY_CHECK(worker->GetAssignedJobId().IsNil() ||
worker->GetAssignedJobId() == task_spec.JobId());
RAY_LOG(DEBUG) << "Re-using worker " << worker->WorkerId() << " for task "
<< task_spec.DebugString();
stats::NumWorkersStartedFromCache.Record(1);
PopWorkerCallbackAsync(callback, worker);
}
}
Expand Down Expand Up @@ -1349,6 +1375,7 @@ void WorkerPool::PrestartDefaultCpuWorkers(ray::Language language, int64_t num_n
{{"CPU", 1}},
/*is_actor*/ false,
/*is_gpu*/ false};
RAY_LOG(DEBUG) << "PrestartDefaultCpuWorkers " << num_needed;
for (int i = 0; i < num_needed; i++) {
PopWorkerStatus status;
StartWorkerProcess(language,
Expand Down
20 changes: 20 additions & 0 deletions src/ray/stats/metric_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,26 @@ static Sum NumWorkersStarted(
"The total number of worker processes the worker pool has created.",
"processes");

static Sum NumCachedWorkersSkippedJobMismatch(
"internal_num_processes_skipped_job_mismatch",
"The total number of cached workers skipped due to job mismatch.",
"workers");

static Sum NumCachedWorkersSkippedRuntimeEnvironmentMismatch(
"internal_num_processes_skipped_runtime_enviornment_mismatch",
"The total number of cached workers skipped due to runtime environment mismatch.",
"workers");

static Sum NumCachedWorkersSkippedDynamicOptionsMismatch(
"internal_num_processes_skipped_job_mismatch",
"The total number of cached workers skipped due to dynamic options mismatch.",
"workers");

static Sum NumWorkersStartedFromCache(
"internal_num_processes_started_from_cache",
"The total number of workers started from a cached worker process.",
"workers");

static Gauge NumSpilledTasks("internal_num_spilled_tasks",
"The cumulative number of lease requeusts that this raylet "
"has spilled to other raylets.",
Expand Down

0 comments on commit 8979292

Please sign in to comment.