Skip to content

Commit

Permalink
[serve] Fix worker replica leak (ray-project#8506)
Browse files Browse the repository at this point in the history
  • Loading branch information
edoakes authored May 20, 2020
1 parent c9c84c8 commit 85cb721
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 16 deletions.
28 changes: 17 additions & 11 deletions python/ray/serve/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,24 +354,30 @@ async def _start_pending_replicas(self):
async def _stop_pending_replicas(self):
"""Stops the pending backend replicas in self.replicas_to_stop.
Stops workers by telling the router to remove them.
Clears self.replicas_to_stop.
Removes workers from the router, kills them, and clears
self.replicas_to_stop.
"""
for backend_tag, replicas_to_stop in self.replicas_to_stop.items():
for replica_tag in replicas_to_stop:
# NOTE(edoakes): the replicas may already be stopped if we
# failed after stopping them but before writing a checkpoint.
try:
# Remove the replica from router.
# This will also submit __ray_terminate__ on the worker.
# NOTE(edoakes): we currently need to kill the worker from
# the router to guarantee that the router won't submit any
# more requests to it.
await self.router.remove_worker.remote(
backend_tag, replica_tag)
replica = ray.util.get_actor(replica_tag)
except ValueError:
pass
continue

# Remove the replica from router. This call is idempotent.
await self.router.remove_worker.remote(backend_tag,
replica_tag)

# TODO(edoakes): this logic isn't ideal because there may be
# pending tasks still executing on the replica. However, if we
# use replica.__ray_terminate__, we may send it while the
# replica is being restarted and there's no way to tell if it
# successfully killed the worker or not.
worker = ray.worker.global_worker
# Kill the actor with no_restart=True.
worker.core_worker.kill_actor(replica._ray_actor_id, True)

self.replicas_to_stop.clear()

Expand Down
6 changes: 1 addition & 5 deletions python/ray/serve/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ async def remove_worker(self, backend_tag, replica_tag):
backend_replica_tag = backend_tag + ":" + replica_tag
if backend_replica_tag not in self.replicas:
return
worker_handle = self.replicas.pop(backend_replica_tag)
del self.replicas[backend_replica_tag]

# We need this lock because we modify worker_queue here.
async with self.flush_lock:
Expand All @@ -262,10 +262,6 @@ async def remove_worker(self, backend_tag, replica_tag):
await new_queue.put(curr_tag)

self.worker_queues[backend_tag] = new_queue
# We need to terminate the worker here instead of from the master
# so we can guarantee that the router won't submit any more tasks
# on it.
worker_handle.__ray_terminate__.remote()

async def set_traffic(self, endpoint, traffic_dict):
logger.debug("Setting traffic for endpoint %s to %s", endpoint,
Expand Down

0 comments on commit 85cb721

Please sign in to comment.