diff --git a/doc/source/serve/advanced.rst b/doc/source/serve/advanced.rst index 195444aa41450..26003559dfa6f 100644 --- a/doc/source/serve/advanced.rst +++ b/doc/source/serve/advanced.rst @@ -16,7 +16,7 @@ the properties of a particular backend. Scaling Out =========== -To scale out a backend to multiple workers, simply configure the number of replicas. +To scale out a backend to many instances, simply configure the number of replicas. .. code-block:: python @@ -27,14 +27,14 @@ To scale out a backend to multiple workers, simply configure the number of repli config = {"num_replicas": 2} client.update_backend_config("my_scaled_endpoint_backend", config) -This will scale up or down the number of workers that can accept requests. +This will scale up or down the number of replicas that can accept requests. Using Resources (CPUs, GPUs) ============================ -To assign hardware resources per worker, you can pass resource requirements to +To assign hardware resources per replica, you can pass resource requirements to ``ray_actor_options``. -By default, each worker requires one CPU. +By default, each replica requires one CPU. To learn about options to pass in, take a look at :ref:`Resources with Actor` guide. For example, to create a backend where each replica uses a single GPU, you can do the @@ -49,7 +49,7 @@ Fractional Resources -------------------- The resources specified in ``ray_actor_options`` can also be *fractional*. -This allows you to flexibly share resources between workers. +This allows you to flexibly share resources between replicas. For example, if you have two models and each doesn't fully saturate a GPU, you might want to have them share a GPU by allocating 0.5 GPUs each. The same could be done to multiplex over CPUs. diff --git a/doc/source/serve/architecture.rst b/doc/source/serve/architecture.rst index ac98d5c68b9bb..9cb036ad824a8 100644 --- a/doc/source/serve/architecture.rst +++ b/doc/source/serve/architecture.rst @@ -20,7 +20,7 @@ There are three kinds of actors that are created to make up a Serve instance: destroying other actors. Serve API calls like :mod:`client.create_backend `, :mod:`client.create_endpoint ` make remote calls to the Controller. - Router: There is one router per node. Each router is a `Uvicorn `_ HTTP - server that accepts incoming requests, forwards them to the worker replicas, and + server that accepts incoming requests, forwards them to replicas, and responds once they are completed. - Worker Replica: Worker replicas actually execute the code in response to a request. For example, they may contain an instantiation of an ML model. Each @@ -36,16 +36,16 @@ When an HTTP request is sent to the router, the follow things happen: - One or more :ref:`backends ` is selected to handle the request given the :ref:`traffic splitting ` and :ref:`shadow testing ` rules. The requests for each backend are placed on a queue. -- For each request in a backend queue, an available worker replica is looked up - and the request is sent to it. If there are no available worker replicas (there +- For each request in a backend queue, an available replica is looked up + and the request is sent to it. If there are no available replicas (there are more than ``max_concurrent_queries`` requests outstanding), the request is left in the queue until an outstanding request is finished. -Each worker maintains a queue of requests and processes one batch of requests at +Each replica maintains a queue of requests and processes one batch of requests at a time. By default the batch size is 1, you can increase the batch size to increase throughput. If the handler (the function for the backend or -``__call__``) is ``async``, worker will not wait for the handler to run; -otherwise, worker will block until the handler returns. +``__call__``) is ``async``, the replica will not wait for the handler to run; +otherwise, the replica will block until the handler returns. FAQ --- @@ -54,14 +54,14 @@ How does Serve handle fault tolerance? Application errors like exceptions in your model evaluation code are caught and wrapped. A 500 status code will be returned with the traceback information. The -worker replica will be able to continue to handle requests. +replica will be able to continue to handle requests. Machine errors and faults will be handled by Ray. Serve utilizes the :ref:`actor reconstruction ` capability. For example, when a machine hosting any of the actors crashes, those actors will be automatically restarted on another available machine. All data in the Controller (routing policies, backend configurations, etc) is checkpointed to the Ray. Transient data in the -router and the worker replica (like network connections and internal request +router and the replica (like network connections and internal request queues) will be lost upon failure. How does Serve ensure horizontal scalability and availability? @@ -72,14 +72,14 @@ should be able to reach Serve and send requests to any models via any of the servers. This architecture ensures horizontal scalability for Serve. You can scale the -router by adding more nodes and scale the model workers by increasing the number +router by adding more nodes and scale the model by increasing the number of replicas. How do ServeHandles work? ^^^^^^^^^^^^^^^^^^^^^^^^^ :mod:`ServeHandles ` wrap a handle to the router actor on the same node. When a -request is sent from one via worker replica to another via the handle, the +request is sent from one via replica to another via the handle, the requests go through the same data path as incoming HTTP requests. This enables the same backend selection and batching procedures to happen. ServeHandles are often used to implement :ref:`model composition `. @@ -91,4 +91,4 @@ What happens to large requests? Serve utilizes Ray’s :ref:`shared memory object store ` and in process memory store. Small request objects are directly sent between actors via network call. Larger request objects (100KiB+) are written to a distributed shared -memory store and the worker can read them via zero-copy read. +memory store and the replica can read them via zero-copy read. diff --git a/doc/source/serve/key-concepts.rst b/doc/source/serve/key-concepts.rst index 0c5e9e9e3f13c..d15a142c85532 100644 --- a/doc/source/serve/key-concepts.rst +++ b/doc/source/serve/key-concepts.rst @@ -24,7 +24,7 @@ A backend is defined using :mod:`client.create_backend `, shown below. -A backend consists of a number of *replicas*, which are individual copies of the function or class that are started in separate worker processes. +A backend consists of a number of *replicas*, which are individual copies of the function or class that are started in separate Ray Workers (processes). .. code-block:: python diff --git a/doc/source/serve/tutorials/batch.rst b/doc/source/serve/tutorials/batch.rst index 1183a22c3779a..e9a74f94f0ea4 100644 --- a/doc/source/serve/tutorials/batch.rst +++ b/doc/source/serve/tutorials/batch.rst @@ -63,7 +63,7 @@ are specifying the maximum batch size via ``config={"max_batch_size": 4}``. This configuration option limits the maximum possible batch size sent to the backend. .. note:: - Ray Serve performs *opportunistic batching*. When a worker is free to evaluate + Ray Serve performs *opportunistic batching*. When a replica is free to evaluate the next batch, Ray Serve will look at the pending queries and take ``max(number_of_pending_queries, max_batch_size)`` queries to form a batch. You can provide :mod:`batch_wait_timeout ` to override diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index cc6acd0adaf44..8640398816575 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -166,7 +166,7 @@ def update_backend_config( config_options(dict, serve.BackendConfig): Backend config options to update. Either a BackendConfig object or a dict mapping strings to values for the following supported options: - - "num_replicas": number of worker processes to start up that + - "num_replicas": number of processes to start up that will handle requests to this backend. - "max_batch_size": the maximum number of requests that will be processed in one batch by this backend. @@ -221,7 +221,7 @@ def create_backend( config (dict, serve.BackendConfig, optional): configuration options for this backend. Either a BackendConfig, or a dictionary mapping strings to values for the following supported options: - - "num_replicas": number of worker processes to start up that + - "num_replicas": number of processes to start up that will handle requests to this backend. - "max_batch_size": the maximum number of requests that will be processed in one batch by this backend. diff --git a/python/ray/serve/backend_worker.py b/python/ray/serve/backend_worker.py index 7caddb4d89f68..ea83800a50d93 100644 --- a/python/ray/serve/backend_worker.py +++ b/python/ray/serve/backend_worker.py @@ -87,8 +87,8 @@ async def wait_for_batch(self) -> List[Query]: return batch -def create_backend_worker(func_or_class: Union[Callable, Type[Callable]]): - """Creates a worker class wrapping the provided function or class.""" +def create_backend_replica(func_or_class: Union[Callable, Type[Callable]]): + """Creates a replica class wrapping the provided function or class.""" if inspect.isfunction(func_or_class): is_function = True @@ -98,7 +98,7 @@ def create_backend_worker(func_or_class: Union[Callable, Type[Callable]]): assert False, "func_or_class must be function or class." # TODO(architkulkarni): Add type hints after upgrading cloudpickle - class RayServeWrappedWorker(object): + class RayServeWrappedReplica(object): def __init__(self, backend_tag, replica_tag, init_args, backend_config: BackendConfig, controller_name: str): # Set the controller name so that serve.connect() will connect to @@ -109,8 +109,8 @@ def __init__(self, backend_tag, replica_tag, init_args, else: _callable = func_or_class(*init_args) - self.backend = RayServeWorker(backend_tag, replica_tag, _callable, - backend_config, is_function) + self.backend = RayServeReplica(backend_tag, replica_tag, _callable, + backend_config, is_function) async def handle_request(self, request): return await self.backend.handle_request(request) @@ -121,8 +121,9 @@ def update_config(self, new_config: BackendConfig): def ready(self): pass - RayServeWrappedWorker.__name__ = "RayServeWorker_" + func_or_class.__name__ - return RayServeWrappedWorker + RayServeWrappedReplica.__name__ = "RayServeReplica_{}".format( + func_or_class.__name__) + return RayServeWrappedReplica def wrap_to_ray_error(exception: Exception) -> RayTaskError: @@ -140,7 +141,7 @@ def ensure_async(func: Callable) -> Callable: return sync_to_async(func) -class RayServeWorker: +class RayServeReplica: """Handles requests with the provided callable.""" def __init__(self, backend_tag: str, replica_tag: str, _callable: Callable, @@ -172,8 +173,8 @@ def __init__(self, backend_tag: str, replica_tag: str, _callable: Callable, self.error_counter.set_default_tags({"backend": self.backend_tag}) self.restart_counter = metrics.Count( - "backend_worker_starts", - description=("The number of time this replica workers " + "backend_replica_starts", + description=("The number of time this replica " "has been restarted due to failure."), tag_keys=("backend", "replica_tag")) self.restart_counter.set_default_tags({ @@ -288,7 +289,7 @@ async def invoke_batch(self, request_item_list: List[Query]) -> List[Any]: if not isinstance(result_list, Iterable) or isinstance( result_list, (dict, set)): error_message = ("RayServe expects an ordered iterable object " - "but the worker returned a {}".format( + "but the replica returned a {}".format( type(result_list))) raise RayServeException(error_message) diff --git a/python/ray/serve/config.py b/python/ray/serve/config.py index 8007bd4b815d4..0a8070d9e24f3 100644 --- a/python/ray/serve/config.py +++ b/python/ray/serve/config.py @@ -30,7 +30,7 @@ class BackendMetadata: class BackendConfig(BaseModel): """Configuration options for a backend, to be set by the user. - :param num_replicas: The number of worker processes to start up that will + :param num_replicas: The number of processes to start up that will handle requests to this backend. Defaults to 0. :type num_replicas: int, optional :param max_batch_size: The maximum number of requests that will be @@ -81,7 +81,7 @@ def _validate_complete(self): # Dynamic default for max_concurrent_queries @validator("max_concurrent_queries", always=True) - def set_max_queries_by_mode(cls, v, values): + def set_max_queries_by_mode(cls, v, values): # noqa 805 if v is None: # Model serving mode: if the servable is blocking and the wait # timeout is default zero seconds, then we keep the existing @@ -95,8 +95,8 @@ def set_max_queries_by_mode(cls, v, values): v = 8 # Pipeline/async mode: if the servable is not blocking, - # router should just keep pushing queries to the worker - # replicas until a high limit. + # router should just keep pushing queries to the replicas + # until a high limit. if not values["internal_metadata"].is_blocking: v = ASYNC_CONCURRENCY diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index fa171868f63c3..b764b704eda45 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -11,7 +11,7 @@ import ray import ray.cloudpickle as pickle from ray.serve.autoscaling_policy import BasicAutoscalingPolicy -from ray.serve.backend_worker import create_backend_worker +from ray.serve.backend_worker import create_backend_replica from ray.serve.constants import ASYNC_CONCURRENCY, SERVE_PROXY_NAME from ray.serve.http_proxy import HTTPProxyActor from ray.serve.kv_store import RayInternalKVStore @@ -73,7 +73,7 @@ def set_shadow(self, backend: str, proportion: float): class BackendInfo(BaseModel): # TODO(architkulkarni): Add type hint for worker_class after upgrading - # cloudpickle and adding types to RayServeWrappedWorker + # cloudpickle and adding types to RayServeWrappedReplica worker_class: Any backend_config: BackendConfig replica_config: ReplicaConfig @@ -112,94 +112,94 @@ class ActorStateReconciler: detached: bool = field(init=True) routers_cache: Dict[NodeId, ActorHandle] = field(default_factory=dict) - replicas: Dict[BackendTag, List[ReplicaTag]] = field( - default_factory=lambda: defaultdict(list)) - replicas_to_start: Dict[BackendTag, List[ReplicaTag]] = field( + backend_replicas: Dict[BackendTag, Dict[ReplicaTag, ActorHandle]] = field( + default_factory=lambda: defaultdict(dict)) + backend_replicas_to_start: Dict[BackendTag, List[ReplicaTag]] = field( default_factory=lambda: defaultdict(list)) - replicas_to_stop: Dict[BackendTag, List[ReplicaTag]] = field( + backend_replicas_to_stop: Dict[BackendTag, List[ReplicaTag]] = field( default_factory=lambda: defaultdict(list)) backends_to_remove: List[BackendTag] = field(default_factory=list) endpoints_to_remove: List[EndpointTag] = field(default_factory=list) + # TODO(edoakes): consider removing this and just using the names. - workers: Dict[BackendTag, Dict[ReplicaConfig, ActorHandle]] = field( - default_factory=lambda: defaultdict(dict)) def router_handles(self) -> List[ActorHandle]: return list(self.routers_cache.values()) - def worker_handles(self) -> List[ActorHandle]: + def get_replica_handles(self) -> List[ActorHandle]: return list( chain.from_iterable([ replica_dict.values() - for replica_dict in self.workers.values() + for replica_dict in self.backend_replicas.values() ])) - def get_replica_actors(self, backend_tag: BackendTag) -> List[ActorHandle]: - return_list = [] - for replica_tag in self.replicas.get(backend_tag, []): - try: - replica_name = format_actor_name(replica_tag, - self.controller_name) - return_list.append(ray.get_actor(replica_name)) - except ValueError: - pass - return return_list + def get_replica_tags(self) -> List[ReplicaTag]: + return list( + chain.from_iterable([ + replica_dict.keys() + for replica_dict in self.backend_replicas.values() + ])) + + def get_replica_handles_for_backend( + self, backend_tag: BackendTag) -> List[ActorHandle]: + return list(self.backend_replicas.get(backend_tag, {}).values()) - async def _start_pending_replicas( + async def _start_pending_backend_replicas( self, config_store: ConfigurationStore) -> None: - """Starts the pending backend replicas in self.replicas_to_start. + """Starts the pending backend replicas in self.backend_replicas_to_start. - Starts the worker, then pushes an update to the router to add it to - the proper backend. If the worker has already been started, only + Starts the replica, then pushes an update to the router to add it to + the proper backend. If the replica has already been started, only updates the router. - Clears self.replicas_to_start. + Clears self.backend_replicas_to_start. """ replica_started_futures = [] - for backend_tag, replicas_to_create in self.replicas_to_start.items(): + for backend_tag, replicas_to_create in self.backend_replicas_to_start.\ + items(): for replica_tag in replicas_to_create: replica_started_futures.append( - self._start_replica(config_store, backend_tag, - replica_tag)) + self._start_backend_replicas(config_store, backend_tag, + replica_tag)) # Wait on all creation task futures together. await asyncio.gather(*replica_started_futures) - self.replicas_to_start.clear() + self.backend_replicas_to_start.clear() - async def _start_replica(self, config_store: ConfigurationStore, - backend_tag: BackendTag, - replica_tag: ReplicaTag) -> None: + async def _start_backend_replicas(self, config_store: ConfigurationStore, + backend_tag: BackendTag, + replica_tag: ReplicaTag) -> None: # NOTE(edoakes): the replicas may already be created if we # failed after creating them but before writing a # checkpoint. replica_name = format_actor_name(replica_tag, self.controller_name) try: - worker_handle = ray.get_actor(replica_name) + replica_handle = ray.get_actor(replica_name) except ValueError: - worker_handle = await self._start_backend_worker( + replica_handle = await self._start_single_replica( config_store, backend_tag, replica_tag, replica_name) - self.replicas[backend_tag].append(replica_tag) - self.workers[backend_tag][replica_tag] = worker_handle + self.backend_replicas[backend_tag][replica_tag] = replica_handle - # Register the worker with the router. + # Register the replica with the router. await asyncio.gather(*[ - router.add_new_worker.remote(backend_tag, replica_tag, - worker_handle) + router.add_new_replica.remote(backend_tag, replica_tag, + replica_handle) for router in self.router_handles() ]) - def _scale_replicas(self, backends: Dict[BackendTag, BackendInfo], - backend_tag: BackendTag, num_replicas: int) -> None: + def _scale_backend_replicas(self, backends: Dict[BackendTag, BackendInfo], + backend_tag: BackendTag, + num_replicas: int) -> None: """Scale the given backend to the number of replicas. NOTE: this does not actually start or stop the replicas, but instead - adds the intention to start/stop them to self.workers_to_start and - self.workers_to_stop. The caller is responsible for then first writing - a checkpoint and then actually starting/stopping the intended replicas. - This avoids inconsistencies with starting/stopping a worker and then - crashing before writing a checkpoint. + adds the intention to start/stop them to self.backend_replicas_to_start + and self.backend_replicas_to_stop. The caller is responsible for then + first writing a checkpoint and then actually starting/stopping the + intended replicas. This avoids inconsistencies with starting/stopping a + replica and then crashing before writing a checkpoint. """ logger.debug("Scaling backend '{}' to {} replicas".format( backend_tag, num_replicas)) @@ -208,7 +208,7 @@ def _scale_replicas(self, backends: Dict[BackendTag, BackendInfo], assert num_replicas >= 0, ("Number of replicas must be" " greater than or equal to 0.") - current_num_replicas = len(self.replicas[backend_tag]) + current_num_replicas = len(self.backend_replicas[backend_tag]) delta_num_replicas = num_replicas - current_num_replicas backend_info = backends[backend_tag] @@ -233,30 +233,28 @@ def _scale_replicas(self, backends: Dict[BackendTag, BackendInfo], delta_num_replicas, backend_tag)) for _ in range(delta_num_replicas): replica_tag = "{}#{}".format(backend_tag, get_random_letters()) - self.replicas_to_start[backend_tag].append(replica_tag) + self.backend_replicas_to_start[backend_tag].append(replica_tag) elif delta_num_replicas < 0: logger.debug("Removing {} replicas from backend '{}'".format( -delta_num_replicas, backend_tag)) - assert len(self.replicas[backend_tag]) >= delta_num_replicas + assert len( + self.backend_replicas[backend_tag]) >= delta_num_replicas for _ in range(-delta_num_replicas): - replica_tag = self.replicas[backend_tag].pop() - if len(self.replicas[backend_tag]) == 0: - del self.replicas[backend_tag] - - del self.workers[backend_tag][replica_tag] - if len(self.workers[backend_tag]) == 0: - del self.workers[backend_tag] + replica_tag, _ = self.backend_replicas[backend_tag].popitem() + if len(self.backend_replicas[backend_tag]) == 0: + del self.backend_replicas[backend_tag] - self.replicas_to_stop[backend_tag].append(replica_tag) + self.backend_replicas_to_stop[backend_tag].append(replica_tag) - async def _stop_pending_replicas(self) -> None: - """Stops the pending backend replicas in self.replicas_to_stop. + async def _stop_pending_backend_replicas(self) -> None: + """Stops the pending backend replicas in self.backend_replicas_to_stop. - Removes workers from the router, kills them, and clears - self.replicas_to_stop. + Removes backend_replicas from the router, kills them, and clears + self.backend_replicas_to_stop. """ - for backend_tag, replicas_list in self.replicas_to_stop.items(): + for backend_tag, replicas_list in self.backend_replicas_to_stop.items( + ): for replica_tag in replicas_list: # NOTE(edoakes): the replicas may already be stopped if we # failed after stopping them but before writing a checkpoint. @@ -269,7 +267,7 @@ async def _stop_pending_replicas(self) -> None: # Remove the replica from router. This call is idempotent. await asyncio.gather(*[ - router.remove_worker.remote(backend_tag, replica_tag) + router.remove_replica.remote(backend_tag, replica_tag) for router in self.router_handles() ]) @@ -280,7 +278,7 @@ async def _stop_pending_replicas(self) -> None: # successfully killed the worker or not. ray.kill(replica, no_restart=True) - self.replicas_to_stop.clear() + self.backend_replicas_to_stop.clear() async def _remove_pending_backends(self) -> None: """Removes the pending backends in self.backends_to_remove. @@ -294,19 +292,19 @@ async def _remove_pending_backends(self) -> None: ]) self.backends_to_remove.clear() - async def _start_backend_worker( + async def _start_single_replica( self, config_store: ConfigurationStore, backend_tag: BackendTag, replica_tag: ReplicaTag, replica_name: str) -> ActorHandle: - """Creates a backend worker and waits for it to start up. + """Creates a backend replica and waits for it to start up. Assumes that the backend configuration has already been registered in the ConfigurationStore. """ - logger.debug("Starting worker '{}' for backend '{}'.".format( + logger.debug("Starting replica '{}' for backend '{}'.".format( replica_tag, backend_tag)) backend_info = config_store.get_backend(backend_tag) - worker_handle = ray.remote(backend_info.worker_class).options( + replica_handle = ray.remote(backend_info.worker_class).options( name=replica_name, lifetime="detached" if self.detached else None, max_restarts=-1, @@ -316,8 +314,8 @@ async def _start_backend_worker( backend_info.replica_config.actor_init_args, backend_info.backend_config, self.controller_name) # TODO(edoakes): we should probably have a timeout here. - await worker_handle.ready.remote() - return worker_handle + await replica_handle.ready.remote() + return replica_handle def _start_routers_if_needed(self, http_host: str, http_port: str, http_middlewares: List[Any]) -> None: @@ -394,15 +392,15 @@ def _recover_actor_handles(self) -> None: self.routers_cache[node_id] = ray.get_actor(router_name) # Fetch actor handles for all of the backend replicas in the system. - # All of these workers are guaranteed to already exist because they - # would not be written to a checkpoint in self.workers until they were - # created. - for backend_tag, replica_tags in self.replicas.items(): - for replica_tag in replica_tags: + # All of these backend_replicas are guaranteed to already exist because + # they would not be written to a checkpoint in self.backend_replicas + # until they were created. + for backend_tag, replica_dict in self.backend_replicas.items(): + for replica_tag in replica_dict.keys(): replica_name = format_actor_name(replica_tag, self.controller_name) - self.workers[backend_tag][replica_tag] = ray.get_actor( - replica_name) + self.backend_replicas[backend_tag][ + replica_tag] = ray.get_actor(replica_name) async def _recover_from_checkpoint( self, config_store: ConfigurationStore, @@ -418,11 +416,11 @@ async def _recover_from_checkpoint( for router in self.router_handles() ]) - for backend_tag, replica_dict in self.workers.items(): - for replica_tag, worker in replica_dict.items(): + for backend_tag, replica_dict in self.backend_replicas.items(): + for replica_tag, replica_handle in replica_dict.items(): await asyncio.gather(*[ - router.add_new_worker.remote(backend_tag, replica_tag, - worker) + router.add_new_replica.remote(backend_tag, replica_tag, + replica_handle) for router in self.router_handles() ]) @@ -444,8 +442,8 @@ async def _recover_from_checkpoint( ]) # Start/stop any pending backend replicas. - await self._start_pending_replicas(config_store) - await self._stop_pending_replicas() + await self._start_pending_backend_replicas(config_store) + await self._stop_pending_backend_replicas() # Remove any pending backends and endpoints. await self._remove_pending_backends() @@ -572,7 +570,7 @@ async def _recover_from_checkpoint(self, checkpoint_bytes: bytes) -> None: 1) Deserializes the internal state from the checkpoint. 2) Pushes the latest configuration to the routers in case we crashed before updating them. - 3) Starts/stops any worker replicas that are pending creation or + 3) Starts/stops any replicas that are pending creation or deletion. NOTE: this requires that self.write_lock is already acquired and will @@ -630,17 +628,17 @@ def get_traffic_policies(self) -> Dict[str, TrafficPolicy]: """Fetched by the router on startup.""" return self.configuration_store.traffic_policies - def _list_replicas(self, backend_tag: BackendTag) -> List[str]: + def _list_replicas(self, backend_tag: BackendTag) -> List[ReplicaTag]: """Used only for testing.""" - return self.actor_reconciler.replicas[backend_tag] + return list(self.actor_reconciler.backend_replicas[backend_tag].keys()) def get_traffic_policy(self, endpoint: str) -> TrafficPolicy: """Fetched by serve handles.""" return self.configuration_store.traffic_policies[endpoint] - def get_all_worker_handles(self) -> Dict[str, Dict[str, ActorHandle]]: + def get_all_replica_handles(self) -> Dict[str, Dict[str, ActorHandle]]: """Fetched by the router on startup.""" - return self.actor_reconciler.workers + return self.actor_reconciler.backend_replicas def get_all_backends(self) -> Dict[str, BackendConfig]: """Returns a dictionary of backend tag to backend config.""" @@ -829,7 +827,7 @@ async def create_backend(self, backend_tag: BackendTag, and backend_info.replica_config == replica_config): return - backend_worker = create_backend_worker( + backend_replica = create_backend_replica( replica_config.func_or_class) # Save creator that starts replicas, the arguments to be passed in, @@ -837,7 +835,7 @@ async def create_backend(self, backend_tag: BackendTag, self.configuration_store.add_backend( backend_tag, BackendInfo( - worker_class=backend_worker, + worker_class=backend_replica, backend_config=backend_config, replica_config=replica_config)) metadata = backend_config.internal_metadata @@ -847,7 +845,7 @@ async def create_backend(self, backend_tag: BackendTag, backend_tag, metadata.autoscaling_config) try: - self.actor_reconciler._scale_replicas( + self.actor_reconciler._scale_backend_replicas( self.configuration_store.backends, backend_tag, backend_config.num_replicas) except RayServeException as e: @@ -858,7 +856,7 @@ async def create_backend(self, backend_tag: BackendTag, # or pushing the updated config to avoid inconsistent state if we # crash while making the change. self._checkpoint() - await self.actor_reconciler._start_pending_replicas( + await self.actor_reconciler._start_pending_backend_replicas( self.configuration_store) # Set the backend config inside the router @@ -888,8 +886,8 @@ async def delete_backend(self, backend_tag: BackendTag) -> None: # Scale its replicas down to 0. This will also remove the backend # from self.configuration_store.backends and - # self.actor_reconciler.replicas. - self.actor_reconciler._scale_replicas( + # self.actor_reconciler.backend_replicas. + self.actor_reconciler._scale_backend_replicas( self.configuration_store.backends, backend_tag, 0) # Remove the backend's metadata. @@ -904,7 +902,7 @@ async def delete_backend(self, backend_tag: BackendTag) -> None: # backend from the router to avoid inconsistent state if we crash # after pushing the update. self._checkpoint() - await self.actor_reconciler._stop_pending_replicas() + await self.actor_reconciler._stop_pending_backend_replicas() await self.actor_reconciler._remove_pending_backends() async def update_backend_config( @@ -930,7 +928,7 @@ async def update_backend_config( backend_tag).backend_config = backend_config # Scale the replicas with the new configuration. - self.actor_reconciler._scale_replicas( + self.actor_reconciler._scale_backend_replicas( self.configuration_store.backends, backend_tag, backend_config.num_replicas) @@ -946,9 +944,9 @@ async def update_backend_config( for router in self.actor_reconciler.router_handles() ]) - await self.actor_reconciler._start_pending_replicas( + await self.actor_reconciler._start_pending_backend_replicas( self.configuration_store) - await self.actor_reconciler._stop_pending_replicas() + await self.actor_reconciler._stop_pending_backend_replicas() await self.broadcast_backend_config(backend_tag) @@ -956,8 +954,9 @@ async def broadcast_backend_config(self, backend_tag: BackendTag) -> None: backend_config = self.configuration_store.get_backend( backend_tag).backend_config broadcast_futures = [ - replica.update_config.remote(backend_config).as_future() for - replica in self.actor_reconciler.get_replica_actors(backend_tag) + replica.update_config.remote(backend_config).as_future() + for replica in + self.actor_reconciler.get_replica_handles_for_backend(backend_tag) ] await asyncio.gather(*broadcast_futures) @@ -972,7 +971,7 @@ async def shutdown(self) -> None: async with self.write_lock: for router in self.actor_reconciler.router_handles(): ray.kill(router, no_restart=True) - for replica in self.actor_reconciler.worker_handles(): + for replica in self.actor_reconciler.get_replica_handles(): ray.kill(replica, no_restart=True) self.kv_store.delete(CHECKPOINT_KEY) diff --git a/python/ray/serve/http_proxy.py b/python/ray/serve/http_proxy.py index e513cb6752dde..c6c9d613b313b 100644 --- a/python/ray/serve/http_proxy.py +++ b/python/ray/serve/http_proxy.py @@ -186,9 +186,9 @@ async def set_route_table(self, route_table): self.app.set_route_table(route_table) # ------ Proxy router logic ------ # - async def add_new_worker(self, backend_tag, replica_tag, worker_handle): - return await self.app.router.add_new_worker(backend_tag, replica_tag, - worker_handle) + async def add_new_replica(self, backend_tag, replica_tag, worker_handle): + return await self.app.router.add_new_replica(backend_tag, replica_tag, + worker_handle) async def set_traffic(self, endpoint, traffic_policy): return await self.app.router.set_traffic(endpoint, traffic_policy) @@ -202,8 +202,8 @@ async def remove_backend(self, backend): async def remove_endpoint(self, endpoint): return await self.app.router.remove_endpoint(endpoint) - async def remove_worker(self, backend_tag, replica_tag): - return await self.app.router.remove_worker(backend_tag, replica_tag) + async def remove_replica(self, backend_tag, replica_tag): + return await self.app.router.remove_replica(backend_tag, replica_tag) async def enqueue_request(self, request_meta, *request_args, **request_kwargs): diff --git a/python/ray/serve/router.py b/python/ray/serve/router.py index d7d31bf50df97..4c3634dfaa8fe 100644 --- a/python/ray/serve/router.py +++ b/python/ray/serve/router.py @@ -53,10 +53,10 @@ def __reduce__(self): def ray_serialize(self): # NOTE: this method is needed because Query need to be serialized and - # sent to the replica worker. However, after we send the query to - # replica worker the async_future is still needed to retrieve the final - # result. Therefore we need a way to pass the information to replica - # worker without removing async_future. + # sent to the replica. However, after we send the query to the + # replica the async_future is still needed to retrieve the final + # result. Therefore we need a way to pass the information to replicas + # without removing async_future. clone = copy.copy(self.__dict__) clone.pop("async_future") return pickle.dumps(clone) @@ -68,7 +68,7 @@ def ray_deserialize(value): class Router: - """A router that routes request to available workers.""" + """A router that routes request to available replicas.""" async def setup(self, name, controller_name): # Note: Several queues are used in the router @@ -117,7 +117,7 @@ async def setup(self, name, controller_name): self.flush_lock = asyncio.Lock() # -- State Restoration -- # - # Fetch the worker handles, traffic policies, and backend configs from + # Fetch the replica handles, traffic policies, and backend configs from # the controller. We use a "pull-based" approach instead of pushing # them from the controller so that the router can transparently recover # from failure. @@ -128,10 +128,12 @@ async def setup(self, name, controller_name): for endpoint, traffic_policy in traffic_policies.items(): await self.set_traffic(endpoint, traffic_policy) - backend_dict = ray.get(self.controller.get_all_worker_handles.remote()) + backend_dict = ray.get( + self.controller.get_all_replica_handles.remote()) for backend_tag, replica_dict in backend_dict.items(): - for replica_tag, worker in replica_dict.items(): - await self.add_new_worker(backend_tag, replica_tag, worker) + for replica_tag, replica_handle in replica_dict.items(): + await self.add_new_replica(backend_tag, replica_tag, + replica_handle) backend_configs = ray.get(self.controller.get_backend_configs.remote()) for backend, backend_config in backend_configs.items(): @@ -193,11 +195,11 @@ async def enqueue_request(self, request_meta, *request_args, request_meta.request_id, request_time_ms)) return result - async def add_new_worker(self, backend_tag, replica_tag, worker_handle): + async def add_new_replica(self, backend_tag, replica_tag, replica_handle): backend_replica_tag = backend_tag + ":" + replica_tag if backend_replica_tag in self.replicas: return - self.replicas[backend_replica_tag] = worker_handle + self.replicas[backend_replica_tag] = replica_handle logger.debug("New worker added for backend '{}'".format(backend_tag)) await self.mark_worker_idle(backend_tag, backend_replica_tag) @@ -214,7 +216,7 @@ async def mark_worker_idle(self, backend_tag, backend_replica_tag): self.worker_queues[backend_tag].appendleft(backend_replica_tag) self.flush_backend_queues([backend_tag]) - async def remove_worker(self, backend_tag, replica_tag): + async def remove_replica(self, backend_tag, replica_tag): backend_replica_tag = backend_tag + ":" + replica_tag if backend_replica_tag not in self.replicas: return diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index 178ee8fc48c76..503c64910c0ca 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -338,7 +338,7 @@ def __call__(self, request): controller._list_replicas.remote("bsimple:v1")) new_all_tag_list = [] for worker_dict in ray.get( - controller.get_all_worker_handles.remote()).values(): + controller.get_all_replica_handles.remote()).values(): new_all_tag_list.extend(list(worker_dict.keys())) # the old and new replica tag list should be identical diff --git a/python/ray/serve/tests/test_backend_worker.py b/python/ray/serve/tests/test_backend_worker.py index d53b8d915d052..cdd7e8fd32e20 100644 --- a/python/ray/serve/tests/test_backend_worker.py +++ b/python/ray/serve/tests/test_backend_worker.py @@ -6,7 +6,7 @@ import ray from ray import serve import ray.serve.context as context -from ray.serve.backend_worker import create_backend_worker, wrap_to_ray_error +from ray.serve.backend_worker import create_backend_replica, wrap_to_ray_error from ray.serve.controller import TrafficPolicy from ray.serve.router import Router, RequestMetadata from ray.serve.config import BackendConfig, BackendMetadata @@ -27,7 +27,7 @@ def setup_worker(name, @ray.remote class WorkerActor: def __init__(self): - self.worker = create_backend_worker(func_or_class)( + self.worker = create_backend_replica(func_or_class)( name, name + ":tag", init_args, backend_config, controller_name) @@ -47,7 +47,7 @@ def update_config(self, new_config): async def add_servable_to_router(servable, router, **kwargs): worker = setup_worker("backend", servable, **kwargs) - await router.add_new_worker.remote("backend", "replica", worker) + await router.add_new_replica.remote("backend", "replica", worker) await router.set_traffic.remote("endpoint", TrafficPolicy({ "backend": 1.0 })) diff --git a/python/ray/serve/tests/test_failure.py b/python/ray/serve/tests/test_failure.py index 92b89c7d5932a..99a05ca39fde0 100644 --- a/python/ray/serve/tests/test_failure.py +++ b/python/ray/serve/tests/test_failure.py @@ -113,7 +113,7 @@ def function(_): def _get_worker_handles(client, backend): controller = client._controller - backend_dict = ray.get(controller.get_all_worker_handles.remote()) + backend_dict = ray.get(controller.get_all_replica_handles.remote()) return list(backend_dict[backend].values()) diff --git a/python/ray/serve/tests/test_router.py b/python/ray/serve/tests/test_router.py index 4ac0c7b00c6ff..4f54b09311e2e 100644 --- a/python/ray/serve/tests/test_router.py +++ b/python/ray/serve/tests/test_router.py @@ -51,8 +51,8 @@ async def test_single_prod_cons_queue(serve_instance, task_runner_mock_actor): await q.setup.remote("", serve_instance._controller_name) q.set_traffic.remote("svc", TrafficPolicy({"backend-single-prod": 1.0})) - q.add_new_worker.remote("backend-single-prod", "replica-1", - task_runner_mock_actor) + q.add_new_replica.remote("backend-single-prod", "replica-1", + task_runner_mock_actor) # Make sure we get the request result back result = await q.enqueue_request.remote( @@ -70,16 +70,16 @@ async def test_alter_backend(serve_instance, task_runner_mock_actor): await q.setup.remote("", serve_instance._controller_name) await q.set_traffic.remote("svc", TrafficPolicy({"backend-alter": 1})) - await q.add_new_worker.remote("backend-alter", "replica-1", - task_runner_mock_actor) + await q.add_new_replica.remote("backend-alter", "replica-1", + task_runner_mock_actor) await q.enqueue_request.remote( RequestMetadata(get_random_letters(10), "svc", None), 1) got_work = await task_runner_mock_actor.get_recent_call.remote() assert got_work.args[0] == 1 await q.set_traffic.remote("svc", TrafficPolicy({"backend-alter-2": 1})) - await q.add_new_worker.remote("backend-alter-2", "replica-1", - task_runner_mock_actor) + await q.add_new_replica.remote("backend-alter-2", "replica-1", + task_runner_mock_actor) await q.enqueue_request.remote( RequestMetadata(get_random_letters(10), "svc", None), 2) got_work = await task_runner_mock_actor.get_recent_call.remote() @@ -96,8 +96,8 @@ async def test_split_traffic_random(serve_instance, task_runner_mock_actor): "backend-split-2": 0.5 })) runner_1, runner_2 = [mock_task_runner() for _ in range(2)] - await q.add_new_worker.remote("backend-split", "replica-1", runner_1) - await q.add_new_worker.remote("backend-split-2", "replica-1", runner_2) + await q.add_new_replica.remote("backend-split", "replica-1", runner_1) + await q.add_new_replica.remote("backend-split-2", "replica-1", runner_2) # assume 50% split, the probability of all 20 requests goes to a # single queue is 0.5^20 ~ 1-6 @@ -120,8 +120,8 @@ def worker_queue_size(self, backend): temp_actor = mock_task_runner() q = ray.remote(TestRouter).remote() await q.setup.remote("", serve_instance._controller_name) - await q.add_new_worker.remote("backend-remove", "replica-1", temp_actor) - await q.remove_worker.remote("backend-remove", "replica-1") + await q.add_new_replica.remote("backend-remove", "replica-1", temp_actor) + await q.remove_replica.remote("backend-remove", "replica-1") assert ray.get(q.worker_queue_size.remote("backend")) == 0 @@ -135,7 +135,7 @@ async def test_shard_key(serve_instance, task_runner_mock_actor): for i, runner in enumerate(runners): backend_name = "backend-split-" + str(i) traffic_dict[backend_name] = 1.0 / num_backends - await q.add_new_worker.remote(backend_name, "replica-1", runner) + await q.add_new_replica.remote(backend_name, "replica-1", runner) await q.set_traffic.remote("svc", TrafficPolicy(traffic_dict)) # Generate random shard keys and send one request for each. @@ -190,7 +190,7 @@ def get_queues(self): backend_name = "max-concurrent-test" config = BackendConfig(max_concurrent_queries=1) await q.set_traffic.remote("svc", TrafficPolicy({backend_name: 1.0})) - await q.add_new_worker.remote(backend_name, "replica-tag", worker) + await q.add_new_replica.remote(backend_name, "replica-tag", worker) await q.set_backend_config.remote(backend_name, config) # We send over two queries