Skip to content

Commit

Permalink
Update torii async services
Browse files Browse the repository at this point in the history
  • Loading branch information
motxx committed Jul 13, 2017
1 parent 0741f37 commit d5b03bc
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 56 deletions.
3 changes: 2 additions & 1 deletion irohad/network/grpc_async_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ namespace network {
* and creates a new Call instance to serve new clients.
*/
template <typename ServiceHandler, typename AsyncService, typename RequestType, typename ResponseType>
using RpcHandler = std::function<void(Call<ServiceHandler, AsyncService, RequestType, ResponseType>&)>;
using RpcHandler = void (ServiceHandler::*)(
Call<ServiceHandler, AsyncService, RequestType, ResponseType>* call);

} // namespace network
28 changes: 5 additions & 23 deletions irohad/network/grpc_call.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,6 @@ limitations under the License.

namespace network {

/*
* TODO(motxx): Use this and clarify ownership.
class ReferenceCountable {
public:
void ref() {
count_++;
}
void unref() {
assert(count > 0);
count_--;
if (!count_) {
delete this;
}
}
private:
size_t count_ = 0;
};
*/

/**
* to enable various Call instances to process in ServiceHandler::handleRpcs() by polymorphism.
* @tparam ServiceHandler
Expand Down Expand Up @@ -72,7 +51,7 @@ namespace network {
* @param serviceHandler - an instance that has all rpc handlers. e.g. CommandService
*/
void onCompleted(ServiceHandler& serviceHandler) {
switch (callback_) {
switch (state_) {
case State::RequestCreated: {
call_->requestReceived(serviceHandler);
break;
Expand Down Expand Up @@ -120,6 +99,8 @@ namespace network {
* @param serviceHandler - an instance that has all rpc handlers. e.g. CommandService
*/
void responseSent() override {
// response has been sent and delete the Call instance.
delete this;
}

/**
Expand All @@ -131,7 +112,8 @@ namespace network {
}

/**
*
* creates a Call instance for one rpc and enqueues it
* to the completion queue.
* @param serviceHandler
* @param cq
* @param requestMethod
Expand Down
12 changes: 5 additions & 7 deletions irohad/torii/command_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@ limitations under the License.

namespace torii {

CommandService::CommandService() {

}

/**
* actual implementation of async Torii in CommandService
* @param request - Transaction
* @param response - ToriiResponse
* @return grpc::Status - Status::OK if succeeded (TODO(motxx): support Status::CANCELLED)
* @return grpc::Status - Status::OK if succeeded. TODO(motxx): grpc::CANCELLED is not supported.
*/
grpc::Status CommandService::ToriiAsync(
static grpc::Status CommandService::ToriiAsync(
iroha::protocol::Transaction const& request, iroha::protocol::ToriiResponse& response) {

response.set_code(iroha::protocol::ResponseCode::OK);
response.set_message("Torii async response");
return grpc::Status::OK;
}

} // namespace torii
6 changes: 2 additions & 4 deletions irohad/torii/command_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@ namespace torii {
*/
class CommandService {
public:
CommandService();

/**
* actual implementation of async Torii in CommandService
* @param request - Transaction
* @param response - ToriiResponse
* @return grpc::Status - Status::OK if succeeded (TODO(motxx): support Status::CANCELLED)
* @return grpc::Status - Status::OK if succeeded. TODO(motxx): grpc::CANCELLED is not supported.
*/
grpc::Status ToriiAsync(iroha::protocol::Transaction const& request, iroha::protocol::ToriiResponse& response);
static grpc::Status ToriiAsync(iroha::protocol::Transaction const& request, iroha::protocol::ToriiResponse& response);
};

} // namespace torii
Expand Down
51 changes: 35 additions & 16 deletions irohad/torii/command_service_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ limitations under the License.
#include <network/grpc_async_service.hpp>
#include <network/grpc_call.hpp>
#include <torii/command_service_handler.hpp>
#include <torii/command_service.hpp>

namespace prot = iroha::protocol;

namespace torii {

/**
* to handle async rpcs of CommandService.
*/
/**
* requires builder to use same server.
* @param builder
Expand All @@ -36,22 +34,38 @@ namespace torii {
}

~CommandServiceHandler::CommandServiceRpcsHandler() override {
delete shutdownAlarm_;
}

/**
* shuts down service handler.
* specifically, enqueues a special event that causes the completion queue to be shut down.
*/
void CommandServiceHandler::shutdown() override {
bool didShutdown = false;
{ std::unique_lock }
{
std::unique_lock<std::mutex> lock(mtx_);
if (!isShutdown_) {
isShutdown_ = true;
didShutdown = true;
}
}

if (didShutdown) {
// Alarm
// enqueue a special event that causes the completion queue to be shut down.
// tag is nullptr in order to determine no Call instance allocated when static_cast.
shutdownAlarm_ = new ::grpc::Alarm(cq_.get(), gpr_now(GPR_CLOCK_MONOTONIC), nullptr);
}
}

/**
* handles all rpc in CommandService.
* We use
* handles rpcs loop in CommandService.
*/
void CommandServiceHandler::handleRpcs() override {
enqueueRequest<prot::Transaction, prot::ToriiResponse>(
&prot::CommandService::AsyncService::RequestTorii,
&CommandServiceHandler::ToriiHandler);
&CommandServiceHandler::ToriiHandler
);

void* tag;
bool ok;
Expand All @@ -61,22 +75,27 @@ namespace torii {
if (callbackTag) {
callbackTag->onCompleted(*this);
} else {
// callbackTag is nullptr (a special event that causes shut down cq_)
cq_->Shutdown();
}
}
}

/**
* releases the completion queue of CommandService.
* @note Call this method after calling server->Shutdown() in ServerRunner
*/
void CommandServiceHandler::shutdown() override { cq_->Shutdown(); }

/**
* extracts request and response from Call instance
* and calls an actual CommandService::AsyncTorii() implementation.
* then, creates a new Call instance to serve an another client.
* then, spawns a new Call instance to serve an another client.
*/
void CommandServiceHandler::ToriiHandler() {}
void CommandServiceHandler::ToriiHandler(CommandServiceCall<
prot::Transaction, prot::ToriiResponse>* call) {
auto stat = CommandService::ToriiAsync(call->request(), call->response());
call->sendResponse(stat);

// Spawn a new Call instance to serve an another client.
enqueueRequest<prot::Transaction, prot::ToriiResponse>(
&prot::CommandService::AsyncService::RequestTorii,
&CommandServiceHandler::ToriiHandler
);
}

} // namespace torii
16 changes: 11 additions & 5 deletions irohad/torii/command_service_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ limitations under the License.
#ifndef TORII_COMMAND_SERVICE_HANDLER_HPP
#define TORII_COMMAND_SERVICE_HANDLER_HPP

#include <network/grpc_async_service.hpp>
#include <endpoint.grpc.pb.h>
#include <endpoint.pb.h>

namespace torii {
/**
* to handle rpcs loop of CommandService.
*/
class CommandServiceHandler : public network::GrpcAsyncService {
public:

Expand All @@ -42,8 +46,7 @@ namespace torii {
>;

/**
* handles all rpc in CommandService.
* We use
* handles rpcs loop in CommandService.
*/
void handleRpcs() override;

Expand All @@ -56,8 +59,9 @@ namespace torii {
private:

/**
*
* @param requester
* helper to call Call::enqueueRequest()
* @param requester - pointer to request method. e.g. &CommandService::AsyncService::RequestTorii
* @param rpcHandler - handler of rpc in ServiceHandler.
*/
template <typename RequestType, typename ResponseType>
void enqueueRequest(
Expand All @@ -80,14 +84,16 @@ namespace torii {
* and calls an actual CommandService::AsyncTorii() implementation.
* then, creates a new Call instance to serve an another client.
*/
void ToriiHandler();
void ToriiHandler(CommandServiceCall<
iroha::protocol::Transaction, iroha::protocol::ToriiResponse>*);

private:
iroha::protocol::CommandService::AsyncService asyncService_;
// TODO(motxx): Investigate a required number of completion queues if we use multiple services.
std::unique_ptr<grpc::ServerCompletionQueue> cq_;
std::mutex mtx_; // TODO(motxx): Write the reason of using mutex for ENQUEUE_REQUEST.
bool isShutdown_ = false;
::grpc::Alarm* shutdownAlarm_ = nullptr;
};
} // namespace torii

Expand Down

0 comments on commit d5b03bc

Please sign in to comment.