diff --git a/python/ray/serve/_private/autoscaling_state.py b/python/ray/serve/_private/autoscaling_state.py new file mode 100644 index 0000000000000..588bd02bba118 --- /dev/null +++ b/python/ray/serve/_private/autoscaling_state.py @@ -0,0 +1,387 @@ +import logging +import time +from dataclasses import dataclass +from typing import Dict, List, Optional, Set + +from ray.serve._private.autoscaling_policy import AutoscalingPolicyManager +from ray.serve._private.common import ( + DeploymentHandleSource, + DeploymentID, + ReplicaID, + TargetCapacityDirection, +) +from ray.serve._private.constants import ( + RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE, + RAY_SERVE_MIN_HANDLE_METRICS_TIMEOUT_S, + SERVE_LOGGER_NAME, +) +from ray.serve._private.deployment_info import DeploymentInfo + +logger = logging.getLogger(SERVE_LOGGER_NAME) + + +@dataclass +class HandleMetricReport: + """Report from a deployment handle on queued and ongoing requests. + + Args: + actor_id: If the deployment handle (from which this metric was + sent) lives on an actor, the actor ID of that actor. + handle_source: Describes what kind of entity holds this + deployment handle: a Serve proxy, a Serve replica, or + unknown. + queued_requests: The current number of queued requests at the + handle, i.e. requests that haven't been assigned to any + replica yet. + running_requests: A map of replica ID to the average number of + requests, assigned through the handle, running at that + replica. + timestamp: The time at which this report was received. + """ + + actor_id: Optional[str] + handle_source: DeploymentHandleSource + queued_requests: float + running_requests: Dict[ReplicaID, float] + timestamp: float + + @property + def total_requests(self) -> float: + """Total number of queued and running requests.""" + return self.queued_requests + sum(self.running_requests.values()) + + @property + def is_serve_component_source(self) -> bool: + """Whether the handle source is a Serve actor. + + More specifically, this returns whether a Serve actor tracked + by the controller holds the deployment handle that sent this + report. If the deployment handle lives on a driver, a Ray task, + or an actor that's not a Serve replica, then this returns False. + """ + return self.handle_source in [ + DeploymentHandleSource.PROXY, + DeploymentHandleSource.REPLICA, + ] + + +@dataclass +class ReplicaMetricReport: + """Report from a replica on ongoing requests. + + Args: + running_requests: Average number of running requests at the + replica. + timestamp: The time at which this report was received. + """ + + running_requests: float + timestamp: float + + +class AutoscalingState: + """Manages autoscaling for a single deployment.""" + + def __init__(self, deployment_id: DeploymentID): + self._deployment_id = deployment_id + + # Map from handle ID to handle request metric report + self._handle_requests: Dict[str, HandleMetricReport] = dict() + self._requests_queued_at_handles: Dict[str, float] = dict() + # Map from replica ID to replica request metric report + self._replica_requests: Dict[ReplicaID, ReplicaMetricReport] = dict() + + self._deployment_info = None + self._autoscaling_policy_manager = None + self._running_replicas: List[ReplicaID] = [] + self._target_capacity: Optional[float] = None + self._target_capacity_direction: Optional[TargetCapacityDirection] = None + + @property + def autoscaling_policy_manager(self) -> AutoscalingPolicyManager: + return self._autoscaling_policy_manager + + def register( + self, info: DeploymentInfo, curr_target_num_replicas: Optional[int] = None + ): + """Registers an autoscaling deployment's info.""" + + config = info.deployment_config.autoscaling_config + if ( + self._deployment_info is None or self._deployment_info.config_changed(info) + ) and config.initial_replicas is not None: + target_num_replicas = config.initial_replicas + else: + target_num_replicas = curr_target_num_replicas + + self._deployment_info = info + self._autoscaling_policy_manager = AutoscalingPolicyManager(config) + self._target_capacity = info.target_capacity + self._target_capacity_direction = info.target_capacity_direction + + return self.apply_bounds(target_num_replicas) + + def update_running_replica_ids(self, running_replicas: List[ReplicaID]): + """Update cached set of running replica IDs for this deployment.""" + self._running_replicas = running_replicas + + def is_within_bounds(self, num_replicas_running_at_target_version: int): + """Whether or not this deployment is within the autoscaling bounds. + + Returns: True if the number of running replicas for the current + deployment version is within the autoscaling bounds. False + otherwise. + """ + + return ( + self.apply_bounds(num_replicas_running_at_target_version) + == num_replicas_running_at_target_version + ) + + def apply_bounds(self, num_replicas: int): + """Clips a replica count with current autoscaling bounds. + + This takes into account target capacity. + """ + + return self.autoscaling_policy_manager.apply_bounds( + num_replicas, + self._target_capacity, + self._target_capacity_direction, + ) + + def record_request_metrics_for_replica( + self, replica_id: ReplicaID, window_avg: Optional[float], send_timestamp: float + ) -> None: + """Records average number of ongoing requests at a replica.""" + + if window_avg is None: + return + + if ( + replica_id not in self._replica_requests + or send_timestamp > self._replica_requests[replica_id].timestamp + ): + self._replica_requests[replica_id] = ReplicaMetricReport( + running_requests=window_avg, + timestamp=send_timestamp, + ) + + def record_request_metrics_for_handle( + self, + *, + handle_id: str, + actor_id: Optional[str], + handle_source: DeploymentHandleSource, + queued_requests: float, + running_requests: Dict[ReplicaID, float], + send_timestamp: float, + ) -> None: + """Records average number of queued and running requests at a handle for this + deployment. + """ + + if ( + handle_id not in self._handle_requests + or send_timestamp > self._handle_requests[handle_id].timestamp + ): + self._handle_requests[handle_id] = HandleMetricReport( + actor_id=actor_id, + handle_source=handle_source, + queued_requests=queued_requests, + running_requests=running_requests, + timestamp=send_timestamp, + ) + + def drop_stale_handle_metrics(self, alive_serve_actor_ids: Set[str]) -> None: + """Drops handle metrics that are no longer valid. + + This includes handles that live on Serve Proxy or replica actors + that have died AND handles from which the controller hasn't + received an update for too long. + """ + + timeout_s = max( + 2 * self.autoscaling_policy_manager.get_metrics_interval_s(), + RAY_SERVE_MIN_HANDLE_METRICS_TIMEOUT_S, + ) + for handle_id, handle_metric in list(self._handle_requests.items()): + # Drop metrics for handles that are on Serve proxy/replica + # actors that have died + if ( + handle_metric.is_serve_component_source + and handle_metric.actor_id is not None + and handle_metric.actor_id not in alive_serve_actor_ids + ): + del self._handle_requests[handle_id] + if handle_metric.total_requests > 0: + logger.debug( + f"Dropping metrics for handle '{handle_id}' because the Serve " + f"actor it was on ({handle_metric.actor_id}) is no longer " + f"alive. It had {handle_metric.total_requests} ongoing requests" + ) + # Drop metrics for handles that haven't sent an update in a while. + # This is expected behavior for handles that were on replicas or + # proxies that have been shut down. + elif time.time() - handle_metric.timestamp >= timeout_s: + del self._handle_requests[handle_id] + if handle_metric.total_requests > 0: + actor_id = handle_metric.actor_id + actor_info = f"on actor '{actor_id}' " if actor_id else "" + logger.info( + f"Dropping stale metrics for handle '{handle_id}' {actor_info}" + f"because no update was received for {timeout_s:.1f}s. " + f"Ongoing requests was: {handle_metric.total_requests}." + ) + + def get_decision_num_replicas(self, curr_target_num_replicas: int): + """Decide the target number of replicas to autoscale to. + + The decision is based off of the number of requests received + for this deployment. + """ + + return self.autoscaling_policy_manager.get_decision_num_replicas( + curr_target_num_replicas=curr_target_num_replicas, + total_num_requests=self.get_total_num_requests(), + num_running_replicas=len(self._running_replicas), + target_capacity=self._target_capacity, + target_capacity_direction=self._target_capacity_direction, + ) + + def get_total_num_requests(self) -> float: + """Get average total number of requests aggregated over the past + `look_back_period_s` number of seconds. + + If there are 0 running replicas, then returns the total number + of requests queued at handles + + If the flag RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE is + set to 1, the returned average includes both queued and ongoing + requests. Otherwise, the returned average includes only ongoing + requests. + """ + + total_requests = 0 + + if ( + RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE + or len(self._running_replicas) == 0 + ): + for handle_metric in self._handle_requests.values(): + total_requests += handle_metric.queued_requests + for id in self._running_replicas: + if id in handle_metric.running_requests: + total_requests += handle_metric.running_requests[id] + else: + for id in self._running_replicas: + if id in self._replica_requests: + total_requests += self._replica_requests[id].running_requests + + return total_requests + + +class AutoscalingStateManager: + """Manages all things autoscaling related. + + Keeps track of request metrics for each deployment and decides on + the target number of replicas to autoscale to based on those metrics. + """ + + def __init__(self): + self._autoscaling_states: Dict[DeploymentID, AutoscalingState] = {} + + def register_deployment( + self, + deployment_id: DeploymentID, + info: DeploymentInfo, + curr_target_num_replicas: Optional[int] = None, + ): + """Register autoscaling deployment info.""" + assert info.deployment_config.autoscaling_config + if deployment_id not in self._autoscaling_states: + self._autoscaling_states[deployment_id] = AutoscalingState(deployment_id) + return self._autoscaling_states[deployment_id].register( + info, curr_target_num_replicas + ) + + def deregister_deployment(self, deployment_id: DeploymentID): + """Remove deployment from tracking.""" + self._autoscaling_states.pop(deployment_id, None) + + def update_running_replica_ids( + self, deployment_id: DeploymentID, running_replicas: List[ReplicaID] + ): + self._autoscaling_states[deployment_id].update_running_replica_ids( + running_replicas + ) + + def get_metrics(self) -> Dict[DeploymentID, float]: + return { + deployment_id: self.get_total_num_requests(deployment_id) + for deployment_id in self._autoscaling_states + } + + def get_target_num_replicas( + self, deployment_id: DeploymentID, curr_target_num_replicas: int + ) -> int: + return self._autoscaling_states[deployment_id].get_decision_num_replicas( + curr_target_num_replicas=curr_target_num_replicas, + ) + + def get_total_num_requests(self, deployment_id: DeploymentID) -> float: + return self._autoscaling_states[deployment_id].get_total_num_requests() + + def is_within_bounds( + self, deployment_id: DeploymentID, num_replicas_running_at_target_version: int + ) -> bool: + return self._autoscaling_states[deployment_id].is_within_bounds( + num_replicas_running_at_target_version + ) + + def record_request_metrics_for_replica( + self, replica_id: ReplicaID, window_avg: Optional[float], send_timestamp: float + ) -> None: + deployment_id = replica_id.deployment_id + # Defensively guard against delayed replica metrics arriving + # after the deployment's been deleted + if deployment_id in self._autoscaling_states: + self._autoscaling_states[deployment_id].record_request_metrics_for_replica( + replica_id=replica_id, + window_avg=window_avg, + send_timestamp=send_timestamp, + ) + + def record_request_metrics_for_handle( + self, + *, + deployment_id: str, + handle_id: str, + actor_id: Optional[str], + handle_source: DeploymentHandleSource, + queued_requests: float, + running_requests: Dict[ReplicaID, float], + send_timestamp: float, + ) -> None: + """Update request metric for a specific handle.""" + + if deployment_id in self._autoscaling_states: + self._autoscaling_states[deployment_id].record_request_metrics_for_handle( + handle_id=handle_id, + actor_id=actor_id, + handle_source=handle_source, + queued_requests=queued_requests, + running_requests=running_requests, + send_timestamp=send_timestamp, + ) + + def drop_stale_handle_metrics(self, alive_serve_actor_ids: Set[str]) -> None: + """Drops handle metrics that are no longer valid. + + This includes handles that live on Serve Proxy or replica actors + that have died AND handles from which the controller hasn't + received an update for too long. + """ + + for autoscaling_state in self._autoscaling_states.values(): + autoscaling_state.drop_stale_handle_metrics(alive_serve_actor_ids) diff --git a/python/ray/serve/_private/controller.py b/python/ray/serve/_private/controller.py index 75f5cb95b3105..8ae95a39df9de 100644 --- a/python/ray/serve/_private/controller.py +++ b/python/ray/serve/_private/controller.py @@ -12,6 +12,7 @@ from ray._raylet import GcsClient from ray.actor import ActorHandle from ray.serve._private.application_state import ApplicationStateManager +from ray.serve._private.autoscaling_state import AutoscalingStateManager from ray.serve._private.common import ( DeploymentHandleSource, DeploymentID, @@ -172,12 +173,14 @@ async def __init__( if actor["namespace"] == SERVE_NAMESPACE ] + self.autoscaling_state_manager = AutoscalingStateManager() self.deployment_state_manager = DeploymentStateManager( self.kv_store, self.long_poll_host, all_serve_actor_names, get_all_live_placement_group_names(), self.cluster_node_info_cache, + self.autoscaling_state_manager, ) # Manage all applications' state @@ -248,12 +251,12 @@ def get_pid(self) -> int: return os.getpid() def record_autoscaling_metrics( - self, replica_id: str, window_avg: float, send_timestamp: float + self, replica_id: str, window_avg: Optional[float], send_timestamp: float ): logger.debug( f"Received metrics from replica {replica_id}: {window_avg} running requests" ) - self.deployment_state_manager.record_autoscaling_metrics( + self.autoscaling_state_manager.record_request_metrics_for_replica( replica_id, window_avg, send_timestamp ) @@ -271,7 +274,7 @@ def record_handle_metrics( f"Received metrics from handle {handle_id} for deployment {deployment_id}: " f"{queued_requests} queued requests and {running_requests} running requests" ) - self.deployment_state_manager.record_handle_metrics( + self.autoscaling_state_manager.record_request_metrics_for_handle( deployment_id=deployment_id, handle_id=handle_id, actor_id=actor_id, @@ -282,7 +285,7 @@ def record_handle_metrics( ) def _dump_autoscaling_metrics_for_testing(self): - return self.deployment_state_manager.get_autoscaling_metrics() + return self.autoscaling_state_manager.get_metrics() def _dump_replica_states_for_testing(self, deployment_id: DeploymentID): return self.deployment_state_manager._deployment_states[deployment_id]._replicas @@ -443,8 +446,9 @@ async def run_control_loop(self) -> None: # When the controller is done recovering, drop invalid handle metrics # that may be stale for autoscaling if not any_recovering: - self.deployment_state_manager.drop_stale_handle_metrics( - self.proxy_state_manager.get_alive_proxy_actor_ids() + self.autoscaling_state_manager.drop_stale_handle_metrics( + self.deployment_state_manager.get_alive_replica_actor_ids() + | self.proxy_state_manager.get_alive_proxy_actor_ids() ) loop_duration = time.time() - loop_start_time diff --git a/python/ray/serve/_private/deployment_info.py b/python/ray/serve/_private/deployment_info.py index 9c1d20e230680..28d16a0f9a67c 100644 --- a/python/ray/serve/_private/deployment_info.py +++ b/python/ray/serve/_private/deployment_info.py @@ -1,7 +1,6 @@ from typing import Any, Dict, Optional import ray -from ray.serve._private.autoscaling_policy import AutoscalingPolicyManager from ray.serve._private.common import TargetCapacityDirection from ray.serve._private.config import DeploymentConfig, ReplicaConfig from ray.serve.generated.serve_pb2 import DeploymentInfo as DeploymentInfoProto @@ -46,10 +45,6 @@ def __init__( self.target_capacity = target_capacity self.target_capacity_direction = target_capacity_direction - self.autoscaling_policy_manager = AutoscalingPolicyManager( - config=deployment_config.autoscaling_config - ) - def __getstate__(self) -> Dict[Any, Any]: clean_dict = self.__dict__.copy() del clean_dict["_cached_actor_def"] @@ -89,6 +84,15 @@ def set_target_capacity( self.target_capacity = new_target_capacity self.target_capacity_direction = new_target_capacity_direction + def config_changed(self, other) -> bool: + return ( + self.deployment_config != other.deployment_config + or self.replica_config.ray_actor_options + != other.replica_config.ray_actor_options + or other.version is None + or self.version != other.version + ) + @property def actor_def(self): if self._cached_actor_def is None: diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 99e4d25262eda..f80609d597ef6 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -17,10 +17,9 @@ from ray.exceptions import RayActorError, RayError, RayTaskError, RuntimeEnvSetupError from ray.serve import metrics from ray.serve._private import default_impl -from ray.serve._private.autoscaling_policy import AutoscalingPolicyManager +from ray.serve._private.autoscaling_state import AutoscalingStateManager from ray.serve._private.cluster_node_info_cache import ClusterNodeInfoCache from ray.serve._private.common import ( - DeploymentHandleSource, DeploymentID, DeploymentStatus, DeploymentStatusInfo, @@ -35,11 +34,9 @@ from ray.serve._private.config import DeploymentConfig from ray.serve._private.constants import ( MAX_DEPLOYMENT_CONSTRUCTOR_RETRY_COUNT, - RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE, RAY_SERVE_EAGERLY_START_REPLACEMENT_REPLICAS, RAY_SERVE_ENABLE_TASK_EVENTS, RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS, - RAY_SERVE_MIN_HANDLE_METRICS_TIMEOUT_S, RAY_SERVE_USE_COMPACT_SCHEDULING_STRATEGY, REPLICA_HEALTH_CHECK_UNHEALTHY_THRESHOLD, SERVE_LOGGER_NAME, @@ -192,26 +189,6 @@ class DeploymentStateUpdateResult: _SCALING_LOG_ENABLED = os.environ.get("SERVE_ENABLE_SCALING_LOG", "0") != "0" -@dataclass -class HandleRequestMetric: - actor_id: str - handle_source: DeploymentHandleSource - queued_requests: float - running_requests: Dict[ReplicaID, float] - timestamp: float - - @property - def total_requests(self) -> float: - return self.queued_requests + sum(self.running_requests.values()) - - @property - def is_serve_component_source(self) -> bool: - return self.handle_source in [ - DeploymentHandleSource.PROXY, - DeploymentHandleSource.REPLICA, - ] - - def print_verbose_scaling_log(): assert _SCALING_LOG_ENABLED @@ -1257,12 +1234,14 @@ def __init__( long_poll_host: LongPollHost, deployment_scheduler: DeploymentScheduler, cluster_node_info_cache: ClusterNodeInfoCache, + autoscaling_state_manager: AutoscalingStateManager, _save_checkpoint_func: Callable, ): self._id = id self._long_poll_host: LongPollHost = long_poll_host self._deployment_scheduler = deployment_scheduler self._cluster_node_info_cache = cluster_node_info_cache + self._autoscaling_state_manager = autoscaling_state_manager self._save_checkpoint_func = _save_checkpoint_func # Each time we set a new deployment goal, we're trying to save new @@ -1283,12 +1262,6 @@ def __init__( self.replica_average_ongoing_requests: Dict[str, float] = dict() - # Map from handle ID to (# requests recorded at handle, recording timestamp) - self.handle_requests: Dict[str, HandleRequestMetric] = dict() - self.requests_queued_at_handles: Dict[str, float] = dict() - # Number of ongoing requests reported by replicas - self.replica_average_ongoing_requests: Dict[str, float] = dict() - self.health_check_gauge = metrics.Gauge( "serve_deployment_replica_healthy", description=( @@ -1305,21 +1278,11 @@ def __init__( self._last_broadcasted_running_replica_infos: List[RunningReplicaInfo] = [] self._last_broadcasted_deployment_config = None - @property - def autoscaling_policy_manager(self) -> AutoscalingPolicyManager: - return self._target_state.info.autoscaling_policy_manager - def should_autoscale(self) -> bool: """ Check if the deployment is under autoscaling """ - return self.autoscaling_policy_manager.should_autoscale() - - def get_autoscale_metric_lookback_period(self) -> float: - """ - Return the autoscaling metrics look back period - """ - return self.autoscaling_policy_manager.config.look_back_period_s + return self._id in self._autoscaling_state_manager._autoscaling_states def get_checkpoint_data(self) -> DeploymentTargetState: """ @@ -1336,6 +1299,10 @@ def recover_target_state_from_checkpoint( self._deployment_scheduler.on_deployment_deployed( self._id, self._target_state.info.replica_config ) + if self._target_state.info.deployment_config.autoscaling_config: + self._autoscaling_state_manager.register_deployment( + self._id, self._target_state.info + ) def recover_current_state_from_replica_actor_names( self, replica_actor_names: List[str] @@ -1399,6 +1366,14 @@ def app_name(self) -> str: def get_alive_replica_actor_ids(self) -> Set[str]: return {replica.actor_id for replica in self._replicas.get()} + def get_running_replica_ids(self) -> List[ReplicaID]: + return [ + replica.replica_id + for replica in self._replicas.get( + [ReplicaState.RUNNING, ReplicaState.PENDING_MIGRATION] + ) + ] + def get_running_replica_infos(self) -> List[RunningReplicaInfo]: return [ replica.get_running_replica_info(self._cluster_node_info_cache) @@ -1580,23 +1555,12 @@ def deploy(self, deployment_info: DeploymentInfo) -> bool: if not deployment_settings_changed and not target_capacity_changed: return False - # Decide new target num_replicas. - autoscaling_policy_manager = deployment_info.autoscaling_policy_manager - if autoscaling_policy_manager.should_autoscale(): - initial_replicas = autoscaling_policy_manager.config.initial_replicas - if deployment_settings_changed and initial_replicas is not None: - target_num_replicas = get_capacity_adjusted_num_replicas( - initial_replicas, - deployment_info.target_capacity, - ) - else: - target_num_replicas = autoscaling_policy_manager.apply_bounds( - self._target_state.target_num_replicas, - deployment_info.target_capacity, - deployment_info.target_capacity_direction, - ) - + if deployment_info.deployment_config.autoscaling_config: + target_num_replicas = self._autoscaling_state_manager.register_deployment( + self._id, deployment_info, self._target_state.target_num_replicas + ) else: + self._autoscaling_state_manager.deregister_deployment(self._id) target_num_replicas = get_capacity_adjusted_num_replicas( deployment_info.deployment_config.num_replicas, deployment_info.target_capacity, @@ -1637,95 +1601,15 @@ def deploy(self, deployment_info: DeploymentInfo) -> bool: self._backoff_time_s = 1 return True - def drop_stale_handle_metrics(self, alive_serve_actor_ids: Set[str]) -> None: - """Drops handle metrics that are no longer valid. - - This includes handles that live on Serve Proxy or replica actors - that have died AND handles from which the controller hasn't - received an update for too long. - """ - - timeout_s = max( - 2 * self.autoscaling_policy_manager.get_metrics_interval_s(), - RAY_SERVE_MIN_HANDLE_METRICS_TIMEOUT_S, - ) - for handle_id, handle_metric in list(self.handle_requests.items()): - # Drop metrics for handles that are on Serve proxy/replica - # actors that have died - if ( - handle_metric.is_serve_component_source - and handle_metric.actor_id not in alive_serve_actor_ids - ): - del self.handle_requests[handle_id] - if handle_metric.total_requests > 0: - logger.debug( - f"Dropping metrics for handle '{handle_id}' because the Serve " - f"actor it was on ({handle_metric.actor_id}) is no longer " - f"alive. It had {handle_metric.total_requests} ongoing requests" - ) - # Drop metrics for handles that haven't sent an update in a while. - # This is expected behavior for handles that were on replicas or - # proxies that have been shut down. - elif time.time() - handle_metric.timestamp >= timeout_s: - del self.handle_requests[handle_id] - if handle_metric.total_requests > 0: - logger.info( - f"Dropping stale metrics for handle '{handle_id}' " - f"because no update was received for {timeout_s:.1f}s. " - f"Ongoing requests was: {handle_metric.total_requests}." - ) - - def get_total_num_requests(self) -> float: - """Get average total number of requests aggregated over the past - `look_back_period_s` number of seconds. - - If there are 0 running replicas, then returns the total number - of requests queued at handles - - If the flag RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE is - set to 1, the returned average includes both queued and ongoing - requests. Otherwise, the returned average includes only ongoing - requests. - """ - - total_requests = 0 - running_replicas = self._replicas.get( - [ReplicaState.RUNNING, ReplicaState.PENDING_MIGRATION] - ) - - if ( - RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE - or len(running_replicas) == 0 - ): - for handle_metric in self.handle_requests.values(): - total_requests += handle_metric.queued_requests - for replica in running_replicas: - id = replica.replica_id - if id in handle_metric.running_requests: - total_requests += handle_metric.running_requests[id] - else: - for replica in running_replicas: - id = replica.replica_id - if id in self.replica_average_ongoing_requests: - total_requests += self.replica_average_ongoing_requests[id][1] - - return total_requests - def autoscale(self) -> int: """Autoscale the deployment based on metrics.""" if self._target_state.deleting: return - total_num_requests = self.get_total_num_requests() - num_running_replicas = len(self.get_running_replica_infos()) - autoscaling_policy_manager = self.autoscaling_policy_manager - decision_num_replicas = autoscaling_policy_manager.get_decision_num_replicas( + decision_num_replicas = self._autoscaling_state_manager.get_target_num_replicas( + deployment_id=self._id, curr_target_num_replicas=self._target_state.target_num_replicas, - total_num_requests=total_num_requests, - num_running_replicas=num_running_replicas, - target_capacity=self._target_state.info.target_capacity, - target_capacity_direction=self._target_state.info.target_capacity_direction, ) if ( @@ -1742,12 +1626,19 @@ def autoscale(self) -> int: # The deployment should only transition to UPSCALING/DOWNSCALING # if it's within the autoscaling bounds - if not self._is_within_autoscaling_bounds(): + if not self._autoscaling_state_manager.is_within_bounds( + self._id, + self._replicas.count( + states=[ReplicaState.RUNNING], version=self._target_state.version + ), + ): return curr_stats_str = ( - f"Current ongoing requests: {total_num_requests:.2f}, " - f"current running replicas: {num_running_replicas}." + f"Current ongoing requests: " + f"{self._autoscaling_state_manager.get_total_num_requests(self._id):.2f}, " + f"current running replicas: " + f"{self._replicas.count(states=[ReplicaState.RUNNING])}." ) new_num = self._target_state.target_num_replicas if new_num > old_num: @@ -1769,33 +1660,6 @@ def autoscale(self) -> int: message=f"Downscaling from {old_num} to {new_num} replicas.", ) - def _is_within_autoscaling_bounds(self) -> bool: - """Whether or not this deployment is within the autoscaling bounds. - - This method should only be used for autoscaling deployments. It raises - an assertion error otherwise. - - Returns: True if the number of running replicas for the current - deployment version is within the autoscaling bounds. False - otherwise. - """ - - target_version = self._target_state.version - num_replicas_running_at_target_version = self._replicas.count( - states=[ReplicaState.RUNNING], version=target_version - ) - - assert self.autoscaling_policy_manager is not None - - return ( - self.autoscaling_policy_manager.apply_bounds( - num_replicas_running_at_target_version, - self._target_state.info.target_capacity, - self._target_state.info.target_capacity_direction, - ) - == num_replicas_running_at_target_version - ) - def delete(self) -> None: if not self._target_state.deleting: self._set_target_state_deleting() @@ -2417,43 +2281,6 @@ def migrate_replicas_on_draining_nodes(self, draining_nodes: Dict[str, int]): for replica in replicas_to_keep: self._replicas.add(ReplicaState.PENDING_MIGRATION, replica) - def record_autoscaling_metrics( - self, replica_id: ReplicaID, window_avg: float, send_timestamp: float - ) -> None: - """Records average ongoing requests at replicas.""" - - if ( - replica_id not in self.replica_average_ongoing_requests - or send_timestamp > self.replica_average_ongoing_requests[replica_id][0] - ): - self.replica_average_ongoing_requests[replica_id] = ( - send_timestamp, - window_avg, - ) - - def record_request_metrics_for_handle( - self, - handle_id: str, - actor_id: Optional[str], - handle_source: DeploymentHandleSource, - queued_requests: float, - running_requests: Dict[ReplicaID, float], - send_timestamp: float, - ) -> None: - """Update request metric for a specific handle.""" - - if ( - handle_id not in self.handle_requests - or send_timestamp > self.handle_requests[handle_id].timestamp - ): - self.handle_requests[handle_id] = HandleRequestMetric( - actor_id=actor_id, - handle_source=handle_source, - queued_requests=queued_requests, - running_requests=running_requests, - timestamp=send_timestamp, - ) - def record_multiplexed_model_ids( self, replica_id: ReplicaID, multiplexed_model_ids: List[str] ) -> None: @@ -2495,6 +2322,7 @@ def __init__( all_current_actor_names: List[str], all_current_placement_group_names: List[str], cluster_node_info_cache: ClusterNodeInfoCache, + autoscaling_state_manager: AutoscalingStateManager, head_node_id_override: Optional[str] = None, create_placement_group_fn_override: Optional[Callable] = None, ): @@ -2506,6 +2334,7 @@ def __init__( head_node_id_override, create_placement_group_fn_override, ) + self._autoscaling_state_manager = autoscaling_state_manager self._deployment_states: Dict[DeploymentID, DeploymentState] = dict() @@ -2523,49 +2352,10 @@ def _create_deployment_state(self, deployment_id): self._long_poll_host, self._deployment_scheduler, self._cluster_node_info_cache, + self._autoscaling_state_manager, self._save_checkpoint_func, ) - def record_autoscaling_metrics( - self, replica_id: ReplicaID, window_avg: float, send_timestamp: float - ): - if window_avg is not None: - self._deployment_states[ - replica_id.deployment_id - ].record_autoscaling_metrics(replica_id, window_avg, send_timestamp) - - def record_handle_metrics( - self, - deployment_id: str, - handle_id: str, - actor_id: Optional[str], - handle_source: DeploymentHandleSource, - queued_requests: float, - running_requests: Dict[ReplicaID, float], - send_timestamp: float, - ): - # NOTE(zcin): There can be handles to deleted deployments still - # sending metrics to the controller - if deployment_id in self._deployment_states: - self._deployment_states[deployment_id].record_request_metrics_for_handle( - handle_id=handle_id, - actor_id=actor_id, - handle_source=handle_source, - queued_requests=queued_requests, - running_requests=running_requests, - send_timestamp=send_timestamp, - ) - - def get_autoscaling_metrics(self): - """Return autoscaling metrics (used for dumping from controller)""" - - return { - deployment: deployment_state.get_total_num_requests() - if deployment_state.should_autoscale() - else None - for deployment, deployment_state in self._deployment_states.items() - } - def _map_actor_names_to_deployment( self, all_current_actor_names: List[str] ) -> Dict[str, List[str]]: @@ -2597,25 +2387,6 @@ def _map_actor_names_to_deployment( return deployment_to_current_replicas - def drop_stale_handle_metrics(self, alive_proxy_actor_ids: Set[str]): - """Drops handle metrics that are no longer valid. - - This includes handles that live on Serve Proxy or replica actors - that have died AND handles from which the controller hasn't - received an update for too long. - """ - - all_alive_serve_actor_ids = set.union( - alive_proxy_actor_ids, - *[ - ds.get_alive_replica_actor_ids() - for ds in self._deployment_states.values() - ], - ) - for deployment_state in self._deployment_states.values(): - if deployment_state.should_autoscale(): - deployment_state.drop_stale_handle_metrics(all_alive_serve_actor_ids) - def _detect_and_remove_leaked_placement_groups( self, all_current_actor_names: List[str], @@ -2803,6 +2574,13 @@ def get_deployment_statuses( statuses.append(state.curr_status_info) return statuses + def get_alive_replica_actor_ids(self) -> Set[str]: + alive_replica_actor_ids = set() + for ds in self._deployment_states.values(): + alive_replica_actor_ids |= ds.get_alive_replica_actor_ids() + + return alive_replica_actor_ids + def deploy( self, deployment_id: DeploymentID, @@ -2916,13 +2694,19 @@ def update(self) -> bool: self._handle_scheduling_request_failures(deployment_id, scheduling_requests) # STEP 7: Broadcast long poll information - for deployment_state in self._deployment_states.values(): + for deployment_id, deployment_state in self._deployment_states.items(): deployment_state.broadcast_running_replicas_if_changed() deployment_state.broadcast_deployment_config_if_changed() + if deployment_state.should_autoscale(): + self._autoscaling_state_manager.update_running_replica_ids( + deployment_id=deployment_id, + running_replicas=deployment_state.get_running_replica_ids(), + ) # STEP 8: Cleanup for deployment_id in deleted_ids: self._deployment_scheduler.on_deployment_deleted(deployment_id) + self._autoscaling_state_manager.deregister_deployment(deployment_id) del self._deployment_states[deployment_id] if len(deleted_ids): diff --git a/python/ray/serve/tests/unit/test_application_state.py b/python/ray/serve/tests/unit/test_application_state.py index 279bbb8332561..eba5e73697135 100644 --- a/python/ray/serve/tests/unit/test_application_state.py +++ b/python/ray/serve/tests/unit/test_application_state.py @@ -990,9 +990,9 @@ def test_override_autoscaling_config(self, info): updated_info = updated_infos["A"] assert updated_info.route_prefix == "/" assert updated_info.version == "123" - assert updated_info.autoscaling_policy_manager.config.min_replicas == 1 - assert updated_info.autoscaling_policy_manager.config.initial_replicas == 12 - assert updated_info.autoscaling_policy_manager.config.max_replicas == 79 + assert updated_info.deployment_config.autoscaling_config.min_replicas == 1 + assert updated_info.deployment_config.autoscaling_config.initial_replicas == 12 + assert updated_info.deployment_config.autoscaling_config.max_replicas == 79 def test_override_route_prefix_1(self, info): config = ServeApplicationSchema( diff --git a/python/ray/serve/tests/unit/test_deployment_state.py b/python/ray/serve/tests/unit/test_deployment_state.py index 758f6740b51a1..4d266ea390853 100644 --- a/python/ray/serve/tests/unit/test_deployment_state.py +++ b/python/ray/serve/tests/unit/test_deployment_state.py @@ -5,6 +5,7 @@ import pytest +from ray.serve._private.autoscaling_state import AutoscalingStateManager from ray.serve._private.common import ( DeploymentHandleSource, DeploymentID, @@ -323,6 +324,7 @@ def mock_deployment_state_manager( kv_store = MockKVStore() cluster_node_info_cache = MockClusterNodeInfoCache() cluster_node_info_cache.add_node("node-id") + autoscaling_state_manager = AutoscalingStateManager() def create_deployment_state_manager( actor_names=None, @@ -341,11 +343,17 @@ def create_deployment_state_manager( actor_names, placement_group_names, cluster_node_info_cache, + autoscaling_state_manager, head_node_id_override="fake-head-node-id", create_placement_group_fn_override=create_placement_group_fn_override, ) - yield create_deployment_state_manager, timer, cluster_node_info_cache + yield ( + create_deployment_state_manager, + timer, + cluster_node_info_cache, + autoscaling_state_manager, + ) dead_replicas_context.clear() @@ -591,7 +599,7 @@ def check_counts( def test_create_delete_single_replica(mock_deployment_state_manager): - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() info_1, v1 = deployment_info() @@ -637,7 +645,7 @@ def test_create_delete_single_replica(mock_deployment_state_manager): def test_force_kill(mock_deployment_state_manager): - create_dsm, timer, _ = mock_deployment_state_manager + create_dsm, timer, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() grace_period_s = 10 @@ -690,7 +698,7 @@ def test_force_kill(mock_deployment_state_manager): def test_redeploy_same_version(mock_deployment_state_manager): # Redeploying with the same version and code should do nothing. - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() info_1, v1 = deployment_info(version="1") @@ -750,7 +758,7 @@ def test_redeploy_no_version(mock_deployment_state_manager): redeploy the replicas. """ - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() b_info_1, v1 = deployment_info(version=None) @@ -857,7 +865,7 @@ def test_redeploy_no_version(mock_deployment_state_manager): def test_redeploy_new_version(mock_deployment_state_manager): """Redeploying with a new version should start a new replica.""" - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() b_info_1, v1 = deployment_info(version="1") @@ -953,7 +961,7 @@ def test_redeploy_different_num_replicas(mock_deployment_state_manager): 4. Makes deployment HEALTHY, and then redeploys with more replicas -> check that is becomes DOWNSCALING. """ - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() version = "1" @@ -1062,7 +1070,7 @@ def test_deploy_new_config_same_code_version( ): """Deploying a new config with the same version should not deploy a new replica.""" - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() b_info_1, v1 = deployment_info(version="1") @@ -1120,7 +1128,7 @@ def test_deploy_new_config_same_code_version( def test_deploy_new_config_same_code_version_2(mock_deployment_state_manager): """Make sure we don't transition from STARTING to UPDATING directly.""" - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() b_info_1, v1 = deployment_info(version="1") @@ -1171,7 +1179,7 @@ def test_deploy_new_config_same_code_version_2(mock_deployment_state_manager): def test_deploy_new_config_new_version(mock_deployment_state_manager): # Deploying a new config with a new version should deploy a new replica. - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() b_info_1, v1 = deployment_info(version="1") @@ -1233,7 +1241,7 @@ def test_deploy_new_config_new_version(mock_deployment_state_manager): def test_initial_deploy_no_throttling(mock_deployment_state_manager): # All replicas should be started at once for a new deployment. - create_dsm, _, cluster_node_info_cache = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() b_info_1, v1 = deployment_info(num_replicas=10, version="1") @@ -1274,7 +1282,7 @@ def test_new_version_deploy_throttling_old(mock_deployment_state_manager): Testing old behavior, where replicas fully stop before starting new ones. """ - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() b_info_1, v1 = deployment_info(num_replicas=10, version="1", user_config="1") @@ -1417,7 +1425,7 @@ def test_new_version_deploy_throttling_new(mock_deployment_state_manager): should apply to both code version and user config updates. """ - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() b_info_1, v1 = deployment_info(num_replicas=10, version="1", user_config="1") @@ -1566,7 +1574,7 @@ def test_reconfigure_throttling(mock_deployment_state_manager): When the version is updated, it should be throttled. """ - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() b_info_1, v1 = deployment_info(num_replicas=2, version="1", user_config="1") @@ -1643,7 +1651,7 @@ def test_new_version_and_scale_down(mock_deployment_state_manager): # Test the case when we reduce the number of replicas and change the # version at the same time. First the number of replicas should be # turned down, then the rolling update should happen. - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() b_info_1, v1 = deployment_info(num_replicas=10, version="1") @@ -1770,7 +1778,7 @@ def test_new_version_and_scale_up(mock_deployment_state_manager): # Test the case when we increase the number of replicas and change the # version at the same time. The new replicas should all immediately be # turned up. When they're up, rolling update should trigger. - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() b_info_1, v1 = deployment_info(num_replicas=2, version="1") @@ -1880,7 +1888,7 @@ def test_scale_num_replicas(mock_deployment_state_manager, target_capacity_direc version = get_random_string() # Create deployment state manager - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() # Deploy deployment with 3 replicas @@ -1969,8 +1977,9 @@ def test_basic_autoscaling(mock_deployment_state_manager, target_capacity_direct """ # Create deployment state manager - create_dsm, timer, _ = mock_deployment_state_manager + create_dsm, timer, _, asm = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() + asm: AutoscalingStateManager = asm # Deploy deployment with 3 replicas info, v1 = deployment_info( @@ -2012,7 +2021,7 @@ def test_basic_autoscaling(mock_deployment_state_manager, target_capacity_direct req_per_replica = 2 if target_capacity_direction == "up" else 0 replicas = ds._replicas.get() if RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE: - dsm.record_handle_metrics( + asm.record_request_metrics_for_handle( deployment_id=TEST_DEPLOYMENT_ID, handle_id="random", actor_id=None, @@ -2025,7 +2034,7 @@ def test_basic_autoscaling(mock_deployment_state_manager, target_capacity_direct ) else: for replica in replicas: - dsm.record_autoscaling_metrics( + asm.record_request_metrics_for_replica( replica_id=replica._actor.replica_id, window_avg=req_per_replica, send_timestamp=timer.time(), @@ -2081,6 +2090,15 @@ def test_basic_autoscaling(mock_deployment_state_manager, target_capacity_direct else DeploymentStatusTrigger.DOWNSCALE_COMPLETED ) + # Make sure autoscaling state is removed when deployment is deleted + dsm.delete_deployment(TEST_DEPLOYMENT_ID) + dsm.update() + for replica in ds._replicas.get(): + replica._actor.set_done_stopping() + dsm.update() + assert TEST_DEPLOYMENT_ID not in dsm._deployment_states + assert TEST_DEPLOYMENT_ID not in asm._autoscaling_states + @pytest.mark.parametrize( "target_startup_status", @@ -2101,8 +2119,9 @@ def test_downscaling_reclaiming_starting_replicas_first( """ # Create deployment state manager - create_dsm, timer, _ = mock_deployment_state_manager + create_dsm, timer, _, asm = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() + asm: AutoscalingStateManager = asm # Deploy deployment with 3 replicas info, _ = deployment_info( @@ -2147,7 +2166,7 @@ def test_downscaling_reclaiming_starting_replicas_first( running_replicas = ds._replicas.get(states=[ReplicaState.RUNNING]) replicas = ds._replicas.get() if RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE: - dsm.record_handle_metrics( + asm.record_request_metrics_for_handle( deployment_id=TEST_DEPLOYMENT_ID, handle_id="random", actor_id=None, @@ -2158,7 +2177,9 @@ def test_downscaling_reclaiming_starting_replicas_first( ) else: for replica in replicas: - dsm.record_autoscaling_metrics(replica._actor.replica_id, 2, timer.time()) + asm.record_request_metrics_for_replica( + replica._actor.replica_id, 2, timer.time() + ) # status=UPSCALING, status_trigger=AUTOSCALE dsm.update() @@ -2213,7 +2234,7 @@ def test_downscaling_reclaiming_starting_replicas_first( # Now, trigger downscaling attempting to reclaim half (3) of the replicas replicas = ds._replicas.get(states=[ReplicaState.RUNNING]) if RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE: - dsm.record_handle_metrics( + asm.record_request_metrics_for_handle( deployment_id=TEST_DEPLOYMENT_ID, handle_id="random", actor_id=None, @@ -2224,7 +2245,9 @@ def test_downscaling_reclaiming_starting_replicas_first( ) else: for replica in replicas: - dsm.record_autoscaling_metrics(replica._actor.replica_id, 1, timer.time()) + asm.record_request_metrics_for_replica( + replica._actor.replica_id, 1, timer.time() + ) # status=DOWNSCALING, status_trigger=AUTOSCALE dsm.update() @@ -2263,8 +2286,9 @@ def test_update_autoscaling_config(mock_deployment_state_manager): """ # Create deployment state manager - create_dsm, timer, _ = mock_deployment_state_manager + create_dsm, timer, _, asm = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() + asm: AutoscalingStateManager = asm # Deploy deployment with 3 replicas info1, _ = deployment_info( @@ -2298,7 +2322,7 @@ def test_update_autoscaling_config(mock_deployment_state_manager): # Num ongoing requests = 1, status should remain HEALTHY replicas = ds._replicas.get() if RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE: - dsm.record_handle_metrics( + asm.record_request_metrics_for_handle( deployment_id=TEST_DEPLOYMENT_ID, handle_id="random", actor_id=None, @@ -2309,7 +2333,9 @@ def test_update_autoscaling_config(mock_deployment_state_manager): ) else: for replica in replicas: - dsm.record_autoscaling_metrics(replica._actor.replica_id, 1, timer.time()) + asm.record_request_metrics_for_replica( + replica._actor.replica_id, 1, timer.time() + ) check_counts(ds, total=3, by_state=[(ReplicaState.RUNNING, 3, None)]) assert ds.curr_status_info.status == DeploymentStatus.HEALTHY @@ -2362,8 +2388,9 @@ def test_update_autoscaling_config(mock_deployment_state_manager): reason="Testing handle metrics behavior.", ) def test_handle_metrics_timeout(mock_deployment_state_manager): - create_dsm, timer, _ = mock_deployment_state_manager + create_dsm, timer, _, asm = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() + asm: AutoscalingStateManager = asm # Deploy, start with 1 replica info, _ = deployment_info( @@ -2380,12 +2407,12 @@ def test_handle_metrics_timeout(mock_deployment_state_manager): ds: DeploymentState = dsm._deployment_states[TEST_DEPLOYMENT_ID] dsm.update() ds._replicas.get()[0]._actor.set_ready() - dsm.drop_stale_handle_metrics(set()) + asm.drop_stale_handle_metrics(dsm.get_alive_replica_actor_ids()) dsm.update() check_counts(ds, total=1, by_state=[(ReplicaState.RUNNING, 1, None)]) # Record 2 requests/replica -> trigger upscale - dsm.record_handle_metrics( + asm.record_request_metrics_for_handle( deployment_id=TEST_DEPLOYMENT_ID, handle_id="random", actor_id=None, @@ -2394,36 +2421,36 @@ def test_handle_metrics_timeout(mock_deployment_state_manager): running_requests={ds._replicas.get()[0]._actor.replica_id: 2}, send_timestamp=timer.time(), ) - dsm.drop_stale_handle_metrics(set()) + asm.drop_stale_handle_metrics(dsm.get_alive_replica_actor_ids()) dsm.update() check_counts( ds, total=2, by_state=[(ReplicaState.RUNNING, 1, None), (ReplicaState.STARTING, 1, None)], ) - assert ds.get_total_num_requests() == 2 + assert asm.get_total_num_requests(TEST_DEPLOYMENT_ID) == 2 ds._replicas.get([ReplicaState.STARTING])[0]._actor.set_ready() - dsm.drop_stale_handle_metrics(set()) + asm.drop_stale_handle_metrics(dsm.get_alive_replica_actor_ids()) dsm.update() check_counts(ds, total=2, by_state=[(ReplicaState.RUNNING, 2, None)]) - assert ds.get_total_num_requests() == 2 + assert asm.get_total_num_requests(TEST_DEPLOYMENT_ID) == 2 # Simulate handle was on an actor that died. 10 seconds later # the handle fails to push metrics timer.advance(10) - dsm.drop_stale_handle_metrics(set()) + asm.drop_stale_handle_metrics(dsm.get_alive_replica_actor_ids()) dsm.update() check_counts(ds, total=2, by_state=[(ReplicaState.RUNNING, 2, None)]) - assert ds.get_total_num_requests() == 2 + assert asm.get_total_num_requests(TEST_DEPLOYMENT_ID) == 2 # Another 10 seconds later handle still fails to push metrics. At # this point the data from the handle should be invalidated. As a # result, the replicas should scale back down to 0. timer.advance(10) - dsm.drop_stale_handle_metrics(set()) + asm.drop_stale_handle_metrics(dsm.get_alive_replica_actor_ids()) dsm.update() check_counts(ds, total=2, by_state=[(ReplicaState.STOPPING, 2, None)]) - assert ds.get_total_num_requests() == 0 + assert asm.get_total_num_requests(TEST_DEPLOYMENT_ID) == 0 @pytest.mark.skipif( @@ -2433,8 +2460,9 @@ def test_handle_metrics_timeout(mock_deployment_state_manager): def test_handle_metrics_on_dead_serve_actor(mock_deployment_state_manager): """When there are handles on dead serve actors, their metrics should be dropped.""" - create_dsm, timer, _ = mock_deployment_state_manager + create_dsm, timer, _, asm = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() + asm: AutoscalingStateManager = asm d_id1 = DeploymentID("d1", "app") d_id2 = DeploymentID("d2", "app") @@ -2457,18 +2485,18 @@ def test_handle_metrics_on_dead_serve_actor(mock_deployment_state_manager): ds2: DeploymentState = dsm._deployment_states[d_id2] # One replica each - dsm.drop_stale_handle_metrics(set()) + asm.drop_stale_handle_metrics(dsm.get_alive_replica_actor_ids()) dsm.update() ds1._replicas.get()[0]._actor.set_ready() ds2._replicas.get()[0]._actor.set_ready() ds2._replicas.get()[0]._actor.set_actor_id("d2_replica_actor_id") - dsm.drop_stale_handle_metrics(set()) + asm.drop_stale_handle_metrics(dsm.get_alive_replica_actor_ids()) dsm.update() check_counts(ds1, total=1, by_state=[(ReplicaState.RUNNING, 1, None)]) check_counts(ds2, total=1, by_state=[(ReplicaState.RUNNING, 1, None)]) # Record 2 requests/replica (sent from d2 replica) -> trigger upscale - dsm.record_handle_metrics( + asm.record_request_metrics_for_handle( deployment_id=d_id1, handle_id="random", actor_id="d2_replica_actor_id", @@ -2477,23 +2505,23 @@ def test_handle_metrics_on_dead_serve_actor(mock_deployment_state_manager): running_requests={ds1._replicas.get()[0]._actor.replica_id: 2}, send_timestamp=timer.time(), ) - dsm.drop_stale_handle_metrics(set()) + asm.drop_stale_handle_metrics(dsm.get_alive_replica_actor_ids()) dsm.update() check_counts( ds1, total=2, by_state=[(ReplicaState.RUNNING, 1, None), (ReplicaState.STARTING, 1, None)], ) - assert ds1.get_total_num_requests() == 2 + assert asm.get_total_num_requests(d_id1) == 2 ds1._replicas.get([ReplicaState.STARTING])[0]._actor.set_ready() - dsm.drop_stale_handle_metrics(set()) + asm.drop_stale_handle_metrics(dsm.get_alive_replica_actor_ids()) dsm.update() check_counts(ds1, total=2, by_state=[(ReplicaState.RUNNING, 2, None)]) - assert ds1.get_total_num_requests() == 2 + assert asm.get_total_num_requests(d_id1) == 2 # d2 replica died ds2._replicas.get()[0]._actor.set_unhealthy() - dsm.drop_stale_handle_metrics(set()) + asm.drop_stale_handle_metrics(dsm.get_alive_replica_actor_ids()) dsm.update() if RAY_SERVE_EAGERLY_START_REPLACEMENT_REPLICAS: check_counts( @@ -2507,13 +2535,13 @@ def test_handle_metrics_on_dead_serve_actor(mock_deployment_state_manager): else: check_counts(ds2, total=1, by_state=[(ReplicaState.STOPPING, 1, None)]) ds2._replicas.get(states=[ReplicaState.STOPPING])[0]._actor.set_done_stopping() - dsm.drop_stale_handle_metrics(set()) + asm.drop_stale_handle_metrics(dsm.get_alive_replica_actor_ids()) dsm.update() check_counts(ds2, total=1, by_state=[(ReplicaState.STARTING, 1, None)]) # Now that the d2 replica is dead, its metrics should be dropped. # Consequently d1 should scale down to 0 replicas - dsm.drop_stale_handle_metrics(set()) + asm.drop_stale_handle_metrics(dsm.get_alive_replica_actor_ids()) dsm.update() check_counts(ds1, total=2, by_state=[(ReplicaState.STOPPING, 2, None)]) @@ -2522,7 +2550,7 @@ def test_handle_metrics_on_dead_serve_actor(mock_deployment_state_manager): def test_health_check( mock_deployment_state_manager, force_stop_unhealthy_replicas: bool ): - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() b_info_1, v1 = deployment_info(num_replicas=2, version="1") @@ -2627,7 +2655,7 @@ def test_health_check( def test_update_while_unhealthy(mock_deployment_state_manager): - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() b_info_1, v1 = deployment_info(num_replicas=2, version="1") @@ -2824,7 +2852,7 @@ def test_deploy_with_consistent_constructor_failure(mock_deployment_state_manage The deployment should get marked FAILED. """ - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() b_info_1, _ = deployment_info(num_replicas=2) @@ -2862,7 +2890,7 @@ def test_deploy_with_partial_constructor_failure(mock_deployment_state_manager): Same testing for same test case in test_deploy.py. """ - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() b_info_1, _ = deployment_info(num_replicas=2) @@ -3002,7 +3030,7 @@ def fake_create_placement_group_fn(placement_group_bundles, *args, **kwargs): validate_placement_group(bundles=placement_group_bundles) - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm( create_placement_group_fn_override=fake_create_placement_group_fn, ) @@ -3098,7 +3126,7 @@ def test_deploy_with_transient_constructor_failure(mock_deployment_state_manager Same testing for same test case in test_deploy.py. """ - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() b_info_1, _ = deployment_info(num_replicas=2) @@ -3148,7 +3176,7 @@ def test_deploy_with_transient_constructor_failure(mock_deployment_state_manager def test_exponential_backoff(mock_deployment_state_manager): """Test exponential backoff.""" - create_dsm, timer, _ = mock_deployment_state_manager + create_dsm, timer, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() b_info_1, _ = deployment_info(num_replicas=2) @@ -3202,7 +3230,7 @@ def test_exponential_backoff(mock_deployment_state_manager): def test_recover_state_from_replica_names(mock_deployment_state_manager): """Test recover deployment state.""" - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() # Deploy deployment with version "1" and one replica @@ -3251,7 +3279,7 @@ def test_recover_during_rolling_update(mock_deployment_state_manager): has an outdated version, it should be stopped and a new replica should be started with the target version. """ - create_dsm, _, cluster_node_info_cache = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm = create_dsm() # Step 1: Create some deployment info with actors in running state @@ -3332,7 +3360,7 @@ def test_actor_died_before_recover(mock_deployment_state_manager): new replica to match target state. """ - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm = create_dsm() # Create some deployment info with actors in running state @@ -3378,7 +3406,7 @@ def test_shutdown(mock_deployment_state_manager): Test that shutdown waits for all deployments to be deleted and they are force-killed without a grace period. """ - create_dsm, timer, cluster_node_info_cache = mock_deployment_state_manager + create_dsm, timer, _, _ = mock_deployment_state_manager dsm = create_dsm() grace_period_s = 10 @@ -3469,7 +3497,7 @@ def test_get_active_node_ids(mock_deployment_state_manager): """ node_ids = ("node1", "node2", "node2") - create_dsm, _, cluster_node_info_cache = mock_deployment_state_manager + create_dsm, _, cluster_node_info_cache, _ = mock_deployment_state_manager dsm = create_dsm() cluster_node_info_cache.add_node("node1") cluster_node_info_cache.add_node("node2") @@ -3523,7 +3551,7 @@ def test_get_active_node_ids_none(mock_deployment_state_manager): """ node_ids = ("node1", "node2", "node2") - create_dsm, _, cluster_node_info_cache = mock_deployment_state_manager + create_dsm, _, cluster_node_info_cache, _ = mock_deployment_state_manager dsm = create_dsm() cluster_node_info_cache.add_node("node1") cluster_node_info_cache.add_node("node2") @@ -3609,7 +3637,7 @@ def test_get_capacity_adjusted_num_replicas( def test_initial_deploy(self, mock_deployment_state_manager): """Deploy with target_capacity set, should apply immediately.""" - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() b_info_1, _ = deployment_info(num_replicas=2) @@ -3645,7 +3673,7 @@ def test_target_capacity_100_no_effect(self, mock_deployment_state_manager): Then go back to no target_capacity, should still have no effect. """ - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() code_version = "arbitrary_version" @@ -3696,7 +3724,7 @@ def test_target_capacity_100_no_effect(self, mock_deployment_state_manager): def test_target_capacity_0(self, mock_deployment_state_manager): """Deploy with target_capacity set to 0. Should have no replicas.""" - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() b_info_1, _ = deployment_info(num_replicas=100) @@ -3719,7 +3747,7 @@ def test_reduce_target_capacity(self, mock_deployment_state_manager): Deploy with target capacity set to 100, then reduce to 50, then reduce to 0. """ - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() code_version = "arbitrary_version" @@ -3837,7 +3865,7 @@ def test_increase_target_capacity(self, mock_deployment_state_manager): then increase to 100. """ - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() code_version = "arbitrary_version" @@ -3943,7 +3971,7 @@ def test_increase_target_capacity(self, mock_deployment_state_manager): def test_clear_target_capacity(self, mock_deployment_state_manager): """Deploy with target_capacity set, should apply immediately.""" - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() code_version = "arbitrary_version" @@ -4007,7 +4035,7 @@ def test_target_num_replicas_is_zero(self, mock_deployment_state_manager): target_capacity. """ - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() # Set num_replicas to 0. @@ -4114,7 +4142,7 @@ def test_target_capacity_with_changing_num_replicas( autoscaling). """ - create_dsm, _, _ = mock_deployment_state_manager + create_dsm, _, _, _ = mock_deployment_state_manager dsm: DeploymentStateManager = create_dsm() # Set num_replicas to 0. @@ -4309,7 +4337,7 @@ def test_draining_start_then_stop_replica(self, mock_deployment_state_manager): transitions to RUNNING. """ - create_dsm, timer, cluster_node_info_cache = mock_deployment_state_manager + create_dsm, timer, cluster_node_info_cache, _ = mock_deployment_state_manager cluster_node_info_cache.add_node("node-1") cluster_node_info_cache.add_node("node-2") dsm: DeploymentStateManager = create_dsm() @@ -4399,7 +4427,7 @@ def test_draining_stop_replica_before_deadline(self, mock_deployment_state_manag replica hasn't transitioned to RUNNING yet. """ - create_dsm, timer, cluster_node_info_cache = mock_deployment_state_manager + create_dsm, timer, cluster_node_info_cache, _ = mock_deployment_state_manager cluster_node_info_cache.add_node("node-1") cluster_node_info_cache.add_node("node-2") dsm: DeploymentStateManager = create_dsm() @@ -4479,7 +4507,7 @@ def test_draining_multiple_nodes(self, mock_deployment_state_manager): deadlines when new replicas are started. """ - create_dsm, timer, cluster_node_info_cache = mock_deployment_state_manager + create_dsm, timer, cluster_node_info_cache, _ = mock_deployment_state_manager cluster_node_info_cache.add_node("node-1") cluster_node_info_cache.add_node("node-2") cluster_node_info_cache.add_node("node-3") @@ -4592,7 +4620,7 @@ def test_draining_multiple_nodes(self, mock_deployment_state_manager): def test_replicas_unhealthy_on_draining_node(self, mock_deployment_state_manager): """Replicas pending migration should be stopped if unhealthy.""" - create_dsm, timer, cluster_node_info_cache = mock_deployment_state_manager + create_dsm, timer, cluster_node_info_cache, _ = mock_deployment_state_manager cluster_node_info_cache.add_node("node-1") cluster_node_info_cache.add_node("node-2") dsm: DeploymentStateManager = create_dsm() @@ -4664,7 +4692,7 @@ def test_replicas_unhealthy_on_draining_node(self, mock_deployment_state_manager def test_starting_replica_on_draining_node(self, mock_deployment_state_manager): """When a node gets drained, replicas in STARTING state should be stopped.""" - create_dsm, timer, cluster_node_info_cache = mock_deployment_state_manager + create_dsm, timer, cluster_node_info_cache, _ = mock_deployment_state_manager cluster_node_info_cache.add_node("node-1") cluster_node_info_cache.add_node("node-2") dsm: DeploymentStateManager = create_dsm() @@ -4733,7 +4761,7 @@ def test_starting_replica_on_draining_node(self, mock_deployment_state_manager): def test_in_place_update_during_draining(self, mock_deployment_state_manager): """Test that pending migration replicas of old versions are updated.""" - create_dsm, timer, cluster_node_info_cache = mock_deployment_state_manager + create_dsm, timer, cluster_node_info_cache, _ = mock_deployment_state_manager cluster_node_info_cache.add_node("node-1") cluster_node_info_cache.add_node("node-2") dsm: DeploymentStateManager = create_dsm()