Skip to content

Commit

Permalink
[Metrics] Fix flaky test_task_metrics + fix slow report issue from un…
Browse files Browse the repository at this point in the history
…it tests (ray-project#32342)

Every X seconds, when we record metrics, we check all pending updates from counter_map. If there's pending updates, we invoke the registered callback for the relevant updates, which record metrics.

Currently, we have 3 counter_map. Regular (containing all data) & get & wait counter_map. For get and wait  counter_map, although there are updates, we don't register callbacks (they are used to calculate correct RUNNING / GET / WAIT counts).

So normally, this is what will happen.

Task gets into RUNNING state. counter_map is updated and add a callback.
Get is called, and get counter_map is updated. Callback is not updated (by design).
If metrics are recorded after 2, the callback from regular counter_map is invoked and we record correct metrics.

If metrics are recorded after 1, RUNNING state is recorded. But since we don't register callbacks for get counter map, when the next metrics are recorded, the relevant updates are not recorded.

Flakiness comes from the latter case.

This fixes the issue by having "no-op update" to the regular counter_map (e.g., Increment(0)). This will trigger counter_map to invoke a callback again which will correctly update get & wait status.

I could also refactor the code to not use get&wait counter map, but this approach is much easier, so I decide to go with this approach.

This PR also fixes the slow stats report issue.
  • Loading branch information
rkooo567 authored Feb 10, 2023
1 parent d8639ab commit b7e671d
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 45 deletions.
37 changes: 0 additions & 37 deletions python/ray/tests/test_object_store_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,43 +220,6 @@ def test_spilling(object_spilling_config, shutdown_only):
)


@pytest.mark.skipif(
sys.platform == "darwin", reason="Timing out on macos. Not enough time to run."
)
@pytest.mark.parametrize("metric_report_interval_ms", [500, 1000, 3000])
def test_object_metric_report_interval(shutdown_only, metric_report_interval_ms):
"""Test object store metric on raylet controlled by `metric_report_interval_ms`"""
import time

info = ray.init(
object_store_memory=100 * MiB,
_system_config={"metrics_report_interval_ms": metric_report_interval_ms},
)

# Put object to make sure metric shows up
obj = ray.get(ray.put(np.zeros(20 * MiB, dtype=np.uint8)))

expected = {
"MMAP_SHM": 20 * MiB,
"MMAP_DISK": 0,
"SPILLED": 0,
"WORKER_HEAP": 0,
}
start = time.time()
wait_for_condition(
# 1KiB for metadata difference
lambda: approx_eq_dict_in(objects_by_loc(info), expected, 1 * KiB),
timeout=10,
retry_interval_ms=100,
)

end = time.time()
# Also shouldn't have metrics reported too quickly
assert (end - start) * 1000 > metric_report_interval_ms, "Reporting too quickly"

del obj


@pytest.mark.skipif(
sys.platform == "darwin", reason="Timing out on macos. Not enough time to run."
)
Expand Down
16 changes: 12 additions & 4 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1250,8 +1250,12 @@ Status CoreWorker::SealExisting(const ObjectID &object_id,
Status CoreWorker::Get(const std::vector<ObjectID> &ids,
const int64_t timeout_ms,
std::vector<std::shared_ptr<RayObject>> *results) {
ScopedTaskMetricSetter state(
worker_context_, task_counter_, rpc::TaskStatus::RUNNING_IN_RAY_GET);
std::unique_ptr<ScopedTaskMetricSetter> state = nullptr;
if (options_.worker_type == WorkerType::WORKER) {
// We track the state change only from workers.
state = std::make_unique<ScopedTaskMetricSetter>(
worker_context_, task_counter_, rpc::TaskStatus::RUNNING_IN_RAY_GET);
}
results->resize(ids.size(), nullptr);

absl::flat_hash_set<ObjectID> plasma_object_ids;
Expand Down Expand Up @@ -1412,8 +1416,12 @@ Status CoreWorker::Wait(const std::vector<ObjectID> &ids,
int64_t timeout_ms,
std::vector<bool> *results,
bool fetch_local) {
ScopedTaskMetricSetter state(
worker_context_, task_counter_, rpc::TaskStatus::RUNNING_IN_RAY_WAIT);
std::unique_ptr<ScopedTaskMetricSetter> state = nullptr;
if (options_.worker_type == WorkerType::WORKER) {
// We track the state change only from workers.
state = std::make_unique<ScopedTaskMetricSetter>(
worker_context_, task_counter_, rpc::TaskStatus::RUNNING_IN_RAY_WAIT);
}

results->resize(ids.size(), false);

Expand Down
6 changes: 6 additions & 0 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ class TaskCounter {
rpc::TaskStatus status,
bool is_retry) {
absl::MutexLock l(&mu_);
// Add a no-op increment to counter_ so that
// it will invoke a callback upon RecordMetrics.
counter_.Increment({func_name, TaskStatusType::kRunning, is_retry}, 0);
if (status == rpc::TaskStatus::RUNNING_IN_RAY_GET) {
running_in_get_counter_.Increment({func_name, is_retry});
} else if (status == rpc::TaskStatus::RUNNING_IN_RAY_WAIT) {
Expand All @@ -201,6 +204,9 @@ class TaskCounter {
rpc::TaskStatus status,
bool is_retry) {
absl::MutexLock l(&mu_);
// Add a no-op decrement to counter_ so that
// it will invoke a callback upon RecordMetrics.
counter_.Decrement({func_name, TaskStatusType::kRunning, is_retry}, 0);
if (status == rpc::TaskStatus::RUNNING_IN_RAY_GET) {
running_in_get_counter_.Decrement({func_name, is_retry});
} else if (status == rpc::TaskStatus::RUNNING_IN_RAY_WAIT) {
Expand Down
8 changes: 4 additions & 4 deletions src/ray/stats/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ static inline void Init(const TagsType &global_tags,
StatsConfig::instance().SetHarvestInterval(
absl::Milliseconds(std::max(RayConfig::instance().metrics_report_interval_ms() / 2,
static_cast<uint64_t>(500))));

MetricPointExporter::Register(exporter, metrics_report_batch_size);
OpenCensusProtoExporter::Register(
metrics_agent_port, (*metrics_io_service), "127.0.0.1", worker_id);
opencensus::stats::StatsExporter::SetInterval(
StatsConfig::instance().GetReportInterval());
opencensus::stats::DeltaProducer::Get()->SetHarvestInterval(
StatsConfig::instance().GetHarvestInterval());

MetricPointExporter::Register(exporter, metrics_report_batch_size);
OpenCensusProtoExporter::Register(
metrics_agent_port, (*metrics_io_service), "127.0.0.1", worker_id);
StatsConfig::instance().SetGlobalTags(global_tags);
for (auto &f : StatsConfig::instance().PopInitializers()) {
f();
Expand Down
15 changes: 15 additions & 0 deletions src/ray/util/counter_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ class CounterMap {

/// Increment the specified key by `val`, default to 1.
void Increment(const K &key, int64_t val = 1) {
// If value is 0, it is no-op and only registers the callback.
if (val == 0) {
if (on_change_ != nullptr) {
pending_changes_.insert(key);
}
return;
}

counters_[key] += val;
total_ += val;
if (on_change_ != nullptr) {
Expand All @@ -71,6 +79,13 @@ class CounterMap {
/// to zero, the entry for the key is erased from the counter. It is not allowed for the
/// count to be decremented below zero.
void Decrement(const K &key, int64_t val = 1) {
// If value is 0, it is no-op and only registers the callback.
if (val == 0) {
if (on_change_ != nullptr) {
pending_changes_.insert(key);
}
return;
}
auto it = counters_.find(key);
RAY_CHECK(it != counters_.end());
it->second -= val;
Expand Down

0 comments on commit b7e671d

Please sign in to comment.