Skip to content

Commit

Permalink
[serve] Maintain replica queue length directly instead of using `_get…
Browse files Browse the repository at this point in the history
…_actor_call_stats` (ray-project#42485)

Removes the reliance on the sketch _get_actor_call_stats API (and removes that API entirely as it was not used elsewhere). Instead, we maintain the count of ongoing requests directly as a counter in the ReplicaMetricsManager.

This is possible after ray-project#42298 because all user code now runs on a separate thread so the main actor asyncio loop will be blocked by synchronous user code (unless it holds the GIL, which was not handled previously anyways).

Due to this change, we can no longer maintain and report the serve_replica_pending_queries metric. This metric was almost always 0 and effectively useless, so I have elected to completely remove it after offline discussion with the team.

---------

Signed-off-by: Edward Oakes <[email protected]>
Co-authored-by: shrekris-anyscale <[email protected]>
Co-authored-by: Cindy Zhang <[email protected]>
  • Loading branch information
3 people authored Jan 22, 2024
1 parent 08ec593 commit 208c540
Show file tree
Hide file tree
Showing 10 changed files with 27 additions and 242 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,6 @@
),
Panel(
id=8,
title="Pending requests per replica",
description="Pending requests for each replica.",
unit="requests",
targets=[
Target(
expr="sum(ray_serve_replica_pending_queries{{{global_filters}}}) by (application, deployment, replica)",
legend="{{replica}}",
),
],
fill=0,
stack=False,
grid_pos=GridPos(8, 2, 8, 8),
),
Panel(
id=9,
title="Running requests per replica",
description="Current running requests for each replica.",
unit="requests",
Expand All @@ -150,7 +135,7 @@
grid_pos=GridPos(16, 2, 8, 8),
),
Panel(
id=10,
id=9,
title="Multiplexed models per replica",
description="The number of multiplexed models for each replica.",
unit="models",
Expand All @@ -165,7 +150,7 @@
grid_pos=GridPos(0, 3, 8, 8),
),
Panel(
id=11,
id=10,
title="Multiplexed model loads per replica",
description="The number of times of multiplexed models loaded for each replica.",
unit="times",
Expand All @@ -180,7 +165,7 @@
grid_pos=GridPos(8, 3, 8, 8),
),
Panel(
id=12,
id=11,
title="Multiplexed model unloads per replica",
description="The number of times of multiplexed models unloaded for each replica.",
unit="times",
Expand All @@ -195,7 +180,7 @@
grid_pos=GridPos(16, 3, 8, 8),
),
Panel(
id=13,
id=12,
title="P99 latency of multiplexed model loads per replica",
description="P99 latency of mutliplexed model load per replica.",
unit="ms",
Expand All @@ -210,7 +195,7 @@
grid_pos=GridPos(0, 4, 8, 8),
),
Panel(
id=14,
id=13,
title="P99 latency of multiplexed model unloads per replica",
description="P99 latency of mutliplexed model unload per replica.",
unit="ms",
Expand All @@ -225,7 +210,7 @@
grid_pos=GridPos(8, 4, 8, 8),
),
Panel(
id=15,
id=14,
title="Multiplexed model ids per replica",
description="The ids of multiplexed models for each replica.",
unit="model",
Expand All @@ -239,7 +224,7 @@
stack=False,
),
Panel(
id=16,
id=15,
title="Multiplexed model cache hit rate",
description="The cache hit rate of multiplexed models for the deployment.",
unit="%",
Expand Down
5 changes: 0 additions & 5 deletions doc/source/serve/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -470,11 +470,6 @@ The following metrics are exposed by Ray Serve:
* replica
* application
- The current number of queries being processed.
* - ``ray_serve_replica_pending_queries`` [**]
- * deployment
* replica
* application
- The current number of pending queries.
* - ``ray_serve_num_http_requests`` [*]
- * route
* method
Expand Down
20 changes: 0 additions & 20 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4722,26 +4722,6 @@ cdef class CoreWorker:

return ref_counts

def get_actor_call_stats(self):
cdef:
unordered_map[c_string, c_vector[int64_t]] c_tasks_count

c_tasks_count = (
CCoreWorkerProcess.GetCoreWorker().GetActorCallStats())
it = c_tasks_count.begin()

tasks_count = dict()
while it != c_tasks_count.end():
func_name = <unicode>dereference(it).first
counters = dereference(it).second
tasks_count[func_name] = {
"pending": counters[0],
"running": counters[1],
"finished": counters[2],
}
postincrement(it)
return tasks_count

def set_get_async_callback(self, ObjectRef object_ref, user_callback: Callable):
# NOTE: we need to manually increment the Python reference count to avoid the
# callback object being garbage collected before it's called by the core worker.
Expand Down
2 changes: 0 additions & 2 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:

int64_t GetNumLeasesRequested() const

unordered_map[c_string, c_vector[int64_t]] GetActorCallStats() const

void RecordTaskLogStart(
const CTaskID &task_id,
int attempt_number,
Expand Down
12 changes: 0 additions & 12 deletions python/ray/runtime_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,18 +400,6 @@ def gcs_address(self):
self.worker.check_connected()
return self.worker.gcs_client.address

def _get_actor_call_stats(self):
"""Get the current worker's task counters.
Returns:
A dictionary keyed by the function name. The values are
dictionaries with form ``{"pending": 0, "running": 1,
"finished": 2}``.
"""
worker = self.worker
worker.check_connected()
return worker.core_worker.get_actor_call_stats()

@Deprecated(message="Use get_accelerator_ids() instead", warning=True)
def get_resource_ids(self) -> Dict[str, List[str]]:
return self.get_accelerator_ids()
Expand Down
57 changes: 19 additions & 38 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,7 @@
get_component_logger_file_path,
)
from ray.serve._private.router import RequestMetadata
from ray.serve._private.utils import (
MetricsPusher,
merge_dict,
parse_import_path,
wrap_to_ray_error,
)
from ray.serve._private.utils import MetricsPusher, parse_import_path, wrap_to_ray_error
from ray.serve._private.version import DeploymentVersion
from ray.serve.config import AutoscalingConfig
from ray.serve.deployment import Deployment
Expand Down Expand Up @@ -114,6 +109,7 @@ def __init__(
self._controller_handle = ray.get_actor(
SERVE_CONTROLLER_NAME, namespace=SERVE_NAMESPACE
)
self._num_ongoing_requests = 0

# Request counter (only set on replica startup).
self._restart_counter = metrics.Counter(
Expand Down Expand Up @@ -173,7 +169,7 @@ def __init__(
)
# Collect autoscaling metrics locally periodically.
self._metrics_pusher.register_task(
self.get_num_pending_and_running_requests,
self.get_num_ongoing_requests,
min(
RAY_SERVE_REPLICA_AUTOSCALING_METRIC_RECORD_PERIOD_S,
self._autoscaling_config.metrics_interval_s,
Expand All @@ -193,10 +189,17 @@ def set_autoscaling_config(self, autoscaling_config: AutoscalingConfig):
"""Dynamically update autoscaling config."""
self._autoscaling_config = autoscaling_config

def get_num_pending_and_running_requests(self) -> int:
def inc_num_ongoing_requests(self) -> int:
"""Increment the current total queue length of requests for this replica."""
self._num_ongoing_requests += 1

def dec_num_ongoing_requests(self) -> int:
"""Decrement the current total queue length of requests for this replica."""
self._num_ongoing_requests -= 1

def get_num_ongoing_requests(self) -> int:
"""Get current total queue length of requests for this replica."""
stats = self._get_handle_request_stats() or {}
return stats.get("pending", 0) + stats.get("running", 0)
return self._num_ongoing_requests

def record_request_metrics(
self, *, route: str, status_str: str, latency_ms: float, was_error: bool
Expand All @@ -221,30 +224,7 @@ def _add_autoscaling_metrics_point(self, data, send_timestamp: float):
)

def _set_replica_requests_metrics(self):
self._num_processing_items.set(self._get_num_running_requests())
self._num_pending_items.set(self._get_num_pending_requests())

def _get_num_running_requests(self) -> int:
stats = self._get_handle_request_stats() or {}
return stats.get("running", 0)

def _get_num_pending_requests(self) -> int:
stats = self._get_handle_request_stats() or {}
return stats.get("pending", 0)

def _get_handle_request_stats(self) -> Optional[Dict[str, int]]:
replica_actor_name = self._deployment_id.to_replica_actor_class_name()
actor_stats = ray.runtime_context.get_runtime_context()._get_actor_call_stats()
method_stats = actor_stats.get(f"{replica_actor_name}.handle_request")
streaming_method_stats = actor_stats.get(
f"{replica_actor_name}.handle_request_streaming"
)
method_stats_java = actor_stats.get(
f"{replica_actor_name}.handle_request_from_java"
)
return merge_dict(
merge_dict(method_stats, streaming_method_stats), method_stats_java
)
self._num_processing_items.set(self.get_num_ongoing_requests())


class ReplicaActor:
Expand Down Expand Up @@ -346,7 +326,7 @@ def get_num_ongoing_requests(self) -> int:
This runs on a separate thread (using a Ray concurrency group) so it will
not be blocked by user code.
"""
return self._metrics_manager.get_num_pending_and_running_requests()
return self._metrics_manager.get_num_ongoing_requests()

@contextmanager
def _wrap_user_method_call(self, request_metadata: RequestMetadata):
Expand All @@ -369,12 +349,15 @@ def _wrap_user_method_call(self, request_metadata: RequestMetadata):
start_time = time.time()
user_exception = None
try:
self._metrics_manager.inc_num_ongoing_requests()
yield
except Exception as e:
user_exception = e
logger.error(f"Request failed:\n{e}")
if ray.util.pdb._is_ray_debugger_enabled():
ray.util.pdb._post_mortem()
finally:
self._metrics_manager.dec_num_ongoing_requests()

latency_ms = (time.time() - start_time) * 1000
if user_exception is None:
Expand Down Expand Up @@ -645,9 +628,7 @@ async def _drain_ongoing_requests(self):
while True:
await asyncio.sleep(wait_loop_period_s)

num_ongoing_requests = (
self._metrics_manager.get_num_pending_and_running_requests()
)
num_ongoing_requests = self._metrics_manager.get_num_ongoing_requests()
if num_ongoing_requests > 0:
logger.info(
f"Waiting for an additional {wait_loop_period_s}s to shut down "
Expand Down
19 changes: 1 addition & 18 deletions python/ray/serve/tests/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,31 +135,14 @@ async def __call__(self):
assert processing_requests[0]["application"] == "app1"
print("serve_replica_processing_queries exists.")

pending_requests = get_metric_dictionaries(
"serve_replica_pending_queries", timeout=5
)
assert len(pending_requests) == 1
assert pending_requests[0]["deployment"] == "A"
assert pending_requests[0]["application"] == "app1"
print("serve_replica_pending_queries exists.")

def ensure_request_processing():
resp = requests.get("http://127.0.0.1:9999").text
resp = resp.split("\n")
expected_metrics = {
"serve_replica_processing_queries",
"serve_replica_pending_queries",
}
for metrics in resp:
if "# HELP" in metrics or "# TYPE" in metrics:
continue
if "serve_replica_processing_queries" in metrics:
assert "1.0" in metrics
expected_metrics.discard("serve_replica_processing_queries")
elif "serve_replica_pending_queries" in metrics:
assert "0.0" in metrics
expected_metrics.discard("serve_replica_pending_queries")
assert len(expected_metrics) == 0
return True

wait_for_condition(ensure_request_processing, timeout=5)
Expand Down Expand Up @@ -543,7 +526,7 @@ def h():
verify_metrics(err_requests[0], expected_output)

health_metrics = get_metric_dictionaries("serve_deployment_replica_healthy")
assert len(health_metrics) == 3
assert len(health_metrics) == 3, health_metrics
expected_outputs = [
{"deployment": "f", "application": "app1"},
{"deployment": "g", "application": "app2"},
Expand Down
Loading

0 comments on commit 208c540

Please sign in to comment.