Skip to content

Commit

Permalink
[Core] Fix a bug where SIGTERM is ignored to worker processes (ray-pr…
Browse files Browse the repository at this point in the history
…oject#40210)


---------

Co-authored-by: SangBin Cho <[email protected]>
Co-authored-by: sangcho <[email protected]>
  • Loading branch information
3 people authored Oct 25, 2023
1 parent 728f2f4 commit 6603dcb
Show file tree
Hide file tree
Showing 20 changed files with 402 additions and 135 deletions.
14 changes: 10 additions & 4 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@
import ray.job_config
import ray.remote_function
from ray import ActorID, JobID, Language, ObjectRef
from ray._raylet import StreamingObjectRefGenerator
from ray._raylet import (
StreamingObjectRefGenerator,
raise_sys_exit_with_custom_error_message,
)
from ray.runtime_env.runtime_env import _merge_runtime_env
from ray._private import ray_option_utils
from ray._private.client_mode_hook import client_mode_hook
Expand Down Expand Up @@ -785,8 +788,10 @@ def main_loop(self):
"""The main loop a worker runs to receive and execute tasks."""

def sigterm_handler(signum, frame):
shutdown(True)
sys.exit(1)
raise_sys_exit_with_custom_error_message(
"The process receives a SIGTERM.", exit_code=1
)
# Note: shutdown() function is called from atexit handler.

ray._private.utils.set_sigterm_handler(sigterm_handler)
self.core_worker.run_task_loop()
Expand Down Expand Up @@ -1742,7 +1747,8 @@ def shutdown(_exiting_interpreter: bool = False):
# we will tear down any processes spawned by ray.init() and the background
# IO thread in the core worker doesn't currently handle that gracefully.
if hasattr(global_worker, "core_worker"):
global_worker.core_worker.shutdown()
if global_worker.mode == SCRIPT_MODE or global_worker.mode == LOCAL_MODE:
global_worker.core_worker.shutdown_driver()
del global_worker.core_worker
# We need to reset function actor manager to clear the context
global_worker.function_actor_manager = FunctionActorManager(global_worker)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/_private/workers/default_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@
if worker_process_setup_hook_key:
error = load_and_execute_setup_hook(worker_process_setup_hook_key)
if error is not None:
worker.core_worker.exit_worker("system", error)
worker.core_worker.drain_and_exit_worker("system", error)

if mode == ray.WORKER_MODE:
worker.main_loop()
Expand Down
111 changes: 82 additions & 29 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ from ray.includes.common cimport (
PythonGetLogBatchLines,
WORKER_EXIT_TYPE_USER_ERROR,
WORKER_EXIT_TYPE_SYSTEM_ERROR,
WORKER_EXIT_TYPE_INTENTIONAL_SYSTEM_ERROR,
kResourceUnitScaling,
kImplicitResourcePrefix,
kWorkerSetupHookKeyName,
Expand Down Expand Up @@ -456,6 +457,13 @@ cdef int check_status(const CRayStatus& status) nogil except -1:
raise ValueError(message)
elif status.IsRpcError():
raise RpcError(message, rpc_code=status.rpc_code())
elif status.IsIntentionalSystemExit():
with gil:
raise_sys_exit_with_custom_error_message(message)
elif status.IsUnexpectedSystemExit():
with gil:
raise_sys_exit_with_custom_error_message(
message, exit_code=1)
else:
raise RaySystemError(message)

Expand Down Expand Up @@ -692,6 +700,31 @@ cdef int prepare_actor_concurrency_groups(
concurrency_groups.push_back(cg)
return 1


def raise_sys_exit_with_custom_error_message(
ray_terminate_msg: str,
exit_code: int = 0) -> None:
"""It is equivalent to sys.exit, but it can contain
a custom message. Custom message is reported to
raylet and is accessible via GCS (from `ray get workers`).

Note that sys.exit == raise SystemExit. I.e., this API
simply raises SystemExit with a custom error message accessible
via `e.ray_terminate_msg`.

Args:
ray_terminate_msg: The error message to propagate to GCS.
exit_code: The exit code. If it is not 0, it is considered
as a system error.
"""
# Raising SystemExit(0) is equivalent to
# sys.exit(0).
# https://docs.python.org/3/library/exceptions.html#SystemExit
e = SystemExit(exit_code)
e.ray_terminate_msg = ray_terminate_msg
raise e


cdef prepare_args_and_increment_put_refs(
CoreWorker core_worker,
Language language, args,
Expand Down Expand Up @@ -1112,13 +1145,13 @@ cdef report_streaming_generator_output(
is_plasma_object(return_obj.second)))

with nogil:
CCoreWorkerProcess.GetCoreWorker().ReportGeneratorItemReturns(
check_status(CCoreWorkerProcess.GetCoreWorker().ReportGeneratorItemReturns(
return_obj,
context.generator_id,
context.caller_address,
generator_index,
context.attempt_number,
context.waiter)
context.waiter))
context.generator_index += 1
return True
else:
Expand Down Expand Up @@ -1146,13 +1179,13 @@ cdef report_streaming_generator_output(
"Writes to a ObjectRefStream of an "
"index {}".format(context.generator_index))
with nogil:
CCoreWorkerProcess.GetCoreWorker().ReportGeneratorItemReturns(
check_status(CCoreWorkerProcess.GetCoreWorker().ReportGeneratorItemReturns(
return_obj,
context.generator_id,
context.caller_address,
generator_index,
context.attempt_number,
context.waiter)
context.waiter))
context.generator_index += 1
return False

Expand Down Expand Up @@ -1515,10 +1548,7 @@ cdef void execute_task(
# actor, Ray will exit the actor.
def exit_current_actor_if_asyncio():
if core_worker.current_actor_is_asyncio():
error = SystemExit(0)
error.is_ray_terminate = True
error.ray_terminate_msg = "exit_actor() is called."
raise error
raise_sys_exit_with_custom_error_message("exit_actor() is called.")

function_descriptor = CFunctionDescriptorToPython(
ray_function.GetFunctionDescriptor())
Expand Down Expand Up @@ -1963,7 +1993,7 @@ cdef execute_task_with_cancellation_handler(
actor = None
actor_id = core_worker.get_actor_id()
if not actor_id.is_nil():
actor = core_worker.actors[actor_id]
actor = worker.actors[actor_id]

store_task_errors(
worker, e,
Expand Down Expand Up @@ -1993,12 +2023,9 @@ cdef execute_task_with_cancellation_handler(
# If we've reached the max number of executions for this worker, exit.
task_counter = manager.get_task_counter(function_descriptor)
if task_counter == execution_info.max_calls:
exit = SystemExit(0)
exit.is_ray_terminate = True
exit.ray_terminate_msg = (
raise_sys_exit_with_custom_error_message(
"max_call has reached, "
f"max_calls: {execution_info.max_calls}")
raise exit

cdef shared_ptr[LocalMemoryBuffer] ray_error_to_memory_buf(ray_error):
cdef bytes py_bytes = ray_error.to_bytes()
Expand Down Expand Up @@ -2086,26 +2113,27 @@ cdef CRayStatus task_execution_handler(
except SystemExit as e:
# Tell the core worker to exit as soon as the result objects
# are processed.
if hasattr(e, "is_ray_terminate"):
return CRayStatus.IntentionalSystemExit(e.ray_terminate_msg)
elif hasattr(e, "is_creation_task_error"):
if hasattr(e, "is_creation_task_error"):
return CRayStatus.CreationTaskError(e.init_error_message)
elif e.code is not None and e.code == 0:
# This means the system exit was
# normal based on the python convention.
# https://docs.python.org/3/library/sys.html#sys.exit
return CRayStatus.IntentionalSystemExit(
f"Worker exits with an exit code {e.code}.")
msg = f"Worker exits with an exit code {e.code}."
if hasattr(e, "ray_terminate_msg"):
msg += (f" {e.ray_terminate_msg}")
return CRayStatus.IntentionalSystemExit(msg)
else:
msg = f"Worker exits with an exit code {e.code}."
# In K8s, SIGTERM likely means we hit memory limits, so print
# a more informative message there.
if "KUBERNETES_SERVICE_HOST" in os.environ:
msg += (
" The worker may have exceeded K8s pod memory limits.")
if hasattr(e, "ray_terminate_msg"):
msg += (f" {e.ray_terminate_msg}")
if hasattr(e, "unexpected_error_traceback"):
msg += (f"\n {e.unexpected_error_traceback}")
logger.exception(msg)
msg += (f" {e.unexpected_error_traceback}")
return CRayStatus.UnexpectedSystemExit(msg)

return CRayStatus.OK()
Expand All @@ -2122,10 +2150,30 @@ cdef c_bool kill_main_task(const CTaskID &task_id) nogil:

cdef CRayStatus check_signals() nogil:
with gil:
# The Python exceptions are not handled if it is raised from cdef,
# so we have to handle it here.
try:
PyErr_CheckSignals()
except KeyboardInterrupt:
return CRayStatus.Interrupted(b"")
except SystemExit as e:
error_msg = (
"SystemExit is raised (sys.exit is called).")
if e.code is not None:
error_msg += f" Exit code: {e.code}."
else:
error_msg += " Exit code was not specified."

if hasattr(e, "ray_terminate_msg"):
error_msg += f" {e.ray_terminate_msg}"

if e.code and e.code == 0:
return CRayStatus.IntentionalSystemExit(error_msg.encode("utf-8"))
else:
return CRayStatus.UnexpectedSystemExit(error_msg.encode("utf-8"))
# By default, if signals raise an exception, Python just prints them.
# To keep the same behavior, we don't handle any other exceptions.

return CRayStatus.OK()


Expand Down Expand Up @@ -3059,15 +3107,15 @@ cdef class CoreWorker:
self._task_id_to_future = {}
self.thread_pool_for_async_event_loop = None

def shutdown(self):
def shutdown_driver(self):
# If it's a worker, the core worker process should have been
# shutdown. So we can't call
# `CCoreWorkerProcess.GetCoreWorker().GetWorkerType()` here.
# Instead, we use the cached `is_driver` flag to test if it's a
# driver.
if self.is_driver:
with nogil:
CCoreWorkerProcess.Shutdown()
assert self.is_driver
with nogil:
CCoreWorkerProcess.Shutdown()

def notify_raylet(self):
with nogil:
Expand All @@ -3077,13 +3125,16 @@ cdef class CoreWorker:
with nogil:
CCoreWorkerProcess.RunTaskExecutionLoop()

def exit_worker(self, exit_type: str, c_string detail):
def drain_and_exit_worker(self, exit_type: str, c_string detail):
"""
Exit the current worker process. This API should only be used by
a worker. If this API is called, the worker will finish currently
executing task, initiate the shutdown, and stop itself gracefully.
The given exit_type and detail will be reported to GCS, and any
worker failure error will contain them.
a worker. If this API is called, the worker will wait to finish
currently executing task, initiate the shutdown, and stop
itself gracefully. The given exit_type and detail will be
reported to GCS, and any worker failure error will contain them.
The behavior of this API while a task is running is undefined.
Avoid using the API when a task is still running.
"""
cdef:
CWorkerExitType c_exit_type
Expand All @@ -3093,6 +3144,8 @@ cdef class CoreWorker:
c_exit_type = WORKER_EXIT_TYPE_USER_ERROR
if exit_type == "system":
c_exit_type = WORKER_EXIT_TYPE_SYSTEM_ERROR
elif exit_type == "intentional_system_exit":
c_exit_type = WORKER_EXIT_TYPE_INTENTIONAL_SYSTEM_ERROR
else:
raise ValueError(f"Invalid exit type: {exit_type}")
assert not self.is_driver
Expand Down
6 changes: 2 additions & 4 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
STREAMING_GENERATOR_RETURN,
PythonFunctionDescriptor,
StreamingObjectRefGenerator,
raise_sys_exit_with_custom_error_message,
)
from ray.exceptions import AsyncioActorExit
from ray.util.annotations import DeveloperAPI, PublicAPI
Expand Down Expand Up @@ -1462,10 +1463,7 @@ def exit_actor():

# Set a flag to indicate this is an intentional actor exit. This
# reduces log verbosity.
exit = SystemExit(0)
exit.is_ray_terminate = True
exit.ray_terminate_msg = "exit_actor() is called."
raise exit
raise_sys_exit_with_custom_error_message("exit_actor() is called.")
else:
raise TypeError(
"exit_actor API is called on a non-actor worker, "
Expand Down
Loading

0 comments on commit 6603dcb

Please sign in to comment.