Skip to content

Commit

Permalink
Merge pull request hyperledger-iroha#985 from hyperledger/feature/que…
Browse files Browse the repository at this point in the history
…ry_processor_shared_model

Feature/query processor shared model
  • Loading branch information
kamilsa authored Feb 22, 2018
2 parents dfc70a1 + 6798778 commit 3491035
Show file tree
Hide file tree
Showing 23 changed files with 472 additions and 523 deletions.
26 changes: 5 additions & 21 deletions irohad/main/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ Irohad::~Irohad() {
* Initializing iroha daemon
*/
void Irohad::init() {
initProtoFactories();
initPeerQuery();
initCryptoProvider();
initValidators();
Expand Down Expand Up @@ -120,17 +119,6 @@ void Irohad::initStorage() {
log_->info("[Init] => storage", logger::logBool(storage));
}

/**
* Creating transaction, query and query response factories
*/
void Irohad::initProtoFactories() {
pb_tx_factory = std::make_shared<PbTransactionFactory>();
pb_query_factory = std::make_shared<PbQueryFactory>();
pb_query_response_factory = std::make_shared<PbQueryResponseFactory>();

log_->info("[Init] => converters");
}

/**
* Initializing peer query interface
*/
Expand All @@ -153,8 +141,6 @@ void Irohad::initCryptoProvider() {
* Initializing validators
*/
void Irohad::initValidators() {
stateless_validator =
std::make_shared<StatelessValidatorImpl>(crypto_verifier);
stateful_validator = std::make_shared<StatefulValidatorImpl>();
chain_validator = std::make_shared<ChainValidatorImpl>();

Expand Down Expand Up @@ -234,11 +220,10 @@ void Irohad::initPeerCommunicationService() {
* Initializing transaction command service
*/
void Irohad::initTransactionCommandService() {
auto tx_processor =
std::make_shared<TransactionProcessorImpl>(pcs, stateless_validator);
auto tx_processor = std::make_shared<TransactionProcessorImpl>(pcs);

command_service = std::make_unique<::torii::CommandService>(
pb_tx_factory, tx_processor, storage, proposal_delay_);
tx_processor, storage, proposal_delay_);

log_->info("[Init] => command service");
}
Expand All @@ -250,11 +235,10 @@ void Irohad::initQueryService() {
auto query_processing_factory = std::make_unique<QueryProcessingFactory>(
storage->getWsvQuery(), storage->getBlockQuery());

auto query_processor = std::make_shared<QueryProcessorImpl>(
std::move(query_processing_factory), stateless_validator);
auto query_processor =
std::make_shared<QueryProcessorImpl>(std::move(query_processing_factory));

query_service = std::make_unique<::torii::QueryService>(
pb_query_factory, pb_query_response_factory, query_processor);
query_service = std::make_unique<::torii::QueryService>(query_processor);

log_->info("[Init] => query service");
}
Expand Down
9 changes: 0 additions & 9 deletions irohad/main/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ class Irohad {

virtual void initStorage();

virtual void initProtoFactories();

virtual void initPeerQuery();

virtual void initCryptoProvider();
Expand Down Expand Up @@ -132,17 +130,10 @@ class Irohad {

// ------------------------| internal dependencies |-------------------------

// converter factories
std::shared_ptr<iroha::model::converters::PbTransactionFactory> pb_tx_factory;
std::shared_ptr<iroha::model::converters::PbQueryFactory> pb_query_factory;
std::shared_ptr<iroha::model::converters::PbQueryResponseFactory>
pb_query_response_factory;

// crypto provider
std::shared_ptr<iroha::model::ModelCryptoProvider> crypto_verifier;

// validators
std::shared_ptr<iroha::validation::StatelessValidator> stateless_validator;
std::shared_ptr<iroha::validation::StatefulValidator> stateful_validator;
std::shared_ptr<iroha::validation::ChainValidator> chain_validator;

Expand Down
3 changes: 3 additions & 0 deletions irohad/model/converters/impl/pb_query_response_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ namespace iroha {
*query_response)));
}

if (response.has_value()) {
response->set_query_hash(query_response->query_hash.to_string());
}
return response;
}

Expand Down
2 changes: 0 additions & 2 deletions irohad/torii/command_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ namespace torii {
* @param proposal_delay - time of a one proposal propagation.
*/
CommandService(
std::shared_ptr<iroha::model::converters::PbTransactionFactory>
pb_factory,
std::shared_ptr<iroha::torii::TransactionProcessor> tx_processor,
std::shared_ptr<iroha::ametsuchi::Storage> storage,
std::chrono::milliseconds proposal_delay);
Expand Down
5 changes: 1 addition & 4 deletions irohad/torii/impl/command_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,10 @@ using namespace std::chrono_literals;
namespace torii {

CommandService::CommandService(
std::shared_ptr<iroha::model::converters::PbTransactionFactory>
pb_factory,
std::shared_ptr<iroha::torii::TransactionProcessor> tx_processor,
std::shared_ptr<iroha::ametsuchi::Storage> storage,
std::chrono::milliseconds proposal_delay)
: pb_factory_(pb_factory),
tx_processor_(tx_processor),
: tx_processor_(tx_processor),
storage_(storage),
proposal_delay_(proposal_delay),
start_tx_processing_duration_(1s),
Expand Down
92 changes: 56 additions & 36 deletions irohad/torii/impl/query_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,56 +16,76 @@
*/

#include "torii/query_service.hpp"
#include "backend/protobuf/from_old_model.hpp"
#include "backend/protobuf/transaction_responses/proto_tx_response.hpp"
#include "common/types.hpp"
#include "model/sha3_hash.hpp"

namespace torii {

QueryService::QueryService(
std::shared_ptr<iroha::model::converters::PbQueryFactory>
pb_query_factory,
std::shared_ptr<iroha::model::converters::PbQueryResponseFactory>
pb_query_response_factory,
std::shared_ptr<iroha::torii::QueryProcessor> query_processor)
: pb_query_factory_(pb_query_factory),
pb_query_response_factory_(pb_query_response_factory),
query_processor_(query_processor) {
// Subscribe on result from iroha
query_processor_->queryNotifier().subscribe([this](auto iroha_response) {
// Find client to respond
auto res = handler_map_.find(iroha_response->query_hash.to_string());
// Serialize to proto an return to response
res->second =
pb_query_response_factory_->serialize(iroha_response).value();
: query_processor_(query_processor) {
// Subscribe on result from iroha
query_processor_->queryNotifier().subscribe(
[this](const std::shared_ptr<shared_model::interface::QueryResponse>
&iroha_response) {
// Find client to respond
auto hash = iroha_response->queryHash();
auto res = cache_.findItem(hash);

});
if (res) {
cache_.addItem(hash, iroha_response);
}

});
}

void QueryService::Find(iroha::protocol::Query const &request,
iroha::protocol::QueryResponse &response) {
using iroha::operator|;
auto deserializedRequest = pb_query_factory_->deserialize(request);
deserializedRequest | [&](const auto &query) {
auto hash = iroha::hash(*query).to_string();
if (handler_map_.count(hash) > 0) {
// Query was already processed
response.mutable_error_response()->set_reason(
iroha::protocol::ErrorResponse::STATELESS_INVALID);
}
// shared_model::proto::QueryResponse model_response(response);
shared_model::crypto::Hash hash;
shared_model::proto::TransportBuilder<
shared_model::proto::Query,
shared_model::validation::DefaultQueryValidator>()
.build(request)
.match(
[this, &hash, &response](
const iroha::expected::Value<shared_model::proto::Query>
&query) {
hash = query.value.hash();
if (cache_.findItem(hash)) {
// Query was already processed

else {
// Query - response relationship
handler_map_.emplace(hash, response);
// Send query to iroha
query_processor_->queryHandle(query);
}
response.set_query_hash(hash);
};
response.mutable_error_response()->set_reason(
iroha::protocol::ErrorResponse::STATELESS_INVALID);
} else {
// Query - response relationship
cache_.addItem(
hash,
std::make_shared<shared_model::proto::QueryResponse>(
response));
// Send query to iroha
query_processor_->queryHandle(
std::make_shared<shared_model::proto::Query>(query.value));
}

if (not deserializedRequest) {
response.mutable_error_response()->set_reason(
iroha::protocol::ErrorResponse::NOT_SUPPORTED);
}
auto result_response = cache_.findItem(hash).value();
response = static_cast<shared_model::proto::QueryResponse &>(
*result_response)
.getTransport();
},
[&hash, &request, &response](
const iroha::expected::Error<std::string> &error) {
auto blobPayload =
shared_model::proto::makeBlob(request.payload());
hash = shared_model::proto::Query::HashProviderType::makeHash(
blobPayload);
response.set_query_hash(
shared_model::crypto::toBinaryString(hash));
response.mutable_error_response()->set_reason(
iroha::protocol::ErrorResponse::STATELESS_INVALID);
});
}

grpc::Status QueryService::Find(grpc::ServerContext *context,
Expand Down
38 changes: 18 additions & 20 deletions irohad/torii/processor/impl/query_processor_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,33 @@
*/

#include "torii/processor/query_processor_impl.hpp"
#include "cryptography/ed25519_sha3_impl/internal/sha3_hash.hpp"
#include "model/queries/responses/error_response.hpp"
#include "model/sha3_hash.hpp"
#include "backend/protobuf/query_responses/proto_query_response.hpp"
#include "backend/protobuf/from_old_model.hpp"

namespace iroha {
namespace torii {

QueryProcessorImpl::QueryProcessorImpl(
std::unique_ptr<model::QueryProcessingFactory> qpf,
std::shared_ptr<validation::StatelessValidator> stateless_validator)
: qpf_(std::move(qpf)), validator_(stateless_validator) {}
std::unique_ptr<model::QueryProcessingFactory> qpf)
: qpf_(std::move(qpf)) {}

void QueryProcessorImpl::queryHandle(std::shared_ptr<model::Query> query) {
// if not valid send wrong response
if (!validator_->validate(*query)) {
model::ErrorResponse response;
response.query_hash = iroha::hash(*query);
response.reason = model::ErrorResponse::STATELESS_INVALID;
subject_.get_subscriber().on_next(
std::make_shared<model::ErrorResponse>(response));
} else { // else execute query
auto qpf_response = qpf_->execute(query);
subject_.get_subscriber().on_next(qpf_response);
}
}
void QueryProcessorImpl::queryHandle(
std::shared_ptr<shared_model::interface::Query> qry) {
std::shared_ptr<iroha::model::Query> query(qry->makeOldModel());
// TODO: 12.02.2018 grimadas Remove when query_executor has new model, as
// query is already stateless valid when passing to query processor

rxcpp::observable<std::shared_ptr<model::QueryResponse>>
auto qpf_response =
qpf_->execute(std::shared_ptr<const model::Query>(query));
auto qry_resp = shared_model::proto::from_old(qpf_response);
subject_.get_subscriber().on_next(
std::make_shared<shared_model::proto::QueryResponse>(
qry_resp.getTransport()));
}
rxcpp::observable<std::shared_ptr<shared_model::interface::QueryResponse>>
QueryProcessorImpl::queryNotifier() {
return subject_.get_observable();
}

} // namespace torii
} // namespace iroha
19 changes: 6 additions & 13 deletions irohad/torii/processor/impl/transaction_processor_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ namespace iroha {
using validation::StatelessValidator;

TransactionProcessorImpl::TransactionProcessorImpl(
std::shared_ptr<PeerCommunicationService> pcs,
std::shared_ptr<StatelessValidator> validator)
: pcs_(std::move(pcs)), validator_(std::move(validator)) {
std::shared_ptr<PeerCommunicationService> pcs)
: pcs_(std::move(pcs)) {
log_ = logger::log("TxProcessor");

// insert all txs from proposal to proposal set
Expand Down Expand Up @@ -97,17 +96,11 @@ namespace iroha {
model::TransactionResponse response;
response.tx_hash = hash(*transaction).to_string();
response.current_status =
model::TransactionResponse::Status::STATELESS_VALIDATION_FAILED;
model::TransactionResponse::Status::STATELESS_VALIDATION_SUCCESS;

if (validator_->validate(*transaction)) {
response.current_status =
TransactionResponse::Status::STATELESS_VALIDATION_SUCCESS;
pcs_->propagate_transaction(transaction);
}
log_->info(
"stateless validation status: {}",
response.current_status
== TransactionResponse::Status::STATELESS_VALIDATION_SUCCESS);
pcs_->propagate_transaction(transaction);

log_->info("stateless validated");
notifier_.get_subscriber().on_next(
std::make_shared<model::TransactionResponse>(response));
}
Expand Down
9 changes: 7 additions & 2 deletions irohad/torii/processor/query_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
#include "model/query.hpp"
#include "model/query_response.hpp"

#include "interfaces/queries/query.hpp"
#include "interfaces/query_responses/query_response.hpp"

namespace iroha {
namespace torii {

Expand All @@ -37,13 +40,15 @@ namespace iroha {
* @param client - query emitter
* @param query - client intent
*/
virtual void queryHandle(std::shared_ptr<model::Query> query) = 0;
virtual void queryHandle(std::shared_ptr<
shared_model::interface::Query> qry) = 0;

/**
* Subscribe for query responses
* @return observable with query responses
*/
virtual rxcpp::observable<std::shared_ptr<model::QueryResponse>>
virtual rxcpp::observable<
std::shared_ptr<shared_model::interface::QueryResponse>>
queryNotifier() = 0;

virtual ~QueryProcessor(){};
Expand Down
16 changes: 9 additions & 7 deletions irohad/torii/processor/query_processor_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,28 @@ namespace iroha {
class QueryProcessorImpl : public QueryProcessor {
public:
explicit QueryProcessorImpl(
std::unique_ptr<model::QueryProcessingFactory> qpf,
std::shared_ptr<validation::StatelessValidator> stateless_validator);
std::unique_ptr<model::QueryProcessingFactory> qpf);

/**
* Register client query
* @param query - client intent
*/
void queryHandle(std::shared_ptr<model::Query> query) override;
void queryHandle(std::shared_ptr<
shared_model::interface::Query> qry) override;

/**
* Subscribe for query responses
* @return observable with query responses
*/
rxcpp::observable<std::shared_ptr<model::QueryResponse>> queryNotifier()
override;
rxcpp::observable<std::shared_ptr<
shared_model::interface::QueryResponse>>
queryNotifier() override;

private:
rxcpp::subjects::subject<std::shared_ptr<model::QueryResponse>> subject_;
rxcpp::subjects::subject<
std::shared_ptr<shared_model::interface::QueryResponse>>
subject_;
std::unique_ptr<model::QueryProcessingFactory> qpf_;
std::shared_ptr<validation::StatelessValidator> validator_;
};
} // namespace torii
} // namespace iroha
Expand Down
Loading

0 comments on commit 3491035

Please sign in to comment.