diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 671ab6f897db..2a2b9f579975 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -9,10 +9,10 @@ import ray from ray.serve.constants import (DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT, SERVE_MASTER_NAME) -from ray.serve.global_state import GlobalState, start_initial_state +from ray.serve.global_state import GlobalState, ServeMaster from ray.serve.kv_store_service import SQLiteKVStore from ray.serve.task_runner import RayServeMixin, TaskRunnerActor -from ray.serve.utils import block_until_http_ready, get_random_letters, expand +from ray.serve.utils import block_until_http_ready, expand from ray.serve.exceptions import RayServeException, batch_annotation_not_found from ray.serve.backend_config import BackendConfig from ray.serve.policy import RoutePolicy @@ -138,16 +138,15 @@ def init( def kv_store_connector(namespace): return SQLiteKVStore(namespace, db_path=kv_store_path) - master = start_initial_state(kv_store_connector) + master = ServeMaster.options( + detached=True, name=SERVE_MASTER_NAME).remote(kv_store_connector) + + ray.get(master.start_router.remote(queueing_policy.value, policy_kwargs)) global_state = GlobalState(master) - router = global_state.init_or_get_router( - queueing_policy=queueing_policy, policy_kwargs=policy_kwargs) - global_state.init_or_get_metric_monitor( - gc_window_seconds=gc_window_seconds) + ray.get(master.start_metric_monitor.remote(gc_window_seconds)) if start_server: - global_state.init_or_get_http_proxy( - host=http_host, port=http_port).set_router_handle.remote(router) + ray.get(master.start_http_proxy.remote(http_host, http_port)) if start_server and blocking: block_until_http_ready("http://{}:{}/-/routes".format( @@ -169,9 +168,11 @@ def create_endpoint(endpoint_name, route=None, methods=["GET"]): methods = [m.upper() for m in methods] global_state.route_table.register_service( route, endpoint_name, methods=methods) - ray.get(global_state.init_or_get_http_proxy().set_route_table.remote( - global_state.route_table.list_service( - include_methods=True, include_headless=False))) + http_proxy = global_state.get_http_proxy() + ray.get( + http_proxy.set_route_table.remote( + global_state.route_table.list_service( + include_methods=True, include_headless=False))) @_ensure_connected @@ -198,8 +199,8 @@ def set_backend_config(backend_tag, backend_config): # inform the router about change in configuration # particularly for setting max_batch_size - ray.get(global_state.init_or_get_router().set_backend_config.remote( - backend_tag, backend_config_dict)) + router = global_state.get_router() + ray.get(router.set_backend_config.remote(backend_tag, backend_config_dict)) # checking if replicas need to be restarted # Replicas are restarted if there is any change in the backend config @@ -281,7 +282,6 @@ def create_backend(func_or_class, class CustomActor(RayServeMixin, func_or_class): @wraps(func_or_class.__init__) def __init__(self, *args, **kwargs): - init() # serve init super().__init__(*args, **kwargs) arg_list = actor_init_args @@ -305,68 +305,11 @@ def __init__(self, *args, **kwargs): # set the backend config inside the router # particularly for max-batch-size - ray.get(global_state.init_or_get_router().set_backend_config.remote( - backend_tag, backend_config_dict)) + router = global_state.get_router() + ray.get(router.set_backend_config.remote(backend_tag, backend_config_dict)) _scale(backend_tag, backend_config_dict["num_replicas"]) -def _start_replica(backend_tag): - assert (backend_tag in global_state.backend_table.list_backends() - ), "Backend {} is not registered.".format(backend_tag) - - replica_tag = "{}#{}".format(backend_tag, get_random_letters(length=6)) - - # get the info which starts the replicas - creator = global_state.backend_table.get_backend_creator(backend_tag) - backend_config_dict = global_state.backend_table.get_info(backend_tag) - backend_config = BackendConfig(**backend_config_dict) - init_args = global_state.backend_table.get_init_args(backend_tag) - - # get actor creation kwargs - actor_kwargs = backend_config.get_actor_creation_args(init_args) - - # Create the runner in the master actor - [runner_handle] = ray.get( - global_state.master_actor_handle.start_actor_with_creator.remote( - creator, actor_kwargs, replica_tag)) - - # Setup the worker - ray.get( - runner_handle._ray_serve_setup.remote( - backend_tag, global_state.init_or_get_router(), runner_handle)) - runner_handle._ray_serve_fetch.remote() - - # Register the worker in config tables as well as metric monitor - global_state.backend_table.add_replica(backend_tag, replica_tag) - global_state.init_or_get_metric_monitor().add_target.remote(runner_handle) - - -def _remove_replica(backend_tag): - assert (backend_tag in global_state.backend_table.list_backends() - ), "Backend {} is not registered.".format(backend_tag) - assert ( - len(global_state.backend_table.list_replicas(backend_tag)) > - 0), "Backend {} does not have enough replicas to be removed.".format( - backend_tag) - - replica_tag = global_state.backend_table.remove_replica(backend_tag) - [replica_handle] = ray.get( - global_state.master_actor_handle.get_handle.remote(replica_tag)) - - # Remove the replica from metric monitor. - ray.get(global_state.init_or_get_metric_monitor().remove_target.remote( - replica_handle)) - - # Remove the replica from master actor. - ray.get(global_state.master_actor_handle.remove_handle.remote(replica_tag)) - - # Remove the replica from router. - # This will also destory the actor handle. - ray.get( - global_state.init_or_get_router().remove_and_destory_replica.remote( - backend_tag, replica_handle)) - - @_ensure_connected def _scale(backend_tag, num_replicas): """Set the number of replicas for backend_tag. @@ -386,10 +329,14 @@ def _scale(backend_tag, num_replicas): if delta_num_replicas > 0: for _ in range(delta_num_replicas): - _start_replica(backend_tag) + ray.get( + global_state.master_actor.start_backend_replica.remote( + backend_tag)) elif delta_num_replicas < 0: for _ in range(-delta_num_replicas): - _remove_replica(backend_tag) + ray.get( + global_state.master_actor.remove_backend_replica.remote( + backend_tag)) @_ensure_connected @@ -441,8 +388,9 @@ def split(endpoint_name, traffic_policy_dictionary): global_state.policy_table.register_traffic_policy( endpoint_name, traffic_policy_dictionary) - ray.get(global_state.init_or_get_router().set_traffic.remote( - endpoint_name, traffic_policy_dictionary)) + router = global_state.get_router() + ray.get( + router.set_traffic.remote(endpoint_name, traffic_policy_dictionary)) @_ensure_connected @@ -473,7 +421,7 @@ def get_handle(endpoint_name, from ray.serve.handle import RayServeHandle return RayServeHandle( - global_state.init_or_get_router(), + global_state.get_router(), endpoint_name, relative_slo_ms, absolute_slo_ms, @@ -492,8 +440,8 @@ def stat(percentiles=[50, 90, 95], The longest aggregation window must be shorter or equal to the gc_window_seconds. """ - return ray.get(global_state.init_or_get_metric_monitor().collect.remote( - percentiles, agg_windows_seconds)) + monitor = global_state.get_metric_monitor() + return ray.get(monitor.collect.remote(percentiles, agg_windows_seconds)) class route: diff --git a/python/ray/serve/constants.py b/python/ray/serve/constants.py index 73cbd77b94a2..bc1faf2adc83 100644 --- a/python/ray/serve/constants.py +++ b/python/ray/serve/constants.py @@ -1,9 +1,6 @@ #: Actor name used to register master actor SERVE_MASTER_NAME = "SERVE_MASTER_ACTOR" -#: KVStore connector key in bootstrap config -BOOTSTRAP_KV_STORE_CONN_KEY = "kv_store_connector" - #: HTTP Address DEFAULT_HTTP_ADDRESS = "http://127.0.0.1:8000" diff --git a/python/ray/serve/global_state.py b/python/ray/serve/global_state.py index ca4da11d2bb8..d5085ca837f1 100644 --- a/python/ray/serve/global_state.py +++ b/python/ray/serve/global_state.py @@ -1,23 +1,11 @@ import ray -from ray.serve.constants import (BOOTSTRAP_KV_STORE_CONN_KEY, - DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT, - SERVE_MASTER_NAME, ASYNC_CONCURRENCY) +from ray.serve.backend_config import BackendConfig +from ray.serve.constants import (SERVE_MASTER_NAME, ASYNC_CONCURRENCY) +from ray.serve.http_proxy import HTTPProxyActor from ray.serve.kv_store_service import (BackendTable, RoutingTable, TrafficPolicyTable) from ray.serve.metric import (MetricMonitor, start_metric_monitor_loop) - -from ray.serve.policy import RoutePolicy -from ray.serve.http_proxy import HTTPProxyActor - - -def start_initial_state(kv_store_connector): - master_handle = ServeMaster.remote() - ray.util.register_actor(SERVE_MASTER_NAME, master_handle) - - ray.get( - master_handle.store_bootstrap_state.remote(BOOTSTRAP_KV_STORE_CONN_KEY, - kv_store_connector)) - return master_handle +from ray.serve.utils import get_random_letters @ray.remote @@ -30,55 +18,107 @@ class ServeMaster: we need to initialize and store actor handles in a seperate actor. """ - def __init__(self): + def __init__(self, kv_store_connector): + self.kv_store_connector = kv_store_connector + self.route_table = RoutingTable(kv_store_connector) + self.backend_table = BackendTable(kv_store_connector) + self.policy_table = TrafficPolicyTable(kv_store_connector) self.tag_to_actor_handles = dict() - self.bootstrap_state = dict() - - def start_actor(self, - actor_cls, - tag, - init_args=(), - init_kwargs={}, - is_asyncio=False): - """Start an actor and add it to the nursery""" - # Avoid double initialization - if tag in self.tag_to_actor_handles.keys(): - return [self.tag_to_actor_handles[tag]] - - max_concurrency = ASYNC_CONCURRENCY if is_asyncio else None - handle = (actor_cls.options(max_concurrency=max_concurrency).remote( - *init_args, **init_kwargs)) - self.tag_to_actor_handles[tag] = handle - return [handle] - - def start_actor_with_creator(self, creator, kwargs, tag): - """ - Args: - creator (Callable[Dict]): a closure that should return - a newly created actor handle when called with kwargs. - The kwargs input is passed to `ActorCls_remote` method. - """ - handle = creator(kwargs) - self.tag_to_actor_handles[tag] = handle - return [handle] + self.router = None + self.http_proxy = None + self.metric_monitor = None + + def get_kv_store_connector(self): + return self.kv_store_connector + + def start_router(self, router_class, init_kwargs): + assert self.router is None, "Router already started." + self.router = router_class.options( + max_concurrency=ASYNC_CONCURRENCY).remote(**init_kwargs) + + def get_router(self): + assert self.router is not None, "Router not started yet." + return [self.router] + + def start_http_proxy(self, host, port): + assert self.http_proxy is None, "HTTP proxy already started." + assert self.router is not None, ( + "Router must be started before HTTP proxy.") + self.http_proxy = HTTPProxyActor.options( + max_concurrency=ASYNC_CONCURRENCY).remote() + self.http_proxy.run.remote(host, port) + ray.get(self.http_proxy.set_router_handle.remote(self.router)) + + def get_http_proxy(self): + assert self.http_proxy is not None, "HTTP proxy not started yet." + return [self.http_proxy] + + def start_metric_monitor(self, gc_window_seconds): + assert self.metric_monitor is None, "Metric monitor already started." + self.metric_monitor = MetricMonitor.remote(gc_window_seconds) + # TODO(edoakes): this should be an actor method, not a separate task. + start_metric_monitor_loop.remote(self.metric_monitor) + self.metric_monitor.add_target.remote(self.router) + + def get_metric_monitor(self): + assert self.metric_monitor is not None, ( + "Metric monitor not started yet.") + return [self.metric_monitor] + + def start_backend_replica(self, backend_tag): + assert (backend_tag in self.backend_table.list_backends() + ), "Backend {} is not registered.".format(backend_tag) + + replica_tag = "{}#{}".format(backend_tag, get_random_letters(length=6)) + + # Fetch the info to start the replica from the backend table. + creator = self.backend_table.get_backend_creator(backend_tag) + backend_config_dict = self.backend_table.get_info(backend_tag) + backend_config = BackendConfig(**backend_config_dict) + init_args = self.backend_table.get_init_args(backend_tag) + kwargs = backend_config.get_actor_creation_args(init_args) + + runner_handle = creator(kwargs) + self.tag_to_actor_handles[replica_tag] = runner_handle + + # Set up the worker. + ray.get( + runner_handle._ray_serve_setup.remote(backend_tag, + self.get_router()[0], + runner_handle)) + ray.get(runner_handle._ray_serve_fetch.remote()) + + # Register the worker in config tables and metric monitor. + self.backend_table.add_replica(backend_tag, replica_tag) + self.get_metric_monitor()[0].add_target.remote(runner_handle) + + def remove_backend_replica(self, backend_tag): + assert (backend_tag in self.backend_table.list_backends() + ), "Backend {} is not registered.".format(backend_tag) + assert ( + len(self.backend_table.list_replicas(backend_tag)) > 0 + ), "Backend {} does not have enough replicas to be removed.".format( + backend_tag) + + replica_tag = self.backend_table.remove_replica(backend_tag) + assert replica_tag in self.tag_to_actor_handles + replica_handle = self.tag_to_actor_handles.pop(replica_tag) + + # Remove the replica from metric monitor. + [monitor] = self.get_metric_monitor() + ray.get(monitor.remove_target.remote(replica_handle)) + + # Remove the replica from router. + # This will also destroy the actor handle. + [router] = self.get_router() + ray.get( + router.remove_and_destory_replica.remote(backend_tag, + replica_handle)) def get_all_handles(self): return self.tag_to_actor_handles - def get_handle(self, actor_tag): - return [self.tag_to_actor_handles[actor_tag]] - - def remove_handle(self, actor_tag): - if actor_tag in self.tag_to_actor_handles.keys(): - self.tag_to_actor_handles.pop(actor_tag) - - def store_bootstrap_state(self, key, value): - self.bootstrap_state[key] = value - - def get_bootstrap_state_dict(self): - return self.bootstrap_state - class GlobalState: """Encapsulate all global state in the serving system. @@ -88,83 +128,24 @@ class GlobalState: 2. A actor supervisor service """ - def __init__(self, master_actor_handle=None): - # Get actor nursery handle - if master_actor_handle is None: - master_actor_handle = ray.util.get_actor(SERVE_MASTER_NAME) - self.master_actor_handle = master_actor_handle + def __init__(self, master_actor=None): + # Get actor nursery handle. + if master_actor is None: + master_actor = ray.util.get_actor(SERVE_MASTER_NAME) + self.master_actor = master_actor - # Connect to all the table - bootstrap_config = ray.get( - self.master_actor_handle.get_bootstrap_state_dict.remote()) - kv_store_connector = bootstrap_config[BOOTSTRAP_KV_STORE_CONN_KEY] + # Connect to all the tables. + kv_store_connector = ray.get( + self.master_actor.get_kv_store_connector.remote()) self.route_table = RoutingTable(kv_store_connector) self.backend_table = BackendTable(kv_store_connector) self.policy_table = TrafficPolicyTable(kv_store_connector) - self.refresh_actor_handle_cache() - - def refresh_actor_handle_cache(self): - self.actor_handle_cache = ray.get( - self.master_actor_handle.get_all_handles.remote()) - - def init_or_get_http_proxy(self, - host=DEFAULT_HTTP_HOST, - port=DEFAULT_HTTP_PORT): - if "http_proxy" not in self.actor_handle_cache: - [handle] = ray.get( - self.master_actor_handle.start_actor.remote( - HTTPProxyActor, tag="http_proxy")) - - handle.run.remote(host=host, port=port) - self.refresh_actor_handle_cache() - return self.actor_handle_cache["http_proxy"] - - def _get_queueing_policy(self, default_policy): - return_policy = default_policy - # check if there is already a queue_actor running - # with policy as p.name for the case where - # serve nursery exists: ray.util.get_actor(SERVE_MASTER_NAME) - for p in RoutePolicy: - queue_actor_tag = "queue_actor::" + p.name - if queue_actor_tag in self.actor_handle_cache: - return_policy = p - break - return return_policy - - def init_or_get_router(self, - queueing_policy=RoutePolicy.Random, - policy_kwargs={}): - # get queueing policy - self.queueing_policy = self._get_queueing_policy( - default_policy=queueing_policy) - queue_actor_tag = "queue_actor::" + self.queueing_policy.name - if queue_actor_tag not in self.actor_handle_cache: - [handle] = ray.get( - self.master_actor_handle.start_actor.remote( - self.queueing_policy.value, - init_kwargs=policy_kwargs, - tag=queue_actor_tag, - is_asyncio=True)) - # handle.register_self_handle.remote(handle) - self.refresh_actor_handle_cache() - - return self.actor_handle_cache[queue_actor_tag] - - def init_or_get_metric_monitor(self, gc_window_seconds=3600): - if "metric_monitor" not in self.actor_handle_cache: - [handle] = ray.get( - self.master_actor_handle.start_actor.remote( - MetricMonitor, - init_args=(gc_window_seconds, ), - tag="metric_monitor")) - - start_metric_monitor_loop.remote(handle) - - if "queue_actor" in self.actor_handle_cache: - handle.add_target.remote( - self.actor_handle_cache["queue_actor"]) - - self.refresh_actor_handle_cache() - - return self.actor_handle_cache["metric_monitor"] + def get_http_proxy(self): + return ray.get(self.master_actor.get_http_proxy.remote())[0] + + def get_router(self): + return ray.get(self.master_actor.get_router.remote())[0] + + def get_metric_monitor(self): + return ray.get(self.master_actor.get_metric_monitor.remote())[0] diff --git a/python/ray/serve/http_proxy.py b/python/ray/serve/http_proxy.py index 4e3ce6d85692..465d8816300b 100644 --- a/python/ray/serve/http_proxy.py +++ b/python/ray/serve/http_proxy.py @@ -155,7 +155,7 @@ class HTTPProxyActor: def __init__(self): self.app = HTTPProxy() - async def run(self, host="0.0.0.0", port=8000): + async def run(self, host, port): sock = socket.socket() sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((host, port)) diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index e63d806b28a4..2cb475bb3647 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -202,8 +202,8 @@ def __call__(self, flask_request, temp=None): serve.set_backend_config("simple:v1", bnew_config) new_replica_tag_list = global_state.backend_table.list_replicas( "simple:v1") - global_state.refresh_actor_handle_cache() - new_all_tag_list = list(global_state.actor_handle_cache.keys()) + new_all_tag_list = list( + ray.get(global_state.master_actor.get_all_handles.remote()).keys()) # the new_replica_tag_list must be subset of all_tag_list assert set(new_replica_tag_list) <= set(new_all_tag_list) @@ -236,8 +236,8 @@ def __call__(self, flask_request, temp=None): serve.set_backend_config("bsimple:v1", bnew_config) new_replica_tag_list = global_state.backend_table.list_replicas( "bsimple:v1") - global_state.refresh_actor_handle_cache() - new_all_tag_list = list(global_state.actor_handle_cache.keys()) + new_all_tag_list = list( + ray.get(global_state.master_actor.get_all_handles.remote()).keys()) # the old and new replica tag list should be identical # and should be subset of all_tag_list