diff --git a/cmake/Modules/Findrxcpp.cmake b/cmake/Modules/Findrxcpp.cmake index 36402d5f1d..c17c8d136d 100644 --- a/cmake/Modules/Findrxcpp.cmake +++ b/cmake/Modules/Findrxcpp.cmake @@ -9,8 +9,7 @@ find_package_handle_standard_args(rxcpp DEFAULT_MSG set(URL https://github.com/Reactive-Extensions/rxcpp.git) -# this version is chosen, because it fixes 100% node overload [IR-1736] bug -set(VERSION a7d5856385f126e874db6010d9dbfd37290c61de) +set(VERSION 795587fa311fa41050111a830d91b183d8e53ff9) set_target_description(rxcpp "Library for reactive programming" ${URL} ${VERSION}) @@ -21,7 +20,6 @@ if (NOT rxcpp_FOUND) CONFIGURE_COMMAND "" BUILD_COMMAND "" INSTALL_COMMAND "" # remove install step - UPDATE_COMMAND "" # remove update step TEST_COMMAND "" # remove test step ) externalproject_get_property(reactive_extensions_rxcpp source_dir) diff --git a/docker/dependencies/Dockerfile b/docker/dependencies/Dockerfile index 62be571199..81be6652a9 100644 --- a/docker/dependencies/Dockerfile +++ b/docker/dependencies/Dockerfile @@ -147,8 +147,7 @@ RUN set -e; \ # install rxcpp RUN set -e; \ git clone https://github.com/Reactive-Extensions/RxCpp /tmp/RxCpp; \ - # this version is chosen, because it fixes 100% node overload [IR-1736] bug - (cd /tmp/RxCpp ; git checkout a7d5856385f126e874db6010d9dbfd37290c61de); \ + (cd /tmp/RxCpp ; git checkout 795587fa311fa41050111a830d91b183d8e53ff9); \ cmake \ -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} \ -H/tmp/RxCpp \ diff --git a/docker/develop/Dockerfile b/docker/develop/Dockerfile index 3701b48817..d2132dd925 100644 --- a/docker/develop/Dockerfile +++ b/docker/develop/Dockerfile @@ -148,8 +148,7 @@ RUN set -e; \ # install rxcpp RUN set -e; \ git clone https://github.com/Reactive-Extensions/RxCpp /tmp/RxCpp; \ - # this version is chosen, because it fixes 100% node overload [IR-1736] bug - (cd /tmp/RxCpp ; git checkout a7d5856385f126e874db6010d9dbfd37290c61de); \ + (cd /tmp/RxCpp ; git checkout 795587fa311fa41050111a830d91b183d8e53ff9); \ cmake \ -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} \ -H/tmp/RxCpp \ diff --git a/irohad/ametsuchi/impl/storage_impl.cpp b/irohad/ametsuchi/impl/storage_impl.cpp index 0d004243f7..5a386dc245 100644 --- a/irohad/ametsuchi/impl/storage_impl.cpp +++ b/irohad/ametsuchi/impl/storage_impl.cpp @@ -87,6 +87,7 @@ namespace iroha { block_store_(std::move(block_store)), connection_(std::move(connection)), factory_(std::move(factory)), + notifier_(notifier_lifetime_), converter_(std::move(converter)), perm_converter_(std::move(perm_converter)), block_storage_factory_(std::move(block_storage_factory)), @@ -206,7 +207,8 @@ namespace iroha { response_factory) const { std::shared_lock lock(drop_mutex); if (not connection_) { - log_->info("connection to database is not initialised"); + log_->info( + "createQueryExecutor: connection to database is not initialised"); return boost::none; } return boost::make_optional>( @@ -494,7 +496,8 @@ namespace iroha { try { std::shared_lock lock(drop_mutex); if (not connection_) { - log_->info("connection to database is not initialised"); + log_->info( + "commitPrepared: connection to database is not initialised"); return boost::none; } soci::session sql(*connection_); @@ -533,7 +536,7 @@ namespace iroha { std::shared_ptr StorageImpl::getWsvQuery() const { std::shared_lock lock(drop_mutex); if (not connection_) { - log_->info("connection to database is not initialised"); + log_->info("getWsvQuery: connection to database is not initialised"); return nullptr; } return std::make_shared( @@ -545,7 +548,7 @@ namespace iroha { std::shared_ptr StorageImpl::getBlockQuery() const { std::shared_lock lock(drop_mutex); if (not connection_) { - log_->info("connection to database is not initialised"); + log_->info("getBlockQuery: connection to database is not initialised"); return nullptr; } return std::make_shared( @@ -584,6 +587,7 @@ namespace iroha { } StorageImpl::~StorageImpl() { + notifier_lifetime_.unsubscribe(); freeConnections(); } diff --git a/irohad/ametsuchi/impl/storage_impl.hpp b/irohad/ametsuchi/impl/storage_impl.hpp index 12b9d0170d..2c9ef9cbb6 100644 --- a/irohad/ametsuchi/impl/storage_impl.hpp +++ b/irohad/ametsuchi/impl/storage_impl.hpp @@ -160,6 +160,7 @@ namespace iroha { std::shared_ptr factory_; + rxcpp::composite_subscription notifier_lifetime_; rxcpp::subjects::subject< std::shared_ptr> notifier_; diff --git a/irohad/consensus/yac/CMakeLists.txt b/irohad/consensus/yac/CMakeLists.txt index d835783f54..5491e21bf8 100644 --- a/irohad/consensus/yac/CMakeLists.txt +++ b/irohad/consensus/yac/CMakeLists.txt @@ -36,6 +36,10 @@ target_link_libraries(yac consensus_round gate_object ) +# avoid compilation error due to missing operator<< in Answer variant types +target_compile_definitions(yac + PUBLIC BOOST_NO_IOSTREAM + ) add_library(yac_transport transport/impl/network_impl.cpp diff --git a/irohad/consensus/yac/impl/yac.cpp b/irohad/consensus/yac/impl/yac.cpp index 6c88a90d4c..261667c969 100644 --- a/irohad/consensus/yac/impl/yac.cpp +++ b/irohad/consensus/yac/impl/yac.cpp @@ -50,9 +50,17 @@ namespace iroha { std::shared_ptr crypto, std::shared_ptr timer, ClusterOrdering order, + Round round, + rxcpp::observe_on_one_worker worker, logger::LoggerPtr log) { - return std::make_shared( - vote_storage, network, crypto, timer, order, std::move(log)); + return std::make_shared(vote_storage, + network, + crypto, + timer, + order, + round, + worker, + std::move(log)); } Yac::Yac(YacVoteStorage vote_storage, @@ -60,14 +68,23 @@ namespace iroha { std::shared_ptr crypto, std::shared_ptr timer, ClusterOrdering order, + Round round, + rxcpp::observe_on_one_worker worker, logger::LoggerPtr log) : log_(std::move(log)), cluster_order_(order), + round_(round), + worker_(worker), + notifier_(worker_, notifier_lifetime_), vote_storage_(std::move(vote_storage)), network_(std::move(network)), crypto_(std::move(crypto)), timer_(std::move(timer)) {} + Yac::~Yac() { + notifier_lifetime_.unsubscribe(); + } + // ------|Hash gate|------ void Yac::vote(YacHash hash, ClusterOrdering order) { @@ -75,7 +92,10 @@ namespace iroha { logger::to_string(order.getPeers(), [](auto val) { return val->address(); })); + std::unique_lock lock(mutex_); cluster_order_ = order; + round_ = hash.vote_round; + lock.unlock(); auto vote = crypto_->getVote(hash); // TODO 10.06.2018 andrei: IR-1407 move YAC propagation strategy to a // separate entity @@ -117,7 +137,7 @@ namespace iroha { } void Yac::onState(std::vector state) { - std::lock_guard guard(mutex_); + std::unique_lock guard(mutex_); removeUnknownPeersVotes(state); if (state.empty()) { @@ -126,7 +146,7 @@ namespace iroha { } if (crypto_->verify(state)) { - applyState(state); + applyState(state, guard); } else { log_->warn("{}", cryptoError(state)); } @@ -135,6 +155,8 @@ namespace iroha { // ------|Private interface|------ void Yac::votingStep(VoteMessage vote) { + std::unique_lock lock(mutex_); + auto committed = vote_storage_.isCommitted(vote.hash.vote_round); if (committed) { return; @@ -150,7 +172,9 @@ namespace iroha { network_->sendState(current_leader, {vote}); cluster_order_.switchToNext(); - if (cluster_order_.hasNext()) { + auto has_next = cluster_order_.hasNext(); + lock.unlock(); + if (has_next) { timer_->invokeAfterDelay([this, vote] { this->votingStep(vote); }); } } @@ -172,7 +196,9 @@ namespace iroha { // ------|Apply data|------ - void Yac::applyState(const std::vector &state) { + void Yac::applyState(const std::vector &state, + std::unique_lock &lock) { + assert(lock.owns_lock()); auto answer = vote_storage_.store(state, cluster_order_.getNumberOfPeers()); @@ -208,6 +234,7 @@ namespace iroha { auto votes = [](const auto &state) { return state.votes; }; + auto current_round = round_; switch (processing_state) { case ProposalState::kNotSentNotProcessed: vote_storage_.nextProcessingState(proposal_round); @@ -218,7 +245,10 @@ namespace iroha { case ProposalState::kSentNotProcessed: vote_storage_.nextProcessingState(proposal_round); log_->info("Pass outcome for {} to pipeline", proposal_round); - this->closeRound(); + lock.unlock(); + if (proposal_round >= current_round) { + this->closeRound(); + } notifier_.get_subscriber().on_next(answer); break; case ProposalState::kSentProcessed: @@ -228,6 +258,9 @@ namespace iroha { }, // sent a state which didn't match with current one [&]() { this->tryPropagateBack(state); }); + if (lock.owns_lock()) { + lock.unlock(); + } } void Yac::tryPropagateBack(const std::vector &state) { diff --git a/irohad/consensus/yac/impl/yac_gate_impl.cpp b/irohad/consensus/yac/impl/yac_gate_impl.cpp index bc2a7f0e06..ee03b4bae7 100644 --- a/irohad/consensus/yac/impl/yac_gate_impl.cpp +++ b/irohad/consensus/yac/impl/yac_gate_impl.cpp @@ -32,6 +32,19 @@ namespace iroha { logger::LoggerPtr log) : log_(std::move(log)), current_hash_(), + published_events_(hash_gate->onOutcome() + .flat_map([this](auto message) { + return visit_in_place( + message, + [this](const CommitMessage &msg) { + return this->handleCommit(msg); + }, + [this](const RejectMessage &msg) { + return this->handleReject(msg); + }); + }) + .publish() + .ref_count()), orderer_(std::move(orderer)), hash_provider_(std::move(hash_provider)), block_creator_(std::move(block_creator)), @@ -83,15 +96,7 @@ namespace iroha { } rxcpp::observable YacGateImpl::onOutcome() { - return hash_gate_->onOutcome().flat_map([this](auto message) { - return visit_in_place(message, - [this](const CommitMessage &msg) { - return this->handleCommit(msg); - }, - [this](const RejectMessage &msg) { - return this->handleReject(msg); - }); - }); + return published_events_; } void YacGateImpl::copySignatures(const CommitMessage &commit) { diff --git a/irohad/consensus/yac/impl/yac_gate_impl.hpp b/irohad/consensus/yac/impl/yac_gate_impl.hpp index 893af4aa93..26f2fca07d 100644 --- a/irohad/consensus/yac/impl/yac_gate_impl.hpp +++ b/irohad/consensus/yac/impl/yac_gate_impl.hpp @@ -60,6 +60,7 @@ namespace iroha { YacHash current_hash_; std::shared_ptr current_ledger_state_; + rxcpp::observable published_events_; std::shared_ptr orderer_; std::shared_ptr hash_provider_; std::shared_ptr block_creator_; diff --git a/irohad/consensus/yac/yac.hpp b/irohad/consensus/yac/yac.hpp index c36694ecc5..e73bbd4c35 100644 --- a/irohad/consensus/yac/yac.hpp +++ b/irohad/consensus/yac/yac.hpp @@ -6,16 +6,17 @@ #ifndef IROHA_YAC_HPP #define IROHA_YAC_HPP -#include +#include "consensus/yac/transport/yac_network_interface.hpp" // for YacNetworkNotifications +#include "consensus/yac/yac_gate.hpp" // for HashGate + #include #include -#include +#include +#include #include "consensus/yac/cluster_order.hpp" // for ClusterOrdering #include "consensus/yac/outcome_messages.hpp" // because messages passed by value #include "consensus/yac/storage/yac_vote_storage.hpp" // for VoteStorage -#include "consensus/yac/transport/yac_network_interface.hpp" // for YacNetworkNotifications -#include "consensus/yac/yac_gate.hpp" // for HashGate #include "logger/logger_fwd.hpp" namespace iroha { @@ -37,6 +38,8 @@ namespace iroha { std::shared_ptr crypto, std::shared_ptr timer, ClusterOrdering order, + Round round, + rxcpp::observe_on_one_worker worker, logger::LoggerPtr log); Yac(YacVoteStorage vote_storage, @@ -44,8 +47,12 @@ namespace iroha { std::shared_ptr crypto, std::shared_ptr timer, ClusterOrdering order, + Round round, + rxcpp::observe_on_one_worker worker, logger::LoggerPtr log); + ~Yac() override; + // ------|Hash gate|------ void vote(YacHash hash, ClusterOrdering order) override; @@ -82,7 +89,12 @@ namespace iroha { void removeUnknownPeersVotes(std::vector &votes); // ------|Apply data|------ - void applyState(const std::vector &state); + /** + * @pre lock is locked + * @post lock is unlocked + */ + void applyState(const std::vector &state, + std::unique_lock &lock); // ------|Propagation|------ void propagateState(const std::vector &msg); @@ -97,9 +109,12 @@ namespace iroha { // ------|One round|------ ClusterOrdering cluster_order_; + Round round_; // ------|Fields|------ - rxcpp::subjects::subject notifier_; + rxcpp::observe_on_one_worker worker_; + rxcpp::composite_subscription notifier_lifetime_; + rxcpp::subjects::synchronize notifier_; YacVoteStorage vote_storage_; std::shared_ptr network_; std::shared_ptr crypto_; diff --git a/irohad/main/CMakeLists.txt b/irohad/main/CMakeLists.txt index 62e5b2dbba..12ce6b1e53 100644 --- a/irohad/main/CMakeLists.txt +++ b/irohad/main/CMakeLists.txt @@ -30,10 +30,12 @@ add_library(application impl/block_loader_init.cpp ) target_link_libraries(application - logger - logger_manager + PRIVATE yac yac_transport + PUBLIC + logger + logger_manager server_runner ametsuchi networking diff --git a/irohad/main/application.cpp b/irohad/main/application.cpp index 05fc05e1eb..9d06253e04 100644 --- a/irohad/main/application.cpp +++ b/irohad/main/application.cpp @@ -25,6 +25,7 @@ #include "interfaces/iroha_internal/transaction_batch_parser_impl.hpp" #include "logger/logger.hpp" #include "logger/logger_manager.hpp" +#include "main/impl/consensus_init.hpp" #include "main/server_runner.hpp" #include "multi_sig_transactions/gossip_propagation_strategy.hpp" #include "multi_sig_transactions/mst_processor_impl.hpp" @@ -103,6 +104,8 @@ Irohad::Irohad(const std::string &block_store_dir, opt_mst_gossip_params_(opt_mst_gossip_params), keypair(keypair), ordering_init(logger_manager->getLogger()), + yac_init(std::make_unique()), + consensus_gate_objects(consensus_gate_objects_lifetime), log_manager_(std::move(logger_manager)), log_(log_manager_->getLogger()) { log_->info("created"); @@ -115,6 +118,7 @@ Irohad::Irohad(const std::string &block_store_dir, } Irohad::~Irohad() { + consensus_gate_objects_lifetime.unsubscribe(); consensus_gate_events_subscription.unsubscribe(); } @@ -455,17 +459,34 @@ void Irohad::initBlockLoader() { * Initializing consensus gate */ void Irohad::initConsensusGate() { - consensus_gate = - yac_init.initConsensusGate(storage, - simulator, - block_loader, - keypair, - consensus_result_cache_, - vote_delay_, - async_call_, - common_objects_factory_, - kConsensusConsistencyModel, - log_manager_->getChild("Consensus")); + auto block_query = storage->createBlockQuery(); + if (not block_query) { + log_->error("Failed to create block query"); + return; + } + auto block_var = (*block_query)->getTopBlock(); + if (auto e = boost::get>(&block_var)) { + log_->error("Failed to get the top block: {}", e->error); + return; + } + + auto block = + boost::get< + expected::Value>>( + &block_var) + ->value; + consensus_gate = yac_init->initConsensusGate( + {block->height(), ordering::kFirstRejectRound}, + storage, + simulator, + block_loader, + keypair, + consensus_result_cache_, + vote_delay_, + async_call_, + common_objects_factory_, + kConsensusConsistencyModel, + log_manager_->getChild("Consensus")); consensus_gate->onOutcome().subscribe( consensus_gate_events_subscription, consensus_gate_objects.get_subscriber()); @@ -671,7 +692,7 @@ Irohad::RunResult Irohad::run() { } // Run internal server return internal_server->append(ordering_init.service) - .append(yac_init.getConsensusNetwork()) + .append(yac_init->getConsensusNetwork()) .append(loader_init.service) .run(); }) diff --git a/irohad/main/application.hpp b/irohad/main/application.hpp index e4b536290d..ff04104a97 100644 --- a/irohad/main/application.hpp +++ b/irohad/main/application.hpp @@ -7,13 +7,14 @@ #define IROHA_APPLICATION_HPP #include "consensus/consensus_block_cache.hpp" +#include "consensus/gate_object.hpp" #include "cryptography/crypto_provider/abstract_crypto_model_signer.hpp" +#include "cryptography/keypair.hpp" #include "interfaces/queries/blocks_query.hpp" #include "interfaces/queries/query.hpp" #include "logger/logger_fwd.hpp" #include "logger/logger_manager_fwd.hpp" #include "main/impl/block_loader_init.hpp" -#include "main/impl/consensus_init.hpp" #include "main/impl/on_demand_ordering_init.hpp" #include "multi_sig_transactions/gossip_propagation_strategy_params.hpp" @@ -25,6 +26,11 @@ namespace iroha { class TxPresenceCache; class Storage; } // namespace ametsuchi + namespace consensus { + namespace yac { + class YacInit; + } // namespace yac + } // namespace consensus namespace network { class BlockLoader; class ConsensusGate; @@ -202,7 +208,7 @@ class Irohad { protected: // initialization objects iroha::network::OnDemandOrderingInit ordering_init; - iroha::consensus::yac::YacInit yac_init; + std::unique_ptr yac_init; iroha::network::BlockLoaderInit loader_init; // common objects factory @@ -280,6 +286,7 @@ class Irohad { // consensus gate std::shared_ptr consensus_gate; + rxcpp::composite_subscription consensus_gate_objects_lifetime; rxcpp::subjects::subject consensus_gate_objects; rxcpp::composite_subscription consensus_gate_events_subscription; diff --git a/irohad/main/impl/consensus_init.cpp b/irohad/main/impl/consensus_init.cpp index 11da7be05b..26135a7d5e 100644 --- a/irohad/main/impl/consensus_init.cpp +++ b/irohad/main/impl/consensus_init.cpp @@ -18,6 +18,7 @@ #include "logger/logger_manager.hpp" #include "network/impl/grpc_channel_builder.hpp" +using namespace iroha::consensus; using namespace iroha::consensus::yac; namespace { @@ -42,12 +43,14 @@ namespace { std::shared_ptr createYac( ClusterOrdering initial_order, + Round initial_round, const shared_model::crypto::Keypair &keypair, std::shared_ptr timer, std::shared_ptr network, std::shared_ptr common_objects_factory, ConsistencyModel consistency_model, + rxcpp::observe_on_one_worker coordination, const logger::LoggerManagerTreePtr &consensus_log_manager) { std::shared_ptr cleanup_strategy = std::make_shared(); @@ -59,6 +62,8 @@ namespace { createCryptoProvider(keypair, std::move(common_objects_factory)), std::move(timer), initial_order, + initial_round, + coordination, consensus_log_manager->getChild("HashGate")->getLogger()); } } // namespace @@ -82,6 +87,7 @@ namespace iroha { } std::shared_ptr YacInit::initConsensusGate( + Round initial_round, std::shared_ptr peer_query_factory, std::shared_ptr block_creator, @@ -109,11 +115,13 @@ namespace iroha { consensus_log_manager->getChild("Network")->getLogger()); auto yac = createYac(*ClusterOrdering::create(peers.value()), + initial_round, keypair, createTimer(vote_delay_milliseconds), consensus_network_, std::move(common_objects_factory), consistency_model, + rxcpp::observe_on_new_thread(), consensus_log_manager); consensus_network_->subscribe(yac); diff --git a/irohad/main/impl/consensus_init.hpp b/irohad/main/impl/consensus_init.hpp index 43b3611226..3d1e75076f 100644 --- a/irohad/main/impl/consensus_init.hpp +++ b/irohad/main/impl/consensus_init.hpp @@ -31,6 +31,7 @@ namespace iroha { class YacInit { public: std::shared_ptr initConsensusGate( + Round initial_round, // TODO 30.01.2019 lebdron: IR-262 Remove PeerQueryFactory std::shared_ptr peer_query_factory, std::shared_ptr block_creator, diff --git a/irohad/main/impl/on_demand_ordering_init.cpp b/irohad/main/impl/on_demand_ordering_init.cpp index 47e33301ad..22fe384ffb 100644 --- a/irohad/main/impl/on_demand_ordering_init.cpp +++ b/irohad/main/impl/on_demand_ordering_init.cpp @@ -44,7 +44,9 @@ namespace iroha { namespace network { OnDemandOrderingInit::OnDemandOrderingInit(logger::LoggerPtr log) - : log_(std::move(log)) {} + : sync_event_notifier(sync_event_notifier_lifetime_), + commit_notifier(commit_notifier_lifetime_), + log_(std::move(log)) {} auto OnDemandOrderingInit::createNotificationFactory( std::shared_ptr> @@ -280,8 +282,8 @@ namespace iroha { } OnDemandOrderingInit::~OnDemandOrderingInit() { - sync_event_notifier.get_subscriber().unsubscribe(); - commit_notifier.get_subscriber().unsubscribe(); + sync_event_notifier_lifetime_.unsubscribe(); + commit_notifier_lifetime_.unsubscribe(); } std::shared_ptr diff --git a/irohad/main/impl/on_demand_ordering_init.hpp b/irohad/main/impl/on_demand_ordering_init.hpp index 614c094fe4..b566ac46a0 100644 --- a/irohad/main/impl/on_demand_ordering_init.hpp +++ b/irohad/main/impl/on_demand_ordering_init.hpp @@ -89,6 +89,9 @@ namespace iroha { std::shared_ptr tx_cache, const logger::LoggerManagerTreePtr &ordering_log_manager); + rxcpp::composite_subscription sync_event_notifier_lifetime_; + rxcpp::composite_subscription commit_notifier_lifetime_; + public: /// Constructor. /// @param log - the logger to use for internal messages. @@ -99,7 +102,8 @@ namespace iroha { /** * Initializes on-demand ordering gate and ordering sevice components * - * @param max_number_of_transactions maximum number of transaction in a proposal + * @param max_number_of_transactions maximum number of transactions in a + * proposal * @param delay timeout for ordering service response on proposal request * @param initial_hashes seeds for peer list permutations for first k * rounds they are required since hash of block i defines round i + k diff --git a/irohad/ordering/impl/on_demand_ordering_gate.cpp b/irohad/ordering/impl/on_demand_ordering_gate.cpp index 20c17b2f97..715bad47be 100644 --- a/irohad/ordering/impl/on_demand_ordering_gate.cpp +++ b/irohad/ordering/impl/on_demand_ordering_gate.cpp @@ -64,9 +64,11 @@ OnDemandOrderingGate::OnDemandOrderingGate( })), cache_(std::move(cache)), proposal_factory_(std::move(factory)), - tx_cache_(std::move(tx_cache)) {} + tx_cache_(std::move(tx_cache)), + proposal_notifier_(proposal_notifier_lifetime_) {} OnDemandOrderingGate::~OnDemandOrderingGate() { + proposal_notifier_lifetime_.unsubscribe(); processed_tx_hashes_subscription_.unsubscribe(); round_switch_subscription_.unsubscribe(); } diff --git a/irohad/ordering/impl/on_demand_ordering_gate.hpp b/irohad/ordering/impl/on_demand_ordering_gate.hpp index 561f561e49..3b10343240 100644 --- a/irohad/ordering/impl/on_demand_ordering_gate.hpp +++ b/irohad/ordering/impl/on_demand_ordering_gate.hpp @@ -98,6 +98,7 @@ namespace iroha { proposal_factory_; std::shared_ptr tx_cache_; + rxcpp::composite_subscription proposal_notifier_lifetime_; rxcpp::subjects::subject proposal_notifier_; }; diff --git a/irohad/simulator/impl/simulator.cpp b/irohad/simulator/impl/simulator.cpp index 2359f8a8cb..af2cbcc1f7 100644 --- a/irohad/simulator/impl/simulator.cpp +++ b/irohad/simulator/impl/simulator.cpp @@ -23,7 +23,9 @@ namespace iroha { std::unique_ptr block_factory, logger::LoggerPtr log) - : validator_(std::move(statefulValidator)), + : notifier_(notifier_lifetime_), + block_notifier_(block_notifier_lifetime_), + validator_(std::move(statefulValidator)), ametsuchi_factory_(std::move(factory)), block_query_factory_(block_query_factory), crypto_signer_(std::move(crypto_signer)), @@ -67,6 +69,8 @@ namespace iroha { } Simulator::~Simulator() { + notifier_lifetime_.unsubscribe(); + block_notifier_lifetime_.unsubscribe(); proposal_subscription_.unsubscribe(); verified_proposal_subscription_.unsubscribe(); } diff --git a/irohad/simulator/impl/simulator.hpp b/irohad/simulator/impl/simulator.hpp index 9e392f0dba..069a0558ca 100644 --- a/irohad/simulator/impl/simulator.hpp +++ b/irohad/simulator/impl/simulator.hpp @@ -54,7 +54,9 @@ namespace iroha { private: // internal + rxcpp::composite_subscription notifier_lifetime_; rxcpp::subjects::subject notifier_; + rxcpp::composite_subscription block_notifier_lifetime_; rxcpp::subjects::subject block_notifier_; rxcpp::composite_subscription proposal_subscription_; diff --git a/irohad/synchronizer/impl/synchronizer_impl.cpp b/irohad/synchronizer/impl/synchronizer_impl.cpp index 2ce4e361aa..fa27d23507 100644 --- a/irohad/synchronizer/impl/synchronizer_impl.cpp +++ b/irohad/synchronizer/impl/synchronizer_impl.cpp @@ -27,6 +27,7 @@ namespace iroha { mutable_factory_(std::move(mutable_factory)), block_query_factory_(std::move(block_query_factory)), block_loader_(std::move(block_loader)), + notifier_(notifier_lifetime_), log_(std::move(log)) { consensus_gate->onOutcome().subscribe( subscription_, [this](consensus::GateObject object) { @@ -204,6 +205,7 @@ namespace iroha { } SynchronizerImpl::~SynchronizerImpl() { + notifier_lifetime_.unsubscribe(); subscription_.unsubscribe(); } diff --git a/irohad/synchronizer/impl/synchronizer_impl.hpp b/irohad/synchronizer/impl/synchronizer_impl.hpp index 60f5b6a2a7..30eb04b77a 100644 --- a/irohad/synchronizer/impl/synchronizer_impl.hpp +++ b/irohad/synchronizer/impl/synchronizer_impl.hpp @@ -77,6 +77,7 @@ namespace iroha { std::shared_ptr block_loader_; // internal + rxcpp::composite_subscription notifier_lifetime_; rxcpp::subjects::subject notifier_; rxcpp::composite_subscription subscription_; diff --git a/libs/logger/logger_spdlog.cpp b/libs/logger/logger_spdlog.cpp index 9c1b0ee906..24ce9e39ba 100644 --- a/libs/logger/logger_spdlog.cpp +++ b/libs/logger/logger_spdlog.cpp @@ -17,19 +17,23 @@ namespace { spdlog::level::level_enum getSpdlogLogLevel(logger::LogLevel level) { - static const std::map - kSpdLogLevels = { - {logger::LogLevel::kTrace, spdlog::level::trace}, - {logger::LogLevel::kDebug, spdlog::level::debug}, - {logger::LogLevel::kInfo, spdlog::level::info}, - {logger::LogLevel::kWarn, spdlog::level::warn}, - {logger::LogLevel::kError, spdlog::level::err}, - {logger::LogLevel::kCritical, spdlog::level::critical}}; - const auto it = kSpdLogLevels.find(level); - BOOST_ASSERT_MSG(it != kSpdLogLevels.end(), "Unknown log level!"); - return it == kSpdLogLevels.end() - ? kSpdLogLevels.at(logger::kDefaultLogLevel) - : it->second; + switch (level) { + case logger::LogLevel::kTrace: + return spdlog::level::trace; + case logger::LogLevel::kDebug: + return spdlog::level::debug; + case logger::LogLevel::kInfo: + return spdlog::level::info; + case logger::LogLevel::kWarn: + return spdlog::level::warn; + case logger::LogLevel::kError: + return spdlog::level::err; + case logger::LogLevel::kCritical: + return spdlog::level::critical; + default: + BOOST_ASSERT_MSG(false, "Unknown log level!"); + return spdlog::level::info; + } } std::shared_ptr getOrCreateLogger(const std::string tag) { diff --git a/snap/snapcraft.yaml b/snap/snapcraft.yaml index 156eab2367..0a72282658 100644 --- a/snap/snapcraft.yaml +++ b/snap/snapcraft.yaml @@ -97,8 +97,7 @@ parts: after: [cmake] rxcpp: source: https://github.com/Reactive-Extensions/RxCpp.git - # this version is chosen, because it fixes 100% node overload [IR-1736] bug - source-commit: a7d5856385f126e874db6010d9dbfd37290c61de + source-commit: 795587fa311fa41050111a830d91b183d8e53ff9 plugin: cmake after: [cmake] rapidjson: diff --git a/test/framework/CMakeLists.txt b/test/framework/CMakeLists.txt index 5bbf1221b2..63429c7196 100644 --- a/test/framework/CMakeLists.txt +++ b/test/framework/CMakeLists.txt @@ -32,13 +32,15 @@ add_library(integration_framework integration_framework/port_guard.cpp ) target_link_libraries(integration_framework + PRIVATE + yac_transport + PUBLIC application integration_framework_config_helper command_client common_test_constants query_client ordering_gate_common - yac_transport shared_model_cryptography_model server_runner mst_transport diff --git a/test/framework/integration_framework/integration_test_framework.cpp b/test/framework/integration_framework/integration_test_framework.cpp index 1913f9dfb2..b6aa6ffbaa 100644 --- a/test/framework/integration_framework/integration_test_framework.cpp +++ b/test/framework/integration_framework/integration_test_framework.cpp @@ -45,6 +45,7 @@ #include "module/shared_model/validators/always_valid_validators.hpp" #include "multi_sig_transactions/mst_processor.hpp" #include "multi_sig_transactions/transport/mst_transport_grpc.hpp" +#include "network/consensus_gate.hpp" #include "network/impl/async_grpc_client.hpp" #include "network/impl/grpc_channel_builder.hpp" #include "ordering/impl/on_demand_os_client_grpc.hpp" @@ -403,7 +404,7 @@ namespace integration_framework { ->onExpiredBatches(); } - rxcpp::observable + rxcpp::observable IntegrationTestFramework::getYacOnCommitObservable() { return iroha_instance_->getIrohaInstance()->getConsensusGate()->onOutcome(); } diff --git a/test/framework/integration_framework/integration_test_framework.hpp b/test/framework/integration_framework/integration_test_framework.hpp index fe857b0a92..85b2a0c6b6 100644 --- a/test/framework/integration_framework/integration_test_framework.hpp +++ b/test/framework/integration_framework/integration_test_framework.hpp @@ -383,8 +383,7 @@ namespace integration_framework { rxcpp::observable getMstExpiredBatchesObservable(); - rxcpp::observable - getYacOnCommitObservable(); + rxcpp::observable getYacOnCommitObservable(); rxcpp::observable getPcsOnCommitObservable(); diff --git a/test/framework/test_subscriber.hpp b/test/framework/test_subscriber.hpp index e1f40d419c..86a2a0188b 100644 --- a/test/framework/test_subscriber.hpp +++ b/test/framework/test_subscriber.hpp @@ -27,7 +27,7 @@ namespace framework { */ template class VerificationStrategy { - template + template friend class TestSubscriber; public: @@ -75,8 +75,9 @@ namespace framework { /** * TestSubscriber class provide wrapper for observable * @tparam T type of data in wrapped observable + * @tparam Observable type of observable */ - template + template class TestSubscriber { public: /** @@ -84,7 +85,7 @@ namespace framework { * @param unwrapped_observable - object for wrapping * @param strategy - invariant for validation */ - TestSubscriber(rxcpp::observable unwrapped_observable, + TestSubscriber(Observable unwrapped_observable, std::unique_ptr> strategy) : unwrapped_(unwrapped_observable), strategy_(std::move(strategy)) {} @@ -93,34 +94,34 @@ namespace framework { * for wrapped observable with checking invariant. * @param subscriber - business logic subscriber */ - TestSubscriber &subscribe( + TestSubscriber &subscribe( std::function subscriber = [](T) {}, std::function error = [](std::exception_ptr) {}, std::function completed = []() {}) { - subscription_ = unwrapped_.subscribe( - [this, subscriber](T val) { - // verify before invariant - this->strategy_->on_next_before(val); - - // invoke subscriber - subscriber(val); - - // verify after invariant - this->strategy_->on_next_after(val); - }, - [this, error](std::exception_ptr ep) { - // invoke subscriber - error(ep); - - this->strategy_->on_error(ep); - }, - [this, completed]() { - // invoke subscriber - completed(); - - this->strategy_->on_completed(); - }); + unwrapped_.subscribe(subscription_, + [this, subscriber](T val) { + // verify before invariant + this->strategy_->on_next_before(val); + + // invoke subscriber + subscriber(val); + + // verify after invariant + this->strategy_->on_next_after(val); + }, + [this, error](std::exception_ptr ep) { + // invoke subscriber + error(ep); + + this->strategy_->on_error(ep); + }, + [this, completed]() { + // invoke subscriber + completed(); + + this->strategy_->on_completed(); + }); return *this; } @@ -138,7 +139,7 @@ namespace framework { } private: - rxcpp::observable unwrapped_; + Observable unwrapped_; std::unique_ptr> strategy_; rxcpp::composite_subscription subscription_; }; @@ -165,9 +166,9 @@ namespace framework { typename T, typename SourceOperator, typename... Args> - TestSubscriber make_test_subscriber( + TestSubscriber> make_test_subscriber( O unwrapped_observable, Args &&... args) { - return TestSubscriber( + return TestSubscriber>( unwrapped_observable, std::make_unique>(std::forward(args)...)); } diff --git a/test/integration/acceptance/fake_peer_example_test.cpp b/test/integration/acceptance/fake_peer_example_test.cpp index a4f4b036ea..a33a22c256 100644 --- a/test/integration/acceptance/fake_peer_example_test.cpp +++ b/test/integration/acceptance/fake_peer_example_test.cpp @@ -5,6 +5,7 @@ #include "ametsuchi/impl/storage_impl.hpp" #include "backend/protobuf/proto_proposal_factory.hpp" +#include "consensus/yac/vote_message.hpp" #include "datetime/time.hpp" #include "framework/integration_framework/fake_peer/behaviour/honest.hpp" #include "framework/integration_framework/fake_peer/block_storage.hpp" @@ -42,7 +43,6 @@ class FakePeerExampleFixture : public AcceptanceFixture { fake_peers_ = itf_->addInitialPeers(num_fake_peers); } - /** * Prepare state of ledger: * - create account of target user @@ -300,7 +300,6 @@ TEST_F(FakePeerExampleFixture, SynchronizeTheRightVersionOfForkedLedger) { */ TEST_F(FakePeerExampleFixture, OnDemandOrderingProposalAfterValidCommandReceived) { - // Create the tx: const auto tx = complete( baseTx(kAdminId).transferAsset(kAdminId, kUserId, kAssetId, "tx1", "1.0"), diff --git a/test/integration/consensus/consensus_sunny_day.cpp b/test/integration/consensus/consensus_sunny_day.cpp index 3c522bbe44..6f802a63a8 100644 --- a/test/integration/consensus/consensus_sunny_day.cpp +++ b/test/integration/consensus/consensus_sunny_day.cpp @@ -120,6 +120,8 @@ class ConsensusSunnyDayTest : public ::testing::Test { crypto, timer, order.value(), + initial_round, + rxcpp::observe_on_new_thread(), getTestLogger("Yac")); network->subscribe(yac); @@ -141,6 +143,7 @@ class ConsensusSunnyDayTest : public ::testing::Test { std::shared_ptr my_peer; const std::string my_pub_key; std::vector> default_peers; + iroha::consensus::Round initial_round{1, 1}; }; /** @@ -149,27 +152,34 @@ class ConsensusSunnyDayTest : public ::testing::Test { * @then commit is achieved */ TEST_F(ConsensusSunnyDayTest, SunnyDayTest) { - std::condition_variable cv; - auto wrapper = make_test_subscriber(yac->onOutcome(), 1); - wrapper.subscribe([&cv](auto hash) { - std::cout << "^_^ COMMITTED!!!" << std::endl; - cv.notify_one(); - }); + auto wrapper = make_test_subscriber( + yac->onOutcome() + .timeout(std::chrono::milliseconds(delay_after), + rxcpp::observe_on_new_thread()) + .take(1) + .as_blocking(), + 1); EXPECT_CALL(*crypto, verify(_)).WillRepeatedly(Return(true)); // Wait for other peers to start std::this_thread::sleep_for(std::chrono::milliseconds(delay_before)); - YacHash my_hash(iroha::consensus::Round{1, 1}, "proposal_hash", "block_hash"); + YacHash my_hash(initial_round, "proposal_hash", "block_hash"); my_hash.block_signature = createSig(my_pub_key); auto order = ClusterOrdering::create(default_peers); ASSERT_TRUE(order); yac->vote(my_hash, *order); - std::mutex m; - std::unique_lock lk(m); - cv.wait_for(lk, std::chrono::milliseconds(delay_after)); + + wrapper.subscribe([](auto) { std::cout << "^_^ COMMITTED!!!" << std::endl; }, + [](std::exception_ptr ep) { + try { + std::rethrow_exception(ep); + } catch (const std::exception &e) { + FAIL() << "Error waiting for outcome: " << e.what(); + } + }); ASSERT_TRUE(wrapper.validate()); } diff --git a/test/module/irohad/consensus/yac/yac_fixture.hpp b/test/module/irohad/consensus/yac/yac_fixture.hpp index e7004fd366..36f499b16c 100644 --- a/test/module/irohad/consensus/yac/yac_fixture.hpp +++ b/test/module/irohad/consensus/yac/yac_fixture.hpp @@ -45,6 +45,7 @@ namespace iroha { } return result; }(); + Round initial_round{1, 1}; void SetUp() override { network = std::make_shared(); @@ -70,6 +71,9 @@ namespace iroha { crypto, timer, ordering, + initial_round, + rxcpp::observe_on_one_worker( + rxcpp::schedulers::make_current_thread()), getTestLogger("Yac")); network->subscribe(yac); } diff --git a/test/module/irohad/consensus/yac/yac_simple_cold_case_test.cpp b/test/module/irohad/consensus/yac/yac_simple_cold_case_test.cpp index 3064976df9..75a142b4b1 100644 --- a/test/module/irohad/consensus/yac/yac_simple_cold_case_test.cpp +++ b/test/module/irohad/consensus/yac/yac_simple_cold_case_test.cpp @@ -33,8 +33,7 @@ TEST_F(YacTest, YacWhenVoting) { EXPECT_CALL(*network, sendState(_, _)).Times(default_peers.size()); - YacHash my_hash( - iroha::consensus::Round{1, 1}, "my_proposal_hash", "my_block_hash"); + YacHash my_hash(initial_round, "my_proposal_hash", "my_block_hash"); auto order = ClusterOrdering::create(default_peers); ASSERT_TRUE(order); @@ -56,8 +55,7 @@ TEST_F(YacTest, YacWhenColdStartAndAchieveOneVote) { EXPECT_CALL(*crypto, verify(_)).Times(1).WillRepeatedly(Return(true)); - YacHash received_hash( - iroha::consensus::Round{1, 1}, "my_proposal", "my_block"); + YacHash received_hash(initial_round, "my_proposal", "my_block"); // assume that our peer receive message network->notification->onState({crypto->getVote(received_hash, "0")}); @@ -89,8 +87,7 @@ TEST_F(YacTest, DISABLED_YacWhenColdStartAndAchieveSupermajorityOfVotes) { .Times(default_peers.size()) .WillRepeatedly(Return(true)); - YacHash received_hash( - iroha::consensus::Round{1, 1}, "my_proposal", "my_block"); + YacHash received_hash(initial_round, "my_proposal", "my_block"); for (size_t i = 0; i < default_peers.size(); ++i) { network->notification->onState( {crypto->getVote(received_hash, std::to_string(i))}); @@ -106,8 +103,7 @@ TEST_F(YacTest, DISABLED_YacWhenColdStartAndAchieveSupermajorityOfVotes) { * AND commit is emitted to observable */ TEST_F(YacTest, YacWhenColdStartAndAchieveCommitMessage) { - YacHash propagated_hash( - iroha::consensus::Round{1, 1}, "my_proposal", "my_block"); + YacHash propagated_hash(initial_round, "my_proposal", "my_block"); // verify that commit emitted auto wrapper = make_test_subscriber(yac->onOutcome(), 1); @@ -160,9 +156,9 @@ TEST_F(YacTest, DISABLED_PropagateCommitBeforeNotifyingSubscribersApplyVote) { }); for (size_t i = 0; i < default_peers.size(); ++i) { - yac->onState({createVote( - YacHash(iroha::consensus::Round(1, 0), "proposal_hash", "block_hash"), - std::to_string(i))}); + yac->onState( + {createVote(YacHash(initial_round, "proposal_hash", "block_hash"), + std::to_string(i))}); } // verify that on_commit subscribers are notified @@ -188,8 +184,7 @@ TEST_F(YacTest, PropagateCommitBeforeNotifyingSubscribersApplyReject) { std::vector commit; - auto yac_hash = - YacHash(iroha::consensus::Round(1, 0), "proposal_hash", "block_hash"); + auto yac_hash = YacHash(initial_round, "proposal_hash", "block_hash"); auto f = (default_peers.size() - 1) / iroha::consensus::yac::detail::kSupermajorityCheckerKfPlus1Bft; @@ -202,8 +197,8 @@ TEST_F(YacTest, PropagateCommitBeforeNotifyingSubscribersApplyReject) { auto vote = createVote(yac_hash, std::to_string(default_peers.size() - f)); RejectMessage reject( {vote, - createVote(YacHash(iroha::consensus::Round{1, 1}, "", "my_block"), - std::to_string(default_peers.size() - f + 1))}); + createVote(YacHash(initial_round, "", "my_block"), + std::to_string(default_peers.size() - f + 1))}); commit.push_back(vote); yac->onState(reject.votes);