Skip to content

Commit

Permalink
[Core] Streaming generator exprimental backpressure (ray-project#40285)
Browse files Browse the repository at this point in the history
This is the PR to support streaming generator backpressure API. The PR is a bit big, but most of them are tests and plumbing.

The API is private. I'd be open to change the name of the argument, but since we have limited time, we can probably do it in 2.9 instead with an official API proposal (this PR is needed by Ray data by 2.8). 

Note that the API is not supported for async actors. There's no immediate use case & supporting async actor is much more difficult because we should implement unblocking pause. 

```
   * Backpressure Impl
   * -----------------
   * Streaming generator optionally supports backpressure when
   * `streaming_generator_backpressure_size_bytes` is included in a task spec.
   *
   * Executor Side:
   * - When a new object is yielded, executor sends a gRPC request that
   *   contains an object size and records total_object_generated.
   * - If a total_object_generated - total_object_consumed > threshold,
   *   it blocks a thread and pauses execution. The consumer communicates
   *   `object_consumed` (via gRPC reply) when objects are consumed from it,
   *   and the execution resumes.
   * - If a gRPC request fails, the executor assumes all the objects are
   *   consumed and resume execution. (alternatively, we can fail execution).
   *
   * Client Side:
   * - If object_generated - object_consumed < threshold, it sends a reply that
   *   contains `object_consumed` to an executor immediately.
   * - If object_generated - object_consumed > threshold, it doesn't reply
   *   until objects are consumed via TryReadObjectRefStream.
   * - If objects are not going to be consumed (e.g., generator is deleted
   *   or objects are already consumed), it replies immediately.
```

See more details here; https://docs.google.com/document/d/1Ugxs7SgCDqUk44SMyO_l7Rj17drHY2Qn6GDd-BxV2Pk/edit
---------

Signed-off-by: Jiajun Yao <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Co-authored-by: SangBin Cho <[email protected]>
Co-authored-by: Jiajun Yao <[email protected]>
Co-authored-by: Hao Chen <[email protected]>
  • Loading branch information
4 people authored Oct 23, 2023
1 parent 2104d63 commit 5d46919
Show file tree
Hide file tree
Showing 41 changed files with 1,503 additions and 232 deletions.
14 changes: 14 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,20 @@ ray_cc_test(
],
)

ray_cc_test(
name = "generator_waiter_test",
size = "small",
srcs = ["src/ray/core_worker/test/generator_waiter_test.cc"],
tags = ["team:core"],
deps = [
":core_worker_lib",
":gcs_client_lib",
":ray_mock",
"//src/ray/common:test_util",
"@com_google_googletest//:gtest_main",
],
)

ray_cc_test(
name = "actor_manager_test",
size = "small",
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/ray/runtime/task/local_mode_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation,
1,
/*returns_dynamic=*/false,
/*is_streaming_generator*/ false,
/*generator_backpressure_num_objects*/ -1,
required_resources,
required_placement_resources,
"",
Expand Down Expand Up @@ -93,6 +94,7 @@ ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation,
builder.AddArg(*invocation.args[i]);
}
auto task_specification = builder.Build();

ObjectID return_object_id = task_specification.ReturnId(0);

std::shared_ptr<msgpack::sbuffer> actor;
Expand Down
1 change: 1 addition & 0 deletions cpp/src/ray/runtime/task/native_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ ObjectID NativeTaskSubmitter::Submit(InvocationSpec &invocation,
options.name = call_options.name;
options.resources = call_options.resources;
options.serialized_runtime_env_info = call_options.serialized_runtime_env_info;
options.generator_backpressure_num_objects = -1;
std::vector<rpc::ObjectReference> return_refs;
if (invocation.task_type == TaskType::ACTOR_TASK) {
auto status = core_worker.SubmitActorTask(invocation.actor_id,
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/ray/runtime/task/task_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ Status TaskExecutor::ExecuteTask(
const std::string name_of_concurrency_group_to_execute,
bool is_reattempt,
bool is_streaming_generator,
bool retry_exception) {
bool retry_exception,
int64_t generator_backpressure_num_objects) {
RAY_LOG(DEBUG) << "Execute task type: " << TaskType_Name(task_type)
<< " name:" << task_name;
RAY_CHECK(ray_function.GetLanguage() == ray::Language::CPP);
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/ray/runtime/task/task_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ class TaskExecutor {
const std::string name_of_concurrency_group_to_execute,
bool is_reattempt,
bool is_streaming_generator,
bool retry_exception);
bool retry_exception,
int64_t generator_backpressure_num_objects);

virtual ~TaskExecutor(){};

Expand Down
11 changes: 11 additions & 0 deletions python/ray/_private/ray_option_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,17 @@ def issubclass_safe(obj: Any, cls_: type) -> bool:
else "retry_exceptions must be either a boolean or a list of exceptions",
default_value=False,
),
"_generator_backpressure_num_objects": Option(
(int, type(None)),
lambda x: None
if x != 0
else (
"_generator_backpressure_num_objects=0 is not allowed. "
"Use a value > 0. If the value is equal to 1, the behavior "
"is identical to Python generator (generator 1 object "
"whenever `next` is called). Use -1 to disable this feature. "
),
),
}

_actor_only_options = {
Expand Down
75 changes: 54 additions & 21 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ from ray.includes.libcoreworker cimport (
ResourceMappingType,
CFiberEvent,
CActorHandle,
CGeneratorBackpressureWaiter,
)

from ray.includes.ray_config cimport RayConfig
Expand Down Expand Up @@ -963,6 +964,9 @@ cdef class StreamingGeneratorExecutionContext:
raises an exception, and the error is retryable.
application_error(out): It is set if the generator raises an
application error.
generator_backpressure_num_objects: The backpressure threshold
for streaming generator. The stremaing generator pauses if
total number of unconsumed objects exceed this threshold.
"""

cdef:
Expand Down Expand Up @@ -993,6 +997,7 @@ cdef class StreamingGeneratorExecutionContext:
c_vector[c_pair[CObjectID, c_bool]] *streaming_generator_returns
c_bool *is_retryable_error
c_string *application_error
shared_ptr[CGeneratorBackpressureWaiter] waiter

def initialize(self, generator: Union[Generator, AsyncGenerator]):
# We couldn't make this a part of `make` method because
Expand Down Expand Up @@ -1024,6 +1029,7 @@ cdef class StreamingGeneratorExecutionContext:
c_vector[c_pair[CObjectID, c_bool]] *streaming_generator_returns,
c_bool *is_retryable_error,
c_string *application_error,
int64_t generator_backpressure_num_objects,
):
cdef StreamingGeneratorExecutionContext self = (
StreamingGeneratorExecutionContext())
Expand All @@ -1045,7 +1051,9 @@ cdef class StreamingGeneratorExecutionContext:
self.streaming_generator_returns = streaming_generator_returns
self.is_retryable_error = is_retryable_error
self.application_error = application_error
self.should_retry_exceptions, = should_retry_exceptions,
self.should_retry_exceptions = should_retry_exceptions
self.waiter = make_shared[CGeneratorBackpressureWaiter](
generator_backpressure_num_objects)
return self


Expand Down Expand Up @@ -1074,6 +1082,7 @@ cdef report_streaming_generator_output(
cdef:
# Ray Object created from an output.
c_pair[CObjectID, shared_ptr[CRayObject]] return_obj
int64_t generator_index = context.generator_index

if isinstance(output_or_exception, Exception):
create_generator_error_object(
Expand Down Expand Up @@ -1102,12 +1111,14 @@ cdef report_streaming_generator_output(
return_obj.first,
is_plasma_object(return_obj.second)))

CCoreWorkerProcess.GetCoreWorker().ReportGeneratorItemReturns(
return_obj,
context.generator_id,
context.caller_address,
context.generator_index,
context.attempt_number)
with nogil:
CCoreWorkerProcess.GetCoreWorker().ReportGeneratorItemReturns(
return_obj,
context.generator_id,
context.caller_address,
generator_index,
context.attempt_number,
context.waiter)
context.generator_index += 1
return True
else:
Expand All @@ -1134,12 +1145,14 @@ cdef report_streaming_generator_output(
logger.debug(
"Writes to a ObjectRefStream of an "
"index {}".format(context.generator_index))
CCoreWorkerProcess.GetCoreWorker().ReportGeneratorItemReturns(
return_obj,
context.generator_id,
context.caller_address,
context.generator_index,
context.attempt_number)
with nogil:
CCoreWorkerProcess.GetCoreWorker().ReportGeneratorItemReturns(
return_obj,
context.generator_id,
context.caller_address,
generator_index,
context.attempt_number,
context.waiter)
context.generator_index += 1
return False

Expand Down Expand Up @@ -1480,7 +1493,8 @@ cdef void execute_task(
title,
task_name,
c_bool is_streaming_generator,
c_bool should_retry_exceptions) except *:
c_bool should_retry_exceptions,
int64_t generator_backpressure_num_objects) except *:
worker = ray._private.worker.global_worker
manager = worker.function_actor_manager
actor = None
Expand Down Expand Up @@ -1661,12 +1675,18 @@ cdef void execute_task(
should_retry_exceptions,
streaming_generator_returns,
is_retryable_error,
application_error)
application_error,
generator_backpressure_num_objects)
# We cannot pass generator to cdef in Cython for some reasons.
# It is a workaround.
context.initialize(outputs)

if is_async_gen:
if generator_backpressure_num_objects != -1:
raise ValueError(
"_generator_backpressure_num_objects is "
"not supported for an async actor."
)
# Note that the report RPCs are called inside an
# event loop thread.
core_worker.run_async_func_or_coro_in_event_loop(
Expand Down Expand Up @@ -1835,7 +1855,8 @@ cdef execute_task_with_cancellation_handler(
const c_string c_name_of_concurrency_group_to_execute,
c_bool is_reattempt,
c_bool is_streaming_generator,
c_bool should_retry_exceptions):
c_bool should_retry_exceptions,
int64_t generator_backpressure_num_objects):

is_retryable_error[0] = False

Expand Down Expand Up @@ -1927,7 +1948,8 @@ cdef execute_task_with_cancellation_handler(
c_name_of_concurrency_group_to_execute,
is_reattempt, execution_info, title, task_name,
is_streaming_generator,
should_retry_exceptions)
should_retry_exceptions,
generator_backpressure_num_objects)

# Check for cancellation.
PyErr_CheckSignals()
Expand Down Expand Up @@ -2003,7 +2025,8 @@ cdef CRayStatus task_execution_handler(
const c_string name_of_concurrency_group_to_execute,
c_bool is_reattempt,
c_bool is_streaming_generator,
c_bool should_retry_exceptions) nogil:
c_bool should_retry_exceptions,
int64_t generator_backpressure_num_objects) nogil:
with gil, disable_client_hook():
# Initialize job_config if it hasn't already.
# Setup system paths configured in job_config.
Expand All @@ -2030,7 +2053,8 @@ cdef CRayStatus task_execution_handler(
name_of_concurrency_group_to_execute,
is_reattempt,
is_streaming_generator,
should_retry_exceptions)
should_retry_exceptions,
generator_backpressure_num_objects)
except Exception as e:
sys_exit = SystemExit()
if isinstance(e, RayActorError) and \
Expand Down Expand Up @@ -3510,6 +3534,7 @@ cdef class CoreWorker:
scheduling_strategy,
c_string debugger_breakpoint,
c_string serialized_runtime_env_info,
int64_t generator_backpressure_num_objects
):
cdef:
unordered_map[c_string, double] c_resources
Expand Down Expand Up @@ -3553,6 +3578,7 @@ cdef class CoreWorker:
task_options = CTaskOptions(
name, num_returns, c_resources,
b"",
generator_backpressure_num_objects,
serialized_runtime_env_info)

# We are in the async context. We have to obtain
Expand Down Expand Up @@ -3737,7 +3763,8 @@ cdef class CoreWorker:
c_string name,
int num_returns,
double num_method_cpus,
c_string concurrency_group_name):
c_string concurrency_group_name,
int64_t generator_backpressure_num_objects):

cdef:
CActorID c_actor_id = actor_id.native()
Expand Down Expand Up @@ -3775,7 +3802,11 @@ cdef class CoreWorker:
ray_function,
args_vector,
CTaskOptions(
name, num_returns, c_resources, concurrency_group_name),
name,
num_returns,
c_resources,
concurrency_group_name,
generator_backpressure_num_objects),
return_refs,
current_c_task_id)
# These arguments were serialized and put into the local object
Expand Down Expand Up @@ -3895,6 +3926,7 @@ cdef class CoreWorker:
method_meta.decorators,
method_meta.signatures,
method_meta.num_returns,
method_meta.generator_backpressure_num_objects, # noqa
actor_method_cpu,
actor_creation_function_descriptor,
worker.current_session_and_job)
Expand All @@ -3903,6 +3935,7 @@ cdef class CoreWorker:
{}, # method decorators
{}, # method signatures
{}, # method num_returns
{}, # generator_backpressure_num_objects
0, # actor method cpu
actor_creation_function_descriptor,
worker.current_session_and_job)
Expand Down
Loading

0 comments on commit 5d46919

Please sign in to comment.