Skip to content

Commit

Permalink
[serve] Migrate v1 api release tests (ray-project#40372)
Browse files Browse the repository at this point in the history
Migrate Serve release tests to use v2 api rather than v1 api.
  • Loading branch information
zcin authored Oct 17, 2023
1 parent ff58667 commit 58b2614
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 130 deletions.
1 change: 1 addition & 0 deletions python/ray/serve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ def run(
Returns:
RayServeSyncHandle: A handle that can be used to call the application.
"""
# trigger tests
if len(name) == 0:
raise RayServeException("Application name must a non-empty string.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,11 @@ def setup_serve(model, use_gpu: bool = False):
serve.start(
http_options={"location": "EveryNode"}
) # Start on every node so `predict` can hit localhost.
MnistDeployment.options(
num_replicas=2, ray_actor_options={"num_gpus": bool(use_gpu)}
)._deploy(model)
serve.run(
MnistDeployment.options(
num_replicas=2, ray_actor_options={"num_gpus": bool(use_gpu)}
).bind(model)
)


@ray.remote
Expand Down
2 changes: 1 addition & 1 deletion release/long_running_tests/workloads/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async def __call__(self, request):
return await self.handle_batch(request)


Echo._deploy()
serve.run(Echo.bind(), route_prefix="/echo")

print("Warming up")
for _ in range(5):
Expand Down
73 changes: 41 additions & 32 deletions release/long_running_tests/workloads/serve_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from ray.serve.context import _get_global_client
from ray.cluster_utils import Cluster
from ray._private.test_utils import safe_write_to_results_json
from ray.serve._private import api as _private_api

# Global variables / constants appear only right after imports.
# Ray serve deployment setup constants
Expand Down Expand Up @@ -83,54 +82,54 @@ async def run(self):
ray.kill(chosen, no_restart=False)
await asyncio.sleep(self.kill_period_s)

async def spare(self, deployment_name: str):
print(f'Sparing deployment "{deployment_name}" replicas.')
self.sanctuary.add(deployment_name)
async def spare(self, app_name: str):
print(f'Sparing application "{app_name}" replicas.')
self.sanctuary.add(app_name)

async def stop_spare(self, deployment_name: str):
print(f'No longer sparing deployment "{deployment_name}" replicas.')
self.sanctuary.discard(deployment_name)
async def stop_spare(self, app_name: str):
print(f'No longer sparing application "{app_name}" replicas.')
self.sanctuary.discard(app_name)

def _get_serve_actors(self):
controller = _get_global_client()._controller
routers = list(ray.get(controller.get_proxies.remote()).values())
all_handles = routers + [controller]
replica_dict = ray.get(controller._all_running_replicas.remote())
for deployment_id, replica_info_list in replica_dict.items():
if deployment_id.name not in self.sanctuary:
if deployment_id.app not in self.sanctuary:
for replica_info in replica_info_list:
all_handles.append(replica_info.actor_handle)

return all_handles


class RandomTest:
def __init__(self, random_killer_handle, max_deployments=1):
self.max_deployments = max_deployments
def __init__(self, random_killer_handle, max_applications=1):
self.max_applications = max_applications
self.weighted_actions = [
(self.create_deployment, 1),
(self.verify_deployment, 4),
(self.create_application, 1),
(self.verify_application, 4),
]
self.deployments = []
self.applications = []

self.random_killer = random_killer_handle

# Deploy in parallel to avoid long test startup time.
self.wait_for_deployments_ready(
[self.create_deployment(blocking=False) for _ in range(max_deployments)]
self.wait_for_applications_running(
[self.create_application(blocking=False) for _ in range(max_applications)]
)

self.random_killer.run.remote()

def wait_for_deployments_ready(self, deployment_names: List[str]):
def wait_for_applications_running(self, application_names: List[str]):
client = _get_global_client()
for deployment_name in deployment_names:
client._wait_for_deployment_healthy(deployment_name, timeout_s=60)
for name in application_names:
client._wait_for_application_running(name, timeout_s=60)

def create_deployment(self, blocking: bool = True) -> str:
if len(self.deployments) == self.max_deployments:
deployment_to_delete = self.deployments.pop()
_private_api.get_deployment(deployment_to_delete)._delete()
def create_application(self, blocking: bool = True) -> str:
if len(self.applications) == self.max_applications:
app_to_delete = self.applications.pop()
serve.delete(app_to_delete)

new_name = "".join([random.choice(string.ascii_letters) for _ in range(10)])

Expand All @@ -141,23 +140,33 @@ def handler(self, *args):

if blocking:
ray.get(self.random_killer.spare.remote(new_name))
handler._deploy(_blocking=blocking)
self.deployments.append(new_name)
serve.run(
handler.bind(),
name=new_name,
route_prefix=f"/{new_name}",
_blocking=True,
)
self.applications.append(new_name)
ray.get(self.random_killer.stop_spare.remote(new_name))
else:
handler._deploy(_blocking=False)
self.deployments.append(new_name)
serve.run(
handler.bind(),
name=new_name,
route_prefix=f"/{new_name}",
_blocking=False,
)
self.applications.append(new_name)

return new_name

def verify_deployment(self):
deployment = random.choice(self.deployments)
def verify_application(self):
app = random.choice(self.applications)
for _ in range(100):
try:
r = requests.get("http://127.0.0.1:8000/" + deployment)
assert r.text == deployment
r = requests.get("http://127.0.0.1:8000/" + app)
assert r.text == app
except Exception:
print("Request to {} failed.".format(deployment))
print("Request to {} failed.".format(app))
time.sleep(0.1)

def run(self):
Expand Down Expand Up @@ -192,5 +201,5 @@ def run(self):


random_killer = RandomKiller.remote()
tester = RandomTest(random_killer, max_deployments=NUM_NODES * CPUS_PER_NODE)
tester = RandomTest(random_killer, max_applications=NUM_NODES * CPUS_PER_NODE)
tester.run()
36 changes: 20 additions & 16 deletions release/serve_tests/workloads/autoscaling_multi_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
NUM_CONNECTIONS,
)
from typing import Optional
from ray.serve._private import api as _private_api

logger = logging.getLogger(__file__)

Expand All @@ -72,8 +71,10 @@


def setup_multi_deployment_replicas(min_replicas, max_replicas, num_deployments):
"""Returns: list of application route prefixes."""

max_replicas_per_deployment = max_replicas // num_deployments
all_deployment_names = [f"Echo_{i+1}" for i in range(num_deployments)]
all_app_names = [f"Echo_{i+1}" for i in range(num_deployments)]

@serve.deployment(
autoscaling_config={
Expand All @@ -88,35 +89,37 @@ def setup_multi_deployment_replicas(min_replicas, max_replicas, num_deployments)
)
class Echo:
def __init__(self):
self.all_deployment_async_handles = []
self.all_app_async_handles = []

def get_random_async_handle(self):
async def get_random_async_handle(self):
# sync get_handle() and expected to be called only a few times
# during deployment warmup so each deployment has reference to
# all other handles to send recursive inference call
if len(self.all_deployment_async_handles) < len(all_deployment_names):
deployments = list(_private_api.list_deployments().values())
self.all_deployment_async_handles = [
deployment._get_handle(sync=False) for deployment in deployments
if len(self.all_app_async_handles) < len(all_app_names):
applications = list(serve.status().applications.keys())
self.all_app_async_handles = [
serve.get_app_handle(app) for app in applications
]

return random.choice(self.all_deployment_async_handles)
return random.choice(self.all_app_async_handles)

async def handle_request(self, request, depth: int):
# Max recursive call depth reached
if depth > 4:
return "hi"

next_async_handle = self.get_random_async_handle()
obj_ref = await next_async_handle.handle_request.remote(request, depth + 1)
next_async_handle = await self.get_random_async_handle()
fut = next_async_handle.handle_request.remote(request, depth + 1)

return await obj_ref
return await fut

async def __call__(self, request):
return await self.handle_request(request, 0)

for deployment in all_deployment_names:
Echo.options(name=deployment)._deploy()
for name in all_app_names:
serve.run(Echo.bind(), name=name, route_prefix=f"/{name}")

return all_app_names


@click.command()
Expand Down Expand Up @@ -170,11 +173,12 @@ def main(
f"Deploying with min {min_replicas} and max {max_replicas}"
f"target replicas ....\n"
)
setup_multi_deployment_replicas(min_replicas, max_replicas, num_deployments)
all_endpoints = setup_multi_deployment_replicas(
min_replicas, max_replicas, num_deployments
)

logger.info("Warming up cluster ....\n")
endpoint_refs = []
all_endpoints = list(_private_api.list_deployments().keys())
for endpoint in all_endpoints:
endpoint_refs.append(
warm_up_one_cluster.options(num_cpus=0).remote(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
NUM_CPU_PER_NODE,
NUM_CONNECTIONS,
)
from ray.serve._private import api as _private_api
from typing import Optional

logger = logging.getLogger(__file__)
Expand Down Expand Up @@ -76,7 +75,6 @@ def deploy_replicas(min_replicas, max_replicas, max_batch_size):
"downscale_delay_s": 0.2,
"upscale_delay_s": 0.2,
},
version="v1",
)
class Echo:
@serve.batch(max_batch_size=max_batch_size)
Expand All @@ -86,7 +84,7 @@ async def handle_batch(self, requests):
async def __call__(self, request):
return await self.handle_batch(request)

Echo._deploy()
serve.run(Echo.bind(), name="echo", route_prefix="/echo")


def deploy_proxy_replicas():
Expand All @@ -104,7 +102,7 @@ class Proxy:
def __call__(self, request):
return "Proxy"

Proxy._deploy()
serve.run(Proxy.bind(), name="proxy", route_prefix="/proxy")


def save_results(final_result, default_name):
Expand Down Expand Up @@ -173,7 +171,7 @@ def main(
logger.info(f"Starting wrk trial on all nodes for {trial_length} ....\n")
# For detailed discussion, see https://github.com/wg/wrk/issues/205
# TODO:(jiaodong) What's the best number to use here ?
all_endpoints = list(_private_api.list_deployments().keys() - {"proxy"})
all_endpoints = ["/echo"]
all_metrics, all_wrk_stdout = run_wrk_on_all_nodes(
trial_length, NUM_CONNECTIONS, http_host, http_port, all_endpoints=all_endpoints
)
Expand Down
25 changes: 12 additions & 13 deletions release/serve_tests/workloads/multi_deployment_1k_noop_replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
save_test_results,
is_smoke_test,
)
from ray.serve._private import api as _private_api
from serve_test_cluster_utils import (
setup_local_single_node_cluster,
setup_anyscale_cluster,
Expand Down Expand Up @@ -73,35 +72,35 @@ def setup_multi_deployment_replicas(num_replicas, num_deployments) -> List[str]:
@serve.deployment(num_replicas=num_replica_per_deployment)
class Echo:
def __init__(self):
self.all_deployment_async_handles = []
self.all_app_async_handles = []

def get_random_async_handle(self):
async def get_random_async_handle(self):
# sync get_handle() and expected to be called only a few times
# during deployment warmup so each deployment has reference to
# all other handles to send recursive inference call
if len(self.all_deployment_async_handles) < len(all_deployment_names):
deployments = list(_private_api.list_deployments().values())
self.all_deployment_async_handles = [
deployment._get_handle(sync=False) for deployment in deployments
if len(self.all_app_async_handles) < len(all_deployment_names):
applications = list(serve.status().applications.keys())
self.all_app_async_handles = [
serve.get_app_handle(app) for app in applications
]

return random.choice(self.all_deployment_async_handles)
return random.choice(self.all_app_async_handles)

async def handle_request(self, request, depth: int):
# Max recursive call depth reached
if depth > 4:
return "hi"

next_async_handle = self.get_random_async_handle()
obj_ref = await next_async_handle.handle_request.remote(request, depth + 1)
next_async_handle = await self.get_random_async_handle()
fut = next_async_handle.handle_request.remote(request, depth + 1)

return await obj_ref
return await fut

async def __call__(self, request):
return await self.handle_request(request, 0)

for deployment in all_deployment_names:
Echo.options(name=deployment)._deploy()
for name in all_deployment_names:
serve.run(Echo.bind(), name=name, route_prefix=f"/{name}")

return all_deployment_names

Expand Down
Loading

0 comments on commit 58b2614

Please sign in to comment.