Skip to content

Commit

Permalink
[Serve] fix _to_object_ref memory leak (ray-project#43763)
Browse files Browse the repository at this point in the history
_PyObjScanner/ CloudPickler is designed to pin object refs into memory so they don't escape the scope of existing Ray session. This PR allows ObjectRef and ObjectRefGenerator to also escape the Ray memory pin so they won't cause memory leaks.

---------

Signed-off-by: Gene Der Su <[email protected]>
Co-authored-by: shrekris-anyscale <[email protected]>
  • Loading branch information
GeneDer and shrekris-anyscale authored Mar 28, 2024
1 parent 1f02250 commit c537c90
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 1 deletion.
11 changes: 10 additions & 1 deletion python/ray/serve/_private/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,9 @@ async def _resolve_deployment_responses(
_DeploymentResponseBase,
)

scanner = _PyObjScanner(source_type=_DeploymentResponseBase)
scanner = _PyObjScanner(
source_type=(_DeploymentResponseBase, ray.ObjectRef, ray.ObjectRefGenerator)
)

try:
responses = []
Expand All @@ -390,6 +392,13 @@ async def _resolve_deployment_responses(
elif isinstance(obj, DeploymentResponse):
responses.append(obj)

# This is no-op replacing the object with itself. The purpose is to make
# sure both object refs and object ref generator are not getting pinned
# to memory by the scanner and cause memory leak.
# See: https://github.com/ray-project/ray/issues/43248
elif isinstance(obj, (ray.ObjectRef, ray.ObjectRefGenerator)):
replacement_table[obj] = obj

# Gather `DeploymentResponse` object refs concurrently.
if len(responses) > 0:
obj_refs = await asyncio.gather(
Expand Down
48 changes: 48 additions & 0 deletions python/ray/serve/tests/test_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
import time

import pytest
import requests
from starlette.requests import Request

import ray
from ray import serve
from ray._private.test_utils import SignalActor
from ray.serve._private.constants import SERVE_DEFAULT_APP_NAME
from ray.serve.handle import DeploymentHandle


def test_serve_forceful_shutdown(serve_instance):
Expand Down Expand Up @@ -97,6 +100,51 @@ def __call__(self):
handle.remote().result(timeout_s=10)


def test_passing_object_ref_to_deployment_not_pinned_to_memory(serve_instance):
"""Passing object refs to deployments should not pin the refs in memory.
We had issue that passing object ref to a deployment will result in memory leak
due to _PyObjScanner/ cloudpickler pinning the object to memory. This test will
ensure the object ref is released after the request is done.
See: https://github.com/ray-project/ray/issues/43248
"""

@serve.deployment
class Dep1:
def multiple_by_two(self, length: int):
return length * 2

@serve.deployment
class Gateway:
def __init__(self, dep1: DeploymentHandle):
self.dep1: DeploymentHandle = dep1

async def __call__(self, http_request: Request) -> str:
_length = int(http_request.query_params.get("length"))
length_ref = ray.put(_length)
obj_ref_hex = length_ref.hex()

# Object ref should be in the memory for downstream deployment to access.
assert obj_ref_hex in ray._private.internal_api.memory_summary()
return {
"result": await self.dep1.multiple_by_two.remote(length_ref),
"length": _length,
"obj_ref_hex": obj_ref_hex,
}

app = Gateway.bind(Dep1.bind())
serve.run(target=app)

length = 10
response = requests.get(f"http://localhost:8000?length={length}").json()
assert response["result"] == length * 2
assert response["length"] == length

# Ensure the object ref is not in the memory anymore.
assert response["obj_ref_hex"] not in ray._private.internal_api.memory_summary()


if __name__ == "__main__":
import sys

Expand Down

0 comments on commit c537c90

Please sign in to comment.