diff --git a/irohad/ametsuchi/block_query.hpp b/irohad/ametsuchi/block_query.hpp index 87ca9329d3..70e8fe0ba0 100644 --- a/irohad/ametsuchi/block_query.hpp +++ b/irohad/ametsuchi/block_query.hpp @@ -46,7 +46,7 @@ namespace iroha { * @param account_id - account_id (accountName@domainName) * @return observable of Model Transaction */ - virtual rxcpp::observable getAccountTransactions( + virtual std::vector getAccountTransactions( const shared_model::interface::types::AccountIdType &account_id) = 0; /** @@ -55,7 +55,7 @@ namespace iroha { * @param asset_id - asset_id (assetName#domainName) * @return observable of Model Transaction */ - virtual rxcpp::observable getAccountAssetTransactions( + virtual std::vector getAccountAssetTransactions( const shared_model::interface::types::AccountIdType &account_id, const shared_model::interface::types::AssetIdType &asset_id) = 0; @@ -64,7 +64,7 @@ namespace iroha { * @param tx_hashes - transactions' hashes to retrieve * @return observable of Model Transaction */ - virtual rxcpp::observable> getTransactions( + virtual std::vector> getTransactions( const std::vector &tx_hashes) = 0; /** @@ -73,7 +73,7 @@ namespace iroha { * @param count - number of blocks to retrieve * @return observable of Model Block */ - virtual rxcpp::observable getBlocks( + virtual std::vector getBlocks( shared_model::interface::types::HeightType height, uint32_t count) = 0; @@ -82,7 +82,7 @@ namespace iroha { * @param from - starting height * @return observable of Model Block */ - virtual rxcpp::observable getBlocksFrom( + virtual std::vector getBlocksFrom( shared_model::interface::types::HeightType height) = 0; /** @@ -90,7 +90,7 @@ namespace iroha { * @param count - number of blocks to retrieve * @return observable of Model Block */ - virtual rxcpp::observable getTopBlocks(uint32_t count) = 0; + virtual std::vector getTopBlocks(uint32_t count) = 0; /** * Get height of the top block. diff --git a/irohad/ametsuchi/impl/postgres_block_query.cpp b/irohad/ametsuchi/impl/postgres_block_query.cpp index 50569d631e..fb6682f096 100644 --- a/irohad/ametsuchi/impl/postgres_block_query.cpp +++ b/irohad/ametsuchi/impl/postgres_block_query.cpp @@ -19,37 +19,33 @@ namespace iroha { block_store_(file_store), log_(logger::log("PostgresBlockIndex")) {} - rxcpp::observable PostgresBlockQuery::getBlocks( + std::vector PostgresBlockQuery::getBlocks( shared_model::interface::types::HeightType height, uint32_t count) { shared_model::interface::types::HeightType last_id = block_store_.last_id(); auto to = std::min(last_id, height + count - 1); + std::vector result; if (height > to or count == 0) { - return rxcpp::observable<>::empty(); + return result; } - return rxcpp::observable<>::range(height, to) - .flat_map([this](const auto &i) { - return rxcpp::observable<>::create( - [i, this](const auto &block_subscriber) { - block_store_.get(i) | [](const auto &bytes) { - return shared_model::converters::protobuf::jsonToModel< - shared_model::proto::Block>(bytesToString(bytes)); - } | [&block_subscriber](auto &&block) { - block_subscriber.on_next( - std::make_shared( - std::move(block))); - }; - block_subscriber.on_completed(); - }); - }); + for (auto i = height; i <= to; i++) { + block_store_.get(i) | [](const auto &bytes) { + return shared_model::converters::protobuf::jsonToModel< + shared_model::proto::Block>(bytesToString(bytes)); + } | [&result](auto &&block) { + result.push_back( + std::make_shared(std::move(block))); + }; + } + return result; } - rxcpp::observable PostgresBlockQuery::getBlocksFrom( + std::vector PostgresBlockQuery::getBlocksFrom( shared_model::interface::types::HeightType height) { return getBlocks(height, block_store_.last_id()); } - rxcpp::observable PostgresBlockQuery::getTopBlocks( + std::vector PostgresBlockQuery::getTopBlocks( uint32_t count) { auto last_id = block_store_.last_id(); count = std::min(count, last_id); @@ -92,9 +88,9 @@ namespace iroha { } std::function &result)> - PostgresBlockQuery::callback( - const rxcpp::subscriber &subscriber, uint64_t block_id) { - return [this, &subscriber, block_id](std::vector &result) { + PostgresBlockQuery::callback(std::vector &blocks, + uint64_t block_id) { + return [this, &blocks, block_id](std::vector &result) { auto block = block_store_.get(block_id) | [](const auto &bytes) { return shared_model::converters::protobuf::jsonToModel< shared_model::proto::Block>(bytesToString(bytes)); @@ -112,93 +108,82 @@ namespace iroha { return size; }), [&](const auto &x) { - subscriber.on_next(PostgresBlockQuery::wTransaction( + blocks.push_back(PostgresBlockQuery::wTransaction( clone(block->transactions()[x]))); }); }; } - rxcpp::observable + std::vector PostgresBlockQuery::getAccountTransactions( const shared_model::interface::types::AccountIdType &account_id) { - return rxcpp::observable<>::create( - [this, account_id](const auto &subscriber) { - auto block_ids = this->getBlockIds(account_id); - if (block_ids.empty()) { - subscriber.on_completed(); - return; - } - - for (const auto &block_id : block_ids) { - std::vector index; - soci::indicator ind; - std::string row; - soci::statement st = - (sql_.prepare - << "SELECT DISTINCT index FROM index_by_creator_height " - "WHERE creator_id = :id AND height = :height", - soci::into(row, ind), - soci::use(account_id), - soci::use(block_id)); - st.execute(); - - processSoci(st, ind, row, [&index](std::string &r) { - index.push_back(r); - }); - this->callback(subscriber, block_id)(index); - } - subscriber.on_completed(); - }); + std::vector result; + auto block_ids = this->getBlockIds(account_id); + if (block_ids.empty()) { + return result; + } + for (const auto &block_id : block_ids) { + std::vector index; + soci::indicator ind; + std::string row; + soci::statement st = + (sql_.prepare + << "SELECT DISTINCT index FROM index_by_creator_height " + "WHERE creator_id = :id AND height = :height", + soci::into(row, ind), + soci::use(account_id), + soci::use(block_id)); + st.execute(); + + processSoci( + st, ind, row, [&index](std::string &r) { index.push_back(r); }); + this->callback(result, block_id)(index); + } + return result; } - rxcpp::observable + std::vector PostgresBlockQuery::getAccountAssetTransactions( const shared_model::interface::types::AccountIdType &account_id, const shared_model::interface::types::AssetIdType &asset_id) { - return rxcpp::observable<>::create( - [this, account_id, asset_id](auto subscriber) { - auto block_ids = this->getBlockIds(account_id); - if (block_ids.empty()) { - subscriber.on_completed(); - return; - } - - for (const auto &block_id : block_ids) { - std::vector index; - soci::indicator ind; - std::string row; - soci::statement st = - (sql_.prepare - << "SELECT DISTINCT index FROM index_by_id_height_asset " - "WHERE id = :id AND height = :height AND asset_id = " - ":asset_id", - soci::into(row, ind), - soci::use(account_id), - soci::use(block_id), - soci::use(asset_id)); - st.execute(); - - processSoci(st, ind, row, [&index](std::string &r) { - index.push_back(r); - }); - this->callback(subscriber, block_id)(index); - } - subscriber.on_completed(); - }); + std::vector result; + auto block_ids = this->getBlockIds(account_id); + if (block_ids.empty()) { + return result; + } + + for (const auto &block_id : block_ids) { + std::vector index; + soci::indicator ind; + std::string row; + soci::statement st = + (sql_.prepare + << "SELECT DISTINCT index FROM index_by_id_height_asset " + "WHERE id = :id AND height = :height AND asset_id = " + ":asset_id", + soci::into(row, ind), + soci::use(account_id), + soci::use(block_id), + soci::use(asset_id)); + st.execute(); + + processSoci( + st, ind, row, [&index](std::string &r) { index.push_back(r); }); + this->callback(result, block_id)(index); + } + return result; } - rxcpp::observable> + std::vector> PostgresBlockQuery::getTransactions( const std::vector &tx_hashes) { - return rxcpp::observable<>::create>( - [this, tx_hashes](const auto &subscriber) { - std::for_each(tx_hashes.begin(), - tx_hashes.end(), - [that = this, &subscriber](const auto &tx_hash) { - subscriber.on_next(that->getTxByHashSync(tx_hash)); - }); - subscriber.on_completed(); - }); + std::vector> result; + std::for_each(tx_hashes.begin(), + tx_hashes.end(), + [this, &result](const auto &tx_hash) { + result.push_back(this->getTxByHashSync(tx_hash)); + }); + return result; } boost::optional diff --git a/irohad/ametsuchi/impl/postgres_block_query.hpp b/irohad/ametsuchi/impl/postgres_block_query.hpp index 1f37435dd9..f6efb53a50 100644 --- a/irohad/ametsuchi/impl/postgres_block_query.hpp +++ b/irohad/ametsuchi/impl/postgres_block_query.hpp @@ -26,28 +26,28 @@ namespace iroha { explicit PostgresBlockQuery(soci::session &sql, KeyValueStorage &file_store); - rxcpp::observable getAccountTransactions( + std::vector getAccountTransactions( const shared_model::interface::types::AccountIdType &account_id) override; - rxcpp::observable getAccountAssetTransactions( + std::vector getAccountAssetTransactions( const shared_model::interface::types::AccountIdType &account_id, const shared_model::interface::types::AssetIdType &asset_id) override; - rxcpp::observable> getTransactions( + std::vector> getTransactions( const std::vector &tx_hashes) override; boost::optional getTxByHashSync( const shared_model::crypto::Hash &hash) override; - rxcpp::observable getBlocks( + std::vector getBlocks( shared_model::interface::types::HeightType height, uint32_t count) override; - rxcpp::observable getBlocksFrom( + std::vector getBlocksFrom( shared_model::interface::types::HeightType height) override; - rxcpp::observable getTopBlocks(uint32_t count) override; + std::vector getTopBlocks(uint32_t count) override; uint32_t getTopBlockHeight() override; @@ -80,7 +80,7 @@ namespace iroha { * @return */ std::function &result)> callback( - const rxcpp::subscriber &s, uint64_t block_id); + std::vector &s, uint64_t block_id); soci::session &sql_; diff --git a/irohad/ametsuchi/impl/wsv_restorer_impl.cpp b/irohad/ametsuchi/impl/wsv_restorer_impl.cpp index 1f7ebe6b8a..3b30d9fac8 100644 --- a/irohad/ametsuchi/impl/wsv_restorer_impl.cpp +++ b/irohad/ametsuchi/impl/wsv_restorer_impl.cpp @@ -28,9 +28,8 @@ namespace iroha { expected::Result WsvRestorerImpl::restoreWsv( Storage &storage) { // get all blocks starting from the genesis - std::vector> blocks; - storage.getBlockQuery()->getBlocksFrom(1).as_blocking().subscribe( - [&blocks](auto block) { blocks.push_back(std::move(block)); }); + std::vector> blocks= + storage.getBlockQuery()->getBlocksFrom(1); storage.reset(); diff --git a/irohad/execution/impl/query_execution_impl.cpp b/irohad/execution/impl/query_execution_impl.cpp index ef0991e81e..54eea0b3b3 100644 --- a/irohad/execution/impl/query_execution_impl.cpp +++ b/irohad/execution/impl/query_execution_impl.cpp @@ -290,10 +290,13 @@ QueryExecutionImpl::executeGetAccountAssetTransactions( bq.getAccountAssetTransactions(query.accountId(), query.assetId()); std::vector txs; - acc_asset_tx.subscribe([&](const auto &tx) { - txs.push_back( - *std::static_pointer_cast(tx)); - }); + std::transform( + acc_asset_tx.begin(), + acc_asset_tx.end(), + std::back_inserter(txs), + [](const auto &tx) { + return *std::static_pointer_cast(tx); + }); auto response = QueryResponseBuilder().transactionsResponse(txs); return response; @@ -307,10 +310,13 @@ QueryExecutionImpl::executeGetAccountTransactions( auto acc_tx = bq.getAccountTransactions(query.accountId()); std::vector txs; - acc_tx.subscribe([&](const auto &tx) { - txs.push_back( - *std::static_pointer_cast(tx)); - }); + std::transform( + acc_tx.begin(), + acc_tx.end(), + std::back_inserter(txs), + [](const auto &tx) { + return *std::static_pointer_cast(tx); + }); auto response = QueryResponseBuilder().transactionsResponse(txs); return response; @@ -329,7 +335,7 @@ QueryExecutionImpl::executeGetTransactions( std::vector txs; bool can_get_all = checkAccountRolePermission(accountId, wq, Role::kGetAllTxs); - transactions.subscribe([&](const auto &tx) { + std::for_each(transactions.begin(), transactions.end(), [&](const auto &tx) { if (tx) { auto proto_tx = *std::static_pointer_cast(*tx); diff --git a/irohad/network/impl/block_loader_service.cpp b/irohad/network/impl/block_loader_service.cpp index a0afac026d..2f93a64130 100644 --- a/irohad/network/impl/block_loader_service.cpp +++ b/irohad/network/impl/block_loader_service.cpp @@ -31,13 +31,11 @@ grpc::Status BlockLoaderService::retrieveBlocks( ::grpc::ServerContext *context, const proto::BlocksRequest *request, ::grpc::ServerWriter<::iroha::protocol::Block> *writer) { - storage_->getBlocksFrom(request->height()) - .map([](auto block) { - return std::dynamic_pointer_cast(block) - ->getTransport(); - }) - .as_blocking() - .subscribe([writer](auto block) { writer->Write(block); }); + auto blocks = storage_->getBlocksFrom(request->height()); + std::for_each(blocks.begin(), blocks.end(), [&writer](const auto &block) { + writer->Write(std::dynamic_pointer_cast(block) + ->getTransport()); + }); return grpc::Status::OK; } @@ -53,14 +51,14 @@ grpc::Status BlockLoaderService::retrieveBlock( } boost::optional result; - storage_->getBlocksFrom(1) - .filter([&hash](auto block) { return block->hash() == hash; }) - .map([](auto block) { - return std::dynamic_pointer_cast(block) - ->getTransport(); - }) - .as_blocking() - .subscribe([&result](auto block) { result = block; }); + auto blocks = storage_->getBlocksFrom(1); + std::for_each( + blocks.begin(), blocks.end(), [&result, &hash](const auto &block) { + if (block->hash() == hash) { + result = std::dynamic_pointer_cast(block) + ->getTransport(); + } + }); if (not result) { log_->info("Cannot find block with requested hash"); return grpc::Status(grpc::StatusCode::NOT_FOUND, "Block not found"); diff --git a/test/module/irohad/ametsuchi/ametsuchi_mocks.hpp b/test/module/irohad/ametsuchi/ametsuchi_mocks.hpp index b614a36d3a..49456315cc 100644 --- a/test/module/irohad/ametsuchi/ametsuchi_mocks.hpp +++ b/test/module/irohad/ametsuchi/ametsuchi_mocks.hpp @@ -156,27 +156,27 @@ namespace iroha { public: MOCK_METHOD1( getAccountTransactions, - rxcpp::observable( + std::vector( const shared_model::interface::types::AccountIdType &account_id)); MOCK_METHOD1(getTxByHashSync, boost::optional( const shared_model::crypto::Hash &hash)); MOCK_METHOD2( getAccountAssetTransactions, - rxcpp::observable( + std::vector( const shared_model::interface::types::AccountIdType &account_id, const shared_model::interface::types::AssetIdType &asset_id)); MOCK_METHOD1( getTransactions, - rxcpp::observable>( + std::vector>( const std::vector &tx_hashes)); MOCK_METHOD2(getBlocks, - rxcpp::observable( + std::vector( shared_model::interface::types::HeightType, uint32_t)); MOCK_METHOD1(getBlocksFrom, - rxcpp::observable( + std::vector( shared_model::interface::types::HeightType)); - MOCK_METHOD1(getTopBlocks, rxcpp::observable(uint32_t)); + MOCK_METHOD1(getTopBlocks, std::vector(uint32_t)); MOCK_METHOD0(getTopBlock, expected::Result(void)); MOCK_METHOD1(hasTxWithHash, bool(const shared_model::crypto::Hash &hash)); MOCK_METHOD0(getTopBlockHeight, uint32_t(void)); diff --git a/test/module/irohad/ametsuchi/ametsuchi_test.cpp b/test/module/irohad/ametsuchi/ametsuchi_test.cpp index 5b3186c238..ca32c6ee06 100644 --- a/test/module/irohad/ametsuchi/ametsuchi_test.cpp +++ b/test/module/irohad/ametsuchi/ametsuchi_test.cpp @@ -26,25 +26,6 @@ auto zero_string = std::string(32, '0'); auto fake_hash = shared_model::crypto::Hash(zero_string); auto fake_pubkey = shared_model::crypto::PublicKey(zero_string); -/** - * Shortcut to create CallExact observable wrapper, subscribe with given lambda, - * and validate the number of calls with optional custom output - * @tparam O observable type - * @tparam F on_next function type - * @param o observable object - * @param f function object - * @param call_count number of expected calls - * @param msg custom validation failure message - */ -template -void validateCalls(O &&o, - F &&f, - uint64_t call_count, - const std::string &msg = {}) { - auto wrap = make_test_subscriber(std::forward(o), call_count); - wrap.subscribe(std::forward(f)); - ASSERT_TRUE(wrap.validate()) << "Expected " << call_count << " calls" << msg; -} /** * Validate getAccountTransaction with given parameters @@ -59,11 +40,11 @@ void validateAccountTransactions(B &&blocks, const std::string &account, int call_count, int command_count) { - validateCalls( - blocks->getAccountTransactions(account), - [&](const auto &tx) { EXPECT_EQ(tx->commands().size(), command_count); }, - call_count, - " for " + account); + auto txs = blocks->getAccountTransactions(account); + ASSERT_EQ(txs.size(), call_count); + std::for_each(txs.begin(), txs.end(), [&](const auto &tx) { + EXPECT_EQ(tx->commands().size(), command_count); + }); } /** @@ -81,11 +62,11 @@ void validateAccountAssetTransactions(B &&blocks, const std::string &asset, int call_count, int command_count) { - validateCalls( - blocks->getAccountAssetTransactions(account, asset), - [&](const auto &tx) { EXPECT_EQ(tx->commands().size(), command_count); }, - call_count, - " for " + account + " " + asset); + auto txs = blocks->getAccountAssetTransactions(account, asset); + ASSERT_EQ(txs.size(), call_count); + std::for_each(txs.begin(), txs.end(), [&](const auto &tx) { + EXPECT_EQ(tx->commands().size(), command_count); + }); } /** @@ -155,10 +136,7 @@ TEST_F(AmetsuchiTest, GetBlocksCompletedWhenCalled) { apply(storage, block); - auto completed_wrapper = - make_test_subscriber(blocks->getBlocks(1, 1)); - completed_wrapper.subscribe(); - ASSERT_TRUE(completed_wrapper.validate()); + ASSERT_EQ(*blocks->getBlocks(1, 1)[0], block); } TEST_F(AmetsuchiTest, SampleTest) { @@ -214,12 +192,12 @@ TEST_F(AmetsuchiTest, SampleTest) { // Block store tests auto hashes = {block1.hash(), block2.hash()}; - validateCalls(blocks->getBlocks(1, 2), - [i = 0, &hashes](auto eachBlock) mutable { - EXPECT_EQ(*(hashes.begin() + i), eachBlock->hash()); - ++i; - }, - 2); + + auto stored_blocks = blocks->getBlocks(1, 2); + ASSERT_EQ(2, stored_blocks.size()); + for (size_t i = 0; i < stored_blocks.size(); i++) { + EXPECT_EQ(*(hashes.begin() + i), stored_blocks[i]->hash()); + } validateAccountTransactions(blocks, "admin1", 1, 3); validateAccountTransactions(blocks, user1id, 1, 4); @@ -357,12 +335,12 @@ TEST_F(AmetsuchiTest, queryGetAccountAssetTransactionsTest) { // Block store test auto hashes = {block1.hash(), block2.hash(), block3.hash()}; - validateCalls(blocks->getBlocks(1, 3), - [i = 0, &hashes](auto eachBlock) mutable { - EXPECT_EQ(*(hashes.begin() + i), eachBlock->hash()); - ++i; - }, - 3); + + auto stored_blocks = blocks->getBlocks(1, 3); + ASSERT_EQ(3, stored_blocks.size()); + for (size_t i = 0; i < stored_blocks.size(); i++) { + EXPECT_EQ(*(hashes.begin() + i), stored_blocks[i]->hash()); + } validateAccountTransactions(blocks, admin, 1, 7); validateAccountTransactions(blocks, user3id, 0, 0); diff --git a/test/module/irohad/ametsuchi/block_query_test.cpp b/test/module/irohad/ametsuchi/block_query_test.cpp index 12464cf971..45e0feccd1 100644 --- a/test/module/irohad/ametsuchi/block_query_test.cpp +++ b/test/module/irohad/ametsuchi/block_query_test.cpp @@ -20,7 +20,6 @@ #include "ametsuchi/impl/postgres_block_index.hpp" #include "ametsuchi/impl/postgres_block_query.hpp" #include "converters/protobuf/json_proto_converter.hpp" -#include "framework/test_subscriber.hpp" #include "framework/result_fixture.hpp" #include "module/irohad/ametsuchi/ametsuchi_fixture.hpp" #include "module/irohad/ametsuchi/ametsuchi_mocks.hpp" @@ -28,7 +27,6 @@ #include "module/shared_model/builders/protobuf/test_transaction_builder.hpp" using namespace iroha::ametsuchi; -using namespace framework::test_subscriber; using testing::Return; @@ -45,8 +43,7 @@ class BlockQueryTest : public AmetsuchiTest { index = std::make_shared(*sql); blocks = std::make_shared(*sql, *file); - empty_blocks = - std::make_shared(*sql, *mock_file); + empty_blocks = std::make_shared(*sql, *mock_file); *sql << init_; @@ -113,11 +110,11 @@ class BlockQueryTest : public AmetsuchiTest { */ TEST_F(BlockQueryTest, GetAccountTransactionsFromSeveralBlocks) { // Check that creator1 has created 3 transactions - auto getCreator1TxWrapper = make_test_subscriber( - blocks->getAccountTransactions(creator1), 3); - getCreator1TxWrapper.subscribe( - [this](auto val) { EXPECT_EQ(val->creatorAccountId(), creator1); }); - ASSERT_TRUE(getCreator1TxWrapper.validate()); + auto txs = blocks->getAccountTransactions(creator1); + ASSERT_EQ(txs.size(), 3); + std::for_each(txs.begin(), txs.end(), [&](const auto &tx) { + EXPECT_EQ(tx->creatorAccountId(), creator1); + }); } /** @@ -129,11 +126,11 @@ TEST_F(BlockQueryTest, GetAccountTransactionsFromSeveralBlocks) { */ TEST_F(BlockQueryTest, GetAccountTransactionsFromSingleBlock) { // Check that creator1 has created 1 transaction - auto getCreator2TxWrapper = make_test_subscriber( - blocks->getAccountTransactions(creator2), 1); - getCreator2TxWrapper.subscribe( - [this](auto val) { EXPECT_EQ(val->creatorAccountId(), creator2); }); - ASSERT_TRUE(getCreator2TxWrapper.validate()); + auto txs = blocks->getAccountTransactions(creator2); + ASSERT_EQ(txs.size(), 1); + std::for_each(txs.begin(), txs.end(), [&](const auto &tx) { + EXPECT_EQ(tx->creatorAccountId(), creator2); + }); } /** @@ -144,10 +141,8 @@ TEST_F(BlockQueryTest, GetAccountTransactionsFromSingleBlock) { */ TEST_F(BlockQueryTest, GetAccountTransactionsNonExistingUser) { // Check that "nonexisting" user has no transaction - auto getNonexistingTxWrapper = make_test_subscriber( - blocks->getAccountTransactions("nonexisting user"), 0); - getNonexistingTxWrapper.subscribe(); - ASSERT_TRUE(getNonexistingTxWrapper.validate()); + auto txs = blocks->getAccountTransactions("nonexisting user"); + ASSERT_EQ(txs.size(), 0); } /** @@ -158,20 +153,12 @@ TEST_F(BlockQueryTest, GetAccountTransactionsNonExistingUser) { * @then queried transactions */ TEST_F(BlockQueryTest, GetTransactionsExistingTxHashes) { - auto wrapper = make_test_subscriber( - blocks->getTransactions({tx_hashes[1], tx_hashes[3]}), 2); - wrapper.subscribe([this](auto tx) { - static auto subs_cnt = 0; - subs_cnt++; - if (subs_cnt == 1) { - ASSERT_TRUE(tx); - EXPECT_EQ(tx_hashes[1], (*tx)->hash()); - } else { - ASSERT_TRUE(tx); - EXPECT_EQ(tx_hashes[3], (*tx)->hash()); - } - }); - ASSERT_TRUE(wrapper.validate()); + auto txs = blocks->getTransactions({tx_hashes[1], tx_hashes[3]}); + ASSERT_EQ(txs.size(), 2); + ASSERT_TRUE(txs[0]); + ASSERT_TRUE(txs[1]); + ASSERT_EQ(txs[0].get()->hash(), tx_hashes[1]); + ASSERT_EQ(txs[1].get()->hash(), tx_hashes[3]); } /** @@ -185,11 +172,11 @@ TEST_F(BlockQueryTest, GetTransactionsIncludesNonExistingTxHashes) { shared_model::crypto::Hash invalid_tx_hash_1(zero_string), invalid_tx_hash_2(std::string( shared_model::crypto::DefaultCryptoAlgorithmType::kHashLength, '9')); - auto wrapper = make_test_subscriber( - blocks->getTransactions({invalid_tx_hash_1, invalid_tx_hash_2}), 2); - wrapper.subscribe( - [](auto transaction) { EXPECT_EQ(boost::none, transaction); }); - ASSERT_TRUE(wrapper.validate()); + + auto txs = blocks->getTransactions({invalid_tx_hash_1, invalid_tx_hash_2}); + ASSERT_EQ(txs.size(), 2); + ASSERT_FALSE(txs[0]); + ASSERT_FALSE(txs[1]); } /** @@ -201,10 +188,8 @@ TEST_F(BlockQueryTest, GetTransactionsIncludesNonExistingTxHashes) { */ TEST_F(BlockQueryTest, GetTransactionsWithEmpty) { // transactions' hashes are empty. - auto wrapper = - make_test_subscriber(blocks->getTransactions({}), 0); - wrapper.subscribe(); - ASSERT_TRUE(wrapper.validate()); + auto txs = blocks->getTransactions({}); + ASSERT_EQ(txs.size(), 0); } /** @@ -217,19 +202,11 @@ TEST_F(BlockQueryTest, GetTransactionsWithEmpty) { TEST_F(BlockQueryTest, GetTransactionsWithInvalidTxAndValidTx) { // TODO 15/11/17 motxx - Use EqualList VerificationStrategy shared_model::crypto::Hash invalid_tx_hash_1(zero_string); - auto wrapper = make_test_subscriber( - blocks->getTransactions({invalid_tx_hash_1, tx_hashes[0]}), 2); - wrapper.subscribe([this](auto tx) { - static auto subs_cnt = 0; - subs_cnt++; - if (subs_cnt == 1) { - EXPECT_EQ(boost::none, tx); - } else { - EXPECT_TRUE(tx); - EXPECT_EQ(tx_hashes[0], (*tx)->hash()); - } - }); - ASSERT_TRUE(wrapper.validate()); + auto txs = blocks->getTransactions({invalid_tx_hash_1, tx_hashes[0]}); + ASSERT_EQ(txs.size(), 2); + ASSERT_FALSE(txs[0]); + ASSERT_TRUE(txs[1]); + ASSERT_EQ(txs[1].get()->hash(), tx_hashes[0]); } /** @@ -239,9 +216,8 @@ TEST_F(BlockQueryTest, GetTransactionsWithInvalidTxAndValidTx) { * @then nothing is returned */ TEST_F(BlockQueryTest, GetNonExistentBlock) { - auto wrapper = make_test_subscriber(blocks->getBlocks(1000, 1), 0); - wrapper.subscribe(); - ASSERT_TRUE(wrapper.validate()); + auto stored_blocks = blocks->getBlocks(1000, 1); + ASSERT_TRUE(stored_blocks.empty()); } /** @@ -251,9 +227,8 @@ TEST_F(BlockQueryTest, GetNonExistentBlock) { * @then returned exactly 1 block */ TEST_F(BlockQueryTest, GetExactlyOneBlock) { - auto wrapper = make_test_subscriber(blocks->getBlocks(1, 1), 1); - wrapper.subscribe(); - ASSERT_TRUE(wrapper.validate()); + auto stored_blocks = blocks->getBlocks(1, 1); + ASSERT_EQ(stored_blocks.size(), 1); } /** @@ -263,9 +238,8 @@ TEST_F(BlockQueryTest, GetExactlyOneBlock) { * @then no blocks returned */ TEST_F(BlockQueryTest, GetBlocks_Count0) { - auto wrapper = make_test_subscriber(blocks->getBlocks(1, 0), 0); - wrapper.subscribe(); - ASSERT_TRUE(wrapper.validate()); + auto stored_blocks = blocks->getBlocks(1, 0); + ASSERT_TRUE(stored_blocks.empty()); } /** @@ -275,9 +249,8 @@ TEST_F(BlockQueryTest, GetBlocks_Count0) { * @then no blocks returned */ TEST_F(BlockQueryTest, GetZeroBlock) { - auto wrapper = make_test_subscriber(blocks->getBlocks(0, 1), 0); - wrapper.subscribe(); - ASSERT_TRUE(wrapper.validate()); + auto stored_blocks = blocks->getBlocks(0, 1); + ASSERT_TRUE(stored_blocks.empty()); } /** @@ -287,15 +260,13 @@ TEST_F(BlockQueryTest, GetZeroBlock) { * @then returned all blocks (2) */ TEST_F(BlockQueryTest, GetBlocksFrom1) { - auto wrapper = - make_test_subscriber(blocks->getBlocksFrom(1), blocks_total); - size_t counter = 1; - wrapper.subscribe([&counter](const auto &b) { - // wrapper returns blocks 1 and 2 - ASSERT_EQ(b->height(), counter++) - << "block height: " << b->height() << "counter: " << counter; - }); - ASSERT_TRUE(wrapper.validate()); + auto stored_blocks = blocks->getBlocksFrom(1); + ASSERT_EQ(stored_blocks.size(), blocks_total); + for (size_t i = 0; i < stored_blocks.size(); i++) { + auto b = stored_blocks[i]; + ASSERT_EQ(b->height(), i + 1) + << "block height: " << b->height() << "counter: " << i; + } } /** @@ -316,11 +287,8 @@ TEST_F(BlockQueryTest, GetBlockButItIsNotJSON) { block_file << content; block_file.close(); - auto wrapper = - make_test_subscriber(blocks->getBlocks(block_n, 1), 0); - wrapper.subscribe(); - - ASSERT_TRUE(wrapper.validate()); + auto stored_blocks = blocks->getBlocks(block_n, 1); + ASSERT_TRUE(stored_blocks.empty()); } /** @@ -344,11 +312,8 @@ TEST_F(BlockQueryTest, GetBlockButItIsInvalidBlock) { block_file << content; block_file.close(); - auto wrapper = - make_test_subscriber(blocks->getBlocks(block_n, 1), 0); - wrapper.subscribe(); - - ASSERT_TRUE(wrapper.validate()); + auto stored_blocks = blocks->getBlocks(block_n, 1); + ASSERT_TRUE(stored_blocks.empty()); } /** @@ -359,14 +324,14 @@ TEST_F(BlockQueryTest, GetBlockButItIsInvalidBlock) { */ TEST_F(BlockQueryTest, GetTop2Blocks) { size_t blocks_n = 2; // top 2 blocks - auto wrapper = - make_test_subscriber(blocks->getTopBlocks(blocks_n), blocks_n); - size_t counter = blocks_total - blocks_n + 1; - wrapper.subscribe( - [&counter](const auto &b) { ASSERT_EQ(b->height(), counter++); }); + auto stored_blocks = blocks->getTopBlocks(blocks_n); + ASSERT_EQ(stored_blocks.size(), blocks_n); - ASSERT_TRUE(wrapper.validate()); + for (size_t i = 0; i < blocks_n; i++) { + auto b = stored_blocks[i]; + ASSERT_EQ(b->height(), i + 1); + } } /** @@ -414,5 +379,6 @@ TEST_F(BlockQueryTest, GetTopBlockFail) { auto top_block_error = framework::expected::err(empty_blocks->getTopBlock()); ASSERT_TRUE(top_block_error); - ASSERT_EQ(top_block_error.value().error, "error while fetching the last block"); + ASSERT_EQ(top_block_error.value().error, + "error while fetching the last block"); } diff --git a/test/module/irohad/ametsuchi/block_query_transfer_test.cpp b/test/module/irohad/ametsuchi/block_query_transfer_test.cpp index 906acddc50..caa7c764aa 100644 --- a/test/module/irohad/ametsuchi/block_query_transfer_test.cpp +++ b/test/module/irohad/ametsuchi/block_query_transfer_test.cpp @@ -127,11 +127,9 @@ namespace iroha { tx_hashes.push_back(block.transactions().back().hash()); insert(block); - auto wrapper = make_test_subscriber( - blocks->getAccountAssetTransactions(creator1, asset), 1); - wrapper.subscribe( - [this](auto val) { ASSERT_EQ(tx_hashes.at(0), val->hash()); }); - ASSERT_TRUE(wrapper.validate()); + auto txs = blocks->getAccountAssetTransactions(creator1, asset); + ASSERT_EQ(txs.size(), 1); + ASSERT_EQ(txs[0]->hash(), tx_hashes[0]); } /** @@ -145,11 +143,9 @@ namespace iroha { tx_hashes.push_back(block.transactions().back().hash()); insert(block); - auto wrapper = make_test_subscriber( - blocks->getAccountAssetTransactions(creator2, asset), 1); - wrapper.subscribe( - [this](auto val) { ASSERT_EQ(tx_hashes.at(0), val->hash()); }); - ASSERT_TRUE(wrapper.validate()); + auto txs = blocks->getAccountAssetTransactions(creator2, asset); + ASSERT_EQ(txs.size(), 1); + ASSERT_EQ(txs[0]->hash(), tx_hashes[0]); } /** @@ -163,11 +159,9 @@ namespace iroha { tx_hashes.push_back(block.transactions().back().hash()); insert(block); - auto wrapper = make_test_subscriber( - blocks->getAccountAssetTransactions(creator3, asset), 1); - wrapper.subscribe( - [this](auto val) { ASSERT_EQ(tx_hashes.at(0), val->hash()); }); - ASSERT_TRUE(wrapper.validate()); + auto txs = blocks->getAccountAssetTransactions(creator3, asset); + ASSERT_EQ(txs.size(), 1); + ASSERT_EQ(txs[0]->hash(), tx_hashes[0]); } /** @@ -187,13 +181,11 @@ namespace iroha { tx_hashes.push_back(block.transactions().back().hash()); insert(block2); - auto wrapper = make_test_subscriber( - blocks->getAccountAssetTransactions(creator1, asset), 2); - wrapper.subscribe([ i = 0, this ](auto val) mutable { - ASSERT_EQ(tx_hashes.at(i), val->hash()); - ++i; - }); - ASSERT_TRUE(wrapper.validate()); + auto txs = blocks->getAccountAssetTransactions(creator1, asset); + ASSERT_EQ(txs.size(), 2); + for (size_t i = 0; i < txs.size(); i++) { + ASSERT_EQ(txs[i]->hash(), tx_hashes[i]); + } } } // namespace ametsuchi } // namespace iroha diff --git a/test/module/irohad/execution/query_execution_test.cpp b/test/module/irohad/execution/query_execution_test.cpp index f7c9fc19bc..d96ee3b70c 100644 --- a/test/module/irohad/execution/query_execution_test.cpp +++ b/test/module/irohad/execution/query_execution_test.cpp @@ -77,16 +77,14 @@ class QueryValidateExecuteTest : public ::testing::Test { * @param N * @return observable with transactions */ - rxcpp::observable getDefaultTransactions( - const std::string &creator, size_t N) { - return rxcpp::observable<>::iterate([&creator, &N, this] { - std::vector result; - for (size_t i = 0; i < N; ++i) { - auto current = makeTransaction(creator); - result.push_back(current); - } - return result; - }()); + std::vector getDefaultTransactions( + const std::string &creator, size_t N) { + std::vector result; + for (size_t i = 0; i < N; ++i) { + auto current = makeTransaction(creator); + result.push_back(current); + } + return result; } std::string admin_id = "admin@test", account_id = "test@test", @@ -645,10 +643,10 @@ class GetAccountTransactionsTest : public QueryValidateExecuteTest { void SetUp() override { QueryValidateExecuteTest::SetUp(); role_permissions = {Role::kGetMyAccTxs}; - txs_observable = getDefaultTransactions(account_id, N); + txs = getDefaultTransactions(account_id, N); } - rxcpp::observable txs_observable; + std::vector txs; size_t N = 3; }; @@ -668,10 +666,10 @@ TEST_F(GetAccountTransactionsTest, MyAccountValidCase) { EXPECT_CALL(*wsv_query, getRolePermissions(admin_role)) .WillOnce(Return(role_permissions)); - txs_observable = getDefaultTransactions(admin_id, N); + txs = getDefaultTransactions(admin_id, N); EXPECT_CALL(*block_query, getAccountTransactions(admin_id)) - .WillOnce(Return(txs_observable)); + .WillOnce(Return(txs)); auto response = validateAndExecute(query); ASSERT_NO_THROW({ @@ -706,7 +704,7 @@ TEST_F(GetAccountTransactionsTest, AllAccountValidCase) { .WillOnce(Return(role_permissions)); EXPECT_CALL(*block_query, getAccountTransactions(account_id)) - .WillOnce(Return(txs_observable)); + .WillOnce(Return(txs)); auto response = validateAndExecute(query); ASSERT_NO_THROW({ @@ -741,7 +739,7 @@ TEST_F(GetAccountTransactionsTest, DomainAccountValidCase) { .WillOnce(Return(role_permissions)); EXPECT_CALL(*block_query, getAccountTransactions(account_id)) - .WillOnce(Return(txs_observable)); + .WillOnce(Return(txs)); auto response = validateAndExecute(query); ASSERT_NO_THROW({ @@ -802,7 +800,7 @@ TEST_F(GetAccountTransactionsTest, NoAccountExist) { .WillOnce(Return(role_permissions)); EXPECT_CALL(*block_query, getAccountTransactions("none")) - .WillOnce(Return(rxcpp::observable<>::empty())); + .WillOnce(Return(std::vector())); auto response = validateAndExecute(query); ASSERT_NO_THROW( @@ -817,10 +815,10 @@ class GetAccountAssetsTransactionsTest : public QueryValidateExecuteTest { void SetUp() override { QueryValidateExecuteTest::SetUp(); role_permissions = {Role::kGetMyAccAstTxs}; - txs_observable = getDefaultTransactions(account_id, N); + txs = getDefaultTransactions(account_id, N); } - rxcpp::observable txs_observable; + std::vector txs; size_t N = 3; }; @@ -840,10 +838,10 @@ TEST_F(GetAccountAssetsTransactionsTest, MyAccountValidCase) { EXPECT_CALL(*wsv_query, getRolePermissions(admin_role)) .WillOnce(Return(role_permissions)); - txs_observable = getDefaultTransactions(admin_id, N); + txs = getDefaultTransactions(admin_id, N); EXPECT_CALL(*block_query, getAccountAssetTransactions(admin_id, asset_id)) - .WillOnce(Return(txs_observable)); + .WillOnce(Return(txs)); auto response = validateAndExecute(query); ASSERT_NO_THROW({ @@ -878,7 +876,7 @@ TEST_F(GetAccountAssetsTransactionsTest, AllAccountValidCase) { .WillOnce(Return(role_permissions)); EXPECT_CALL(*block_query, getAccountAssetTransactions(account_id, asset_id)) - .WillOnce(Return(txs_observable)); + .WillOnce(Return(txs)); auto response = validateAndExecute(query); ASSERT_NO_THROW({ @@ -913,7 +911,7 @@ TEST_F(GetAccountAssetsTransactionsTest, DomainAccountValidCase) { .WillOnce(Return(role_permissions)); EXPECT_CALL(*block_query, getAccountAssetTransactions(account_id, asset_id)) - .WillOnce(Return(txs_observable)); + .WillOnce(Return(txs)); auto response = validateAndExecute(query); ASSERT_NO_THROW({ @@ -974,7 +972,7 @@ TEST_F(GetAccountAssetsTransactionsTest, NoAccountExist) { .WillOnce(Return(role_permissions)); EXPECT_CALL(*block_query, getAccountAssetTransactions("none", asset_id)) - .WillOnce(Return(rxcpp::observable<>::empty())); + .WillOnce(Return(std::vector())); auto response = validateAndExecute(query); ASSERT_NO_THROW( @@ -1002,7 +1000,7 @@ TEST_F(GetAccountAssetsTransactionsTest, NoAssetExist) { .WillOnce(Return(role_permissions)); EXPECT_CALL(*block_query, getAccountAssetTransactions(account_id, "none")) - .WillOnce(Return(rxcpp::observable<>::empty())); + .WillOnce(Return(std::vector())); auto response = validateAndExecute(query); ASSERT_NO_THROW( diff --git a/test/module/irohad/network/block_loader_test.cpp b/test/module/irohad/network/block_loader_test.cpp index 44af03020a..101fddc2bf 100644 --- a/test/module/irohad/network/block_loader_test.cpp +++ b/test/module/irohad/network/block_loader_test.cpp @@ -114,7 +114,7 @@ TEST_F(BlockLoaderTest, ValidWhenSameTopBlock) { EXPECT_CALL(*storage, getTopBlock()) .WillOnce(Return(iroha::expected::makeValue(wBlock(clone(block))))); EXPECT_CALL(*storage, getBlocksFrom(block.height() + 1)) - .WillOnce(Return(rxcpp::observable<>::empty())); + .WillOnce(Return(std::vector())); auto wrapper = make_test_subscriber( loader->retrieveBlocks(peer->pubkey()), 0); wrapper.subscribe(); @@ -149,7 +149,7 @@ TEST_F(BlockLoaderTest, ValidWhenOneBlock) { EXPECT_CALL(*storage, getTopBlock()) .WillOnce(Return(iroha::expected::makeValue(wBlock(clone(block))))); EXPECT_CALL(*storage, getBlocksFrom(block.height() + 1)) - .WillOnce(Return(rxcpp::observable<>::just(wBlock(clone(top_block))))); + .WillOnce(Return(std::vector{clone(top_block)})); auto wrapper = make_test_subscriber(loader->retrieveBlocks(peer_key), 1); wrapper.subscribe( @@ -191,7 +191,7 @@ TEST_F(BlockLoaderTest, ValidWhenMultipleBlocks) { EXPECT_CALL(*storage, getTopBlock()) .WillOnce(Return(iroha::expected::makeValue(wBlock(clone(block))))); EXPECT_CALL(*storage, getBlocksFrom(next_height)) - .WillOnce(Return(rxcpp::observable<>::iterate(blocks))); + .WillOnce(Return(blocks)); auto wrapper = make_test_subscriber( loader->retrieveBlocks(peer_key), num_blocks); auto height = next_height; @@ -214,7 +214,7 @@ TEST_F(BlockLoaderTest, ValidWhenBlockPresent) { EXPECT_CALL(*peer_query, getLedgerPeers()) .WillOnce(Return(std::vector{peer})); EXPECT_CALL(*storage, getBlocksFrom(1)) - .WillOnce(Return(rxcpp::observable<>::just(wBlock(clone(requested))))); + .WillOnce(Return(std::vector{clone(requested)})); auto block = loader->retrieveBlock(peer_key, requested.hash()); ASSERT_TRUE(block); @@ -234,7 +234,7 @@ TEST_F(BlockLoaderTest, ValidWhenBlockMissing) { EXPECT_CALL(*peer_query, getLedgerPeers()) .WillOnce(Return(std::vector{peer})); EXPECT_CALL(*storage, getBlocksFrom(1)) - .WillOnce(Return(rxcpp::observable<>::just(wBlock(clone(present))))); + .WillOnce(Return(std::vector{clone(present)})); auto block = loader->retrieveBlock(peer_key, kPrevHash); ASSERT_FALSE(block); diff --git a/test/module/irohad/torii/torii_queries_test.cpp b/test/module/irohad/torii/torii_queries_test.cpp index fc0b845d77..13ee2673e3 100644 --- a/test/module/irohad/torii/torii_queries_test.cpp +++ b/test/module/irohad/torii/torii_queries_test.cpp @@ -476,17 +476,14 @@ TEST_F(ToriiQueriesTest, FindTransactionsWhenValid) { auto account = shared_model::proto::AccountBuilder().accountId("accountA").build(); auto creator = "a@domain"; - auto txs_observable = rxcpp::observable<>::iterate([&account] { - std::vector result; - for (size_t i = 0; i < 3; ++i) { - std::shared_ptr current = - clone(TestTransactionBuilder() - .creatorAccountId(account.accountId()) - .build()); - result.push_back(current); - } - return result; - }()); + std::vector txs; + for (size_t i = 0; i < 3; ++i) { + std::shared_ptr current = + clone(TestTransactionBuilder() + .creatorAccountId(account.accountId()) + .build()); + txs.push_back(current); + } EXPECT_CALL(*wsv_query, getSignatories(creator)) .WillRepeatedly(Return(signatories)); @@ -496,7 +493,7 @@ TEST_F(ToriiQueriesTest, FindTransactionsWhenValid) { perm.set(Role::kGetMyAccTxs); EXPECT_CALL(*wsv_query, getRolePermissions("test")).WillOnce(Return(perm)); EXPECT_CALL(*block_query, getAccountTransactions(creator)) - .WillOnce(Return(txs_observable)); + .WillOnce(Return(txs)); iroha::protocol::QueryResponse response;