Skip to content

Commit

Permalink
[Internal Observability] [Part 2] Share the same code for RecordMetri…
Browse files Browse the repository at this point in the history
…cs & DebugString for cluster task manager. (ray-project#20958)

Share the same code for RecordMetrics & DebugString for cluster task manager.

Both requires almost identical (and also expensive) operation. This PR makes them share the same `UpdateState` code which stores stats in the struct. 

Note that we don't update state when metrics are recorded because the debug string is anyway consistently called and states are updated.

Ideally, we should dynamically update the stats.
  • Loading branch information
rkooo567 authored Dec 9, 2021
1 parent 05a302b commit f4d4639
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 46 deletions.
93 changes: 54 additions & 39 deletions src/ray/raylet/scheduling/cluster_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,7 @@ ClusterTaskManager::ClusterTaskManager(
get_time_ms_(get_time_ms),
sched_cls_cap_enabled_(RayConfig::instance().worker_cap_enabled()),
sched_cls_cap_interval_ms_(sched_cls_cap_interval_ms),
sched_cls_cap_max_ms_(RayConfig::instance().worker_cap_max_backoff_delay_ms()),
metric_tasks_queued_(0),
metric_tasks_dispatched_(0),
metric_tasks_spilled_(0) {}
sched_cls_cap_max_ms_(RayConfig::instance().worker_cap_max_backoff_delay_ms()) {}

bool ClusterTaskManager::SchedulePendingTasks() {
// Always try to schedule infeasible tasks in case they are now feasible.
Expand Down Expand Up @@ -941,7 +938,7 @@ bool ClusterTaskManager::AnyPendingTasksForResourceAcquisition(
return *any_pending;
}

std::string ClusterTaskManager::DebugStr() const {
void ClusterTaskManager::RecomputeDebugStats() const {
auto accumulator =
[](size_t state,
const std::pair<int, std::deque<std::shared_ptr<internal::Work>>> &pair) {
Expand All @@ -953,7 +950,7 @@ std::string ClusterTaskManager::DebugStr() const {
size_t num_worker_not_started_by_job_config_not_exist = 0;
size_t num_worker_not_started_by_registration_timeout = 0;
size_t num_worker_not_started_by_process_rate_limit = 0;
size_t num_worker_waiting_for_workers = 0;
size_t num_tasks_waiting_for_workers = 0;
size_t num_cancelled_tasks = 0;

size_t num_infeasible_tasks = std::accumulate(
Expand All @@ -967,7 +964,7 @@ std::string ClusterTaskManager::DebugStr() const {
&num_worker_not_started_by_job_config_not_exist,
&num_worker_not_started_by_registration_timeout,
&num_worker_not_started_by_process_rate_limit,
&num_worker_waiting_for_workers, &num_cancelled_tasks](
&num_tasks_waiting_for_workers, &num_cancelled_tasks](
size_t state,
const std::pair<
int, std::deque<std::shared_ptr<internal::Work>>>
Expand All @@ -976,7 +973,7 @@ std::string ClusterTaskManager::DebugStr() const {
for (auto work_it = work_queue.begin(); work_it != work_queue.end();) {
const auto &work = *work_it++;
if (work->GetState() == internal::WorkStatus::WAITING_FOR_WORKER) {
num_worker_waiting_for_workers += 1;
num_tasks_waiting_for_workers += 1;
} else if (work->GetState() == internal::WorkStatus::CANCELLED) {
num_cancelled_tasks += 1;
} else if (work->GetUnscheduledCause() ==
Expand Down Expand Up @@ -1008,28 +1005,64 @@ std::string ClusterTaskManager::DebugStr() const {
std::accumulate(tasks_to_dispatch_.begin(), tasks_to_dispatch_.end(), (size_t)0,
per_work_accumulator);

if (num_tasks_to_schedule + num_tasks_to_dispatch + num_infeasible_tasks > 1000) {
/// Update the internal states.
internal_stats_.num_waiting_for_resource = num_waiting_for_resource;
internal_stats_.num_waiting_for_plasma_memory = num_waiting_for_plasma_memory;
internal_stats_.num_waiting_for_remote_node_resources =
num_waiting_for_remote_node_resources;
internal_stats_.num_worker_not_started_by_job_config_not_exist =
num_worker_not_started_by_job_config_not_exist;
internal_stats_.num_worker_not_started_by_registration_timeout =
num_worker_not_started_by_registration_timeout;
internal_stats_.num_worker_not_started_by_process_rate_limit =
num_worker_not_started_by_process_rate_limit;
internal_stats_.num_tasks_waiting_for_workers = num_tasks_waiting_for_workers;
internal_stats_.num_cancelled_tasks = num_cancelled_tasks;
internal_stats_.num_infeasible_tasks = num_infeasible_tasks;
internal_stats_.num_tasks_to_schedule = num_tasks_to_schedule;
internal_stats_.num_tasks_to_dispatch = num_tasks_to_dispatch;
}

void ClusterTaskManager::RecordMetrics() const {
/// This method intentionally doesn't call RecomputeDebugStats() because
/// that function is expensive. RecomputeDebugStats is called by DebugStr method
/// and they are always periodically called by node manager.
stats::NumReceivedTasks.Record(internal_stats_.num_tasks_to_schedule);
stats::NumDispatchedTasks.Record(internal_stats_.num_tasks_to_dispatch);
stats::NumSpilledTasks.Record(internal_stats_.metric_tasks_spilled);
stats::NumInfeasibleSchedulingClasses.Record(infeasible_tasks_.size());
stats::NumInfeasibleTasks.Record(internal_stats_.num_infeasible_tasks);
}

std::string ClusterTaskManager::DebugStr() const {
RecomputeDebugStats();
if (internal_stats_.num_tasks_to_schedule + internal_stats_.num_tasks_to_dispatch +
internal_stats_.num_infeasible_tasks >
1000) {
RAY_LOG(WARNING)
<< "More than 1000 tasks are queued in this node. This can cause slow down.";
}

std::stringstream buffer;
buffer << "========== Node: " << self_node_id_ << " =================\n";
buffer << "Infeasible queue length: " << num_infeasible_tasks << "\n";
buffer << "Schedule queue length: " << num_tasks_to_schedule << "\n";
buffer << "Dispatch queue length: " << num_tasks_to_dispatch << "\n";
buffer << "num_waiting_for_resource: " << num_waiting_for_resource << "\n";
buffer << "num_waiting_for_plasma_memory: " << num_waiting_for_plasma_memory << "\n";
buffer << "Infeasible queue length: " << internal_stats_.num_infeasible_tasks << "\n";
buffer << "Schedule queue length: " << internal_stats_.num_tasks_to_schedule << "\n";
buffer << "Dispatch queue length: " << internal_stats_.num_tasks_to_dispatch << "\n";
buffer << "num_waiting_for_resource: " << internal_stats_.num_waiting_for_resource
<< "\n";
buffer << "num_waiting_for_plasma_memory: "
<< internal_stats_.num_waiting_for_plasma_memory << "\n";
buffer << "num_waiting_for_remote_node_resources: "
<< num_waiting_for_remote_node_resources << "\n";
<< internal_stats_.num_waiting_for_remote_node_resources << "\n";
buffer << "num_worker_not_started_by_job_config_not_exist: "
<< num_worker_not_started_by_job_config_not_exist << "\n";
<< internal_stats_.num_worker_not_started_by_job_config_not_exist << "\n";
buffer << "num_worker_not_started_by_registration_timeout: "
<< num_worker_not_started_by_registration_timeout << "\n";
<< internal_stats_.num_worker_not_started_by_registration_timeout << "\n";
buffer << "num_worker_not_started_by_process_rate_limit: "
<< num_worker_not_started_by_process_rate_limit << "\n";
buffer << "num_worker_waiting_for_workers: " << num_worker_waiting_for_workers << "\n";
buffer << "num_cancelled_tasks: " << num_cancelled_tasks << "\n";
<< internal_stats_.num_worker_not_started_by_process_rate_limit << "\n";
buffer << "num_tasks_waiting_for_workers: "
<< internal_stats_.num_tasks_waiting_for_workers << "\n";
buffer << "num_cancelled_tasks: " << internal_stats_.num_cancelled_tasks << "\n";
buffer << "Waiting tasks size: " << waiting_tasks_index_.size() << "\n";
buffer << "Number of executing tasks: " << executing_task_args_.size() << "\n";
buffer << "Number of pinned task arguments: " << pinned_task_arguments_.size() << "\n";
Expand Down Expand Up @@ -1085,23 +1118,6 @@ std::string ClusterTaskManager::DebugStr() const {
return buffer.str();
}

void ClusterTaskManager::RecordMetrics() {
stats::NumReceivedTasks.Record(metric_tasks_queued_);
stats::NumDispatchedTasks.Record(metric_tasks_dispatched_);
stats::NumSpilledTasks.Record(metric_tasks_spilled_);
stats::NumInfeasibleSchedulingClasses.Record(infeasible_tasks_.size());

metric_tasks_queued_ = 0;
metric_tasks_dispatched_ = 0;
metric_tasks_spilled_ = 0;

uint64_t num_infeasible_tasks = 0;
for (const auto &pair : infeasible_tasks_) {
num_infeasible_tasks += pair.second.size();
}
stats::NumInfeasibleTasks.Record(num_infeasible_tasks);
}

void ClusterTaskManager::TryLocalInfeasibleTaskScheduling() {
for (auto shapes_it = infeasible_tasks_.begin();
shapes_it != infeasible_tasks_.end();) {
Expand Down Expand Up @@ -1142,7 +1158,6 @@ void ClusterTaskManager::Dispatch(
const std::shared_ptr<TaskResourceInstances> &allocated_instances,
const RayTask &task, rpc::RequestWorkerLeaseReply *reply,
std::function<void(void)> send_reply_callback) {
metric_tasks_dispatched_++;
const auto &task_spec = task.GetTaskSpecification();

worker->SetBundleId(task_spec.PlacementGroupBundleId());
Expand Down Expand Up @@ -1225,7 +1240,7 @@ void ClusterTaskManager::Spillback(const NodeID &spillback_to,
return;
}

metric_tasks_spilled_++;
internal_stats_.metric_tasks_spilled++;
const auto &task = work->task;
const auto &task_spec = task.GetTaskSpecification();
RAY_LOG(DEBUG) << "Spilling task " << task_spec.TaskId() << " to node " << spillback_to;
Expand Down
45 changes: 40 additions & 5 deletions src/ray/raylet/scheduling/cluster_task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
void ScheduleAndDispatchTasks() override;

/// Record the internal metrics.
void RecordMetrics() override;
void RecordMetrics() const override;

/// The helper to dump the debug state of the cluster task manater.
std::string DebugStr() const override;
Expand Down Expand Up @@ -329,6 +329,12 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
bool requires_object_store_memory,
bool force_spillback, bool *is_infeasible);

/// Recompute the debug stats.
/// It is needed because updating the debug state is expensive for cluster_task_manager.
/// TODO(sang): Update the internal states value dynamically instead of iterating the
/// data structure.
void RecomputeDebugStats() const;

const NodeID &self_node_id_;
/// Responsible for resource tracking/view of the cluster.
std::shared_ptr<ClusterResourceScheduler> cluster_resource_scheduler_;
Expand Down Expand Up @@ -453,10 +459,39 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {

const int64_t sched_cls_cap_max_ms_;

/// Metrics collected since the last report.
uint64_t metric_tasks_queued_;
uint64_t metric_tasks_dispatched_;
uint64_t metric_tasks_spilled_;
struct InternalStats {
/// Number of tasks that are spilled to other
/// nodes because it cannot be scheduled locally.
int64_t metric_tasks_spilled = 0;
/// Number of tasks that are waiting for
/// resources to be available locally.
int64_t num_waiting_for_resource = 0;
/// Number of tasks that are waiting for available memory
/// from the plasma store.
int64_t num_waiting_for_plasma_memory = 0;
/// Number of tasks that are waiting for nodes with available resources.
int64_t num_waiting_for_remote_node_resources = 0;
/// Number of workers that couldn't be started because the job config wasn't local.
int64_t num_worker_not_started_by_job_config_not_exist = 0;
/// Number of workers that couldn't be started because the worker registration timed
/// out.
int64_t num_worker_not_started_by_registration_timeout = 0;
/// Number of workers that couldn't be started becasue it hits the worker startup rate
/// limit.
int64_t num_worker_not_started_by_process_rate_limit = 0;
/// Number of tasks that are waiting for worker processes to start.
int64_t num_tasks_waiting_for_workers = 0;
/// Number of cancelled tasks.
int64_t num_cancelled_tasks = 0;
/// Number of infeasible tasks.
int64_t num_infeasible_tasks = 0;
/// Number of tasks to schedule.
int64_t num_tasks_to_schedule = 0;
/// Number of tasks to dispatch.
int64_t num_tasks_to_dispatch = 0;
};

mutable InternalStats internal_stats_;

/// Determine whether a task should be immediately dispatched,
/// or placed on a wait queue.
Expand Down
4 changes: 2 additions & 2 deletions src/ray/raylet/scheduling/cluster_task_manager_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ class ClusterTaskManagerInterface {
/// The helper to dump the debug state of the cluster task manater.
virtual std::string DebugStr() const = 0;

/// Report high frequency scheduling metrics.
virtual void RecordMetrics() = 0;
/// Record the internal metrics.
virtual void RecordMetrics() const = 0;

/// Check if there are enough available resources for the given input.
virtual bool IsLocallySchedulable(const RayTask &task) const = 0;
Expand Down

0 comments on commit f4d4639

Please sign in to comment.