Skip to content

Commit

Permalink
[serve] Broadcast full deployment config to routers instead of just a…
Browse files Browse the repository at this point in the history
…utoscaling config (ray-project#43167)

This will be needed to implement max_queued_requests as the handle will need to know the config option (and it may change).

Also updated some naming/comments to clarify questions I had while reading the code.

---------

Signed-off-by: Edward Oakes <[email protected]>
  • Loading branch information
edoakes authored Feb 14, 2024
1 parent c812e31 commit 7a56825
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 28 deletions.
42 changes: 27 additions & 15 deletions python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -1274,8 +1274,8 @@ def __init__(
# time we checked.
self._multiplexed_model_ids_updated = False

self._last_notified_running_replica_infos: List[RunningReplicaInfo] = []
self._last_notified_autoscaling_config = None
self._last_broadcasted_running_replica_infos: List[RunningReplicaInfo] = []
self._last_broadcasted_deployment_config = None

@property
def autoscaling_policy_manager(self) -> AutoscalingPolicyManager:
Expand Down Expand Up @@ -1396,10 +1396,19 @@ def get_active_node_ids(self) -> Set[str]:
def list_replica_details(self) -> List[ReplicaDetails]:
return [replica.actor_details for replica in self._replicas.get()]

def notify_running_replicas_changed(self) -> None:
def broadcast_running_replicas_if_changed(self) -> None:
"""Broadcasts the set of running replicas over long poll if it has changed.
Keeps an in-memory record of the last set of running replicas that was broadcast
to determine if it has changed.
The set will also be broadcast if any replicas have an updated set of
multiplexed model IDs.
"""
running_replica_infos = self.get_running_replica_infos()
if (
set(self._last_notified_running_replica_infos) == set(running_replica_infos)
set(self._last_broadcasted_running_replica_infos)
== set(running_replica_infos)
and not self._multiplexed_model_ids_updated
):
return
Expand All @@ -1416,22 +1425,25 @@ def notify_running_replicas_changed(self) -> None:
(LongPollNamespace.RUNNING_REPLICAS, self._id.name),
running_replica_infos,
)
self._last_notified_running_replica_infos = running_replica_infos
self._last_broadcasted_running_replica_infos = running_replica_infos
self._multiplexed_model_ids_updated = False

def notify_autoscaling_config_changed(self) -> None:
current_autoscaling_config = (
self._target_state.info.deployment_config.autoscaling_config
)
if self._last_notified_autoscaling_config == current_autoscaling_config:
def broadcast_deployment_config_if_changed(self) -> None:
"""Broadcasts the deployment config over long poll if it has changed.
Keeps an in-memory record of the last config that was broadcast to determine
if it has changed.
"""
current_deployment_config = self._target_state.info.deployment_config
if self._last_broadcasted_deployment_config == current_deployment_config:
return

self._long_poll_host.notify_changed(
(LongPollNamespace.AUTOSCALING_CONFIG, self._id),
current_autoscaling_config,
(LongPollNamespace.DEPLOYMENT_CONFIG, self._id),
current_deployment_config,
)

self._last_notified_autoscaling_config = current_autoscaling_config
self._last_broadcasted_deployment_config = current_deployment_config

def _set_target_state_deleting(self) -> None:
"""Set the target state for the deployment to be deleted."""
Expand Down Expand Up @@ -2712,8 +2724,8 @@ def update(self) -> bool:
self._deployment_states[deployment_id].stop_replicas(replicas_to_stop)

for deployment_state in self._deployment_states.values():
deployment_state.notify_running_replicas_changed()
deployment_state.notify_autoscaling_config_changed()
deployment_state.broadcast_running_replicas_if_changed()
deployment_state.broadcast_deployment_config_if_changed()

for deployment_id in deleted_ids:
self._deployment_scheduler.on_deployment_deleted(deployment_id)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/_private/long_poll.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __repr__(self):
RUNNING_REPLICAS = auto()
ROUTE_TABLE = auto()
GLOBAL_LOGGING_CONFIG = auto()
AUTOSCALING_CONFIG = auto()
DEPLOYMENT_CONFIG = auto()


@dataclass
Expand Down
37 changes: 25 additions & 12 deletions python/ray/serve/_private/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ray.actor import ActorHandle
from ray.dag.py_obj_scanner import _PyObjScanner
from ray.serve._private.common import DeploymentID, RequestMetadata, RunningReplicaInfo
from ray.serve._private.config import DeploymentConfig
from ray.serve._private.constants import (
HANDLE_METRIC_PUSH_INTERVAL_S,
RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE,
Expand Down Expand Up @@ -108,15 +109,17 @@ def __init__(
deployment_id,
): self.update_running_replicas,
(
LongPollNamespace.AUTOSCALING_CONFIG,
LongPollNamespace.DEPLOYMENT_CONFIG,
deployment_id,
): self.update_autoscaling_config,
): self.update_deployment_config,
},
call_in_event_loop=event_loop,
)

# For autoscaling deployments.
self.autoscaling_config = None
# The config for the deployment this router sends requests to will be broadcast
# by the controller. That means it is not available until we get the first
# update. This includes an optional autoscaling config.
self.deployment_config: Optional[DeploymentConfig] = None
# Track queries sent to replicas for the autoscaling algorithm.
self.num_requests_sent_to_replicas = defaultdict(int)
# We use Ray object ref callbacks to update state when tracking
Expand Down Expand Up @@ -148,11 +151,20 @@ def update_running_replicas(self, running_replicas: List[RunningReplicaInfo]):
},
)

def update_autoscaling_config(self, autoscaling_config: AutoscalingConfig):
self.autoscaling_config = autoscaling_config
@property
def curr_autoscaling_config(self) -> Optional[AutoscalingConfig]:
if self.deployment_config is None:
return None

return self.deployment_config.autoscaling_config

def update_deployment_config(self, deployment_config: DeploymentConfig):
"""Update the config for the deployment this router sends requests to."""
self.deployment_config = deployment_config

# Start the metrics pusher if autoscaling is enabled.
if self.autoscaling_config:
autoscaling_config: AutoscalingConfig = self.curr_autoscaling_config
if autoscaling_config:
# Optimization for autoscaling cold start time. If there are
# currently 0 replicas for the deployment, and there is at
# least one queued request on this router, then immediately
Expand All @@ -171,13 +183,13 @@ def update_autoscaling_config(self, autoscaling_config: AutoscalingConfig):
self.metrics_pusher.register_or_update_task(
RECORD_METRICS_TASK_NAME,
self._add_autoscaling_metrics_point,
min(0.5, self.autoscaling_config.metrics_interval_s),
min(0.5, autoscaling_config.metrics_interval_s),
)
# Push metrics to the controller periodically.
self.metrics_pusher.register_or_update_task(
PUSH_METRICS_TO_CONTROLLER_TASK_NAME,
self._get_aggregated_requests,
self.autoscaling_config.metrics_interval_s,
autoscaling_config.metrics_interval_s,
self.push_metrics_to_controller,
)
else:
Expand Down Expand Up @@ -205,8 +217,9 @@ def _add_autoscaling_metrics_point(self):

def _get_aggregated_requests(self):
running_requests = dict()
if RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE:
look_back_period = self.autoscaling_config.look_back_period_s
autoscaling_config = self.curr_autoscaling_config
if RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE and autoscaling_config:
look_back_period = autoscaling_config.look_back_period_s
running_requests = {
replica_id: self.metrics_store.window_average(
replica_id, time.time() - look_back_period
Expand Down Expand Up @@ -329,7 +342,7 @@ async def assign_request(
# you need to yield the event loop above this conditional, you
# will need to remove the check "self.num_queued_queries == 1"
if (
self.autoscaling_config
self.curr_autoscaling_config
and len(self._replica_scheduler.curr_replicas) == 0
and self.num_queued_queries == 1
):
Expand Down

0 comments on commit 7a56825

Please sign in to comment.