Skip to content

Commit

Permalink
Rename server handler class. Add torii service test
Browse files Browse the repository at this point in the history
  • Loading branch information
motxx committed Jul 20, 2017
1 parent a704a09 commit 8cffdf2
Show file tree
Hide file tree
Showing 19 changed files with 282 additions and 187 deletions.
3 changes: 1 addition & 2 deletions irohad/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ set(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/bin)

add_library(server_runner server_runner.cpp)
target_link_libraries(server_runner
command_service
query_service
torii_service
logger
endpoint
schema
Expand Down
15 changes: 7 additions & 8 deletions irohad/main/server_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ limitations under the License.
#include <grpc++/server_context.h>
#include <logger/logger.hpp>
#include <main/server_runner.hpp>
#include <torii/command_service_handler.hpp>
#include <torii/query_service.hpp>
#include <torii/torii_service_handler.hpp>

logger::Logger Log("ServerRunner");

ServerRunner::ServerRunner(const std::string &ip, int port)
: serverAddress_(ip + ":" + std::to_string(port)) {}

ServerRunner::~ServerRunner() {
commandServiceHandler_->shutdown();
toriiServiceHandler_->shutdown();
}

void ServerRunner::run() {
Expand All @@ -37,27 +36,27 @@ void ServerRunner::run() {
builder.AddListeningPort(serverAddress_, grpc::InsecureServerCredentials());

// Register services.
commandServiceHandler_ = std::make_unique<torii::CommandServiceHandler>(builder);
toriiServiceHandler_ = std::make_unique<torii::ToriiServiceHandler>(builder);

serverInstance_ = builder.BuildAndStart();
serverInstanceCV_.notify_one();

Log.info("Server listening on {}", serverAddress_);

// proceed to server's main loop
commandServiceHandler_->handleRpcs();
toriiServiceHandler_->handleRpcs();
}

void ServerRunner::shutdown() {
serverInstance_->Shutdown();

while (!commandServiceHandler_->isShutdownCompletionQueue()) {
while (!toriiServiceHandler_->isShutdownCompletionQueue()) {
usleep(1); // wait for shutting down completion queue
}
commandServiceHandler_->shutdown();
toriiServiceHandler_->shutdown();
}

bool ServerRunner::waitForServersReady() {
void ServerRunner::waitForServersReady() {
std::unique_lock<std::mutex> lock(waitForServer_);
while (!serverInstance_) serverInstanceCV_.wait(lock);
}
8 changes: 4 additions & 4 deletions irohad/main/server_runner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,27 @@ limitations under the License.

#include <grpc++/grpc++.h>
#include <grpc++/server_builder.h>
#include <torii/command_service_handler.hpp>
#include <torii/query_service.hpp>

#ifndef MAIN_SERVER_RUNNER_HPP
#define MAIN_SERVER_RUNNER_HPP

namespace torii { class ToriiServiceHandler; }

class ServerRunner {
public:
ServerRunner(const std::string &ip, int port);
~ServerRunner();
void run();
void shutdown();
bool waitForServersReady();
void waitForServersReady();

private:
std::unique_ptr<grpc::Server> serverInstance_;
std::mutex waitForServer_;
std::condition_variable serverInstanceCV_;

std::string serverAddress_;
std::unique_ptr<torii::CommandServiceHandler> commandServiceHandler_;
std::unique_ptr<torii::ToriiServiceHandler> toriiServiceHandler_;
};

#endif // MAIN_SERVER_RUNNER_HPP
16 changes: 4 additions & 12 deletions irohad/torii/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,18 @@
# limitations under the License.
add_subdirectory(processor)

add_library(command_client
command_client.cpp
)
add_library(command_client command_client.cpp)
target_link_libraries(command_client
torii_service
endpoint
)

add_library(command_service
command_service_handler.cpp
)
target_link_libraries(command_service
add_library(torii_service torii_service_handler.cpp)
target_link_libraries(torii_service
endpoint
stateless_validator
)

add_library(query_service query_service.cpp)
target_link_libraries(query_service
endpoint
)

add_library(torii impl/torii_stub.cpp)
target_link_libraries(torii
model
Expand Down
4 changes: 2 additions & 2 deletions irohad/torii/command_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ limitations under the License.
*/

#include <torii/command_client.hpp>
#include <torii/command_service_handler.hpp>
#include <torii/torii_service_handler.hpp>
#include <network/grpc_call.hpp>
#include <block.pb.h>
#include <grpc++/grpc++.h>
Expand Down Expand Up @@ -44,7 +44,7 @@ namespace torii {
stub_->AsyncTorii(&context_, tx, &completionQueue_)
);

using State = network::UntypedCall<torii::CommandServiceHandler>::State;
using State = network::UntypedCall<torii::ToriiServiceHandler>::State;

rpc->Finish(&response, &status_, (void *)static_cast<int>(State::ResponseSent));

Expand Down
3 changes: 1 addition & 2 deletions irohad/torii/command_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ limitations under the License.

#include <endpoint.grpc.pb.h>
#include <endpoint.pb.h>
#include <torii/command_service_handler.hpp>

namespace torii {

/**
* Actual implementation of async CommandService.
* CommandServiceHandler::(SomeMethod)Handler calls a corresponding method in this class.
* ToriiServiceHandler::(SomeMethod)Handler calls a corresponding method in this class.
*/
class CommandService {
public:
Expand Down
27 changes: 0 additions & 27 deletions irohad/torii/query_service.cpp

This file was deleted.

27 changes: 20 additions & 7 deletions irohad/torii/query_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,34 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

#ifndef CONNECTION_TORII_QUERY_SERVICE_HPP
#define CONNECTION_TORII_QUERY_SERVICE_HPP
#ifndef TORII_QUERY_SERVICE_HPP
#define TORII_QUERY_SERVICE_HPP

#include <endpoint.grpc.pb.h>
#include <endpoint.pb.h>

namespace torii {

class QueryService final : public iroha::protocol::QueryService::Service {
/**
* Actual implementation of async QueryService.
* ToriiServiceHandler::(SomeMethod)Handler calls a corresponding method in this class.
*/
class QueryService {
public:
virtual ::grpc::Status Find(::grpc::ServerContext* context,
const ::iroha::protocol::Query* request,
::iroha::protocol::QueryResponse* response) override;
/**
* actual implementation of async Find in QueryService
* @param request - Query
* @param response - QueryResponse
* @return grpc::Status - Status::OK if succeeded. TODO(motxx): grpc::CANCELLED is not supported.
*/
static grpc::Status FindAsync(
iroha::protocol::Query const& request, iroha::protocol::QueryResponse& response) {
response.set_code(iroha::protocol::ResponseCode::OK);
response.set_message("Find async response");
return grpc::Status::OK;
}
};

} // namespace torii

#endif
#endif // TORII_QUERY_SERVICE_HPP
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ limitations under the License.
#include <endpoint.grpc.pb.h>
#include <network/grpc_async_service.hpp>
#include <network/grpc_call.hpp>
#include <torii/command_service_handler.hpp>
#include <torii/torii_service_handler.hpp>
#include <torii/command_service.hpp>
#include <torii/query_service.hpp>
#include <unistd.h>
#include <grpc/support/time.h>

Expand All @@ -30,36 +31,36 @@ namespace torii {
* registers async command service
* @param builder
*/
CommandServiceHandler::CommandServiceHandler(::grpc::ServerBuilder& builder) {
ToriiServiceHandler::ToriiServiceHandler(::grpc::ServerBuilder& builder) {
builder.RegisterService(&commandAsyncService_);
builder.RegisterService(&queryAsyncService_);
completionQueue_ = builder.AddCompletionQueue();
}

CommandServiceHandler::~CommandServiceHandler() {}
ToriiServiceHandler::~ToriiServiceHandler() {}

/**
* shuts down service handler. (actually, shuts down completion queue only)
*/
void CommandServiceHandler::shutdown() {
void ToriiServiceHandler::shutdown() {
completionQueue_->Shutdown();
}

/**
* handles rpcs loop in CommandService.
*/
void CommandServiceHandler::handleRpcs() {
void ToriiServiceHandler::handleRpcs() {
// CommandService::Torii()
enqueueRequest<prot::CommandService::AsyncService, prot::Transaction, prot::ToriiResponse>(
&prot::CommandService::AsyncService::RequestTorii,
&CommandServiceHandler::ToriiHandler,
&ToriiServiceHandler::ToriiHandler,
commandAsyncService_
);

// QueryService::Find()
enqueueRequest<prot::QueryService::AsyncService, prot::Query, prot::QueryResponse>(
&prot::QueryService::AsyncService::RequestFind,
&CommandServiceHandler::QueryFindHandler,
&ToriiServiceHandler::QueryFindHandler,
queryAsyncService_
);

Expand All @@ -77,7 +78,7 @@ namespace torii {
*/
while (completionQueue_->Next(&tag, &ok)) {
auto callbackTag =
static_cast<network::UntypedCall<CommandServiceHandler>::CallOwner*>(tag);
static_cast<network::UntypedCall<ToriiServiceHandler>::CallOwner*>(tag);
if (ok && callbackTag) {
/*assert(callbackTag);*/
callbackTag->onCompleted(this);
Expand All @@ -93,33 +94,33 @@ namespace torii {
* and calls an actual CommandService::AsyncTorii() implementation.
* then, spawns a new Call instance to serve an another client.
*/
void CommandServiceHandler::ToriiHandler(
void ToriiServiceHandler::ToriiHandler(
CommandServiceCall<prot::Transaction, prot::ToriiResponse>* call) {
auto stat = CommandService::ToriiAsync(call->request(), call->response());
call->sendResponse(stat);

// Spawn a new Call instance to serve an another client.
enqueueRequest<prot::CommandService::AsyncService, prot::Transaction, prot::ToriiResponse>(
&prot::CommandService::AsyncService::RequestTorii,
&CommandServiceHandler::ToriiHandler,
&ToriiServiceHandler::ToriiHandler,
commandAsyncService_
);
}

void CommandServiceHandler::QueryFindHandler(
void ToriiServiceHandler::QueryFindHandler(
QueryServiceCall<
iroha::protocol::Query, iroha::protocol::QueryResponse>* call) {

std::cout << "QueryFindHandler called!\n";

auto stat = grpc::Status::OK;//CommandService::ToriiAsync(call->request(), call->response());
std::cout << "HANDLE FIND!\n";
auto stat = QueryService::FindAsync(call->request(), call->response());
call->sendResponse(stat);

std::cout << "OKAY!\n";

// Spawn a new Call instance to serve an another client.
enqueueRequest<prot::CommandService::AsyncService, prot::Transaction, prot::ToriiResponse>(
&prot::CommandService::AsyncService::RequestTorii,
&CommandServiceHandler::ToriiHandler,
commandAsyncService_
enqueueRequest<prot::QueryService::AsyncService, prot::Query, prot::QueryResponse>(
&prot::QueryService::AsyncService::RequestFind,
&ToriiServiceHandler::QueryFindHandler,
queryAsyncService_
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,23 @@ limitations under the License.

namespace torii {
/**
* to handle rpcs loop of CommandService.
* to handle rpcs loop of CommandService and QueryService.
*/
class CommandServiceHandler : public network::GrpcAsyncService {
class ToriiServiceHandler : public network::GrpcAsyncService {
public:

/**
* requires builder to use same server.
* @param builder
*/
CommandServiceHandler(::grpc::ServerBuilder &builder);
ToriiServiceHandler(::grpc::ServerBuilder &builder);

virtual ~CommandServiceHandler() override;
virtual ~ToriiServiceHandler() override;

template <typename RequestType, typename ResponseType>
using CommandServiceCall =
network::Call<
CommandServiceHandler,
ToriiServiceHandler,
iroha::protocol::CommandService::AsyncService,
RequestType,
ResponseType
Expand All @@ -49,7 +49,7 @@ namespace torii {
template <typename RequestType, typename ResponseType>
using QueryServiceCall =
network::Call<
CommandServiceHandler,
ToriiServiceHandler,
iroha::protocol::QueryService::AsyncService,
RequestType,
ResponseType
Expand Down Expand Up @@ -82,12 +82,12 @@ namespace torii {
template <typename AsyncService, typename RequestType, typename ResponseType>
void enqueueRequest(
network::RequestMethod<AsyncService, RequestType, ResponseType> requester,
network::RpcHandler<CommandServiceHandler, AsyncService, RequestType, ResponseType> rpcHandler,
network::RpcHandler<ToriiServiceHandler, AsyncService, RequestType, ResponseType> rpcHandler,
AsyncService& asyncService
) {
std::unique_lock<std::mutex> lock(mtx_);
if (!isShutdown_) {
network::Call<CommandServiceHandler, AsyncService, RequestType, ResponseType>::enqueueRequest(
network::Call<ToriiServiceHandler, AsyncService, RequestType, ResponseType>::enqueueRequest(
&asyncService, completionQueue_.get(), requester, rpcHandler
);
}
Expand Down
1 change: 1 addition & 0 deletions libs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ add_subdirectory(crypto)
add_subdirectory(datetime)
add_subdirectory(timer)
add_subdirectory(logger)
add_subdirectory(torii_utils)
Loading

0 comments on commit 8cffdf2

Please sign in to comment.