Skip to content

Commit

Permalink
[Internal Observability] [Part 3] Support debug state metrics on all …
Browse files Browse the repository at this point in the history
…components. (ray-project#20957)

This PR adds RecordMetrics and DebugString to all raylet components. 

Some of methods are probably empty now. They are going to be supported in the next PR
  • Loading branch information
rkooo567 authored Dec 9, 2021
1 parent d0e79a3 commit 05a302b
Show file tree
Hide file tree
Showing 16 changed files with 71 additions and 28 deletions.
9 changes: 7 additions & 2 deletions python/ray/tests/test_metrics_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ def _setup_cluster_for_test(ray_start_cluster):
NUM_NODES = 2
cluster = ray_start_cluster
# Add a head node.
cluster.add_node(_system_config={"metrics_report_interval_ms": 1000})
cluster.add_node(
_system_config={
"metrics_report_interval_ms": 1000,
"event_stats_print_interval_ms": 500,
"event_stats": True
})
# Add worker nodes.
[cluster.add_node() for _ in range(NUM_NODES - 1)]
cluster.wait_for_nodes()
Expand Down Expand Up @@ -143,7 +148,7 @@ async def ping(self):
@pytest.mark.skipif(
prometheus_client is None, reason="Prometheus not installed")
def test_metrics_export_end_to_end(_setup_cluster_for_test):
TEST_TIMEOUT_S = 20
TEST_TIMEOUT_S = 30

prom_addresses, autoscaler_export_addr = _setup_cluster_for_test

Expand Down
4 changes: 3 additions & 1 deletion src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,9 @@ std::string ObjectManager::DebugString() const {
return result.str();
}

void ObjectManager::RecordMetrics() const {
void ObjectManager::RecordMetrics() {
pull_manager_->RecordMetrics();
push_manager_->RecordMetrics();
stats::ObjectStoreAvailableMemory().Record(config_.object_store_memory - used_memory_);
stats::ObjectStoreUsedMemory().Record(used_memory_);
stats::ObjectStoreFallbackMemory().Record(
Expand Down
4 changes: 2 additions & 2 deletions src/ray/object_manager/object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ class ObjectManager : public ObjectManagerInterface,
/// \return string.
std::string DebugString() const;

/// Record metrics.
void RecordMetrics() const;
/// Record the internal stats.
void RecordMetrics();

/// Populate object store stats.
///
Expand Down
2 changes: 2 additions & 0 deletions src/ray/object_manager/plasma/object_lifecycle_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ int64_t ObjectLifecycleManager::GetNumObjectsUnsealed() const {
return stats_collector_.GetNumObjectsUnsealed();
}

void ObjectLifecycleManager::RecordMetrics() const { stats_collector_.RecordMetrics(); }

void ObjectLifecycleManager::GetDebugDump(std::stringstream &buffer) const {
return stats_collector_.GetDebugDump(buffer);
}
Expand Down
2 changes: 2 additions & 0 deletions src/ray/object_manager/plasma/object_lifecycle_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ class ObjectLifecycleManager : public IObjectLifecycleManager {

int64_t GetNumObjectsUnsealed() const;

void RecordMetrics() const;

void GetDebugDump(std::stringstream &buffer) const;

private:
Expand Down
4 changes: 4 additions & 0 deletions src/ray/object_manager/plasma/stats_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ void ObjectStatsCollector::OnObjectRefDecreased(const LocalObject &obj) {
}
}

void ObjectStatsCollector::RecordMetrics() const {
// TODO(sang): Add metrics.
}

void ObjectStatsCollector::GetDebugDump(std::stringstream &buffer) const {
buffer << "- objects spillable: " << num_objects_spillable_ << "\n";
buffer << "- bytes spillable: " << num_bytes_spillable_ << "\n";
Expand Down
4 changes: 4 additions & 0 deletions src/ray/object_manager/plasma/stats_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ class ObjectStatsCollector {
// Called after an object's ref count is decreased by 1.
void OnObjectRefDecreased(const LocalObject &object);

/// Record the internal metrics.
void RecordMetrics() const;

/// Debug dump the stats.
void GetDebugDump(std::stringstream &buffer) const;

int64_t GetNumBytesInUse() const;
Expand Down
12 changes: 9 additions & 3 deletions src/ray/object_manager/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ PlasmaStore::PlasmaStore(instrumented_io_context &main_service, IAllocator &allo
const auto event_stats_print_interval_ms =
RayConfig::instance().event_stats_print_interval_ms();
if (event_stats_print_interval_ms > 0 && RayConfig::instance().event_stats()) {
PrintDebugDump();
PrintAndRecordDebugDump();
}
}

Expand Down Expand Up @@ -528,14 +528,20 @@ bool PlasmaStore::IsObjectSpillable(const ObjectID &object_id) {
return entry->Sealed() && entry->GetRefCount() == 1;
}

void PlasmaStore::PrintDebugDump() const {
void PlasmaStore::PrintAndRecordDebugDump() const {
absl::MutexLock lock(&mutex_);
RecordMetrics();
RAY_LOG(INFO) << GetDebugDump();
stats_timer_ = execute_after(
io_context_, [this]() { PrintDebugDump(); },
io_context_, [this]() { PrintAndRecordDebugDump(); },
RayConfig::instance().event_stats_print_interval_ms());
}

void PlasmaStore::RecordMetrics() const {
// TODO(sang): Add metrics.
object_lifecycle_mgr_.RecordMetrics();
}

std::string PlasmaStore::GetDebugDump() const {
std::stringstream buffer;
buffer << "========== Plasma store: =================\n";
Expand Down
4 changes: 3 additions & 1 deletion src/ray/object_manager/plasma/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,9 @@ class PlasmaStore {
// Start listening for clients.
void DoAccept();

void PrintDebugDump() const LOCKS_EXCLUDED(mutex_);
void RecordMetrics() const EXCLUSIVE_LOCKS_REQUIRED(mutex_);

void PrintAndRecordDebugDump() const LOCKS_EXCLUDED(mutex_);

std::string GetDebugDump() const EXCLUSIVE_LOCKS_REQUIRED(mutex_);

Expand Down
4 changes: 4 additions & 0 deletions src/ray/object_manager/pull_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,10 @@ int64_t PullManager::NextRequestBundleSize(const Queue &bundles,
return bytes_needed_calculated;
}

void PullManager::RecordMetrics() const {
// TODO(sang): Add metrics.
}

std::string PullManager::DebugString() const {
absl::MutexLock lock(&active_objects_mu_);
std::stringstream result;
Expand Down
3 changes: 3 additions & 0 deletions src/ray/object_manager/pull_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ class PullManager {
/// there are object sizes missing.
bool HasPullsQueued() const;

/// Record the internal metrics.
void RecordMetrics() const;

std::string DebugString() const;

/// Returns the number of bytes of quota remaining. When this is less than zero,
Expand Down
16 changes: 16 additions & 0 deletions src/ray/object_manager/push_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ void PushManager::StartPush(const NodeID &dest_id, const ObjectID &obj_id,
return;
}
RAY_CHECK(num_chunks > 0);
chunks_remaining_ += num_chunks;
push_info_[push_id].reset(new PushState(num_chunks, send_chunk_fn));
ScheduleRemainingPushes();
}

void PushManager::OnChunkComplete(const NodeID &dest_id, const ObjectID &obj_id) {
auto push_id = std::make_pair(dest_id, obj_id);
chunks_in_flight_ -= 1;
chunks_remaining_ -= 1;
if (--push_info_[push_id]->chunks_remaining <= 0) {
push_info_.erase(push_id);
RAY_LOG(DEBUG) << "Push for " << push_id.first << ", " << push_id.second
Expand Down Expand Up @@ -73,4 +75,18 @@ void PushManager::ScheduleRemainingPushes() {
}
}

void PushManager::RecordMetrics() const {
// TODO(sang): Add metrics.
}

std::string PushManager::DebugString() const {
std::stringstream result;
result << "PushManager:";
result << "\n- num pushes in flight: " << NumPushesInFlight();
result << "\n- num chunks in flight: " << NumChunksInFlight();
result << "\n- num chunks remaining: " << NumChunksRemaining();
result << "\n- max chunks allowed: " << max_chunks_in_flight_;
return result.str();
}

} // namespace ray
24 changes: 8 additions & 16 deletions src/ray/object_manager/push_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,26 +58,15 @@ class PushManager {
int64_t NumChunksInFlight() const { return chunks_in_flight_; };

/// Return the number of chunks remaining. For testing only.
int64_t NumChunksRemaining() const {
int total = 0;
for (const auto &pair : push_info_) {
total += pair.second->chunks_remaining;
}
return total;
}
int64_t NumChunksRemaining() const { return chunks_remaining_; }

/// Return the number of pushes currently in flight. For testing only.
int64_t NumPushesInFlight() const { return push_info_.size(); };

std::string DebugString() const {
std::stringstream result;
result << "PushManager:";
result << "\n- num pushes in flight: " << NumPushesInFlight();
result << "\n- num chunks in flight: " << NumChunksInFlight();
result << "\n- num chunks remaining: " << NumChunksRemaining();
result << "\n- max chunks allowed: " << max_chunks_in_flight_;
return result.str();
}
/// Record the internal metrics.
void RecordMetrics() const;

std::string DebugString() const;

private:
/// Tracks the state of an active object push to another node.
Expand Down Expand Up @@ -111,6 +100,9 @@ class PushManager {
/// Running count of chunks in flight, used to limit progress of in_flight_pushes_.
int64_t chunks_in_flight_ = 0;

/// Remaining count of chunks to push to other nodes.
int64_t chunks_remaining_ = 0;

/// Tracks all pushes with chunk transfers in flight.
absl::flat_hash_map<PushID, std::unique_ptr<PushState>> push_info_;
};
Expand Down
3 changes: 2 additions & 1 deletion src/ray/raylet/local_object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,8 @@ void LocalObjectManager::FillObjectSpillingStats(rpc::GetNodeStatsReply *reply)
stats->set_object_store_bytes_primary_copy(pinned_objects_size_);
}

void LocalObjectManager::RecordObjectSpillingStats() const {
void LocalObjectManager::RecordMetrics() const {
/// Record Metrics.
if (spilled_bytes_total_ != 0 && spill_time_total_s_ != 0) {
stats::SpillingBandwidthMB.Record(spilled_bytes_total_ / 1024 / 1024 /
spill_time_total_s_);
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/local_object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class LocalObjectManager {
void FillObjectSpillingStats(rpc::GetNodeStatsReply *reply) const;

/// Record object spilling stats to metrics.
void RecordObjectSpillingStats() const;
void RecordMetrics() const;

/// Return the spilled object URL if the object is spilled locally,
/// or the empty string otherwise.
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2467,7 +2467,7 @@ void NodeManager::RecordMetrics() {

cluster_task_manager_->RecordMetrics();
object_manager_.RecordMetrics();
local_object_manager_.RecordObjectSpillingStats();
local_object_manager_.RecordMetrics();

uint64_t current_time = current_time_ms();
uint64_t duration_ms = current_time - last_metrics_recorded_at_ms_;
Expand Down

0 comments on commit 05a302b

Please sign in to comment.