Skip to content

Commit

Permalink
Merge pull request hyperledger-iroha#931 from hyperledger/feature/rem…
Browse files Browse the repository at this point in the history
…ove-async-clients

Signed-off-by: dumitru <[email protected]>
  • Loading branch information
x3medima17 committed Feb 5, 2018
2 parents 8fba7e3 + 8091d4f commit fa82a07
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 413 deletions.
2 changes: 1 addition & 1 deletion iroha-cli/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include <string>

#include "torii/command_client.hpp"
#include "torii_utils/query_client.hpp"
#include "torii/query_client.hpp"

namespace iroha {
namespace model {
Expand Down
7 changes: 7 additions & 0 deletions irohad/torii/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,20 @@

add_subdirectory(processor)

add_library(query_client STATIC query_client.cpp)

add_library(command_client command_client.cpp)
target_link_libraries(command_client
torii_service
endpoint
model
)

target_link_libraries(query_client
torii_service
endpoint
)

add_library(torii_service
torii_service_handler.cpp
impl/query_service.cpp
Expand Down
193 changes: 28 additions & 165 deletions irohad/torii/command_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ limitations under the License.
#include <thread>

#include <grpc++/grpc++.h>
#include <network/grpc_call.hpp>

#include "block.pb.h"
#include "torii/command_client.hpp"
Expand All @@ -25,186 +24,50 @@ namespace torii {
using iroha::protocol::ToriiResponse;
using iroha::protocol::Transaction;

CommandSyncClient::CommandSyncClient(std::string ip, int port)
: stub_(iroha::protocol::CommandService::NewStub(
CommandSyncClient::CommandSyncClient(const std::string &ip, size_t port)
: ip_(ip),
port_(port),
stub_(iroha::protocol::CommandService::NewStub(
grpc::CreateChannel(ip + ":" + std::to_string(port),
grpc::InsecureChannelCredentials()))) {}

CommandSyncClient::~CommandSyncClient() {
completionQueue_.Shutdown();
}

const char *kQueueNextError = "CompletionQueue::Next() returns error";

/**
* requests tx to a torii server and returns response (blocking, sync)
* @param tx
* @param response - returns ToriiResponse if succeeded
* @return grpc::Status - returns connection is success or not.
*/
grpc::Status CommandSyncClient::Torii(const Transaction &tx) {
auto rpc = stub_->AsyncTorii(&context_, tx, &completionQueue_);

using State = network::UntypedCall<torii::ToriiServiceHandler>::State;

google::protobuf::Empty empty;
rpc->Finish(
&empty, &status_, (void *)static_cast<int>(State::ResponseSent));

void *got_tag;
bool ok = false;
CommandSyncClient::CommandSyncClient(const CommandSyncClient &rhs)
: CommandSyncClient(rhs.ip_, rhs.port_) {}

/**
* pulls a new rpc response. If no response, blocks this thread.
*/
if (!completionQueue_.Next(&got_tag, &ok)) {
throw std::runtime_error(kQueueNextError);
}

assert(got_tag == (void *)static_cast<int>(State::ResponseSent));
assert(ok);

return status_;
CommandSyncClient &CommandSyncClient::operator=(CommandSyncClient rhs) {
swap(*this, rhs);
return *this;
}

grpc::Status CommandSyncClient::Status(
const iroha::protocol::TxStatusRequest &request,
iroha::protocol::ToriiResponse &response) {
auto rpc = stub_->AsyncStatus(&context_, request, &completionQueue_);

using State = network::UntypedCall<torii::ToriiServiceHandler>::State;

rpc->Finish(
&response, &status_, (void *)static_cast<int>(State::ResponseSent));

void *got_tag;
bool ok = false;

/**
* pulls a new rpc response. If no response, blocks this thread.
*/
if (!completionQueue_.Next(&got_tag, &ok)) {
throw std::runtime_error(kQueueNextError);
}

assert(got_tag == (void *)static_cast<int>(State::ResponseSent));
assert(ok);
CommandSyncClient::CommandSyncClient(CommandSyncClient &&rhs) noexcept {
swap(*this, rhs);
}

return status_;
CommandSyncClient& CommandSyncClient::operator=(CommandSyncClient &&rhs) noexcept {
swap(*this, rhs);
return *this;
}

/**
* manages ClientContext and Status
*/
struct AsyncClientCall {
grpc::Status CommandSyncClient::Torii(const Transaction &tx) const {
google::protobuf::Empty a;
grpc::ClientContext context;
grpc::Status status;

virtual ~AsyncClientCall() {}
};
/**
* manages state of a Torii async client call.
*/
struct ToriiAsyncClientCall : public AsyncClientCall {
google::protobuf::Empty response;
std::unique_ptr<grpc::ClientAsyncResponseReader<google::protobuf::Empty>>
responseReader;
CommandAsyncClient::ToriiCallback callback;
};

/**
* manages state of a Status async client call.
*/
struct StatusAsyncClientCall : public AsyncClientCall {
iroha::protocol::ToriiResponse response;
std::unique_ptr<
grpc::ClientAsyncResponseReader<iroha::protocol::ToriiResponse>>
responseReader;
CommandAsyncClient::StatusCallback callback;
};

/**
* requests tx to a torii server and returns response (non-blocking)
* @param tx
* @param callback
* @return grpc::Status
*/
grpc::Status CommandAsyncClient::Torii(
const Transaction &tx,
const std::function<void(google::protobuf::Empty &response)> &callback) {
auto call = new ToriiAsyncClientCall;
call->callback = callback;
call->responseReader =
stub_->AsyncTorii(&call->context, tx, &completionQueue_);
call->responseReader->Finish(&call->response, &call->status, (void *)call);
return call->status;
return stub_->Torii(&context, tx, &a);
}

/**
* @param tx_request contains hash of requested tx
* @param callback callback to process obtained status
* @return grpc::Status determining if connection was successful
*/
grpc::Status CommandAsyncClient::Status(
const iroha::protocol::TxStatusRequest &tx_request,
const StatusCallback &callback) {
auto call = new StatusAsyncClientCall;
call->callback = callback;
call->responseReader =
stub_->AsyncStatus(&call->context, tx_request, &completionQueue_);
call->responseReader->Finish(&call->response, &call->status, (void *)call);
return call->status;
grpc::Status CommandSyncClient::Status(
const iroha::protocol::TxStatusRequest &request,
iroha::protocol::ToriiResponse &response) const {
grpc::ClientContext context;
return stub_->Status(&context, request, &response);
}

/**
* sets ip and port and calls listenToriiNonBlocking() in a new thread.
* @param ip
* @param port
*/
CommandAsyncClient::CommandAsyncClient(const std::string &ip, const int port)
: stub_(iroha::protocol::CommandService::NewStub(
grpc::CreateChannel(ip + ":" + std::to_string(port),
grpc::InsecureChannelCredentials()))) {
listener_ = std::thread(&CommandAsyncClient::listen, this);
}

CommandAsyncClient::~CommandAsyncClient() {
completionQueue_.Shutdown();
listener_.join();
}

/**
* starts response listener of a non-blocking torii client.
*/
void CommandAsyncClient::listen() {
/**
* got_tag - a state (ToriiAsyncClientCall) that is sent by a command
* server.
* ok - true if a regular event, otherwise false (e.g. grpc::Alarm)
*/
void *got_tag;
bool ok = false;

/**
* pulls a new client's response. If no response, blocks this thread.
* CompletionQueue::Next() returns false if cq_.Shutdown() is executed.
*/
while (completionQueue_.Next(&got_tag, &ok)) {
if (!got_tag || !ok) {
break;
}
auto call = static_cast<AsyncClientCall *>(got_tag);

if (ToriiAsyncClientCall *torii_call =
dynamic_cast<ToriiAsyncClientCall *>(call)) {
torii_call->callback(torii_call->response);
}
if (StatusAsyncClientCall *status_call =
dynamic_cast<StatusAsyncClientCall *>(call)) {
status_call->callback(status_call->response);
}
delete call;
}
void CommandSyncClient::swap(CommandSyncClient& lhs, CommandSyncClient& rhs) {
using std::swap;
swap(lhs.ip_, rhs.ip_);
swap(lhs.port_, rhs.port_);
swap(lhs.stub_, rhs.stub_);
}

} // namespace torii
68 changes: 12 additions & 56 deletions irohad/torii/command_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,80 +29,36 @@ namespace torii {
*/
class CommandSyncClient {
public:
CommandSyncClient(std::string ip, int port);
~CommandSyncClient();
CommandSyncClient(const std::string& ip, size_t port);

CommandSyncClient(const CommandSyncClient&);
CommandSyncClient& operator=(CommandSyncClient);

CommandSyncClient(CommandSyncClient&&) noexcept ;
CommandSyncClient&operator=(CommandSyncClient&&) noexcept ;

/**
* requests tx to a torii server and returns response (blocking, sync)
* @param tx
* @return grpc::Status - returns connection is success or not.
*/
grpc::Status Torii(const iroha::protocol::Transaction &tx);
grpc::Status Torii(const iroha::protocol::Transaction &tx) const;

/**
* @param tx
* @param response returns ToriiResponse if succeeded
* @return grpc::Status - returns connection is success or not.
*/
grpc::Status Status(const iroha::protocol::TxStatusRequest &tx,
iroha::protocol::ToriiResponse &response);
iroha::protocol::ToriiResponse &response) const;

private:
grpc::ClientContext context_;
void swap(CommandSyncClient& lhs, CommandSyncClient& rhs);
std::string ip_;
size_t port_;
std::unique_ptr<iroha::protocol::CommandService::Stub> stub_;
grpc::CompletionQueue completionQueue_;
grpc::Status status_;
};

/**
* CommandAsyncClient is used by peer service.
*/
class CommandAsyncClient {
public:
/**
* sets ip and port and calls listenToriiNonBlocking() in a new thread.
* @param ip
* @param port
*/
CommandAsyncClient(const std::string &ip, const int port);

~CommandAsyncClient();

using ToriiCallback =
std::function<void(google::protobuf::Empty &response)>;
using StatusCallback = std::function<void(iroha::protocol::ToriiResponse)>;

/**
* Async Torii rpc
* @param tx
* @param callback
* @return grpc::Status
*/
grpc::Status Torii(const iroha::protocol::Transaction &tx,
const ToriiCallback &callback);

/**
* @param tx_request contains hash of requested tx
* @param callback processes obtained response
* @return grpc::Status
*/
grpc::Status Status(const iroha::protocol::TxStatusRequest &tx_request,
const StatusCallback &callback);

private:
/**
* starts response listener of non-blocking rpcs.
*/
void listen();

private:
grpc::ClientContext context_;
std::unique_ptr<iroha::protocol::CommandService::Stub> stub_;
grpc::CompletionQueue completionQueue_;
grpc::Status status_;
std::thread listener_; // listens rpcs' responses and executes callbacks.
std::thread status_listener_;
};

} // namespace torii

Expand Down
Loading

0 comments on commit fa82a07

Please sign in to comment.