Skip to content

Commit

Permalink
[call-v3] Add CallInitiator, CallHandler wrappers around CallSpine (g…
Browse files Browse the repository at this point in the history
…rpc#35223)

Closes grpc#35223

COPYBARA_INTEGRATE_REVIEW=grpc#35223 from ctiller:cg-initiator 78826d9
PiperOrigin-RevId: 588481455
  • Loading branch information
ctiller authored and copybara-github committed Dec 6, 2023
1 parent e497eed commit 295b665
Show file tree
Hide file tree
Showing 11 changed files with 172 additions and 26 deletions.
1 change: 1 addition & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1604,6 +1604,7 @@ grpc_cc_library(
"//src/core:slice_refcount",
"//src/core:socket_mutator",
"//src/core:stats_data",
"//src/core:status_flag",
"//src/core:status_helper",
"//src/core:strerror",
"//src/core:thread_quota",
Expand Down
1 change: 1 addition & 0 deletions Package.swift

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions build_autogenerated.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions gRPC-C++.podspec

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions gRPC-Core.podspec

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions grpc.gemspec

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/core/lib/channel/connected_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -883,8 +883,8 @@ grpc_channel_filter MakeConnectedFilter() {
}

ArenaPromise<ServerMetadataHandle> MakeTransportCallPromise(
Transport* transport, CallArgs call_args, NextPromiseFactory) {
return transport->client_transport()->MakeCallPromise(std::move(call_args));
Transport*, CallArgs, NextPromiseFactory) {
Crash("unimplemented");
}

const grpc_channel_filter kPromiseBasedTransportFilter =
Expand Down
180 changes: 156 additions & 24 deletions src/core/lib/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
#include "src/core/lib/promise/latch.h"
#include "src/core/lib/promise/party.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/race.h"
#include "src/core/lib/promise/status_flag.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/connectivity_state.h"
Expand Down Expand Up @@ -244,18 +246,23 @@ class CallSpineInterface {
virtual Pipe<MessageHandle>& client_to_server_messages() = 0;
virtual Pipe<MessageHandle>& server_to_client_messages() = 0;
virtual Pipe<ServerMetadataHandle>& server_trailing_metadata() = 0;
// Cancel the call with the given metadata.
// Regarding the `MUST_USE_RESULT absl::nullopt_t`:
// Most cancellation calls right now happen in pipe interceptors;
// there `nullopt` indicates terminate processing of this pipe and close with
// error.
// It's convenient then to have the Cancel operation (setting the latch to
// terminate the call) be the last thing that occurs in a pipe interceptor,
// and this construction supports that (and has helped the author not write
// some bugs).
GRPC_MUST_USE_RESULT virtual absl::nullopt_t Cancel(
ServerMetadataHandle metadata) = 0;
virtual Latch<ServerMetadataHandle>& cancel_latch() = 0;
virtual Party& party() = 0;
virtual void IncrementRefCount() = 0;
virtual void Unref() = 0;

GRPC_MUST_USE_RESULT absl::nullopt_t Cancel(ServerMetadataHandle metadata) {
GPR_DEBUG_ASSERT(Activity::current() == &party());
auto& c = cancel_latch();
if (c.is_set()) return absl::nullopt;
c.Set(std::move(metadata));
return absl::nullopt;
}

auto WaitForCancel() {
GPR_DEBUG_ASSERT(Activity::current() == &party());
return cancel_latch().Wait();
}

// Wrap a promise so that if it returns failure it automatically cancels
// the rest of the call.
Expand Down Expand Up @@ -317,13 +324,10 @@ class CallSpine final : public CallSpineInterface {
Pipe<ServerMetadataHandle>& server_trailing_metadata() override {
return server_trailing_metadata_;
}
absl::nullopt_t Cancel(ServerMetadataHandle metadata) override {
GPR_DEBUG_ASSERT(Activity::current() == &party());
if (cancel_latch_.is_set()) return absl::nullopt;
cancel_latch_.Set(std::move(metadata));
return absl::nullopt;
}
Latch<ServerMetadataHandle>& cancel_latch() override { return cancel_latch_; }
Party& party() override { Crash("unimplemented"); }
void IncrementRefCount() override { Crash("unimplemented"); }
void Unref() override { Crash("unimplemented"); }

private:
// Initial metadata from client to server
Expand All @@ -340,6 +344,132 @@ class CallSpine final : public CallSpineInterface {
Latch<ServerMetadataHandle> cancel_latch_;
};

class CallInitiator {
public:
explicit CallInitiator(RefCountedPtr<CallSpine> spine)
: spine_(std::move(spine)) {}

auto PushClientInitialMetadata(ClientMetadataHandle md) {
GPR_DEBUG_ASSERT(Activity::current() == &spine_->party());
return Map(spine_->client_initial_metadata().sender.Push(std::move(md)),
[](bool ok) { return StatusFlag(ok); });
}

auto PullServerInitialMetadata() {
GPR_DEBUG_ASSERT(Activity::current() == &spine_->party());
return Map(spine_->server_initial_metadata().receiver.Next(),
[](NextResult<ClientMetadataHandle> md)
-> ValueOrFailure<ClientMetadataHandle> {
if (!md.has_value()) return Failure{};
return std::move(*md);
});
}

auto PullServerTrailingMetadata() {
GPR_DEBUG_ASSERT(Activity::current() == &spine_->party());
return Race(spine_->WaitForCancel(),
Map(spine_->server_trailing_metadata().receiver.Next(),
[spine = spine_](NextResult<ServerMetadataHandle> md)
-> ServerMetadataHandle {
GPR_ASSERT(md.has_value());
return std::move(*md);
}));
}

auto PullMessage() {
GPR_DEBUG_ASSERT(Activity::current() == &spine_->party());
return spine_->server_to_client_messages().receiver.Next();
}

auto PushMessage(MessageHandle message) {
GPR_DEBUG_ASSERT(Activity::current() == &spine_->party());
return spine_->client_to_server_messages().sender.Push(std::move(message));
}

template <typename Promise>
auto CancelIfFails(Promise promise) {
return spine_->CancelIfFails(std::move(promise));
}

template <typename PromiseFactory>
void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) {
spine_->SpawnGuarded(name, std::move(promise_factory));
}

template <typename PromiseFactory>
void SpawnInfallible(absl::string_view name, PromiseFactory promise_factory) {
spine_->SpawnInfallible(name, std::move(promise_factory));
}

private:
const RefCountedPtr<CallSpine> spine_;
};

class CallHandler {
public:
explicit CallHandler(RefCountedPtr<CallSpine> spine)
: spine_(std::move(spine)) {}

auto PullClientInitialMetadata() {
GPR_DEBUG_ASSERT(Activity::current() == &spine_->party());
return Map(spine_->client_initial_metadata().receiver.Next(),
[](NextResult<ClientMetadataHandle> md)
-> ValueOrFailure<ClientMetadataHandle> {
if (!md.has_value()) return Failure{};
return std::move(*md);
});
}

auto PushServerInitialMetadata(ClientMetadataHandle md) {
GPR_DEBUG_ASSERT(Activity::current() == &spine_->party());
return Map(spine_->server_initial_metadata().sender.Push(std::move(md)),
[](bool ok) { return StatusFlag(ok); });
}

auto PushServerTrailingMetadata(ClientMetadataHandle md) {
GPR_DEBUG_ASSERT(Activity::current() == &spine_->party());
return Map(spine_->server_initial_metadata().sender.Push(std::move(md)),
[](bool ok) { return StatusFlag(ok); });
}

auto PullMessage() {
GPR_DEBUG_ASSERT(Activity::current() == &spine_->party());
return spine_->client_to_server_messages().receiver.Next();
}

auto PushMessage(MessageHandle message) {
GPR_DEBUG_ASSERT(Activity::current() == &spine_->party());
return spine_->server_to_client_messages().sender.Push(std::move(message));
}

template <typename Promise>
auto CancelIfFails(Promise promise) {
return spine_->CancelIfFails(std::move(promise));
}

template <typename PromiseFactory>
void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) {
spine_->SpawnGuarded(name, std::move(promise_factory));
}

template <typename PromiseFactory>
void SpawnInfallible(absl::string_view name, PromiseFactory promise_factory) {
spine_->SpawnInfallible(name, std::move(promise_factory));
}

private:
const RefCountedPtr<CallSpine> spine_;
};

template <typename CallHalf>
auto OutgoingMessages(CallHalf& h) {
struct Wrapper {
CallHalf& h;
auto Next() { return h.PullMessage(); }
};
return Wrapper{h};
}

} // namespace grpc_core

// forward declarations
Expand Down Expand Up @@ -745,20 +875,22 @@ class FilterStackTransport {

class ClientTransport {
public:
// Create a promise to execute one client call.
virtual ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args) = 0;
virtual void StartCall(CallHandler call_handler) = 0;

protected:
~ClientTransport() = default;
};

class ServerTransport {
public:
// Register the factory function for the filter stack part of a call
// promise.
void SetCallPromiseFactory(
absl::AnyInvocable<ArenaPromise<ServerMetadataHandle>(CallArgs) const>);
// AcceptFunction takes initial metadata for a new call and returns a
// CallInitiator object for it, for the transport to use to communicate with
// the CallHandler object passed to the application.
using AcceptFunction =
absl::AnyInvocable<absl::StatusOr<CallInitiator>(ClientMetadata&) const>;

// Called once slightly after transport setup to register the accept function.
virtual void SetAcceptFunction(AcceptFunction accept_function) = 0;

protected:
~ServerTransport() = default;
Expand Down
1 change: 1 addition & 0 deletions tools/doxygen/Doxyfile.c++.internal

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions tools/doxygen/Doxyfile.core.internal

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 295b665

Please sign in to comment.