From d5b03bc0458895db8da27d2c99e2675e2df491e0 Mon Sep 17 00:00:00 2001 From: motxx Date: Thu, 13 Jul 2017 10:12:40 +0900 Subject: [PATCH] Update torii async services --- irohad/network/grpc_async_service.hpp | 3 +- irohad/network/grpc_call.hpp | 28 +++---------- irohad/torii/command_service.cpp | 12 +++--- irohad/torii/command_service.hpp | 6 +-- irohad/torii/command_service_handler.cpp | 51 ++++++++++++++++-------- irohad/torii/command_service_handler.hpp | 16 +++++--- 6 files changed, 60 insertions(+), 56 deletions(-) diff --git a/irohad/network/grpc_async_service.hpp b/irohad/network/grpc_async_service.hpp index b1593f75c7..a93919c2d6 100644 --- a/irohad/network/grpc_async_service.hpp +++ b/irohad/network/grpc_async_service.hpp @@ -61,6 +61,7 @@ namespace network { * and creates a new Call instance to serve new clients. */ template - using RpcHandler = std::function&)>; + using RpcHandler = void (ServiceHandler::*)( + Call* call); } // namespace network diff --git a/irohad/network/grpc_call.hpp b/irohad/network/grpc_call.hpp index 53d551ac13..140265ccd9 100644 --- a/irohad/network/grpc_call.hpp +++ b/irohad/network/grpc_call.hpp @@ -22,27 +22,6 @@ limitations under the License. namespace network { - /* - * TODO(motxx): Use this and clarify ownership. - class ReferenceCountable { - public: - void ref() { - count_++; - } - - void unref() { - assert(count > 0); - count_--; - if (!count_) { - delete this; - } - } - - private: - size_t count_ = 0; - }; - */ - /** * to enable various Call instances to process in ServiceHandler::handleRpcs() by polymorphism. * @tparam ServiceHandler @@ -72,7 +51,7 @@ namespace network { * @param serviceHandler - an instance that has all rpc handlers. e.g. CommandService */ void onCompleted(ServiceHandler& serviceHandler) { - switch (callback_) { + switch (state_) { case State::RequestCreated: { call_->requestReceived(serviceHandler); break; @@ -120,6 +99,8 @@ namespace network { * @param serviceHandler - an instance that has all rpc handlers. e.g. CommandService */ void responseSent() override { + // response has been sent and delete the Call instance. + delete this; } /** @@ -131,7 +112,8 @@ namespace network { } /** - * + * creates a Call instance for one rpc and enqueues it + * to the completion queue. * @param serviceHandler * @param cq * @param requestMethod diff --git a/irohad/torii/command_service.cpp b/irohad/torii/command_service.cpp index 4256f29fa2..5c87133ee5 100644 --- a/irohad/torii/command_service.cpp +++ b/irohad/torii/command_service.cpp @@ -18,19 +18,17 @@ limitations under the License. namespace torii { - CommandService::CommandService() { - - } - /** * actual implementation of async Torii in CommandService * @param request - Transaction * @param response - ToriiResponse - * @return grpc::Status - Status::OK if succeeded (TODO(motxx): support Status::CANCELLED) + * @return grpc::Status - Status::OK if succeeded. TODO(motxx): grpc::CANCELLED is not supported. */ - grpc::Status CommandService::ToriiAsync( + static grpc::Status CommandService::ToriiAsync( iroha::protocol::Transaction const& request, iroha::protocol::ToriiResponse& response) { - + response.set_code(iroha::protocol::ResponseCode::OK); + response.set_message("Torii async response"); + return grpc::Status::OK; } } // namespace torii diff --git a/irohad/torii/command_service.hpp b/irohad/torii/command_service.hpp index 9c428f6780..c9b72ee8d3 100644 --- a/irohad/torii/command_service.hpp +++ b/irohad/torii/command_service.hpp @@ -29,15 +29,13 @@ namespace torii { */ class CommandService { public: - CommandService(); - /** * actual implementation of async Torii in CommandService * @param request - Transaction * @param response - ToriiResponse - * @return grpc::Status - Status::OK if succeeded (TODO(motxx): support Status::CANCELLED) + * @return grpc::Status - Status::OK if succeeded. TODO(motxx): grpc::CANCELLED is not supported. */ - grpc::Status ToriiAsync(iroha::protocol::Transaction const& request, iroha::protocol::ToriiResponse& response); + static grpc::Status ToriiAsync(iroha::protocol::Transaction const& request, iroha::protocol::ToriiResponse& response); }; } // namespace torii diff --git a/irohad/torii/command_service_handler.cpp b/irohad/torii/command_service_handler.cpp index a43328c1a9..cff5284b49 100644 --- a/irohad/torii/command_service_handler.cpp +++ b/irohad/torii/command_service_handler.cpp @@ -18,14 +18,12 @@ limitations under the License. #include #include #include +#include namespace prot = iroha::protocol; namespace torii { - /** - * to handle async rpcs of CommandService. - */ /** * requires builder to use same server. * @param builder @@ -36,22 +34,38 @@ namespace torii { } ~CommandServiceHandler::CommandServiceRpcsHandler() override { + delete shutdownAlarm_; + } + + /** + * shuts down service handler. + * specifically, enqueues a special event that causes the completion queue to be shut down. + */ + void CommandServiceHandler::shutdown() override { bool didShutdown = false; - { std::unique_lock } + { + std::unique_lock lock(mtx_); + if (!isShutdown_) { + isShutdown_ = true; + didShutdown = true; + } + } if (didShutdown) { - // Alarm + // enqueue a special event that causes the completion queue to be shut down. + // tag is nullptr in order to determine no Call instance allocated when static_cast. + shutdownAlarm_ = new ::grpc::Alarm(cq_.get(), gpr_now(GPR_CLOCK_MONOTONIC), nullptr); } } /** - * handles all rpc in CommandService. - * We use + * handles rpcs loop in CommandService. */ void CommandServiceHandler::handleRpcs() override { enqueueRequest( &prot::CommandService::AsyncService::RequestTorii, - &CommandServiceHandler::ToriiHandler); + &CommandServiceHandler::ToriiHandler + ); void* tag; bool ok; @@ -61,22 +75,27 @@ namespace torii { if (callbackTag) { callbackTag->onCompleted(*this); } else { + // callbackTag is nullptr (a special event that causes shut down cq_) cq_->Shutdown(); } } } - /** - * releases the completion queue of CommandService. - * @note Call this method after calling server->Shutdown() in ServerRunner - */ - void CommandServiceHandler::shutdown() override { cq_->Shutdown(); } - /** * extracts request and response from Call instance * and calls an actual CommandService::AsyncTorii() implementation. - * then, creates a new Call instance to serve an another client. + * then, spawns a new Call instance to serve an another client. */ - void CommandServiceHandler::ToriiHandler() {} + void CommandServiceHandler::ToriiHandler(CommandServiceCall< + prot::Transaction, prot::ToriiResponse>* call) { + auto stat = CommandService::ToriiAsync(call->request(), call->response()); + call->sendResponse(stat); + + // Spawn a new Call instance to serve an another client. + enqueueRequest( + &prot::CommandService::AsyncService::RequestTorii, + &CommandServiceHandler::ToriiHandler + ); + } } // namespace torii diff --git a/irohad/torii/command_service_handler.hpp b/irohad/torii/command_service_handler.hpp index 30b38c8f47..af2514c6e7 100644 --- a/irohad/torii/command_service_handler.hpp +++ b/irohad/torii/command_service_handler.hpp @@ -17,10 +17,14 @@ limitations under the License. #ifndef TORII_COMMAND_SERVICE_HANDLER_HPP #define TORII_COMMAND_SERVICE_HANDLER_HPP +#include #include #include namespace torii { + /** + * to handle rpcs loop of CommandService. + */ class CommandServiceHandler : public network::GrpcAsyncService { public: @@ -42,8 +46,7 @@ namespace torii { >; /** - * handles all rpc in CommandService. - * We use + * handles rpcs loop in CommandService. */ void handleRpcs() override; @@ -56,8 +59,9 @@ namespace torii { private: /** - * - * @param requester + * helper to call Call::enqueueRequest() + * @param requester - pointer to request method. e.g. &CommandService::AsyncService::RequestTorii + * @param rpcHandler - handler of rpc in ServiceHandler. */ template void enqueueRequest( @@ -80,7 +84,8 @@ namespace torii { * and calls an actual CommandService::AsyncTorii() implementation. * then, creates a new Call instance to serve an another client. */ - void ToriiHandler(); + void ToriiHandler(CommandServiceCall< + iroha::protocol::Transaction, iroha::protocol::ToriiResponse>*); private: iroha::protocol::CommandService::AsyncService asyncService_; @@ -88,6 +93,7 @@ namespace torii { std::unique_ptr cq_; std::mutex mtx_; // TODO(motxx): Write the reason of using mutex for ENQUEUE_REQUEST. bool isShutdown_ = false; + ::grpc::Alarm* shutdownAlarm_ = nullptr; }; } // namespace torii