Skip to content

Commit

Permalink
Merge pull request hyperledger-iroha#437 from hyperledger/feature/tor…
Browse files Browse the repository at this point in the history
…ii-async-connection

Feature/torii async connection
  • Loading branch information
motxx authored Jul 14, 2017
2 parents 92c24ae + cc6b721 commit 6a1ce0b
Show file tree
Hide file tree
Showing 18 changed files with 759 additions and 261 deletions.
39 changes: 20 additions & 19 deletions irohad/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,29 @@ set(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/bin)

add_library(server_runner server_runner.cpp)
target_link_libraries(server_runner
logger
endpoint
schema
)
command_service
logger
endpoint
schema
)

add_library(irohad application.cpp)
target_link_libraries(irohad
server_runner
model
ametsuchi
networking
ordering_service
consensus_service
chain_validator
hash
stateless_validator
client_processor
torii
crypto
)
server_runner
model
ametsuchi
networking
ordering_service
consensus_service
chain_validator
hash
stateless_validator
client_processor
torii
crypto
)

add_executable(iroha-main iroha-main.cpp)
target_link_libraries(iroha-main
irohad
)
irohad
)
29 changes: 15 additions & 14 deletions irohad/main/server_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,36 @@ limitations under the License.
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <logger/logger.hpp>

#include <main/server_runner.hpp>
#include <torii/command_service_handler.hpp>

logger::Logger console("ServerRunner");
logger::Logger Log("ServerRunner");

ServerRunner::ServerRunner(const std::string &ip, int port,
const std::vector<grpc::Service *> &srvs)
: serverAddress_(ip + ":" + std::to_string(port)), services_(srvs) {}
ServerRunner::ServerRunner(const std::string &ip, int port)
: serverAddress_(ip + ":" + std::to_string(port)) {}

void ServerRunner::run() {
grpc::ServerBuilder builder;

// TODO(motxx): Is it ok to open same port for all services?
builder.AddListeningPort(serverAddress_, grpc::InsecureServerCredentials());
for (auto srv : services_) {
builder.RegisterService(srv);
}

waitForServer_.lock();
commandServiceHandler_ = std::make_unique<torii::CommandServiceHandler>(builder);

serverInstance_ = builder.BuildAndStart();
waitForServer_.unlock();
serverInstanceCV_.notify_one();

console.info("Server listening on {}", serverAddress_);
Log.info("Server listening on {}", serverAddress_);

serverInstance_->Wait();
// proceed to server's main loop
commandServiceHandler_->handleRpcs();
}

void ServerRunner::shutdown() { serverInstance_->Shutdown(); }
void ServerRunner::shutdown() {
commandServiceHandler_->shutdown();
while (!commandServiceHandler_->isShutdownCompletionQueue())
usleep(1); // wait for shutting down completion queue
serverInstance_->Shutdown();
}

bool ServerRunner::waitForServersReady() {
std::unique_lock<std::mutex> lock(waitForServer_);
Expand Down
25 changes: 7 additions & 18 deletions irohad/main/server_runner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,14 @@ limitations under the License.

#include <grpc++/grpc++.h>
#include <grpc++/server_builder.h>
#include <torii/command_service_handler.hpp>

#ifndef CONNECTION_SERVER_RUNNER_HPP
#define CONNECTION_SERVER_RUNNER_HPP
#ifndef MAIN_SERVER_RUNNER_HPP
#define MAIN_SERVER_RUNNER_HPP

/**
* For easy replacing with mock server, we use the interface.
*/
class IServerRunner {
class ServerRunner {
public:
virtual ~IServerRunner() = default;
virtual void run() = 0;
virtual void shutdown() = 0;
virtual bool waitForServersReady() = 0;
};

class ServerRunner final : public IServerRunner {
public:
ServerRunner(const std::string &ip, int port,
const std::vector<grpc::Service *> &services);
ServerRunner(const std::string &ip, int port);
void run();
void shutdown();
bool waitForServersReady();
Expand All @@ -45,7 +34,7 @@ class ServerRunner final : public IServerRunner {
std::condition_variable serverInstanceCV_;

std::string serverAddress_;
std::vector<grpc::Service *> services_;
std::unique_ptr<torii::CommandServiceHandler> commandServiceHandler_;
};

#endif
#endif // MAIN_SERVER_RUNNER_HPP
72 changes: 72 additions & 0 deletions irohad/network/grpc_async_service.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
Copyright 2016 Soramitsu Co., Ltd.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#ifndef NETWORK_GRPC_ASYNC_SERVICE_HPP
#define NETWORK_GRPC_ASYNC_SERVICE_HPP

namespace network {

template <class ServiceHandler, class AsyncService, class RequestType, class ResponseType>
class Call;

/**
* interface of handling rpcs in a service.
*/
class GrpcAsyncService {
public:
virtual ~GrpcAsyncService() {}

/**
* handles incoming RPCs.
* 1. Create a Call instance that handles a rpc.
* 2. Execute CompletionQueue::Next() and gets current status with tag.
* 3. Handle a rpc associated with the tag. For polymorphism, cast the tag with UntypedCall.
* 4. Back to 2
*/
virtual void handleRpcs() = 0;

/**
* stops spawning new Call instances and enqueues a special event
* that causes the completion queue to be shut down.
*/
virtual void shutdown() = 0;
};

/**
* to refer a method that requests one rpc.
* e.g. iroha::protocol::AsyncService::RequestTorii
*
* AsyncService - e.g. iroha::protocol::CommandService::AsyncService
* RequestType - e.g. Transaction in rpc Torii (Transaction) returns (ToriiResponse)
* ResponseType - e.g. ToriiResponse in rpc Torii (Transaction) returns (ToriiResponse)
*/
template <typename AsyncService, typename RequestType, typename ResponseType>
using RequestMethod = void (AsyncService::*)(
::grpc::ServerContext*, RequestType*,
::grpc::ServerAsyncResponseWriter<ResponseType>*,
::grpc::CompletionQueue*, ::grpc::ServerCompletionQueue*, void*);

/**
* to refer a method that extracts request and response from Call instance
* and creates a new Call instance to serve new clients.
*/
template <typename ServiceHandler, typename AsyncService, typename RequestType, typename ResponseType>
using RpcHandler = void (ServiceHandler::*)(
Call<ServiceHandler, AsyncService, RequestType, ResponseType>*);

} // namespace network

#endif // NETWORK_GRPC_ASYNC_SERVICE_HPP
166 changes: 166 additions & 0 deletions irohad/network/grpc_call.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
Copyright 2016 Soramitsu Co., Ltd.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#ifndef NETWORK_GRPC_CALL_HPP
#define NETWORK_GRPC_CALL_HPP

#include <grpc++/grpc++.h>
#include <assert.h>
#include <network/grpc_async_service.hpp>

namespace network {

/**
* to use polymorphism in ServiceHandler::handleRpcs()
* @tparam ServiceHandler
*/
template <typename ServiceHandler>
class UntypedCall {
public:
virtual ~UntypedCall() {}

enum class State { RequestCreated, ResponseSent };

/**
* invokes when state is RequestReceivedTag.
* @param serviceHandler - an instance that has all rpc handlers. e.g. CommandService
*/
virtual void requestReceived(ServiceHandler* serviceHandler) = 0;

/**
* invokes when state is ResponseSentTag.
*/
virtual void responseSent() = 0;

/**
* owns concrete Call type and executes derived functions.
*/
class CallOwner {
public:
CallOwner(UntypedCall* call, UntypedCall::State state)
: call_(call), state_(state) {}

/**
* selects a procedure by state and invokes it by using polymorphism.
* this is called from ServiceHandler::handleRpcs()
* @param serviceHandler - an instance that has all rpc handlers. e.g. CommandService
*/
void onCompleted(ServiceHandler *serviceHandler) {
switch (state_) {
case UntypedCall::State::RequestCreated: {
call_->requestReceived(serviceHandler);
break;
}
case UntypedCall::State::ResponseSent: {
call_->responseSent();
break;
}
}
}

private:
UntypedCall* call_; // owns concrete Call type.
const UntypedCall::State state_;
};
};

/**
* to manage the state of one rpc.
* @tparam ServiceHandler
* @tparam AsyncService
* @tparam RequestType
* @tparam ResponseType
*/
template <typename ServiceHandler, typename AsyncService, typename RequestType, typename ResponseType>
class Call : public UntypedCall<ServiceHandler> {
public:

using RpcHandlerType = network::RpcHandler<ServiceHandler, AsyncService, RequestType, ResponseType>;
using RequestMethodType = network::RequestMethod<AsyncService, RequestType, ResponseType>;
using CallType = Call<ServiceHandler, AsyncService, RequestType, ResponseType>;
using UntypedCallType = UntypedCall<ServiceHandler>;
using CallOwnerType = typename UntypedCallType::CallOwner;

Call(RpcHandlerType rpcHandler)
: rpcHandler_(rpcHandler), responder_(&ctx_) {}

virtual ~Call() {}

/**
* invokes when state is RequestReceivedTag.
* this method is called by onCompleted() in super class (UntypedCall).
* @param serviceHandler - an instance that has all rpc handlers. e.g. CommandService
*/
void requestReceived(ServiceHandler* serviceHandler) override {
(serviceHandler->*rpcHandler_)(this);
}

/**
* invokes when state is ResponseSentTag.
* this method is called by onCompleted() in super class (UntypedCall).
* @param serviceHandler - an instance that has all rpc handlers. e.g. CommandService
*/
void responseSent() override {
// response has been sent and delete the Call instance.
delete this;
}

/**
* notifies response and grpc::Status when finishing handling rpc.
* @param status
*/
void sendResponse(::grpc::Status status) {
responder_.Finish(response_, status, &ResponseSentTag);
}

/**
* creates a Call instance for one rpc and enqueues it
* to the completion queue.
* @param serviceHandler
* @param cq
* @param requestMethod
* @param rpcHandler
*/
static void enqueueRequest(AsyncService* asyncService,
::grpc::ServerCompletionQueue* cq,
RequestMethodType requestMethod,
RpcHandlerType rpcHandler) {
auto call = new CallType(rpcHandler);

(asyncService->*requestMethod)(&call->ctx_, &call->request(),
&call->responder_, cq, cq,
&call->RequestReceivedTag);
}

public:
auto& request() { return request_; }
auto& response() { return response_; }

private:
CallOwnerType RequestReceivedTag { this, UntypedCallType::State::RequestCreated };
CallOwnerType ResponseSentTag { this, UntypedCallType::State::ResponseSent };

private:
RpcHandlerType rpcHandler_;
RequestType request_;
ResponseType response_;
::grpc::ServerContext ctx_;
::grpc::ServerAsyncResponseWriter<ResponseType> responder_;
};

} // namespace network

#endif // NETWORK_GRPC_CALL_HPP
Loading

0 comments on commit 6a1ce0b

Please sign in to comment.