From cb1826a9a94110ae473377fe915e7dd05c2d2c8e Mon Sep 17 00:00:00 2001 From: motxx Date: Thu, 13 Jul 2017 04:56:34 +0900 Subject: [PATCH 01/21] Add grpc_async_service.hpp --- irohad/network/grpc_async_service.hpp | 66 +++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 irohad/network/grpc_async_service.hpp diff --git a/irohad/network/grpc_async_service.hpp b/irohad/network/grpc_async_service.hpp new file mode 100644 index 0000000000..b1593f75c7 --- /dev/null +++ b/irohad/network/grpc_async_service.hpp @@ -0,0 +1,66 @@ +/* +Copyright 2016 Soramitsu Co., Ltd. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + */ + +namespace network { + + template + class Call; + + /** + * interface of handling rpcs in a service. + */ + class GrpcAsyncService { + public: + virtual ~GrpcAsyncService() {} + + /** + * handles incoming RPCs. + * 1. Create a Call instance that handles a rpc. + * 2. Execute CompletionQueue::Next() and gets current status with tag. + * 3. Handle a rpc associated with the tag. For polymorphism, cast the tag with UntypedCall. + * 4. Back to 2 + */ + virtual void handleRpcs() = 0; + + /** + * stops spawning new Call instances and enqueues a special event + * that causes the completion queue to be shut down. + */ + virtual void shutdown() = 0; + }; + + /** + * to refer a method that requests one rpc. + * e.g. iroha::protocol::AsyncService::RequestTorii + * + * AsyncService - e.g. iroha::protocol::CommandService::AsyncService + * RequestType - e.g. Transaction in rpc Torii (Transaction) returns (ToriiResponse) + * ResponseType - e.g. ToriiResponse in rpc Torii (Transaction) returns (ToriiResponse) + */ + template + using RequestMethod = void (AsyncService::*)( + ::grpc::ServerContext*, RequestType*, + ::grpc::ServerAsyncResponseWriter*, + ::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*); + + /** + * to refer a method that extracts request and response from Call instance + * and creates a new Call instance to serve new clients. + */ + template + using RpcHandler = std::function&)>; + +} // namespace network From 0eaebcab8e3a80ceba8ee8fab73e3713a6311ca3 Mon Sep 17 00:00:00 2001 From: motxx Date: Thu, 13 Jul 2017 08:05:15 +0900 Subject: [PATCH 02/21] Add grpc_call.hpp --- irohad/network/grpc_call.hpp | 163 +++++++++++++++++++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 irohad/network/grpc_call.hpp diff --git a/irohad/network/grpc_call.hpp b/irohad/network/grpc_call.hpp new file mode 100644 index 0000000000..ca5dd92d94 --- /dev/null +++ b/irohad/network/grpc_call.hpp @@ -0,0 +1,163 @@ +/* +Copyright 2016 Soramitsu Co., Ltd. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + */ + +#ifndef NETWORK_GRPC_CALL_HPP +#define NETWORK_GRPC_CALL_HPP + +#include +#include + +namespace network { + + /* + * TODO(motxx): Use this and clarify ownership. + class ReferenceCountable { + public: + void ref() { + count_++; + } + + void unref() { + assert(count > 0); + count_--; + if (!count_) { + delete this; + } + } + + private: + size_t count_ = 0; + }; + */ + + /** + * to enable various types to process in ServiceHandler::handleRpcs() + * + * @tparam ServiceHandler + */ + template + class UntypedCall { + public: + UntypedCall(State const& state) + : state_(state) {} + + virtual ~UntypedCall() {} + + /** + * selects a procedure by state and invokes it by using polymorphism. + * @param serviceHandler + */ + virtual void requestReceived(ServiceHandler& serviceHandler) = 0; + + /** + * + */ + virtual void responseSent() = 0; + + void onCompleted(ServiceHandler& serviceHandler) { + switch (callback_) { + case State::RequestCreated: { + call_->requestReceived(serviceHandler); + break; + } + case State::ResponseSent: { + call_->responseSent(); + break; + } + } + } + + enum class State { RequestCreated, ResponseSent }; + + private: + const State state_; + }; + + /** + * to manage the state of one rpc. + * @tparam ServiceHandler + * @tparam AsyncService + * @tparam RequestType + * @tparam ResponseType + */ + template + class Call : public UntypedCall { + public: + Call(RpcHandler const& rpcHandler) + : rpcHandler_(rpcHandler) {} + + virtual ~Call() {} + + /** + * called by onCompleted() in super class. + * + * @param serviceHandler - an instance that has all rpc handlers. e.g. CommandService + */ + void requestReceived(ServiceHandler* serviceHandler) override { + rpcHandler(*this); + } + + /** + * + */ + void responseSent() override { + } + + /** + * + * @param status + */ + void sendResponse(const ::grpc::Status& status) { + responder_.Finish(response, status, &response_sent_tag_); + } + + /** + * + * @param serviceHandler + * @param cq + * @param requestMethod + * @param rpcHandler + */ + static void enqueueRequest(AsyncService* asyncService, + ::grpc::ServerCompletionQueue* cq, + RequestMethod requestMethod, + RpcHandler rpcHandler) { + auto call = new Call(rpcHandler); + + (asyncService->*requestMethod)(&call->ctx_, &call->request(), + &call->responder_, cq, cq, + &call->RequestReceivedTag); + } + + public: + auto& request() { return request_; } + auto& response() { return response_; } + + private: + const UntypedCall RequestReceivedTag { State::RequestCreated }; + const UntypedCall ResponseSentTag { State::ResponseSent }; + + private: + const RpcHandler rpcHandler_; + RequestType request_; + ResponseType response_; + ::grpc::ServerContext ctx_; + ::grpc::ServerAsyncResponseWriter responder_; + }; + +} // namespace network + +#endif // NETWORK_GRPC_CALL_HPP From ca2067523b0ab4838bb2b491d84a1e3de0a3c7c8 Mon Sep 17 00:00:00 2001 From: motxx Date: Thu, 13 Jul 2017 08:10:27 +0900 Subject: [PATCH 03/21] Add command_service_handler --- irohad/torii/command_service.cpp | 54 -------------- irohad/torii/command_service.hpp | 37 ---------- irohad/torii/command_service_handler.cpp | 82 +++++++++++++++++++++ irohad/torii/command_service_handler.hpp | 94 ++++++++++++++++++++++++ 4 files changed, 176 insertions(+), 91 deletions(-) delete mode 100644 irohad/torii/command_service.cpp delete mode 100644 irohad/torii/command_service.hpp create mode 100644 irohad/torii/command_service_handler.cpp create mode 100644 irohad/torii/command_service_handler.hpp diff --git a/irohad/torii/command_service.cpp b/irohad/torii/command_service.cpp deleted file mode 100644 index 962036504c..0000000000 --- a/irohad/torii/command_service.cpp +++ /dev/null @@ -1,54 +0,0 @@ -/* -Copyright 2017 Soramitsu Co., Ltd. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -#include "command_service.hpp" -#include -#include -#include - -namespace api { - - using namespace iroha::protocol; - - std::function dispatchToOrdering; - - void receive( - std::function const& func) { - dispatchToOrdering = func; - } - - grpc::Status CommandService::Torii(grpc::ServerContext* context, - const Transaction* request, - ToriiResponse* response) { - // TODO: Use this to get client's ip and port. - (void)context; - - /*if (validation::stateless::validate(*request)) { - dispatchToOrdering(*request); - // TODO: Return tracking log number (hash) - *response = ToriiResponse(); - response->set_code(ResponseCode::OK); - response->set_message("successfully dispatching to ordering."); - } else { - // TODO: Return validation failed message - *response = ToriiResponse(); - response->set_code(ResponseCode::FAIL); - response->set_message("failed stateless validation."); - }*/ - return grpc::Status::OK; - } - -} // namespace api diff --git a/irohad/torii/command_service.hpp b/irohad/torii/command_service.hpp deleted file mode 100644 index 4e0285a20e..0000000000 --- a/irohad/torii/command_service.hpp +++ /dev/null @@ -1,37 +0,0 @@ -/* -Copyright 2017 Soramitsu Co., Ltd. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -#ifndef API_COMMAND_SERVICE_HPP -#define API_COMMAND_SERVICE_HPP - -#include -#include - -namespace api { - - void receive(std::function); - - class CommandService final - : public iroha::protocol::CommandService::Service { - public: - grpc::Status Torii(grpc::ServerContext* context, - const iroha::protocol::Transaction* request, - iroha::protocol::ToriiResponse* response); - }; - -} // namespace api - -#endif // API_COMMAND_SERVICE_HPP \ No newline at end of file diff --git a/irohad/torii/command_service_handler.cpp b/irohad/torii/command_service_handler.cpp new file mode 100644 index 0000000000..a43328c1a9 --- /dev/null +++ b/irohad/torii/command_service_handler.cpp @@ -0,0 +1,82 @@ +/* +Copyright 2016 Soramitsu Co., Ltd. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + */ + +#include +#include +#include +#include + +namespace prot = iroha::protocol; + +namespace torii { + + /** + * to handle async rpcs of CommandService. + */ + /** + * requires builder to use same server. + * @param builder + */ + CommandServiceHandler::CommandServiceHandler(::grpc::ServerBuilder& builder) { + builder.RegisterService(&asyncService_); + cq_ = builder.AddCompletionQueue(); + } + + ~CommandServiceHandler::CommandServiceRpcsHandler() override { + bool didShutdown = false; + { std::unique_lock } + + if (didShutdown) { + // Alarm + } + } + + /** + * handles all rpc in CommandService. + * We use + */ + void CommandServiceHandler::handleRpcs() override { + enqueueRequest( + &prot::CommandService::AsyncService::RequestTorii, + &CommandServiceHandler::ToriiHandler); + + void* tag; + bool ok; + while (cq_->Next(&tag, &ok)) { + auto callbackTag = + static_cast*>(tag); + if (callbackTag) { + callbackTag->onCompleted(*this); + } else { + cq_->Shutdown(); + } + } + } + + /** + * releases the completion queue of CommandService. + * @note Call this method after calling server->Shutdown() in ServerRunner + */ + void CommandServiceHandler::shutdown() override { cq_->Shutdown(); } + + /** + * extracts request and response from Call instance + * and calls an actual CommandService::AsyncTorii() implementation. + * then, creates a new Call instance to serve an another client. + */ + void CommandServiceHandler::ToriiHandler() {} + +} // namespace torii diff --git a/irohad/torii/command_service_handler.hpp b/irohad/torii/command_service_handler.hpp new file mode 100644 index 0000000000..30b38c8f47 --- /dev/null +++ b/irohad/torii/command_service_handler.hpp @@ -0,0 +1,94 @@ +/* +Copyright 2017 Soramitsu Co., Ltd. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#ifndef TORII_COMMAND_SERVICE_HANDLER_HPP +#define TORII_COMMAND_SERVICE_HANDLER_HPP + +#include +#include + +namespace torii { + class CommandServiceHandler : public network::GrpcAsyncService { + public: + + /** + * requires builder to use same server. + * @param builder + */ + CommandServiceHandler(::grpc::ServerBuilder &builder); + + ~CommandServiceRpcsHandler() override; + + template + using CommandServiceCall = + network::Call< + CommandServiceHandler, + prot::CommandService::AsyncService, + RequestType, + ResponseType + >; + + /** + * handles all rpc in CommandService. + * We use + */ + void handleRpcs() override; + + /** + * releases the completion queue of CommandService. + * @note Call this method after calling server->Shutdown() in ServerRunner + */ + void shutdown() override; + + private: + + /** + * + * @param requester + */ + template + void enqueueRequest( + network::RequestMethod requester, + network::RpcHandler< + CommandServiceHandler, prot::CommandService::AsyncService, + RequestType, ResponseType> rpcHandler + ) { + std::unique_lock lock(mtx_); + if (!isShutdown_) { + CommandServiceCall::enqueueRequest( + &asyncService_, cq_.get(), requester, rpcHandler + ); + } + } + + /** + * extracts request and response from Call instance + * and calls an actual CommandService::AsyncTorii() implementation. + * then, creates a new Call instance to serve an another client. + */ + void ToriiHandler(); + + private: + iroha::protocol::CommandService::AsyncService asyncService_; + // TODO(motxx): Investigate a required number of completion queues if we use multiple services. + std::unique_ptr cq_; + std::mutex mtx_; // TODO(motxx): Write the reason of using mutex for ENQUEUE_REQUEST. + bool isShutdown_ = false; + }; +} // namespace torii + +#endif // TORII_COMMAND_SERVICE_HANDLER_HPP \ No newline at end of file From e13ed1b65a246c7697c8e716d2ebc17080aeb55a Mon Sep 17 00:00:00 2001 From: motxx Date: Thu, 13 Jul 2017 08:19:48 +0900 Subject: [PATCH 04/21] Add base implementation of command_service --- irohad/torii/command_service.cpp | 36 +++++++++++++++++++++++++ irohad/torii/command_service.hpp | 45 ++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+) create mode 100644 irohad/torii/command_service.cpp create mode 100644 irohad/torii/command_service.hpp diff --git a/irohad/torii/command_service.cpp b/irohad/torii/command_service.cpp new file mode 100644 index 0000000000..4256f29fa2 --- /dev/null +++ b/irohad/torii/command_service.cpp @@ -0,0 +1,36 @@ +/* +Copyright 2017 Soramitsu Co., Ltd. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include + +namespace torii { + + CommandService::CommandService() { + + } + + /** + * actual implementation of async Torii in CommandService + * @param request - Transaction + * @param response - ToriiResponse + * @return grpc::Status - Status::OK if succeeded (TODO(motxx): support Status::CANCELLED) + */ + grpc::Status CommandService::ToriiAsync( + iroha::protocol::Transaction const& request, iroha::protocol::ToriiResponse& response) { + + } + +} // namespace torii diff --git a/irohad/torii/command_service.hpp b/irohad/torii/command_service.hpp new file mode 100644 index 0000000000..9c428f6780 --- /dev/null +++ b/irohad/torii/command_service.hpp @@ -0,0 +1,45 @@ +/* +Copyright 2017 Soramitsu Co., Ltd. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#ifndef TORII_COMMAND_SERVICE_HPP +#define TORII_COMMAND_SERVICE_HPP + +#include +#include +#include + +namespace torii { + + /** + * Actual implementation of async CommandService. + * CommandServiceHandler::(SomeMethod)Handler calls a corresponding method in this class. + */ + class CommandService { + public: + CommandService(); + + /** + * actual implementation of async Torii in CommandService + * @param request - Transaction + * @param response - ToriiResponse + * @return grpc::Status - Status::OK if succeeded (TODO(motxx): support Status::CANCELLED) + */ + grpc::Status ToriiAsync(iroha::protocol::Transaction const& request, iroha::protocol::ToriiResponse& response); + }; + +} // namespace torii + +#endif // TORII_COMMAND_SERVICE_HPP From c92217ceae32117e7e1b5eac50d0a5d7c67aed07 Mon Sep 17 00:00:00 2001 From: motxx Date: Thu, 13 Jul 2017 08:41:04 +0900 Subject: [PATCH 05/21] Add comments and fix --- irohad/network/grpc_call.hpp | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/irohad/network/grpc_call.hpp b/irohad/network/grpc_call.hpp index ca5dd92d94..53d551ac13 100644 --- a/irohad/network/grpc_call.hpp +++ b/irohad/network/grpc_call.hpp @@ -44,8 +44,7 @@ namespace network { */ /** - * to enable various types to process in ServiceHandler::handleRpcs() - * + * to enable various Call instances to process in ServiceHandler::handleRpcs() by polymorphism. * @tparam ServiceHandler */ template @@ -57,16 +56,21 @@ namespace network { virtual ~UntypedCall() {} /** - * selects a procedure by state and invokes it by using polymorphism. - * @param serviceHandler + * invokes when state is RequestReceivedTag. + * @param serviceHandler - an instance that has all rpc handlers. e.g. CommandService */ virtual void requestReceived(ServiceHandler& serviceHandler) = 0; /** - * + * invokes when state is ResponseSentTag. */ virtual void responseSent() = 0; + /** + * selects a procedure by state and invokes it by using polymorphism. + * this is called from ServiceHandler::handleRpcs() + * @param serviceHandler - an instance that has all rpc handlers. e.g. CommandService + */ void onCompleted(ServiceHandler& serviceHandler) { switch (callback_) { case State::RequestCreated: { @@ -102,8 +106,8 @@ namespace network { virtual ~Call() {} /** - * called by onCompleted() in super class. - * + * invokes when state is RequestReceivedTag. + * this method is called by onCompleted() in super class (UntypedCall). * @param serviceHandler - an instance that has all rpc handlers. e.g. CommandService */ void requestReceived(ServiceHandler* serviceHandler) override { @@ -111,17 +115,19 @@ namespace network { } /** - * + * invokes when state is ResponseSentTag. + * this method is called by onCompleted() in super class (UntypedCall). + * @param serviceHandler - an instance that has all rpc handlers. e.g. CommandService */ void responseSent() override { } /** - * + * notifies response and grpc::Status when finishing handling rpc. * @param status */ void sendResponse(const ::grpc::Status& status) { - responder_.Finish(response, status, &response_sent_tag_); + responder_.Finish(response, status, &ResponseSentTag); } /** From d5b03bc0458895db8da27d2c99e2675e2df491e0 Mon Sep 17 00:00:00 2001 From: motxx Date: Thu, 13 Jul 2017 10:12:40 +0900 Subject: [PATCH 06/21] Update torii async services --- irohad/network/grpc_async_service.hpp | 3 +- irohad/network/grpc_call.hpp | 28 +++---------- irohad/torii/command_service.cpp | 12 +++--- irohad/torii/command_service.hpp | 6 +-- irohad/torii/command_service_handler.cpp | 51 ++++++++++++++++-------- irohad/torii/command_service_handler.hpp | 16 +++++--- 6 files changed, 60 insertions(+), 56 deletions(-) diff --git a/irohad/network/grpc_async_service.hpp b/irohad/network/grpc_async_service.hpp index b1593f75c7..a93919c2d6 100644 --- a/irohad/network/grpc_async_service.hpp +++ b/irohad/network/grpc_async_service.hpp @@ -61,6 +61,7 @@ namespace network { * and creates a new Call instance to serve new clients. */ template - using RpcHandler = std::function&)>; + using RpcHandler = void (ServiceHandler::*)( + Call* call); } // namespace network diff --git a/irohad/network/grpc_call.hpp b/irohad/network/grpc_call.hpp index 53d551ac13..140265ccd9 100644 --- a/irohad/network/grpc_call.hpp +++ b/irohad/network/grpc_call.hpp @@ -22,27 +22,6 @@ limitations under the License. namespace network { - /* - * TODO(motxx): Use this and clarify ownership. - class ReferenceCountable { - public: - void ref() { - count_++; - } - - void unref() { - assert(count > 0); - count_--; - if (!count_) { - delete this; - } - } - - private: - size_t count_ = 0; - }; - */ - /** * to enable various Call instances to process in ServiceHandler::handleRpcs() by polymorphism. * @tparam ServiceHandler @@ -72,7 +51,7 @@ namespace network { * @param serviceHandler - an instance that has all rpc handlers. e.g. CommandService */ void onCompleted(ServiceHandler& serviceHandler) { - switch (callback_) { + switch (state_) { case State::RequestCreated: { call_->requestReceived(serviceHandler); break; @@ -120,6 +99,8 @@ namespace network { * @param serviceHandler - an instance that has all rpc handlers. e.g. CommandService */ void responseSent() override { + // response has been sent and delete the Call instance. + delete this; } /** @@ -131,7 +112,8 @@ namespace network { } /** - * + * creates a Call instance for one rpc and enqueues it + * to the completion queue. * @param serviceHandler * @param cq * @param requestMethod diff --git a/irohad/torii/command_service.cpp b/irohad/torii/command_service.cpp index 4256f29fa2..5c87133ee5 100644 --- a/irohad/torii/command_service.cpp +++ b/irohad/torii/command_service.cpp @@ -18,19 +18,17 @@ limitations under the License. namespace torii { - CommandService::CommandService() { - - } - /** * actual implementation of async Torii in CommandService * @param request - Transaction * @param response - ToriiResponse - * @return grpc::Status - Status::OK if succeeded (TODO(motxx): support Status::CANCELLED) + * @return grpc::Status - Status::OK if succeeded. TODO(motxx): grpc::CANCELLED is not supported. */ - grpc::Status CommandService::ToriiAsync( + static grpc::Status CommandService::ToriiAsync( iroha::protocol::Transaction const& request, iroha::protocol::ToriiResponse& response) { - + response.set_code(iroha::protocol::ResponseCode::OK); + response.set_message("Torii async response"); + return grpc::Status::OK; } } // namespace torii diff --git a/irohad/torii/command_service.hpp b/irohad/torii/command_service.hpp index 9c428f6780..c9b72ee8d3 100644 --- a/irohad/torii/command_service.hpp +++ b/irohad/torii/command_service.hpp @@ -29,15 +29,13 @@ namespace torii { */ class CommandService { public: - CommandService(); - /** * actual implementation of async Torii in CommandService * @param request - Transaction * @param response - ToriiResponse - * @return grpc::Status - Status::OK if succeeded (TODO(motxx): support Status::CANCELLED) + * @return grpc::Status - Status::OK if succeeded. TODO(motxx): grpc::CANCELLED is not supported. */ - grpc::Status ToriiAsync(iroha::protocol::Transaction const& request, iroha::protocol::ToriiResponse& response); + static grpc::Status ToriiAsync(iroha::protocol::Transaction const& request, iroha::protocol::ToriiResponse& response); }; } // namespace torii diff --git a/irohad/torii/command_service_handler.cpp b/irohad/torii/command_service_handler.cpp index a43328c1a9..cff5284b49 100644 --- a/irohad/torii/command_service_handler.cpp +++ b/irohad/torii/command_service_handler.cpp @@ -18,14 +18,12 @@ limitations under the License. #include #include #include +#include namespace prot = iroha::protocol; namespace torii { - /** - * to handle async rpcs of CommandService. - */ /** * requires builder to use same server. * @param builder @@ -36,22 +34,38 @@ namespace torii { } ~CommandServiceHandler::CommandServiceRpcsHandler() override { + delete shutdownAlarm_; + } + + /** + * shuts down service handler. + * specifically, enqueues a special event that causes the completion queue to be shut down. + */ + void CommandServiceHandler::shutdown() override { bool didShutdown = false; - { std::unique_lock } + { + std::unique_lock lock(mtx_); + if (!isShutdown_) { + isShutdown_ = true; + didShutdown = true; + } + } if (didShutdown) { - // Alarm + // enqueue a special event that causes the completion queue to be shut down. + // tag is nullptr in order to determine no Call instance allocated when static_cast. + shutdownAlarm_ = new ::grpc::Alarm(cq_.get(), gpr_now(GPR_CLOCK_MONOTONIC), nullptr); } } /** - * handles all rpc in CommandService. - * We use + * handles rpcs loop in CommandService. */ void CommandServiceHandler::handleRpcs() override { enqueueRequest( &prot::CommandService::AsyncService::RequestTorii, - &CommandServiceHandler::ToriiHandler); + &CommandServiceHandler::ToriiHandler + ); void* tag; bool ok; @@ -61,22 +75,27 @@ namespace torii { if (callbackTag) { callbackTag->onCompleted(*this); } else { + // callbackTag is nullptr (a special event that causes shut down cq_) cq_->Shutdown(); } } } - /** - * releases the completion queue of CommandService. - * @note Call this method after calling server->Shutdown() in ServerRunner - */ - void CommandServiceHandler::shutdown() override { cq_->Shutdown(); } - /** * extracts request and response from Call instance * and calls an actual CommandService::AsyncTorii() implementation. - * then, creates a new Call instance to serve an another client. + * then, spawns a new Call instance to serve an another client. */ - void CommandServiceHandler::ToriiHandler() {} + void CommandServiceHandler::ToriiHandler(CommandServiceCall< + prot::Transaction, prot::ToriiResponse>* call) { + auto stat = CommandService::ToriiAsync(call->request(), call->response()); + call->sendResponse(stat); + + // Spawn a new Call instance to serve an another client. + enqueueRequest( + &prot::CommandService::AsyncService::RequestTorii, + &CommandServiceHandler::ToriiHandler + ); + } } // namespace torii diff --git a/irohad/torii/command_service_handler.hpp b/irohad/torii/command_service_handler.hpp index 30b38c8f47..af2514c6e7 100644 --- a/irohad/torii/command_service_handler.hpp +++ b/irohad/torii/command_service_handler.hpp @@ -17,10 +17,14 @@ limitations under the License. #ifndef TORII_COMMAND_SERVICE_HANDLER_HPP #define TORII_COMMAND_SERVICE_HANDLER_HPP +#include #include #include namespace torii { + /** + * to handle rpcs loop of CommandService. + */ class CommandServiceHandler : public network::GrpcAsyncService { public: @@ -42,8 +46,7 @@ namespace torii { >; /** - * handles all rpc in CommandService. - * We use + * handles rpcs loop in CommandService. */ void handleRpcs() override; @@ -56,8 +59,9 @@ namespace torii { private: /** - * - * @param requester + * helper to call Call::enqueueRequest() + * @param requester - pointer to request method. e.g. &CommandService::AsyncService::RequestTorii + * @param rpcHandler - handler of rpc in ServiceHandler. */ template void enqueueRequest( @@ -80,7 +84,8 @@ namespace torii { * and calls an actual CommandService::AsyncTorii() implementation. * then, creates a new Call instance to serve an another client. */ - void ToriiHandler(); + void ToriiHandler(CommandServiceCall< + iroha::protocol::Transaction, iroha::protocol::ToriiResponse>*); private: iroha::protocol::CommandService::AsyncService asyncService_; @@ -88,6 +93,7 @@ namespace torii { std::unique_ptr cq_; std::mutex mtx_; // TODO(motxx): Write the reason of using mutex for ENQUEUE_REQUEST. bool isShutdown_ = false; + ::grpc::Alarm* shutdownAlarm_ = nullptr; }; } // namespace torii From 16d1d7ddee447cb091b04405e4f3d309f6dd0090 Mon Sep 17 00:00:00 2001 From: motxx Date: Thu, 13 Jul 2017 13:51:47 +0900 Subject: [PATCH 07/21] Update ServerRunner --- irohad/main/server_runner.cpp | 15 ++-- irohad/main/server_runner.hpp | 25 ++----- .../consensus/connection/CMakeLists.txt | 29 -------- .../connection/consensus_connection_test.cpp | 71 ------------------- test/module/irohad/torii/torii_async_test.cpp | 0 5 files changed, 13 insertions(+), 127 deletions(-) delete mode 100644 test/module/irohad/consensus/connection/CMakeLists.txt delete mode 100644 test/module/irohad/consensus/connection/consensus_connection_test.cpp create mode 100644 test/module/irohad/torii/torii_async_test.cpp diff --git a/irohad/main/server_runner.cpp b/irohad/main/server_runner.cpp index 538481cb2c..74f2427c0a 100644 --- a/irohad/main/server_runner.cpp +++ b/irohad/main/server_runner.cpp @@ -18,23 +18,20 @@ limitations under the License. #include #include #include - -#include "server_runner.hpp" +#include
+#include logger::Logger console("ServerRunner"); -ServerRunner::ServerRunner(const std::string &ip, int port, - const std::vector &srvs) - : serverAddress_(ip + ":" + std::to_string(port)), services_(srvs) {} +ServerRunner::ServerRunner(const std::string &ip, int port) + : serverAddress_(ip + ":" + std::to_string(port)) {} void ServerRunner::run() { grpc::ServerBuilder builder; - // TODO(motxx): Is it ok to open same port for all services? builder.AddListeningPort(serverAddress_, grpc::InsecureServerCredentials()); - for (auto srv : services_) { - builder.RegisterService(srv); - } + + commandServiceHandler_ = std::make_unique(builder); waitForServer_.lock(); serverInstance_ = builder.BuildAndStart(); diff --git a/irohad/main/server_runner.hpp b/irohad/main/server_runner.hpp index 0e1454acd1..aa833c5933 100644 --- a/irohad/main/server_runner.hpp +++ b/irohad/main/server_runner.hpp @@ -16,25 +16,14 @@ limitations under the License. #include #include +#include -#ifndef CONNECTION_SERVER_RUNNER_HPP -#define CONNECTION_SERVER_RUNNER_HPP +#ifndef MAIN_SERVER_RUNNER_HPP +#define MAIN_SERVER_RUNNER_HPP -/** - * For easy replacing with mock server, we use the interface. - */ -class IServerRunner { +class ServerRunner { public: - virtual ~IServerRunner() = default; - virtual void run() = 0; - virtual void shutdown() = 0; - virtual bool waitForServersReady() = 0; -}; - -class ServerRunner final : public IServerRunner { -public: - ServerRunner(const std::string &ip, int port, - const std::vector &services); + ServerRunner(const std::string &ip, int port); void run(); void shutdown(); bool waitForServersReady(); @@ -45,7 +34,7 @@ class ServerRunner final : public IServerRunner { std::condition_variable serverInstanceCV_; std::string serverAddress_; - std::vector services_; + std::unique_ptr commandServiceHandler_; }; -#endif +#endif // MAIN_SERVER_RUNNER_HPP diff --git a/test/module/irohad/consensus/connection/CMakeLists.txt b/test/module/irohad/consensus/connection/CMakeLists.txt deleted file mode 100644 index d983087817..0000000000 --- a/test/module/irohad/consensus/connection/CMakeLists.txt +++ /dev/null @@ -1,29 +0,0 @@ -# Copyright 2017 Soramitsu Co., Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Consensus Connection Test -add_executable(consensus_connection_test - consensus_connection_test.cpp -) -target_link_libraries(consensus_connection_test - server_runner - consensus_connection - schema - endpoint - gtest -) -add_test( - NAME consensus_connection_test - COMMAND $ -) diff --git a/test/module/irohad/consensus/connection/consensus_connection_test.cpp b/test/module/irohad/consensus/connection/consensus_connection_test.cpp deleted file mode 100644 index 2d8e3b6c30..0000000000 --- a/test/module/irohad/consensus/connection/consensus_connection_test.cpp +++ /dev/null @@ -1,71 +0,0 @@ -/* -Copyright Soramitsu Co., Ltd. 2016 All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -#include -#include -#include -#include
-#include -#include - -namespace conn = consensus::connection; -using iroha::protocol::Block; - -class ConsensusConnectionTest : public ::testing::Test { - protected: - virtual void SetUp() { - serverRunner_.reset(new ServerRunner("0.0.0.0", 50051, { - &service_ - })); - running_ = false; - } - - virtual void TearDown() { - if (running_) { - serverRunner_->shutdown(); - serverThread_.join(); - } - } - - void RunServer() { - serverThread_ = std::thread(&IServerRunner::run, std::ref(*serverRunner_)); - serverRunner_->waitForServersReady(); - running_ = true; - } - - private: - bool running_; - conn::SumeragiService service_; - std::unique_ptr serverRunner_; - std::thread serverThread_; -}; - -/** - * Note: Async connection is WIP. - * Temporarily, we tests sync connection. - */ -TEST_F(ConsensusConnectionTest, FailConnectionWhenNotStandingServer) { - Block block; - auto response = conn::sendBlock(block, "0.0.0.0"); - ASSERT_EQ(response.code(), iroha::protocol::ResponseCode::FAIL); -} - -TEST_F(ConsensusConnectionTest, SuccessConnectionWhenStandingServer) { - RunServer(); - Block block; - auto response = conn::sendBlock(block, "0.0.0.0"); - ASSERT_EQ(response.code(), iroha::protocol::ResponseCode::OK); -} diff --git a/test/module/irohad/torii/torii_async_test.cpp b/test/module/irohad/torii/torii_async_test.cpp new file mode 100644 index 0000000000..e69de29bb2 From 86a7e599393df1f813a6d7f308a12d1c533abfe9 Mon Sep 17 00:00:00 2001 From: motxx Date: Thu, 13 Jul 2017 13:52:40 +0900 Subject: [PATCH 08/21] Fix compilation --- irohad/network/grpc_async_service.hpp | 7 ++- irohad/network/grpc_call.hpp | 54 ++++++++++++------- irohad/torii/CMakeLists.txt | 5 +- irohad/torii/command_service.cpp | 13 ----- irohad/torii/command_service.hpp | 7 ++- irohad/torii/command_service_handler.cpp | 14 ++--- irohad/torii/command_service_handler.hpp | 16 +++--- test/module/irohad/consensus/CMakeLists.txt | 2 - test/module/irohad/torii/CMakeLists.txt | 11 ++++ test/module/irohad/torii/torii_async_test.cpp | 22 ++++++++ 10 files changed, 99 insertions(+), 52 deletions(-) create mode 100644 test/module/irohad/torii/CMakeLists.txt diff --git a/irohad/network/grpc_async_service.hpp b/irohad/network/grpc_async_service.hpp index a93919c2d6..a97e40a0c5 100644 --- a/irohad/network/grpc_async_service.hpp +++ b/irohad/network/grpc_async_service.hpp @@ -14,6 +14,9 @@ See the License for the specific language governing permissions and limitations under the License. */ +#ifndef NETWORK_GRPC_ASYNC_SERVICE_HPP +#define NETWORK_GRPC_ASYNC_SERVICE_HPP + namespace network { template @@ -62,6 +65,8 @@ namespace network { */ template using RpcHandler = void (ServiceHandler::*)( - Call* call); + Call*); } // namespace network + +#endif // NETWORK_GRPC_ASYNC_SERVICE_HPP diff --git a/irohad/network/grpc_call.hpp b/irohad/network/grpc_call.hpp index 140265ccd9..cf651c2a10 100644 --- a/irohad/network/grpc_call.hpp +++ b/irohad/network/grpc_call.hpp @@ -19,6 +19,7 @@ limitations under the License. #include #include +#include namespace network { @@ -29,8 +30,12 @@ namespace network { template class UntypedCall { public: - UntypedCall(State const& state) - : state_(state) {} + + enum class State { RequestCreated, ResponseSent }; + + UntypedCall(State state) + : state_(state) { + } virtual ~UntypedCall() {} @@ -38,33 +43,35 @@ namespace network { * invokes when state is RequestReceivedTag. * @param serviceHandler - an instance that has all rpc handlers. e.g. CommandService */ - virtual void requestReceived(ServiceHandler& serviceHandler) = 0; + virtual void requestReceived(ServiceHandler* serviceHandler) { + assert(false && "Concrete class is not allocated."); + } /** * invokes when state is ResponseSentTag. */ - virtual void responseSent() = 0; + virtual void responseSent() { + assert(false && "Concrete class is not allocated."); + } /** * selects a procedure by state and invokes it by using polymorphism. * this is called from ServiceHandler::handleRpcs() * @param serviceHandler - an instance that has all rpc handlers. e.g. CommandService */ - void onCompleted(ServiceHandler& serviceHandler) { + void onCompleted(ServiceHandler* serviceHandler) { switch (state_) { case State::RequestCreated: { - call_->requestReceived(serviceHandler); + requestReceived(serviceHandler); break; } case State::ResponseSent: { - call_->responseSent(); + responseSent(); break; } } } - enum class State { RequestCreated, ResponseSent }; - private: const State state_; }; @@ -79,8 +86,15 @@ namespace network { template class Call : public UntypedCall { public: - Call(RpcHandler const& rpcHandler) - : rpcHandler_(rpcHandler) {} + + using RpcHandlerType = network::RpcHandler; + using RequestMethodType = network::RequestMethod; + using CallType = Call; + using UntypedCallType = UntypedCall; + + Call(RpcHandlerType rpcHandler) + : UntypedCall(UntypedCallType::State::RequestCreated), + rpcHandler_(rpcHandler), responder_(&ctx_) {} virtual ~Call() {} @@ -90,7 +104,7 @@ namespace network { * @param serviceHandler - an instance that has all rpc handlers. e.g. CommandService */ void requestReceived(ServiceHandler* serviceHandler) override { - rpcHandler(*this); + (serviceHandler->*rpcHandler_)(this); } /** @@ -107,8 +121,8 @@ namespace network { * notifies response and grpc::Status when finishing handling rpc. * @param status */ - void sendResponse(const ::grpc::Status& status) { - responder_.Finish(response, status, &ResponseSentTag); + void sendResponse(::grpc::Status status) { + responder_.Finish(response_, status, &ResponseSentTag); } /** @@ -121,9 +135,9 @@ namespace network { */ static void enqueueRequest(AsyncService* asyncService, ::grpc::ServerCompletionQueue* cq, - RequestMethod requestMethod, - RpcHandler rpcHandler) { - auto call = new Call(rpcHandler); + RequestMethodType requestMethod, + RpcHandlerType rpcHandler) { + auto call = new CallType(rpcHandler); (asyncService->*requestMethod)(&call->ctx_, &call->request(), &call->responder_, cq, cq, @@ -135,11 +149,11 @@ namespace network { auto& response() { return response_; } private: - const UntypedCall RequestReceivedTag { State::RequestCreated }; - const UntypedCall ResponseSentTag { State::ResponseSent }; + UntypedCallType RequestReceivedTag { UntypedCallType::State::RequestCreated }; + UntypedCallType ResponseSentTag { UntypedCallType::State::ResponseSent }; private: - const RpcHandler rpcHandler_; + RpcHandlerType rpcHandler_; RequestType request_; ResponseType response_; ::grpc::ServerContext ctx_; diff --git a/irohad/torii/CMakeLists.txt b/irohad/torii/CMakeLists.txt index 43a374888d..59184c2c1a 100644 --- a/irohad/torii/CMakeLists.txt +++ b/irohad/torii/CMakeLists.txt @@ -18,7 +18,10 @@ target_link_libraries(command_client endpoint ) -add_library(command_service command_service.cpp) +add_library(command_service + command_service.cpp + command_service_handler.cpp + ) target_link_libraries(command_service endpoint stateless_validator diff --git a/irohad/torii/command_service.cpp b/irohad/torii/command_service.cpp index 5c87133ee5..07e49f1cc1 100644 --- a/irohad/torii/command_service.cpp +++ b/irohad/torii/command_service.cpp @@ -18,17 +18,4 @@ limitations under the License. namespace torii { - /** - * actual implementation of async Torii in CommandService - * @param request - Transaction - * @param response - ToriiResponse - * @return grpc::Status - Status::OK if succeeded. TODO(motxx): grpc::CANCELLED is not supported. - */ - static grpc::Status CommandService::ToriiAsync( - iroha::protocol::Transaction const& request, iroha::protocol::ToriiResponse& response) { - response.set_code(iroha::protocol::ResponseCode::OK); - response.set_message("Torii async response"); - return grpc::Status::OK; - } - } // namespace torii diff --git a/irohad/torii/command_service.hpp b/irohad/torii/command_service.hpp index c9b72ee8d3..3120427f30 100644 --- a/irohad/torii/command_service.hpp +++ b/irohad/torii/command_service.hpp @@ -35,7 +35,12 @@ namespace torii { * @param response - ToriiResponse * @return grpc::Status - Status::OK if succeeded. TODO(motxx): grpc::CANCELLED is not supported. */ - static grpc::Status ToriiAsync(iroha::protocol::Transaction const& request, iroha::protocol::ToriiResponse& response); + static grpc::Status ToriiAsync( + iroha::protocol::Transaction const& request, iroha::protocol::ToriiResponse& response) { + response.set_code(iroha::protocol::ResponseCode::OK); + response.set_message("Torii async response"); + return grpc::Status::OK; + } }; } // namespace torii diff --git a/irohad/torii/command_service_handler.cpp b/irohad/torii/command_service_handler.cpp index cff5284b49..a1e6b35b23 100644 --- a/irohad/torii/command_service_handler.cpp +++ b/irohad/torii/command_service_handler.cpp @@ -25,7 +25,7 @@ namespace prot = iroha::protocol; namespace torii { /** - * requires builder to use same server. + * registers async command service * @param builder */ CommandServiceHandler::CommandServiceHandler(::grpc::ServerBuilder& builder) { @@ -33,7 +33,7 @@ namespace torii { cq_ = builder.AddCompletionQueue(); } - ~CommandServiceHandler::CommandServiceRpcsHandler() override { + CommandServiceHandler::~CommandServiceHandler() { delete shutdownAlarm_; } @@ -41,7 +41,7 @@ namespace torii { * shuts down service handler. * specifically, enqueues a special event that causes the completion queue to be shut down. */ - void CommandServiceHandler::shutdown() override { + void CommandServiceHandler::shutdown() { bool didShutdown = false; { std::unique_lock lock(mtx_); @@ -61,7 +61,7 @@ namespace torii { /** * handles rpcs loop in CommandService. */ - void CommandServiceHandler::handleRpcs() override { + void CommandServiceHandler::handleRpcs() { enqueueRequest( &prot::CommandService::AsyncService::RequestTorii, &CommandServiceHandler::ToriiHandler @@ -73,7 +73,7 @@ namespace torii { auto callbackTag = static_cast*>(tag); if (callbackTag) { - callbackTag->onCompleted(*this); + callbackTag->onCompleted(this); } else { // callbackTag is nullptr (a special event that causes shut down cq_) cq_->Shutdown(); @@ -86,8 +86,8 @@ namespace torii { * and calls an actual CommandService::AsyncTorii() implementation. * then, spawns a new Call instance to serve an another client. */ - void CommandServiceHandler::ToriiHandler(CommandServiceCall< - prot::Transaction, prot::ToriiResponse>* call) { + void CommandServiceHandler::ToriiHandler( + CommandServiceCall* call) { auto stat = CommandService::ToriiAsync(call->request(), call->response()); call->sendResponse(stat); diff --git a/irohad/torii/command_service_handler.hpp b/irohad/torii/command_service_handler.hpp index af2514c6e7..b524b8869c 100644 --- a/irohad/torii/command_service_handler.hpp +++ b/irohad/torii/command_service_handler.hpp @@ -18,8 +18,10 @@ limitations under the License. #define TORII_COMMAND_SERVICE_HANDLER_HPP #include +#include #include #include +#include namespace torii { /** @@ -34,13 +36,13 @@ namespace torii { */ CommandServiceHandler(::grpc::ServerBuilder &builder); - ~CommandServiceRpcsHandler() override; + virtual ~CommandServiceHandler() override; template using CommandServiceCall = network::Call< CommandServiceHandler, - prot::CommandService::AsyncService, + iroha::protocol::CommandService::AsyncService, RequestType, ResponseType >; @@ -48,13 +50,13 @@ namespace torii { /** * handles rpcs loop in CommandService. */ - void handleRpcs() override; + virtual void handleRpcs() override; /** * releases the completion queue of CommandService. * @note Call this method after calling server->Shutdown() in ServerRunner */ - void shutdown() override; + virtual void shutdown() override; private: @@ -65,15 +67,15 @@ namespace torii { */ template void enqueueRequest( - network::RequestMethod requester, network::RpcHandler< - CommandServiceHandler, prot::CommandService::AsyncService, + CommandServiceHandler, iroha::protocol::CommandService::AsyncService, RequestType, ResponseType> rpcHandler ) { std::unique_lock lock(mtx_); if (!isShutdown_) { - CommandServiceCall::enqueueRequest( + CommandServiceCall::enqueueRequest( &asyncService_, cq_.get(), requester, rpcHandler ); } diff --git a/test/module/irohad/consensus/CMakeLists.txt b/test/module/irohad/consensus/CMakeLists.txt index 835b37326b..50150bc167 100644 --- a/test/module/irohad/consensus/CMakeLists.txt +++ b/test/module/irohad/consensus/CMakeLists.txt @@ -11,5 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -add_subdirectory(connection) diff --git a/test/module/irohad/torii/CMakeLists.txt b/test/module/irohad/torii/CMakeLists.txt new file mode 100644 index 0000000000..831046af58 --- /dev/null +++ b/test/module/irohad/torii/CMakeLists.txt @@ -0,0 +1,11 @@ +# Torii Test +add_executable(torii_async_test + torii_async_test.cpp + ) +target_link_libraries(torii_async_test + gtest + ) +add_test( + NAME torii_async_test + COMMAND $ +) diff --git a/test/module/irohad/torii/torii_async_test.cpp b/test/module/irohad/torii/torii_async_test.cpp index e69de29bb2..55eb5d79f2 100644 --- a/test/module/irohad/torii/torii_async_test.cpp +++ b/test/module/irohad/torii/torii_async_test.cpp @@ -0,0 +1,22 @@ +/* +Copyright Soramitsu Co., Ltd. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include +#include
+ +TEST(ToriiAsyncTest, GetToriiResponseWhenSendingTx) { + ServerRunner runner("0.0.0.0", 50051); +} From 7cb4b99ee29f424b3113cb96493879d91389f626 Mon Sep 17 00:00:00 2001 From: motxx Date: Thu, 13 Jul 2017 20:04:01 +0900 Subject: [PATCH 09/21] [WIP] todo fix segfault when server shuts down --- irohad/main/CMakeLists.txt | 41 ++++++------ irohad/main/server_runner.cpp | 8 ++- irohad/network/grpc_call.hpp | 63 ++++++++--------- irohad/torii/CMakeLists.txt | 6 +- irohad/torii/command_client.cpp | 67 ++++++++++++------- irohad/torii/command_client.hpp | 29 +++----- irohad/torii/command_service_handler.cpp | 5 +- irohad/torii/command_service_handler.hpp | 3 +- test/module/irohad/CMakeLists.txt | 3 +- test/module/irohad/torii/CMakeLists.txt | 3 + test/module/irohad/torii/torii_async_test.cpp | 26 ++++++- 11 files changed, 152 insertions(+), 102 deletions(-) diff --git a/irohad/main/CMakeLists.txt b/irohad/main/CMakeLists.txt index 89021ebad1..54d00a02cd 100644 --- a/irohad/main/CMakeLists.txt +++ b/irohad/main/CMakeLists.txt @@ -16,29 +16,30 @@ set(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/bin) add_library(server_runner server_runner.cpp) target_link_libraries(server_runner - logger - endpoint - schema - ) + command_service + logger + endpoint + schema +) add_library(irohad application.cpp) target_link_libraries(irohad - server_runner - model - ametsuchi - networking - ordering_service - consensus_service - chain_validator - stateful_validator - hash - stateless_validator - client_processor - torii - crypto - ) + server_runner + model + ametsuchi + networking + ordering_service + consensus_service + chain_validator + stateful_validator + hash + stateless_validator + client_processor + torii + crypto +) add_executable(iroha-main iroha-main.cpp) target_link_libraries(iroha-main - irohad - ) \ No newline at end of file + irohad +) \ No newline at end of file diff --git a/irohad/main/server_runner.cpp b/irohad/main/server_runner.cpp index 74f2427c0a..b499765b37 100644 --- a/irohad/main/server_runner.cpp +++ b/irohad/main/server_runner.cpp @@ -40,10 +40,14 @@ void ServerRunner::run() { console.info("Server listening on {}", serverAddress_); - serverInstance_->Wait(); + // proceed to server's main loop + commandServiceHandler_->handleRpcs(); } -void ServerRunner::shutdown() { serverInstance_->Shutdown(); } +void ServerRunner::shutdown() { + commandServiceHandler_->shutdown(); + serverInstance_->Shutdown(); +} bool ServerRunner::waitForServersReady() { std::unique_lock lock(waitForServer_); diff --git a/irohad/network/grpc_call.hpp b/irohad/network/grpc_call.hpp index cf651c2a10..22f3863d66 100644 --- a/irohad/network/grpc_call.hpp +++ b/irohad/network/grpc_call.hpp @@ -30,50 +30,51 @@ namespace network { template class UntypedCall { public: + virtual ~UntypedCall() {} enum class State { RequestCreated, ResponseSent }; - UntypedCall(State state) - : state_(state) { - } - - virtual ~UntypedCall() {} - /** * invokes when state is RequestReceivedTag. * @param serviceHandler - an instance that has all rpc handlers. e.g. CommandService */ - virtual void requestReceived(ServiceHandler* serviceHandler) { - assert(false && "Concrete class is not allocated."); - } + virtual void requestReceived(ServiceHandler* serviceHandler) = 0; /** * invokes when state is ResponseSentTag. */ - virtual void responseSent() { - assert(false && "Concrete class is not allocated."); - } + virtual void responseSent() = 0; /** - * selects a procedure by state and invokes it by using polymorphism. - * this is called from ServiceHandler::handleRpcs() - * @param serviceHandler - an instance that has all rpc handlers. e.g. CommandService + * owns concrete Call type and able to execute derived functions. */ - void onCompleted(ServiceHandler* serviceHandler) { - switch (state_) { - case State::RequestCreated: { - requestReceived(serviceHandler); - break; - } - case State::ResponseSent: { - responseSent(); - break; + class CallOwner { + public: + CallOwner(UntypedCall* call, UntypedCall::State state) + : call_(call), state_(state) {} + + /** + * selects a procedure by state and invokes it by using polymorphism. + * this is called from ServiceHandler::handleRpcs() + * @param serviceHandler - an instance that has all rpc handlers. e.g. CommandService + */ + void onCompleted(ServiceHandler *serviceHandler) { + switch (state_) { + case UntypedCall::State::RequestCreated: { + call_->requestReceived(serviceHandler); + break; + } + case UntypedCall::State::ResponseSent: { + call_->responseSent(); + break; + } } } - } - private: - const State state_; + private: + UntypedCall* call_; // owns concrete Call type. + const UntypedCall::State state_; + }; }; /** @@ -91,10 +92,10 @@ namespace network { using RequestMethodType = network::RequestMethod; using CallType = Call; using UntypedCallType = UntypedCall; + using CallOwnerType = typename UntypedCallType::CallOwner; Call(RpcHandlerType rpcHandler) - : UntypedCall(UntypedCallType::State::RequestCreated), - rpcHandler_(rpcHandler), responder_(&ctx_) {} + : rpcHandler_(rpcHandler), responder_(&ctx_) {} virtual ~Call() {} @@ -149,8 +150,8 @@ namespace network { auto& response() { return response_; } private: - UntypedCallType RequestReceivedTag { UntypedCallType::State::RequestCreated }; - UntypedCallType ResponseSentTag { UntypedCallType::State::ResponseSent }; + CallOwnerType RequestReceivedTag { this, UntypedCallType::State::RequestCreated }; + CallOwnerType ResponseSentTag { this, UntypedCallType::State::ResponseSent }; private: RpcHandlerType rpcHandler_; diff --git a/irohad/torii/CMakeLists.txt b/irohad/torii/CMakeLists.txt index 59184c2c1a..794df3d263 100644 --- a/irohad/torii/CMakeLists.txt +++ b/irohad/torii/CMakeLists.txt @@ -13,7 +13,9 @@ # limitations under the License. add_subdirectory(processor) -add_library(command_client command_client.cpp) +add_library(command_client + command_client.cpp +) target_link_libraries(command_client endpoint ) @@ -21,7 +23,7 @@ target_link_libraries(command_client add_library(command_service command_service.cpp command_service_handler.cpp - ) +) target_link_libraries(command_service endpoint stateless_validator diff --git a/irohad/torii/command_client.cpp b/irohad/torii/command_client.cpp index c76d473cf6..2bfe399499 100644 --- a/irohad/torii/command_client.cpp +++ b/irohad/torii/command_client.cpp @@ -11,43 +11,64 @@ See the License for the specific language governing permissions and limitations under the License. */ -#include "command_client.hpp" +#include +#include +#include #include #include /* * This client is used by peer service sending tx to change state peers. */ -namespace api { +namespace torii { using iroha::protocol::Transaction; using iroha::protocol::ToriiResponse; - ToriiResponse sendTransaction(const Transaction& tx, - const std::string& targetPeerIp) { - CommandClient client(targetPeerIp, 50051); // TODO: Get port from config - return client.Torii(tx); - } + class CommandClient { + public: + CommandClient(const std::string& ip, int port) + : stub_(iroha::protocol::CommandService::NewStub( + grpc::CreateChannel(ip + ":" + std::to_string(port), grpc::InsecureChannelCredentials()))) + {} - CommandClient::CommandClient(const std::string& ip, int port) { - // TODO(motxx): call validation of ip format and port. - auto channel = grpc::CreateChannel(ip + ":" + std::to_string(port), - grpc::InsecureChannelCredentials()); - stub_ = iroha::protocol::CommandService::NewStub(channel); - } + ToriiResponse Torii(const Transaction& tx) { + ToriiResponse response; - ToriiResponse CommandClient::Torii(const Transaction& tx) { - ToriiResponse response; - auto status = stub_->Torii(&context_, tx, &response); + std::unique_ptr> rpc( + stub_->AsyncTorii(&context_, tx, &cq_) + ); - if (status.ok()) { - return response; - } else { - response.Clear(); - response.set_code(iroha::protocol::FAIL); - response.set_message("connection failed. cannot send transaction."); + using State = network::UntypedCall::State; + + rpc->Finish(&response, &status_, (void *)static_cast(State::ResponseSent)); + + void* got_tag; + bool ok = false; + assert(cq_.Next(&got_tag, &ok)); + assert(got_tag == (void *)static_cast(State::ResponseSent)); + assert(ok); + + if (status_.ok()) { + return response; + } + response.set_code(iroha::protocol::ResponseCode::FAIL); + response.set_message("RPC failed"); return response; } + + private: + grpc::ClientContext context_; + std::unique_ptr stub_; + grpc::CompletionQueue cq_; + grpc::Status status_; + }; + + ToriiResponse sendTransaction(const Transaction& tx, + const std::string& targetPeerIp, + int targetPeerPort) { + CommandClient client(targetPeerIp, targetPeerPort); + return client.Torii(tx); } -} // namespace api +} // namespace torii diff --git a/irohad/torii/command_client.hpp b/irohad/torii/command_client.hpp index ef4cc5259d..5ed9c69ad6 100644 --- a/irohad/torii/command_client.hpp +++ b/irohad/torii/command_client.hpp @@ -14,30 +14,21 @@ See the License for the specific language governing permissions and limitations under the License. */ -#ifndef API_COMMAND_CLIENT_HPP -#define API_COMMAND_CLIENT_HPP +#ifndef TORII_COMMAND_CLIENT_HPP +#define TORII_COMMAND_CLIENT_HPP #include #include +#include +#include -namespace api { +namespace torii { iroha::protocol::ToriiResponse sendTransaction( - const iroha::protocol::Transaction& block, - const std::string& targetPeerIp); + const iroha::protocol::Transaction& tx, + const std::string& targetPeerIp, + int targetPeerPort); - class CommandClient { - public: - CommandClient(const std::string& ip, int port); +} // namespace torii - iroha::protocol::ToriiResponse Torii( - const iroha::protocol::Transaction& request); - - private: - grpc::ClientContext context_; - std::unique_ptr stub_; - }; - -} // namespace api - -#endif // API_COMMAND_CLIENT_HPP \ No newline at end of file +#endif // TORII_COMMAND_CLIENT_HPP diff --git a/irohad/torii/command_service_handler.cpp b/irohad/torii/command_service_handler.cpp index a1e6b35b23..e32fad675b 100644 --- a/irohad/torii/command_service_handler.cpp +++ b/irohad/torii/command_service_handler.cpp @@ -52,6 +52,7 @@ namespace torii { } if (didShutdown) { + // std::cout << "throw alarm\n"; // enqueue a special event that causes the completion queue to be shut down. // tag is nullptr in order to determine no Call instance allocated when static_cast. shutdownAlarm_ = new ::grpc::Alarm(cq_.get(), gpr_now(GPR_CLOCK_MONOTONIC), nullptr); @@ -71,10 +72,12 @@ namespace torii { bool ok; while (cq_->Next(&tag, &ok)) { auto callbackTag = - static_cast*>(tag); + static_cast::CallOwner*>(tag); if (callbackTag) { + // std::cout << "Serve\n"; callbackTag->onCompleted(this); } else { + // std::cout << "Shutdown\n"; // callbackTag is nullptr (a special event that causes shut down cq_) cq_->Shutdown(); } diff --git a/irohad/torii/command_service_handler.hpp b/irohad/torii/command_service_handler.hpp index b524b8869c..f736db9ac6 100644 --- a/irohad/torii/command_service_handler.hpp +++ b/irohad/torii/command_service_handler.hpp @@ -91,9 +91,8 @@ namespace torii { private: iroha::protocol::CommandService::AsyncService asyncService_; - // TODO(motxx): Investigate a required number of completion queues if we use multiple services. std::unique_ptr cq_; - std::mutex mtx_; // TODO(motxx): Write the reason of using mutex for ENQUEUE_REQUEST. + std::mutex mtx_; bool isShutdown_ = false; ::grpc::Alarm* shutdownAlarm_ = nullptr; }; diff --git a/test/module/irohad/CMakeLists.txt b/test/module/irohad/CMakeLists.txt index 8a63d291e5..c7fec40b26 100644 --- a/test/module/irohad/CMakeLists.txt +++ b/test/module/irohad/CMakeLists.txt @@ -22,4 +22,5 @@ add_subdirectory(ametsuchi) add_subdirectory(consensus) add_subdirectory(logger) add_subdirectory(peer_service) -add_subdirectory(validation) \ No newline at end of file +add_subdirectory(validation) +add_subdirectory(torii) diff --git a/test/module/irohad/torii/CMakeLists.txt b/test/module/irohad/torii/CMakeLists.txt index 831046af58..654c5c4b97 100644 --- a/test/module/irohad/torii/CMakeLists.txt +++ b/test/module/irohad/torii/CMakeLists.txt @@ -3,6 +3,9 @@ add_executable(torii_async_test torii_async_test.cpp ) target_link_libraries(torii_async_test + command_service + command_client + server_runner gtest ) add_test( diff --git a/test/module/irohad/torii/torii_async_test.cpp b/test/module/irohad/torii/torii_async_test.cpp index 55eb5d79f2..477e1f8d56 100644 --- a/test/module/irohad/torii/torii_async_test.cpp +++ b/test/module/irohad/torii/torii_async_test.cpp @@ -16,7 +16,31 @@ limitations under the License. #include #include
+#include +#include +#include +#include +#include + +const std::string Ip = "0.0.0.0"; +const int Port = 50051; TEST(ToriiAsyncTest, GetToriiResponseWhenSendingTx) { - ServerRunner runner("0.0.0.0", 50051); + ServerRunner runner(Ip, Port); + std::thread th([&runner]{ + runner.run(); + }); + + runner.waitForServersReady(); + + for (int i = 0; i < 100; i++) { + std::cout << i << std::endl; + auto response = torii::sendTransaction(iroha::protocol::Transaction {}, Ip, Port); + ASSERT_EQ(response.code(), iroha::protocol::ResponseCode::OK); + ASSERT_STREQ(response.message().c_str(), "Torii async response"); + } + + // TODO(motxx): Segmentation fault occurs because an event doesn't executed that causes completion queue to be shut down. + runner.shutdown(); + th.join(); } From 5e57e34c21f567c90511cc3947616c567849ba01 Mon Sep 17 00:00:00 2001 From: motxx Date: Thu, 13 Jul 2017 21:13:49 +0900 Subject: [PATCH 10/21] Minor fix --- irohad/torii/command_client.cpp | 38 +++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/irohad/torii/command_client.cpp b/irohad/torii/command_client.cpp index 2bfe399499..638311ff56 100644 --- a/irohad/torii/command_client.cpp +++ b/irohad/torii/command_client.cpp @@ -17,14 +17,14 @@ limitations under the License. #include #include -/* - * This client is used by peer service sending tx to change state peers. - */ namespace torii { using iroha::protocol::Transaction; using iroha::protocol::ToriiResponse; + /** + * CommandClient is used by peer service. + */ class CommandClient { public: CommandClient(const std::string& ip, int port) @@ -32,7 +32,12 @@ namespace torii { grpc::CreateChannel(ip + ":" + std::to_string(port), grpc::InsecureChannelCredentials()))) {} - ToriiResponse Torii(const Transaction& tx) { + /** + * requests tx to a torii server and returns response (blocking, sync) + * @param tx + * @return ToriiResponse + */ + ToriiResponse ToriiBlocking(const Transaction& tx) { ToriiResponse response; std::unique_ptr> rpc( @@ -45,7 +50,11 @@ namespace torii { void* got_tag; bool ok = false; - assert(cq_.Next(&got_tag, &ok)); + + if (!cq_.Next(&got_tag, &ok)) { // CompletionQueue::Next() is blocking. + throw std::runtime_error("CompletionQueue::Next() returns error"); + } + assert(got_tag == (void *)static_cast(State::ResponseSent)); assert(ok); @@ -57,6 +66,13 @@ namespace torii { return response; } + // TODO(motxx): ToriiNonBlocking + void ToriiNonBlocking(const Transaction& tx) { + std::unique_ptr> rpc( + stub_->AsyncTorii(&context_, tx, &cq_) + ); + } + private: grpc::ClientContext context_; std::unique_ptr stub_; @@ -64,11 +80,15 @@ namespace torii { grpc::Status status_; }; - ToriiResponse sendTransaction(const Transaction& tx, - const std::string& targetPeerIp, - int targetPeerPort) { + ToriiResponse sendTransactionBlocking(const Transaction& tx, + const std::string& targetPeerIp, + int targetPeerPort) { CommandClient client(targetPeerIp, targetPeerPort); - return client.Torii(tx); + return client.ToriiBlocking(tx); } + /* + TODO(motxx): sendTransactionNonBlocking() + */ + } // namespace torii From 9238b74586d029007e00076d8671a7c6e8193059 Mon Sep 17 00:00:00 2001 From: motxx Date: Thu, 13 Jul 2017 22:48:39 +0900 Subject: [PATCH 11/21] Add waiting shutting down completion queue --- irohad/main/server_runner.cpp | 2 ++ irohad/torii/command_service_handler.cpp | 10 +++++----- irohad/torii/command_service_handler.hpp | 10 ++++++++-- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/irohad/main/server_runner.cpp b/irohad/main/server_runner.cpp index b499765b37..04403949d8 100644 --- a/irohad/main/server_runner.cpp +++ b/irohad/main/server_runner.cpp @@ -46,6 +46,8 @@ void ServerRunner::run() { void ServerRunner::shutdown() { commandServiceHandler_->shutdown(); + while (!commandServiceHandler_->isShutdownCompletionQueue()) + usleep(1); // wait for shutting down completion queue serverInstance_->Shutdown(); } diff --git a/irohad/torii/command_service_handler.cpp b/irohad/torii/command_service_handler.cpp index e32fad675b..b87881d093 100644 --- a/irohad/torii/command_service_handler.cpp +++ b/irohad/torii/command_service_handler.cpp @@ -19,6 +19,7 @@ limitations under the License. #include #include #include +#include namespace prot = iroha::protocol; @@ -52,7 +53,6 @@ namespace torii { } if (didShutdown) { - // std::cout << "throw alarm\n"; // enqueue a special event that causes the completion queue to be shut down. // tag is nullptr in order to determine no Call instance allocated when static_cast. shutdownAlarm_ = new ::grpc::Alarm(cq_.get(), gpr_now(GPR_CLOCK_MONOTONIC), nullptr); @@ -73,13 +73,13 @@ namespace torii { while (cq_->Next(&tag, &ok)) { auto callbackTag = static_cast::CallOwner*>(tag); - if (callbackTag) { - // std::cout << "Serve\n"; + if (ok && callbackTag) { callbackTag->onCompleted(this); } else { - // std::cout << "Shutdown\n"; - // callbackTag is nullptr (a special event that causes shut down cq_) + // callbackTag is nullptr (and) ok is false + // if the queue is shutting down. cq_->Shutdown(); + isShutdownCompletionQueue_ = true; } } } diff --git a/irohad/torii/command_service_handler.hpp b/irohad/torii/command_service_handler.hpp index f736db9ac6..fe262bdbec 100644 --- a/irohad/torii/command_service_handler.hpp +++ b/irohad/torii/command_service_handler.hpp @@ -58,6 +58,11 @@ namespace torii { */ virtual void shutdown() override; + /** + * @return true if completion queue has been shut down. + */ + bool isShutdownCompletionQueue() const { return isShutdownCompletionQueue_; } + private: /** @@ -93,9 +98,10 @@ namespace torii { iroha::protocol::CommandService::AsyncService asyncService_; std::unique_ptr cq_; std::mutex mtx_; - bool isShutdown_ = false; + bool isShutdown_ = false; // called shutdown() + bool isShutdownCompletionQueue_ = false; // called cq_->Shutdown() ::grpc::Alarm* shutdownAlarm_ = nullptr; }; } // namespace torii -#endif // TORII_COMMAND_SERVICE_HANDLER_HPP \ No newline at end of file +#endif // TORII_COMMAND_SERVICE_HANDLER_HPP From 68a69bfe41792c0d7b6207054e2bdd7973c63c39 Mon Sep 17 00:00:00 2001 From: motxx Date: Thu, 13 Jul 2017 22:49:06 +0900 Subject: [PATCH 12/21] Minor fix --- irohad/network/grpc_call.hpp | 2 +- irohad/torii/command_client.hpp | 2 +- test/module/irohad/torii/torii_async_test.cpp | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/irohad/network/grpc_call.hpp b/irohad/network/grpc_call.hpp index 22f3863d66..6354610a03 100644 --- a/irohad/network/grpc_call.hpp +++ b/irohad/network/grpc_call.hpp @@ -46,7 +46,7 @@ namespace network { virtual void responseSent() = 0; /** - * owns concrete Call type and able to execute derived functions. + * owns concrete Call type and executes derived functions. */ class CallOwner { public: diff --git a/irohad/torii/command_client.hpp b/irohad/torii/command_client.hpp index 5ed9c69ad6..eeccc12b53 100644 --- a/irohad/torii/command_client.hpp +++ b/irohad/torii/command_client.hpp @@ -24,7 +24,7 @@ limitations under the License. namespace torii { - iroha::protocol::ToriiResponse sendTransaction( + iroha::protocol::ToriiResponse sendTransactionBlocking( const iroha::protocol::Transaction& tx, const std::string& targetPeerIp, int targetPeerPort); diff --git a/test/module/irohad/torii/torii_async_test.cpp b/test/module/irohad/torii/torii_async_test.cpp index 477e1f8d56..96d551a38e 100644 --- a/test/module/irohad/torii/torii_async_test.cpp +++ b/test/module/irohad/torii/torii_async_test.cpp @@ -35,9 +35,9 @@ TEST(ToriiAsyncTest, GetToriiResponseWhenSendingTx) { for (int i = 0; i < 100; i++) { std::cout << i << std::endl; - auto response = torii::sendTransaction(iroha::protocol::Transaction {}, Ip, Port); + auto response = torii::sendTransactionBlocking(iroha::protocol::Transaction {}, Ip, Port); ASSERT_EQ(response.code(), iroha::protocol::ResponseCode::OK); - ASSERT_STREQ(response.message().c_str(), "Torii async response"); +// ASSERT_STREQ(response.message().c_str(), "Torii async response"); } // TODO(motxx): Segmentation fault occurs because an event doesn't executed that causes completion queue to be shut down. From 411e1ab173b7e5f1a8682c286129c68a9e284478 Mon Sep 17 00:00:00 2001 From: motxx Date: Fri, 14 Jul 2017 13:37:46 +0900 Subject: [PATCH 13/21] Change comment --- irohad/network/grpc_call.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/irohad/network/grpc_call.hpp b/irohad/network/grpc_call.hpp index 6354610a03..0c9b7520d5 100644 --- a/irohad/network/grpc_call.hpp +++ b/irohad/network/grpc_call.hpp @@ -24,7 +24,7 @@ limitations under the License. namespace network { /** - * to enable various Call instances to process in ServiceHandler::handleRpcs() by polymorphism. + * to use polymorphism in ServiceHandler::handleRpcs() * @tparam ServiceHandler */ template From 2983db2c8db5dd355854a27f2223adef1b63f29f Mon Sep 17 00:00:00 2001 From: motxx Date: Fri, 14 Jul 2017 13:38:43 +0900 Subject: [PATCH 14/21] [WIP] Implement ToriiNonBlocking() --- irohad/torii/command_client.cpp | 149 ++++++++++++++++++++------------ irohad/torii/command_client.hpp | 48 +++++++++- 2 files changed, 137 insertions(+), 60 deletions(-) diff --git a/irohad/torii/command_client.cpp b/irohad/torii/command_client.cpp index 638311ff56..eaef6b30c9 100644 --- a/irohad/torii/command_client.cpp +++ b/irohad/torii/command_client.cpp @@ -16,79 +16,116 @@ limitations under the License. #include #include #include +#include +#include namespace torii { using iroha::protocol::Transaction; using iroha::protocol::ToriiResponse; + /* + * avoids from multiple-definition of ThreadPool + * tp::ThradPool is type alias, not class. So we can't use struct ThreadPool; + * We shouldn't know about ThreadPoolImpl, that is template class. + */ + struct ThreadContainer { + tp::ThreadPool pool; + }; + + struct ToriiAsyncClientCall { + iroha::protocol::ToriiResponse response; + grpc::ClientContext context; + grpc::Status status; + std::unique_ptr> response_reader; + }; + + CommandClient::CommandClient(const std::string& ip, int port) + : stub_(iroha::protocol::CommandService::NewStub( + grpc::CreateChannel(ip + ":" + std::to_string(port), grpc::InsecureChannelCredentials()))), + listenerPool_(new ThreadContainer) + {} + + CommandClient::~CommandClient() { + delete listenerPool_; + } + /** - * CommandClient is used by peer service. + * requests tx to a torii server and returns response (blocking, sync) + * @param tx + * @return ToriiResponse */ - class CommandClient { - public: - CommandClient(const std::string& ip, int port) - : stub_(iroha::protocol::CommandService::NewStub( - grpc::CreateChannel(ip + ":" + std::to_string(port), grpc::InsecureChannelCredentials()))) - {} - - /** - * requests tx to a torii server and returns response (blocking, sync) - * @param tx - * @return ToriiResponse - */ - ToriiResponse ToriiBlocking(const Transaction& tx) { - ToriiResponse response; - - std::unique_ptr> rpc( - stub_->AsyncTorii(&context_, tx, &cq_) - ); - - using State = network::UntypedCall::State; - - rpc->Finish(&response, &status_, (void *)static_cast(State::ResponseSent)); - - void* got_tag; - bool ok = false; - - if (!cq_.Next(&got_tag, &ok)) { // CompletionQueue::Next() is blocking. - throw std::runtime_error("CompletionQueue::Next() returns error"); - } + ToriiResponse CommandClient::ToriiBlocking(const Transaction& tx) { + ToriiResponse response; - assert(got_tag == (void *)static_cast(State::ResponseSent)); - assert(ok); + std::unique_ptr> rpc( + stub_->AsyncTorii(&context_, tx, &cq_) + ); - if (status_.ok()) { - return response; - } - response.set_code(iroha::protocol::ResponseCode::FAIL); - response.set_message("RPC failed"); - return response; - } + using State = network::UntypedCall::State; + + rpc->Finish(&response, &status_, (void *)static_cast(State::ResponseSent)); - // TODO(motxx): ToriiNonBlocking - void ToriiNonBlocking(const Transaction& tx) { - std::unique_ptr> rpc( - stub_->AsyncTorii(&context_, tx, &cq_) - ); + void* got_tag; + bool ok = false; + + if (!cq_.Next(&got_tag, &ok)) { // CompletionQueue::Next() is blocking. + throw std::runtime_error("CompletionQueue::Next() returns error"); } - private: - grpc::ClientContext context_; - std::unique_ptr stub_; - grpc::CompletionQueue cq_; - grpc::Status status_; - }; + assert(got_tag == (void *)static_cast(State::ResponseSent)); + assert(ok); - ToriiResponse sendTransactionBlocking(const Transaction& tx, - const std::string& targetPeerIp, - int targetPeerPort) { - CommandClient client(targetPeerIp, targetPeerPort); - return client.ToriiBlocking(tx); + if (status_.ok()) { + return response; + } + + response.set_code(iroha::protocol::ResponseCode::FAIL); + response.set_message("RPC failed"); + return response; } + /* - TODO(motxx): sendTransactionNonBlocking() + * TODO(motxx): We can't use CommandClient::ToriiNonBlocking() for now. gRPC causes the error + * E0714 04:24:40.045388600 4346 sync_posix.c:60] assertion failed: pthread_mutex_lock(mu) == 0 */ + /* + void CommandClient::ToriiNonBlocking( + const Transaction& tx, + const std::function& callback) + { + ToriiAsyncClientCall* call = new ToriiAsyncClientCall; + call->response_reader = stub_->AsyncTorii(&call->context, tx, &cq_); + call->response_reader->Finish(&call->response, &call->status, (void*)call); + + listenerPool_->pool.post(std::bind(ToriiNonBlockingListener, cq_, callback)); + } + */ + + void CommandClient::ToriiNonBlockingListener( + grpc::CompletionQueue& cq, + const std::function& callback) + { + void* got_tag; + bool ok = false; + + while (cq.Next(&got_tag, &ok)) { + ToriiAsyncClientCall* call = static_cast(got_tag); + assert(ok); // guarantees the request for updates by Finish() + + if (call->status.ok()) { + callback(call->response); + } else { + ToriiResponse responseFailure; + responseFailure.set_code(iroha::protocol::ResponseCode::FAIL); + responseFailure.set_message("RPC failed"); + callback(responseFailure); + } + + delete call; + } + } + } // namespace torii diff --git a/irohad/torii/command_client.hpp b/irohad/torii/command_client.hpp index eeccc12b53..d2b0caf2e4 100644 --- a/irohad/torii/command_client.hpp +++ b/irohad/torii/command_client.hpp @@ -21,13 +21,53 @@ limitations under the License. #include #include #include +#include namespace torii { - iroha::protocol::ToriiResponse sendTransactionBlocking( - const iroha::protocol::Transaction& tx, - const std::string& targetPeerIp, - int targetPeerPort); + /* + * avoids from multiple-definition of ThreadPool + * tp::ThradPool is type alias, not class. So we can't use struct ThreadPool; + * We shouldn't know about ThreadPoolImpl, that is template class. + */ + struct ThreadContainer; + + /** + * CommandClient is used by peer service. + */ + class CommandClient { + public: + CommandClient(const std::string& ip, int port); + ~CommandClient(); + + /** + * requests tx to a torii server and returns response (blocking, sync) + * @param tx + * @return ToriiResponse + */ + iroha::protocol::ToriiResponse ToriiBlocking(const iroha::protocol::Transaction& tx); + + /* + * TODO(motxx): We can't use CommandClient::ToriiNonBlocking() for now. gRPC causes the error + * E0714 04:24:40.045388600 4346 sync_posix.c:60] assertion failed: pthread_mutex_lock(mu) == 0 + */ + /* + void ToriiNonBlocking(const iroha::protocol::Transaction& tx, + const std::function& callback); + */ + + private: + static void ToriiNonBlockingListener( + grpc::CompletionQueue& cq, + const std::function& callback); + + private: + grpc::ClientContext context_; + std::unique_ptr stub_; + grpc::CompletionQueue cq_; + grpc::Status status_; + ThreadContainer* listenerPool_; // cannot use smart pointer because of avoiding redefinition. + }; } // namespace torii From f781c1a95ec96f952743a8bda199edf7b13a49c8 Mon Sep 17 00:00:00 2001 From: motxx Date: Fri, 14 Jul 2017 13:39:11 +0900 Subject: [PATCH 15/21] Update torii_async_test --- test/module/irohad/torii/torii_async_test.cpp | 54 ++++++++++++++----- 1 file changed, 41 insertions(+), 13 deletions(-) diff --git a/test/module/irohad/torii/torii_async_test.cpp b/test/module/irohad/torii/torii_async_test.cpp index 96d551a38e..026eabb6cc 100644 --- a/test/module/irohad/torii/torii_async_test.cpp +++ b/test/module/irohad/torii/torii_async_test.cpp @@ -22,25 +22,53 @@ limitations under the License. #include #include -const std::string Ip = "0.0.0.0"; -const int Port = 50051; +constexpr const char* Ip = "0.0.0.0"; +constexpr int Port = 50051; -TEST(ToriiAsyncTest, GetToriiResponseWhenSendingTx) { - ServerRunner runner(Ip, Port); - std::thread th([&runner]{ - runner.run(); - }); +ServerRunner runner {Ip, Port}; +std::thread th; - runner.waitForServersReady(); +class ToriiAsyncTest : public testing::Test { +public: + virtual void SetUp() { + th = std::thread([]{ + runner.run(); + }); + runner.waitForServersReady(); + } + + virtual void TearDown() { + runner.shutdown(); + th.join(); + } +}; + +TEST_F(ToriiAsyncTest, ToriiBlocking) { + EXPECT_GT(static_cast(iroha::protocol::ResponseCode::OK), 0); // to guarantee ASSERT_EQ works TODO(motxx): More reasonable way. for (int i = 0; i < 100; i++) { std::cout << i << std::endl; - auto response = torii::sendTransactionBlocking(iroha::protocol::Transaction {}, Ip, Port); + auto response = torii::CommandClient(Ip, Port) + .ToriiBlocking(iroha::protocol::Transaction {}); ASSERT_EQ(response.code(), iroha::protocol::ResponseCode::OK); -// ASSERT_STREQ(response.message().c_str(), "Torii async response"); } +} - // TODO(motxx): Segmentation fault occurs because an event doesn't executed that causes completion queue to be shut down. - runner.shutdown(); - th.join(); +/* + * TODO(motxx): We can't use CommandClient::ToriiNonBlocking() for now. gRPC causes the error + * E0714 04:24:40.045388600 4346 sync_posix.c:60] assertion failed: pthread_mutex_lock(mu) == 0 + */ +/* +TEST_F(ToriiAsyncTest, ToriiNonBlocking) { + EXPECT_GT(static_cast(iroha::protocol::ResponseCode::OK), 0); + for (int i = 0; i < 100; i++) { + std::cout << i << std::endl; + torii::CommandClient(Ip, Port) + .ToriiNonBlocking(iroha::protocol::Transaction {}, + [](iroha::protocol::ToriiResponse response){ + ASSERT_EQ(response.code(), iroha::protocol::ResponseCode::OK); + std::cout << "Response validated\n"; // for checking really asynced. + }); + } } +*/ \ No newline at end of file From 48489a36b926d253166991178a09ac5b5b748530 Mon Sep 17 00:00:00 2001 From: motxx Date: Fri, 14 Jul 2017 13:44:46 +0900 Subject: [PATCH 16/21] Add thread_pool link in torii --- irohad/torii/CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/irohad/torii/CMakeLists.txt b/irohad/torii/CMakeLists.txt index 794df3d263..d90f6effa2 100644 --- a/irohad/torii/CMakeLists.txt +++ b/irohad/torii/CMakeLists.txt @@ -17,6 +17,7 @@ add_library(command_client command_client.cpp ) target_link_libraries(command_client + thread_pool endpoint ) From cfd16082e18fcf303baa02c5bb17653953e31663 Mon Sep 17 00:00:00 2001 From: motxx Date: Fri, 14 Jul 2017 13:49:21 +0900 Subject: [PATCH 17/21] Remove unused link --- irohad/torii/CMakeLists.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/irohad/torii/CMakeLists.txt b/irohad/torii/CMakeLists.txt index d90f6effa2..e07cf45db6 100644 --- a/irohad/torii/CMakeLists.txt +++ b/irohad/torii/CMakeLists.txt @@ -22,7 +22,6 @@ target_link_libraries(command_client ) add_library(command_service - command_service.cpp command_service_handler.cpp ) target_link_libraries(command_service From 06669277287ee03f7c38c1964dd8a2d644fba9ad Mon Sep 17 00:00:00 2001 From: motxx Date: Fri, 14 Jul 2017 13:50:01 +0900 Subject: [PATCH 18/21] Rename console -> Log --- irohad/main/server_runner.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/irohad/main/server_runner.cpp b/irohad/main/server_runner.cpp index 04403949d8..f92f71b784 100644 --- a/irohad/main/server_runner.cpp +++ b/irohad/main/server_runner.cpp @@ -21,7 +21,7 @@ limitations under the License. #include
#include -logger::Logger console("ServerRunner"); +logger::Logger Log("ServerRunner"); ServerRunner::ServerRunner(const std::string &ip, int port) : serverAddress_(ip + ":" + std::to_string(port)) {} @@ -38,7 +38,7 @@ void ServerRunner::run() { waitForServer_.unlock(); serverInstanceCV_.notify_one(); - console.info("Server listening on {}", serverAddress_); + Log.info("Server listening on {}", serverAddress_); // proceed to server's main loop commandServiceHandler_->handleRpcs(); From 6cc6d5fc49a152b6887f4f0143be4acbcb29cd49 Mon Sep 17 00:00:00 2001 From: motxx Date: Fri, 14 Jul 2017 13:51:49 +0900 Subject: [PATCH 19/21] Resolve conflict again --- irohad/main/CMakeLists.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/irohad/main/CMakeLists.txt b/irohad/main/CMakeLists.txt index 54d00a02cd..9de26c7dc8 100644 --- a/irohad/main/CMakeLists.txt +++ b/irohad/main/CMakeLists.txt @@ -31,7 +31,6 @@ target_link_libraries(irohad ordering_service consensus_service chain_validator - stateful_validator hash stateless_validator client_processor From c946005da72bcc8ad66c6b1ec0c3702cfd17ccc9 Mon Sep 17 00:00:00 2001 From: motxx Date: Fri, 14 Jul 2017 15:48:05 +0900 Subject: [PATCH 20/21] Add const --- irohad/torii/command_client.cpp | 2 +- irohad/torii/command_client.hpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/irohad/torii/command_client.cpp b/irohad/torii/command_client.cpp index eaef6b30c9..b44235065b 100644 --- a/irohad/torii/command_client.cpp +++ b/irohad/torii/command_client.cpp @@ -40,7 +40,7 @@ namespace torii { std::unique_ptr> response_reader; }; - CommandClient::CommandClient(const std::string& ip, int port) + CommandClient::CommandClient(const std::string& ip, const int port) : stub_(iroha::protocol::CommandService::NewStub( grpc::CreateChannel(ip + ":" + std::to_string(port), grpc::InsecureChannelCredentials()))), listenerPool_(new ThreadContainer) diff --git a/irohad/torii/command_client.hpp b/irohad/torii/command_client.hpp index d2b0caf2e4..71b2ff96b1 100644 --- a/irohad/torii/command_client.hpp +++ b/irohad/torii/command_client.hpp @@ -37,7 +37,7 @@ namespace torii { */ class CommandClient { public: - CommandClient(const std::string& ip, int port); + CommandClient(const std::string& ip, const int port); ~CommandClient(); /** From cc6b72127354ded75e659ecadd7e5ea0de4adbd5 Mon Sep 17 00:00:00 2001 From: motxx Date: Fri, 14 Jul 2017 20:29:01 +0900 Subject: [PATCH 21/21] Remove useless lock/unlock --- irohad/main/server_runner.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/irohad/main/server_runner.cpp b/irohad/main/server_runner.cpp index f92f71b784..f33ebea441 100644 --- a/irohad/main/server_runner.cpp +++ b/irohad/main/server_runner.cpp @@ -33,9 +33,7 @@ void ServerRunner::run() { commandServiceHandler_ = std::make_unique(builder); - waitForServer_.lock(); serverInstance_ = builder.BuildAndStart(); - waitForServer_.unlock(); serverInstanceCV_.notify_one(); Log.info("Server listening on {}", serverAddress_);