From 8979292935de6c27600630c0ae6907d9fc3ab10e Mon Sep 17 00:00:00 2001 From: clarng Date: Mon, 27 Feb 2023 11:48:10 -0800 Subject: [PATCH] [core] additional telemetry for worker pool (#32686) Why are these changes needed? Add more telemetry to understand the effectiveness of the worker pool how often is it re-using a worker or creating new one why it is not re-using workers Signed-off-by: Clarence Ng --- src/ray/raylet/worker_pool.cc | 29 ++++++++++++++++++++++++++++- src/ray/stats/metric_defs.h | 20 ++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 8e3c20c686ba..aeadc4180cad 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -103,6 +103,10 @@ WorkerPool::WorkerPool(instrumented_io_context &io_service, // processes have started before a task runs on the node (as opposed to the // metric not existing at all). stats::NumWorkersStarted.Record(0); + stats::NumWorkersStartedFromCache.Record(0); + stats::NumCachedWorkersSkippedJobMismatch.Record(0); + stats::NumCachedWorkersSkippedDynamicOptionsMismatch.Record(0); + stats::NumCachedWorkersSkippedRuntimeEnvironmentMismatch.Record(0); #ifndef _WIN32 // Ignore SIGCHLD signals. If we don't do this, then worker processes will // become zombies instead of dying gracefully. @@ -1222,6 +1226,10 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec, dynamic_options = task_spec.DynamicWorkerOptions(); } + int64_t skip_cached_worker_job_mismatch = 0; + int64_t skip_cached_worker_dynamic_options_mismatch = 0; + int64_t skip_cached_worker_runtime_env_mismatch = 0; + const int runtime_env_hash = task_spec.GetRuntimeEnvHash(); for (auto it = idle_of_all_languages_.rbegin(); it != idle_of_all_languages_.rend(); it++) { @@ -1233,11 +1241,15 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec, // Don't allow worker reuse across jobs. Reuse worker with unassigned job_id is OK. if (!it->first->GetAssignedJobId().IsNil() && it->first->GetAssignedJobId() != task_spec.JobId()) { + skip_cached_worker_job_mismatch++; + stats::NumCachedWorkersSkippedJobMismatch.Record(1); continue; } // Skip if the dynamic_options doesn't match. if (LookupWorkerDynamicOptions(it->first->GetStartupToken()) != dynamic_options) { + skip_cached_worker_dynamic_options_mismatch++; + stats::NumCachedWorkersSkippedDynamicOptionsMismatch.Record(1); continue; } @@ -1245,8 +1257,13 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec, if (pending_exit_idle_workers_.count(it->first->WorkerId())) { continue; } + // Skip if the runtime env doesn't match. + // TODO(clarng): consider re-using worker that has runtime envionrment + // if the task doesn't require one. if (runtime_env_hash != it->first->GetRuntimeEnvHash()) { + skip_cached_worker_runtime_env_mismatch++; + stats::NumCachedWorkersSkippedRuntimeEnvironmentMismatch.Record(1); continue; } @@ -1263,9 +1280,15 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec, if (worker == nullptr) { // There are no more cached workers available to execute this task. // Start a new worker process. + RAY_LOG(DEBUG) << "No cached worker, cached workers skipped due to mismatch job " + << skip_cached_worker_job_mismatch + << " due to mismatch dynamic options " + << skip_cached_worker_dynamic_options_mismatch + << " due to mismatch runtime environment " + << skip_cached_worker_runtime_env_mismatch; if (task_spec.HasRuntimeEnv()) { // create runtime env. - RAY_LOG(DEBUG) << "Creating runtime env for task " << task_spec.TaskId(); + RAY_LOG(DEBUG) << "GetOrCreateRuntimeEnv for task " << task_spec.TaskId(); GetOrCreateRuntimeEnv( task_spec.SerializedRuntimeEnv(), task_spec.RuntimeEnvConfig(), @@ -1306,6 +1329,9 @@ void WorkerPool::PopWorker(const TaskSpecification &task_spec, if (worker) { RAY_CHECK(worker->GetAssignedJobId().IsNil() || worker->GetAssignedJobId() == task_spec.JobId()); + RAY_LOG(DEBUG) << "Re-using worker " << worker->WorkerId() << " for task " + << task_spec.DebugString(); + stats::NumWorkersStartedFromCache.Record(1); PopWorkerCallbackAsync(callback, worker); } } @@ -1349,6 +1375,7 @@ void WorkerPool::PrestartDefaultCpuWorkers(ray::Language language, int64_t num_n {{"CPU", 1}}, /*is_actor*/ false, /*is_gpu*/ false}; + RAY_LOG(DEBUG) << "PrestartDefaultCpuWorkers " << num_needed; for (int i = 0; i < num_needed; i++) { PopWorkerStatus status; StartWorkerProcess(language, diff --git a/src/ray/stats/metric_defs.h b/src/ray/stats/metric_defs.h index 051a21c37169..dd5350ba6068 100644 --- a/src/ray/stats/metric_defs.h +++ b/src/ray/stats/metric_defs.h @@ -213,6 +213,26 @@ static Sum NumWorkersStarted( "The total number of worker processes the worker pool has created.", "processes"); +static Sum NumCachedWorkersSkippedJobMismatch( + "internal_num_processes_skipped_job_mismatch", + "The total number of cached workers skipped due to job mismatch.", + "workers"); + +static Sum NumCachedWorkersSkippedRuntimeEnvironmentMismatch( + "internal_num_processes_skipped_runtime_enviornment_mismatch", + "The total number of cached workers skipped due to runtime environment mismatch.", + "workers"); + +static Sum NumCachedWorkersSkippedDynamicOptionsMismatch( + "internal_num_processes_skipped_job_mismatch", + "The total number of cached workers skipped due to dynamic options mismatch.", + "workers"); + +static Sum NumWorkersStartedFromCache( + "internal_num_processes_started_from_cache", + "The total number of workers started from a cached worker process.", + "workers"); + static Gauge NumSpilledTasks("internal_num_spilled_tasks", "The cumulative number of lease requeusts that this raylet " "has spilled to other raylets.",