Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
[event_engine] Move combiner executor usage to event engine (grpc#31713)
Browse files Browse the repository at this point in the history
* [event_engine] Move combiner executor usage to event engine

* fix

* review feedback

* fix

* x

* fix

* fix
  • Loading branch information
ctiller authored Nov 22, 2022
1 parent c34d99f commit b04aa1c
Show file tree
Hide file tree
Showing 35 changed files with 159 additions and 62 deletions.
4 changes: 3 additions & 1 deletion BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,7 @@ grpc_cc_library(
"//src/core:arena",
"//src/core:channel_args_preconditioning",
"//src/core:channel_stack_type",
"//src/core:default_event_engine",
"//src/core:iomgr_fwd",
"//src/core:iomgr_port",
"//src/core:slice",
Expand Down Expand Up @@ -3166,11 +3167,12 @@ grpc_cc_library(
"//src/core:channel_init",
"//src/core:channel_stack_type",
"//src/core:context",
"//src/core:for_each",
"//src/core:grpc_message_size_filter",
"//src/core:latch",
"//src/core:map_pipe",
"//src/core:percent_encoding",
"//src/core:pipe",
"//src/core:promise_like",
"//src/core:seq",
"//src/core:slice",
"//src/core:slice_buffer",
Expand Down
5 changes: 5 additions & 0 deletions bazel/grpc_deps.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ def grpc_deps():
actual = "@com_google_googleapis//google/logging/v2:logging_cc_grpc",
)

native.bind(
name = "googleapis_logging_cc_proto",
actual = "@com_google_googleapis//google/logging/v2:logging_cc_proto",
)

if "boringssl" not in native.existing_rules():
http_archive(
name = "boringssl",
Expand Down
4 changes: 1 addition & 3 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3896,7 +3896,6 @@ grpc_cc_library(
"lb_policy_factory",
"lb_policy_registry",
"pollset_set",
"ref_counted",
"subchannel_interface",
"time",
"validation_errors",
Expand Down Expand Up @@ -4180,7 +4179,6 @@ grpc_cc_library(
"lb_policy_factory",
"lb_policy_registry",
"pollset_set",
"ref_counted",
"subchannel_interface",
"time",
"validation_errors",
Expand Down Expand Up @@ -4221,7 +4219,6 @@ grpc_cc_library(
"lb_policy_factory",
"lb_policy_registry",
"pollset_set",
"ref_counted",
"subchannel_interface",
"time",
"validation_errors",
Expand Down Expand Up @@ -4776,6 +4773,7 @@ grpc_cc_library(
"//:gpr",
"//:gpr_platform",
"//:grpc_base",
"//:grpc_public_hdrs",
"//:hpack_encoder",
"//:hpack_parser",
],
Expand Down
1 change: 0 additions & 1 deletion src/core/ext/filters/client_channel/config_selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include <grpc/slice.h>
#include <grpc/support/log.h>

#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/ref_counted.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/validation_errors.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/validation_errors.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/validation_errors.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

#include <inttypes.h>

#include <cstdint>
#include <functional>
#include <memory>
#include <utility>
Expand All @@ -43,9 +42,10 @@
#include "src/core/lib/compression/message_compress.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/for_each.h"
#include "src/core/lib/promise/detail/promise_like.h"
#include "src/core/lib/promise/latch.h"
#include "src/core/lib/promise/map_pipe.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/promise/try_concurrently.h"
Expand Down
4 changes: 3 additions & 1 deletion src/core/ext/transport/binder/transport/binder_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "src/core/ext/transport/binder/wire_format/wire_reader.h"
#include "src/core/ext/transport/binder/wire_format/wire_reader_impl.h"
#include "src/core/ext/transport/binder/wire_format/wire_writer.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/transport/error_utils.h"
Expand Down Expand Up @@ -709,7 +710,8 @@ grpc_binder_transport::grpc_binder_transport(
std::unique_ptr<grpc_binder::Binder> binder, bool is_client,
std::shared_ptr<grpc::experimental::binder::SecurityPolicy> security_policy)
: is_client(is_client),
combiner(grpc_combiner_create()),
combiner(grpc_combiner_create(
grpc_event_engine::experimental::GetDefaultEventEngine())),
state_tracker(
is_client ? "binder_transport_client" : "binder_transport_server",
GRPC_CHANNEL_READY),
Expand Down
6 changes: 5 additions & 1 deletion src/core/ext/transport/binder/wire_format/wire_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

#include <grpc/support/log.h>

#include "src/core/lib/event_engine/default_event_engine.h"

#define RETURN_IF_ERROR(expr) \
do { \
const absl::Status status = (expr); \
Expand Down Expand Up @@ -81,7 +83,9 @@ absl::Status WriteTrailingMetadata(const Transaction& tx,
}

WireWriterImpl::WireWriterImpl(std::unique_ptr<Binder> binder)
: binder_(std::move(binder)), combiner_(grpc_combiner_create()) {}
: binder_(std::move(binder)),
combiner_(grpc_combiner_create(
grpc_event_engine::experimental::GetDefaultEventEngine())) {}

WireWriterImpl::~WireWriterImpl() {
GRPC_COMBINER_UNREF(combiner_, "wire_writer_impl");
Expand Down
1 change: 1 addition & 0 deletions src/core/ext/transport/chaotic_good/frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "absl/status/statusor.h"

#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/log.h>

#include "src/core/lib/gprpp/bitset.h"
Expand Down
5 changes: 4 additions & 1 deletion src/core/ext/transport/chttp2/transport/chttp2_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "absl/types/optional.h"
#include "absl/types/variant.h"

#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/slice_buffer.h>
Expand Down Expand Up @@ -475,7 +476,9 @@ grpc_chttp2_transport::grpc_chttp2_transport(
grpc_endpoint_get_peer(ep), ":client_transport"))),
self_reservation(
memory_owner.MakeReservation(sizeof(grpc_chttp2_transport))),
combiner(grpc_combiner_create()),
combiner(grpc_combiner_create(
channel_args
.GetObjectRef<grpc_event_engine::experimental::EventEngine>())),
state_tracker(is_client ? "client_transport" : "server_transport",
GRPC_CHANNEL_READY),
is_client(is_client),
Expand Down
1 change: 1 addition & 0 deletions src/core/ext/transport/chttp2/transport/flow_control.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <cmath>
#include <ostream>
#include <string>
#include <tuple>
#include <vector>

#include "absl/strings/str_cat.h"
Expand Down
1 change: 1 addition & 0 deletions src/core/ext/xds/xds_route_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/env.h"
#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
Expand Down
26 changes: 12 additions & 14 deletions src/core/lib/iomgr/combiner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#include <grpc/support/log.h>

#include "src/core/lib/gprpp/mpscq.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_internal.h"

grpc_core::DebugOnlyTraceFlag grpc_combiner_trace(false, "combiner");
Expand All @@ -49,14 +49,14 @@ static void combiner_finally_exec(grpc_core::Combiner* lock,
grpc_closure* closure,
grpc_error_handle error);

static void offload(void* arg, grpc_error_handle error);

grpc_core::Combiner* grpc_combiner_create(void) {
grpc_core::Combiner* grpc_combiner_create(
std::shared_ptr<grpc_event_engine::experimental::EventEngine>
event_engine) {
grpc_core::Combiner* lock = new grpc_core::Combiner();
lock->event_engine = std::move(event_engine);
gpr_ref_init(&lock->refs, 1);
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
grpc_closure_list_init(&lock->final_list);
GRPC_CLOSURE_INIT(&lock->offload, offload, lock, nullptr);
GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p create", lock));
return lock;
}
Expand Down Expand Up @@ -163,15 +163,15 @@ static void move_next() {
}
}

static void offload(void* arg, grpc_error_handle /*error*/) {
grpc_core::Combiner* lock = static_cast<grpc_core::Combiner*>(arg);
push_last_on_exec_ctx(lock);
}

static void queue_offload(grpc_core::Combiner* lock) {
move_next();
GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p queue_offload", lock));
grpc_core::Executor::Run(&lock->offload, absl::OkStatus());
lock->event_engine->Run([lock] {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx(0);
push_last_on_exec_ctx(lock);
exec_ctx.Flush();
});
}

bool grpc_combiner_continue_exec_ctx() {
Expand All @@ -198,9 +198,7 @@ bool grpc_combiner_continue_exec_ctx() {
// 2. the current execution context needs to finish as soon as possible
// 3. the current thread is not a worker for any background poller
// 4. the DEFAULT executor is threaded
if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish() &&
!grpc_iomgr_platform_is_any_background_poller_thread() &&
grpc_core::Executor::IsThreadedDefault()) {
if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish()) {
// this execution context wants to move on: schedule remaining work to be
// picked up on the executor
queue_offload(lock);
Expand Down
5 changes: 3 additions & 2 deletions src/core/lib/iomgr/combiner.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ class Combiner {
gpr_atm state;
bool time_to_execute_final_list = false;
grpc_closure_list final_list;
grpc_closure offload;
gpr_refcount refs;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine;
};
} // namespace grpc_core

Expand All @@ -61,7 +61,8 @@ class Combiner {

// Initialize the lock, with an optional workqueue to shift load to when
// necessary
grpc_core::Combiner* grpc_combiner_create(void);
grpc_core::Combiner* grpc_combiner_create(
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine);

#ifndef NDEBUG
#define GRPC_COMBINER_DEBUG_ARGS \
Expand Down
6 changes: 6 additions & 0 deletions src/cpp/ext/gcp/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,22 @@ grpc_cc_library(
"absl/strings",
"absl/strings:str_format",
"absl/types:optional",
"googleapis_logging_cc_proto",
"googleapis_logging_grpc_service",
"protobuf_headers",
],
language = "c++",
visibility = ["//test:__subpackages__"],
deps = [
"observability_config",
"//:gpr",
"//:gpr_platform",
"//:grpc++",
"//:grpc_base",
"//:grpc_opencensus_plugin",
"//src/core:env",
"//src/core:json",
"//src/core:time",
"//src/cpp/ext/filters/logging:logging_sink",
],
)
1 change: 1 addition & 0 deletions src/cpp/ext/gcp/observability_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <stdint.h>

#include <map>
#include <string>
#include <vector>

Expand Down
13 changes: 13 additions & 0 deletions src/cpp/ext/gcp/observability_logging_sink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,28 @@
#include <stddef.h>

#include <algorithm>
#include <map>
#include <utility>

#include <google/protobuf/timestamp.pb.h>

#include "absl/strings/str_cat.h"
#include "absl/strings/str_format.h"
#include "absl/types/optional.h"
#include "google/logging/v2/log_entry.pb.h"
#include "google/logging/v2/logging.grpc.pb.h"
#include "google/logging/v2/logging.pb.h"

#include <grpc/impl/codegen/gpr_types.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/security/credentials.h>
#include <grpcpp/support/channel_arguments.h>
#include <grpcpp/support/status.h>

#include "src/core/lib/gprpp/env.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/json/json.h"
#include "src/cpp/ext/filters/census/open_census_call_tracer.h"

namespace grpc {
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/ext/gcp/observability_logging_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include <string>
#include <vector>

#include <google/protobuf/struct.pb.h>

#include "absl/strings/string_view.h"
#include "google/logging/v2/logging.grpc.pb.h"

Expand Down
2 changes: 2 additions & 0 deletions test/core/client_channel/lb_policy/lb_policy_test_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <map>
#include <memory>
#include <string>
#include <tuple>
#include <utility>
#include <vector>

Expand All @@ -36,6 +37,7 @@
#include "absl/strings/string_view.h"
#include "absl/synchronization/notification.h"
#include "absl/types/optional.h"
#include "absl/types/span.h"
#include "absl/types/variant.h"
#include "gtest/gtest.h"

Expand Down
7 changes: 0 additions & 7 deletions test/core/client_channel/lb_policy/outlier_detection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,20 @@

#include <algorithm>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "gtest/gtest.h"

#include <grpc/grpc.h>

#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/resolver/server_address.h"
#include "test/core/client_channel/lb_policy/lb_policy_test_lib.h"
#include "test/core/util/test_config.h"

Expand Down
Loading

0 comments on commit b04aa1c

Please sign in to comment.