Skip to content

Commit

Permalink
[Core] Increase the dashboard agent gRPC payload to 100MB & batch the…
Browse files Browse the repository at this point in the history
… metrics to multiple gRPC requests (ray-project#40177)

It seems like if there are lots of metrics to be reported, the worker fails to report metrics because the default grpc server payload size is 4MB.

This PR fixes the issue using 2 ways

Increase the default gprc server payload limit to 20MB. The Ray's default is 512MB, so I think increasing this a little bit is not risky.
When there are lots of metrics, the metrics are reported in multiple gRPC requests. It seems like each metric is about 500~1KB bytes, so we set 1000 as the default batch size (< 1MB). Note that if the default is too small, it can cause issues because agent is not that great at handling lots of GRPC requests.
  • Loading branch information
rkooo567 authored Oct 11, 2023
1 parent 3fb4e19 commit efc2367
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 86 deletions.
15 changes: 14 additions & 1 deletion dashboard/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
_initialize_internal_kv,
_internal_kv_initialized,
)
from ray._private.ray_constants import AGENT_GRPC_MAX_MESSAGE_LENGTH

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -112,7 +113,19 @@ def _init_non_minimal(self):
else:
aiogrpc.init_grpc_aio()

self.server = aiogrpc.server(options=(("grpc.so_reuseport", 0),))
self.server = aiogrpc.server(
options=(
("grpc.so_reuseport", 0),
(
"grpc.max_send_message_length",
AGENT_GRPC_MAX_MESSAGE_LENGTH,
), # noqa
(
"grpc.max_receive_message_length",
AGENT_GRPC_MAX_MESSAGE_LENGTH,
),
) # noqa
)
grpc_ip = "127.0.0.1" if self.ip == "127.0.0.1" else "0.0.0.0"
try:
self.grpc_port = ray._private.tls_utils.add_port_to_grpc_server(
Expand Down
5 changes: 5 additions & 0 deletions python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,11 @@ def env_set_by_user(key):
# The default gRPC max message size is 4 MiB, we use a larger number of 250 MiB
# NOTE: This is equal to the C++ limit of (RAY_CONFIG::max_grpc_message_size)
GRPC_CPP_MAX_MESSAGE_SIZE = 250 * 1024 * 1024
# The gRPC send & receive max length for "dashboard agent" server.
AGENT_GRPC_MAX_MESSAGE_LENGTH = env_integer(
"AGENT_GRPC_MAX_MESSAGE_LENGTH", 20 * 1024 * 1024 # 20MB
)


# GRPC options
GRPC_ENABLE_HTTP_PROXY = (
Expand Down
58 changes: 58 additions & 0 deletions python/ray/tests/test_task_metrics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections import defaultdict
import sys
import os
import copy

import pytest

Expand Down Expand Up @@ -598,6 +599,63 @@ def g():
)


@pytest.mark.skipif(sys.platform == "win32", reason="Flaky on Windows. Timing out.")
def test_metrics_batch(shutdown_only):
"""Verify metrics_report_batch_size works correctly without data loss."""
config_copy = copy.deepcopy(METRIC_CONFIG)
config_copy["_system_config"].update({"metrics_report_batch_size": 1})
info = ray.init(num_cpus=2, **config_copy)

driver = """
import ray
import os
import time
ray.init("auto")
@ray.remote
class Phaser:
def __init__(self):
self.i = 0
def inc(self):
self.i += 1
if self.i < 3:
raise ValueError("First two tries will fail")
phaser = Phaser.remote()
@ray.remote(max_restarts=10, max_task_retries=10)
class F:
def f(self):
try:
ray.get(phaser.inc.remote())
except Exception:
print("RESTART")
os._exit(1)
f = F.remote()
ray.get(f.f.remote())
time.sleep(999)
"""

proc = run_string_as_driver_nonblocking(driver)
expected = {
("F.__init__", "FINISHED", "0"): 1.0,
("F.f", "FAILED", "0"): 1.0,
("F.f", "FAILED", "1"): 1.0,
("F.f", "FINISHED", "1"): 1.0,
("Phaser.__init__", "FINISHED", "0"): 1.0,
("Phaser.inc", "FINISHED", "0"): 1.0,
}
wait_for_condition(
lambda: tasks_by_all(info) == expected,
timeout=20,
retry_interval_ms=500,
)
proc.kill()


if __name__ == "__main__":
import sys

Expand Down
3 changes: 2 additions & 1 deletion src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,8 @@ RAY_CONFIG(int64_t, ray_syncer_polling_buffer, 5)
RAY_CONFIG(uint64_t, gcs_service_address_check_interval_milliseconds, 1000)

/// The batch size for metrics export.
RAY_CONFIG(int64_t, metrics_report_batch_size, 100)
/// Normally each metrics is about < 1KB. 1000 means it is around 1MB.
RAY_CONFIG(int64_t, metrics_report_batch_size, 1000)

/// The interval duration for which task state events will be reported to GCS.
/// The reported data should only be used for observability.
Expand Down
181 changes: 101 additions & 80 deletions src/ray/stats/metric_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,9 @@ void MetricPointExporter::addGlobalTagsToGrpcMetric(MetricPoint &metric) {
OpenCensusProtoExporter::OpenCensusProtoExporter(const int port,
instrumented_io_context &io_service,
const std::string address,
const WorkerID &worker_id)
: client_call_manager_(io_service), worker_id_(worker_id) {
const WorkerID &worker_id,
size_t report_batch_size)
: client_call_manager_(io_service), worker_id_(worker_id), report_batch_size_(report_batch_size) {
absl::MutexLock l(&mu_);
client_.reset(new rpc::MetricsAgentClient(address, port, client_call_manager_));
};
Expand Down Expand Up @@ -159,99 +160,119 @@ void OpenCensusProtoExporter::ExportViewData(
// https://github.com/census-instrumentation/opencensus-proto/blob/master/src/opencensus/proto/metrics/v1/metrics.proto
rpc::ReportOCMetricsRequest request_proto;
request_proto.set_worker_id(worker_id_.Binary());
size_t data_batched = 0;

for (const auto &datum : data) {
// Unpack the fields we need for in memory data structure.
auto &view_descriptor = datum.first;
auto &view_data = datum.second;
auto &measure_descriptor = view_descriptor.measure_descriptor();
UpdateMetricsData(datum, request_proto);
data_batched += 1;

// Create one metric `Point` in protobuf.
auto request_point_proto = request_proto.add_metrics();
/// If it exceeds the batch size, send data.
if (data_batched >= report_batch_size_) {
SendData(request_proto);
request_proto = rpc::ReportOCMetricsRequest();
request_proto.set_worker_id(worker_id_.Binary());
data_batched = 0;
}
}

// Write the `MetricDescriptor`.
auto metric_descriptor_proto = request_point_proto->mutable_metric_descriptor();
metric_descriptor_proto->set_name(measure_descriptor.name());
metric_descriptor_proto->set_description(measure_descriptor.description());
metric_descriptor_proto->set_unit(measure_descriptor.units());
for (const auto &tag_key : view_descriptor.columns()) {
metric_descriptor_proto->add_label_keys()->set_key(tag_key.name());
};
if (data_batched > 0) {
SendData(request_proto);
}
}

void OpenCensusProtoExporter::SendData(rpc::ReportOCMetricsRequest &request) {
RAY_LOG(DEBUG) << "Exproting metrics. request_proto numbers: " << request.metrics_size() << ", request_proto size bytes: " << request.ByteSizeLong();
absl::MutexLock l(&mu_);
client_->ReportOCMetrics(
request, [](const Status &status, const rpc::ReportOCMetricsReply &reply) {
RAY_UNUSED(reply);
if (!status.ok()) {
RAY_LOG_EVERY_N(WARNING, 10000)
<< "Export metrics to agent failed: " << status
<< ". This won't affect Ray, but you can lose metrics from the cluster.";
}
});
}

void OpenCensusProtoExporter::UpdateMetricsData(const std::pair<opencensus::stats::ViewDescriptor,
opencensus::stats::ViewData> &datum, rpc::ReportOCMetricsRequest &request_proto) {
// Unpack the fields we need for in memory data structure.
auto &view_descriptor = datum.first;
auto &view_data = datum.second;
auto &measure_descriptor = view_descriptor.measure_descriptor();

// Create one metric `Point` in protobuf.
auto request_point_proto = request_proto.add_metrics();

// Helpers for writing the actual `TimeSeries`.
auto start_time = absl::ToUnixSeconds(view_data.start_time());
auto end_time = absl::ToUnixSeconds(view_data.end_time());
auto make_new_data_point_proto = [&request_point_proto, start_time, end_time](
const std::vector<std::string> &tag_values) {
auto metric_timeseries_proto = request_point_proto->add_timeseries();
metric_timeseries_proto->mutable_start_timestamp()->set_seconds(start_time);
// Write the `MetricDescriptor`.
auto metric_descriptor_proto = request_point_proto->mutable_metric_descriptor();
metric_descriptor_proto->set_name(measure_descriptor.name());
metric_descriptor_proto->set_description(measure_descriptor.description());
metric_descriptor_proto->set_unit(measure_descriptor.units());
for (const auto &tag_key : view_descriptor.columns()) {
metric_descriptor_proto->add_label_keys()->set_key(tag_key.name());
};

for (const auto &value : tag_values) {
metric_timeseries_proto->add_label_values()->set_value(value);
};
// Helpers for writing the actual `TimeSeries`.
auto start_time = absl::ToUnixSeconds(view_data.start_time());
auto end_time = absl::ToUnixSeconds(view_data.end_time());
auto make_new_data_point_proto = [&request_point_proto, start_time, end_time](
const std::vector<std::string> &tag_values) {
auto metric_timeseries_proto = request_point_proto->add_timeseries();
metric_timeseries_proto->mutable_start_timestamp()->set_seconds(start_time);

auto point_proto = metric_timeseries_proto->add_points();
point_proto->mutable_timestamp()->set_seconds(end_time);
return point_proto;
for (const auto &value : tag_values) {
metric_timeseries_proto->add_label_values()->set_value(value);
};

// Write the `TimeSeries` for the given aggregated data type.
switch (view_data.type()) {
case opencensus::stats::ViewData::Type::kDouble:
for (const auto &row : view_data.double_data()) {
auto point_proto = make_new_data_point_proto(row.first /*tag_values*/);
point_proto->set_double_value(row.second);
}
break;
case opencensus::stats::ViewData::Type::kInt64:
for (const auto &row : view_data.int_data()) {
auto point_proto = make_new_data_point_proto(row.first /*tag_values*/);
point_proto->set_int64_value(row.second);
}
break;
case opencensus::stats::ViewData::Type::kDistribution:
for (const auto &row : view_data.distribution_data()) {
opencensus::stats::Distribution dist_value = row.second;
auto point_proto = metric_timeseries_proto->add_points();
point_proto->mutable_timestamp()->set_seconds(end_time);
return point_proto;
};

auto point_proto = make_new_data_point_proto(row.first /*tag_values*/);
// Write the `TimeSeries` for the given aggregated data type.
switch (view_data.type()) {
case opencensus::stats::ViewData::Type::kDouble:
for (const auto &row : view_data.double_data()) {
auto point_proto = make_new_data_point_proto(row.first /*tag_values*/);
point_proto->set_double_value(row.second);
}
break;
case opencensus::stats::ViewData::Type::kInt64:
for (const auto &row : view_data.int_data()) {
auto point_proto = make_new_data_point_proto(row.first /*tag_values*/);
point_proto->set_int64_value(row.second);
}
break;
case opencensus::stats::ViewData::Type::kDistribution:
for (const auto &row : view_data.distribution_data()) {
opencensus::stats::Distribution dist_value = row.second;

// Copy in memory data into `DistributionValue` protobuf.
auto distribution_proto = point_proto->mutable_distribution_value();
distribution_proto->set_count(dist_value.count());
distribution_proto->set_sum(dist_value.count() * dist_value.mean());
distribution_proto->set_sum_of_squared_deviation(
dist_value.sum_of_squared_deviation());
auto point_proto = make_new_data_point_proto(row.first /*tag_values*/);

// Write the `BucketOption` and `Bucket` data.
auto bucket_opt_proto =
distribution_proto->mutable_bucket_options()->mutable_explicit_();
for (const auto &bound : dist_value.bucket_boundaries().lower_boundaries()) {
bucket_opt_proto->add_bounds(bound);
}
for (const auto &count : dist_value.bucket_counts()) {
distribution_proto->add_buckets()->set_count(count);
}
// Copy in memory data into `DistributionValue` protobuf.
auto distribution_proto = point_proto->mutable_distribution_value();
distribution_proto->set_count(dist_value.count());
distribution_proto->set_sum(dist_value.count() * dist_value.mean());
distribution_proto->set_sum_of_squared_deviation(
dist_value.sum_of_squared_deviation());

// Write the `BucketOption` and `Bucket` data.
auto bucket_opt_proto =
distribution_proto->mutable_bucket_options()->mutable_explicit_();
for (const auto &bound : dist_value.bucket_boundaries().lower_boundaries()) {
bucket_opt_proto->add_bounds(bound);
}
for (const auto &count : dist_value.bucket_counts()) {
distribution_proto->add_buckets()->set_count(count);
}
break;
default:
RAY_LOG(FATAL) << "Unknown view data type.";
break;
}
addGlobalTagsToGrpcMetric(*request_point_proto);
}

{
absl::MutexLock l(&mu_);
client_->ReportOCMetrics(
request_proto, [](const Status &status, const rpc::ReportOCMetricsReply &reply) {
RAY_UNUSED(reply);
if (!status.ok()) {
RAY_LOG_EVERY_N(WARNING, 10000)
<< "Export metrics to agent failed: " << status
<< ". This won't affect Ray, but you can lose metrics from the cluster.";
}
});
break;
default:
RAY_LOG(FATAL) << "Unknown view data type.";
break;
}
addGlobalTagsToGrpcMetric(*request_point_proto);
}

} // namespace stats
Expand Down
13 changes: 10 additions & 3 deletions src/ray/stats/metric_exporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,22 +99,27 @@ class OpenCensusProtoExporter final : public opencensus::stats::StatsExporter::H
OpenCensusProtoExporter(const int port,
instrumented_io_context &io_service,
const std::string address,
const WorkerID &worker_id);
const WorkerID &worker_id,
size_t report_batch_size);

~OpenCensusProtoExporter() = default;

static void Register(const int port,
instrumented_io_context &io_service,
const std::string address,
const WorkerID &worker_id) {
const WorkerID &worker_id,
size_t report_batch_size) {
opencensus::stats::StatsExporter::RegisterPushHandler(
absl::make_unique<OpenCensusProtoExporter>(port, io_service, address, worker_id));
absl::make_unique<OpenCensusProtoExporter>(port, io_service, address, worker_id, report_batch_size));
}

void ExportViewData(
const std::vector<std::pair<opencensus::stats::ViewDescriptor,
opencensus::stats::ViewData>> &data) override;
void addGlobalTagsToGrpcMetric(opencensus::proto::metrics::v1::Metric &metric);
void SendData(rpc::ReportOCMetricsRequest &request);
void UpdateMetricsData(const std::pair<opencensus::stats::ViewDescriptor,
opencensus::stats::ViewData> &datum, rpc::ReportOCMetricsRequest &request_proto);

private:
/// Call Manager for gRPC client.
Expand All @@ -125,6 +130,8 @@ class OpenCensusProtoExporter final : public opencensus::stats::StatsExporter::H
std::unique_ptr<rpc::MetricsAgentClient> client_ ABSL_GUARDED_BY(&mu_);
/// The worker ID of the current component.
WorkerID worker_id_;
/// The maximum batch size to be included in a single gRPC metrics report request.
size_t report_batch_size_;
};

} // namespace stats
Expand Down
2 changes: 1 addition & 1 deletion src/ray/stats/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ static inline void Init(const TagsType &global_tags,

MetricPointExporter::Register(exporter, metrics_report_batch_size);
OpenCensusProtoExporter::Register(
metrics_agent_port, (*metrics_io_service), "127.0.0.1", worker_id);
metrics_agent_port, (*metrics_io_service), "127.0.0.1", worker_id, RayConfig::instance().metrics_report_batch_size());
StatsConfig::instance().SetGlobalTags(global_tags);
for (auto &f : StatsConfig::instance().PopInitializers()) {
f();
Expand Down

0 comments on commit efc2367

Please sign in to comment.