Skip to content

Commit

Permalink
Async end2end test passes
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Mar 31, 2017
1 parent dd36b15 commit 66051c6
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 13 deletions.
13 changes: 8 additions & 5 deletions include/grpc++/impl/codegen/async_unary_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,9 @@ class ClientAsyncResponseReader final
ClientContext* context,
const W& request) {
Call call = channel->CreateCall(method, context, cq);
ClientAsyncResponseReader* reader = static_cast<ClientAsyncResponseReader*>(
grpc_call_arena_alloc(call.call(), sizeof(*reader)));
new (&reader->call_) Call(std::move(call));
reader->context_ = context;
ClientAsyncResponseReader* reader =
new (grpc_call_arena_alloc(call.call(), sizeof(*reader)))
ClientAsyncResponseReader(call, context);

reader->init_buf_.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
Expand Down Expand Up @@ -107,11 +106,15 @@ class ClientAsyncResponseReader final
}

private:
ClientContext* context_;
ClientContext* const context_;
Call call_;

ClientAsyncResponseReader(Call call, ClientContext* context)
: context_(context), call_(call) {}

// disable operator new
static void* operator new(std::size_t size);
static void* operator new(std::size_t size, void* p) { return p; };

SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpClientSendClose>
Expand Down
10 changes: 6 additions & 4 deletions include/grpc++/impl/codegen/call.h
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ class CallOpSetInterface : public CompletionQueueTag {
public:
/// Fills in grpc_op, starting from ops[*nops] and moving
/// upwards.
virtual void FillOps(grpc_op* ops, size_t* nops) = 0;
virtual void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) = 0;
};

/// Primary implementaiton of CallOpSetInterface.
Expand Down Expand Up @@ -598,25 +598,27 @@ class CallOpSet : public CallOpSetInterface,
this->Op4::AddOp(ops, nops);
this->Op5::AddOp(ops, nops);
this->Op6::AddOp(ops, nops);
grpc_call_ref(call);
g_core_codegen_interface->grpc_call_ref(call);
call_ = call;
}

bool FinalizeResult(grpc_call* call, void** tag, bool* status) override {
bool FinalizeResult(void** tag, bool* status) override {
this->Op1::FinishOp(status);
this->Op2::FinishOp(status);
this->Op3::FinishOp(status);
this->Op4::FinishOp(status);
this->Op5::FinishOp(status);
this->Op6::FinishOp(status);
*tag = return_tag_;
grpc_call_unref(call);
g_core_codegen_interface->grpc_call_unref(call_);
return true;
}

void set_output_tag(void* return_tag) { return_tag_ = return_tag; }

private:
void* return_tag_;
grpc_call* call_;
};

/// A CallOpSet that does not post completions to the completion queue.
Expand Down
3 changes: 3 additions & 0 deletions include/grpc++/impl/codegen/core_codegen.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ class CoreCodegen : public CoreCodegenInterface {
void gpr_cv_signal(gpr_cv* cv) override;
void gpr_cv_broadcast(gpr_cv* cv) override;

void grpc_call_ref(grpc_call* call) override;
void grpc_call_unref(grpc_call* call) override;

void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override;

int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
Expand Down
3 changes: 3 additions & 0 deletions include/grpc++/impl/codegen/core_codegen_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ class CoreCodegenInterface {
virtual grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slice,
size_t nslices) = 0;

virtual void grpc_call_ref(grpc_call* call) = 0;
virtual void grpc_call_unref(grpc_call* call) = 0;

virtual grpc_slice grpc_slice_malloc(size_t length) = 0;
virtual void grpc_slice_unref(grpc_slice slice) = 0;
virtual grpc_slice grpc_slice_split_tail(grpc_slice* s, size_t split) = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/client/channel_cc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
static const size_t MAX_OPS = 8;
size_t nops = 0;
grpc_op cops[MAX_OPS];
ops->FillOps(cops, &nops);
ops->FillOps(call->call(), cops, &nops);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(call->call(), cops, nops, ops, nullptr));
}
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/common/core_codegen.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) {
::grpc_byte_buffer_destroy(bb);
}

void CoreCodegen::grpc_call_ref(grpc_call* call) { ::grpc_call_ref(call); }
void CoreCodegen::grpc_call_unref(grpc_call* call) { ::grpc_call_unref(call); }

int CoreCodegen::grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
grpc_byte_buffer* buffer) {
return ::grpc_byte_buffer_reader_init(reader, buffer);
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/server/server_cc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
static const size_t MAX_OPS = 8;
size_t nops = 0;
grpc_op cops[MAX_OPS];
ops->FillOps(cops, &nops);
ops->FillOps(call->call(), cops, &nops);
auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr);
GPR_ASSERT(GRPC_CALL_OK == result);
}
Expand Down
5 changes: 3 additions & 2 deletions src/cpp/server/server_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class ServerContext::CompletionOp final : public CallOpSetInterface {
finalized_(false),
cancelled_(0) {}

void FillOps(grpc_op* ops, size_t* nops) override;
void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) override;
bool FinalizeResult(void** tag, bool* status) override;

bool CheckCancelled(CompletionQueue* cq) {
Expand Down Expand Up @@ -100,7 +100,8 @@ void ServerContext::CompletionOp::Unref() {
}
}

void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) {
void ServerContext::CompletionOp::FillOps(grpc_call* call, grpc_op* ops,
size_t* nops) {
ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
ops->data.recv_close_on_server.cancelled = &cancelled_;
ops->flags = 0;
Expand Down

0 comments on commit 66051c6

Please sign in to comment.