Skip to content

Commit

Permalink
Feature/remove observables block query (hyperledger-iroha#1591)
Browse files Browse the repository at this point in the history
* remove observables in getBlocks and getTransaction methods

Signed-off-by: Victor Drobny <[email protected]>
  • Loading branch information
vdrobnyi authored Jul 30, 2018
1 parent 6b4cacc commit 433fc8a
Show file tree
Hide file tree
Showing 13 changed files with 259 additions and 340 deletions.
12 changes: 6 additions & 6 deletions irohad/ametsuchi/block_query.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ namespace iroha {
* @param account_id - account_id (accountName@domainName)
* @return observable of Model Transaction
*/
virtual rxcpp::observable<wTransaction> getAccountTransactions(
virtual std::vector<wTransaction> getAccountTransactions(
const shared_model::interface::types::AccountIdType &account_id) = 0;

/**
Expand All @@ -55,7 +55,7 @@ namespace iroha {
* @param asset_id - asset_id (assetName#domainName)
* @return observable of Model Transaction
*/
virtual rxcpp::observable<wTransaction> getAccountAssetTransactions(
virtual std::vector<wTransaction> getAccountAssetTransactions(
const shared_model::interface::types::AccountIdType &account_id,
const shared_model::interface::types::AssetIdType &asset_id) = 0;

Expand All @@ -64,7 +64,7 @@ namespace iroha {
* @param tx_hashes - transactions' hashes to retrieve
* @return observable of Model Transaction
*/
virtual rxcpp::observable<boost::optional<wTransaction>> getTransactions(
virtual std::vector<boost::optional<wTransaction>> getTransactions(
const std::vector<shared_model::crypto::Hash> &tx_hashes) = 0;

/**
Expand All @@ -73,7 +73,7 @@ namespace iroha {
* @param count - number of blocks to retrieve
* @return observable of Model Block
*/
virtual rxcpp::observable<wBlock> getBlocks(
virtual std::vector<wBlock> getBlocks(
shared_model::interface::types::HeightType height,
uint32_t count) = 0;

Expand All @@ -82,15 +82,15 @@ namespace iroha {
* @param from - starting height
* @return observable of Model Block
*/
virtual rxcpp::observable<wBlock> getBlocksFrom(
virtual std::vector<wBlock> getBlocksFrom(
shared_model::interface::types::HeightType height) = 0;

/**
* Get given number of blocks from top.
* @param count - number of blocks to retrieve
* @return observable of Model Block
*/
virtual rxcpp::observable<wBlock> getTopBlocks(uint32_t count) = 0;
virtual std::vector<wBlock> getTopBlocks(uint32_t count) = 0;

/**
* Get height of the top block.
Expand Down
171 changes: 78 additions & 93 deletions irohad/ametsuchi/impl/postgres_block_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,33 @@ namespace iroha {
block_store_(file_store),
log_(logger::log("PostgresBlockIndex")) {}

rxcpp::observable<BlockQuery::wBlock> PostgresBlockQuery::getBlocks(
std::vector<BlockQuery::wBlock> PostgresBlockQuery::getBlocks(
shared_model::interface::types::HeightType height, uint32_t count) {
shared_model::interface::types::HeightType last_id =
block_store_.last_id();
auto to = std::min(last_id, height + count - 1);
std::vector<BlockQuery::wBlock> result;
if (height > to or count == 0) {
return rxcpp::observable<>::empty<wBlock>();
return result;
}
return rxcpp::observable<>::range(height, to)
.flat_map([this](const auto &i) {
return rxcpp::observable<>::create<PostgresBlockQuery::wBlock>(
[i, this](const auto &block_subscriber) {
block_store_.get(i) | [](const auto &bytes) {
return shared_model::converters::protobuf::jsonToModel<
shared_model::proto::Block>(bytesToString(bytes));
} | [&block_subscriber](auto &&block) {
block_subscriber.on_next(
std::make_shared<shared_model::proto::Block>(
std::move(block)));
};
block_subscriber.on_completed();
});
});
for (auto i = height; i <= to; i++) {
block_store_.get(i) | [](const auto &bytes) {
return shared_model::converters::protobuf::jsonToModel<
shared_model::proto::Block>(bytesToString(bytes));
} | [&result](auto &&block) {
result.push_back(
std::make_shared<shared_model::proto::Block>(std::move(block)));
};
}
return result;
}

rxcpp::observable<BlockQuery::wBlock> PostgresBlockQuery::getBlocksFrom(
std::vector<BlockQuery::wBlock> PostgresBlockQuery::getBlocksFrom(
shared_model::interface::types::HeightType height) {
return getBlocks(height, block_store_.last_id());
}

rxcpp::observable<BlockQuery::wBlock> PostgresBlockQuery::getTopBlocks(
std::vector<BlockQuery::wBlock> PostgresBlockQuery::getTopBlocks(
uint32_t count) {
auto last_id = block_store_.last_id();
count = std::min(count, last_id);
Expand Down Expand Up @@ -92,9 +88,9 @@ namespace iroha {
}

std::function<void(std::vector<std::string> &result)>
PostgresBlockQuery::callback(
const rxcpp::subscriber<wTransaction> &subscriber, uint64_t block_id) {
return [this, &subscriber, block_id](std::vector<std::string> &result) {
PostgresBlockQuery::callback(std::vector<wTransaction> &blocks,
uint64_t block_id) {
return [this, &blocks, block_id](std::vector<std::string> &result) {
auto block = block_store_.get(block_id) | [](const auto &bytes) {
return shared_model::converters::protobuf::jsonToModel<
shared_model::proto::Block>(bytesToString(bytes));
Expand All @@ -112,93 +108,82 @@ namespace iroha {
return size;
}),
[&](const auto &x) {
subscriber.on_next(PostgresBlockQuery::wTransaction(
blocks.push_back(PostgresBlockQuery::wTransaction(
clone(block->transactions()[x])));
});
};
}

rxcpp::observable<BlockQuery::wTransaction>
std::vector<BlockQuery::wTransaction>
PostgresBlockQuery::getAccountTransactions(
const shared_model::interface::types::AccountIdType &account_id) {
return rxcpp::observable<>::create<wTransaction>(
[this, account_id](const auto &subscriber) {
auto block_ids = this->getBlockIds(account_id);
if (block_ids.empty()) {
subscriber.on_completed();
return;
}

for (const auto &block_id : block_ids) {
std::vector<std::string> index;
soci::indicator ind;
std::string row;
soci::statement st =
(sql_.prepare
<< "SELECT DISTINCT index FROM index_by_creator_height "
"WHERE creator_id = :id AND height = :height",
soci::into(row, ind),
soci::use(account_id),
soci::use(block_id));
st.execute();

processSoci(st, ind, row, [&index](std::string &r) {
index.push_back(r);
});
this->callback(subscriber, block_id)(index);
}
subscriber.on_completed();
});
std::vector<BlockQuery::wTransaction> result;
auto block_ids = this->getBlockIds(account_id);
if (block_ids.empty()) {
return result;
}
for (const auto &block_id : block_ids) {
std::vector<std::string> index;
soci::indicator ind;
std::string row;
soci::statement st =
(sql_.prepare
<< "SELECT DISTINCT index FROM index_by_creator_height "
"WHERE creator_id = :id AND height = :height",
soci::into(row, ind),
soci::use(account_id),
soci::use(block_id));
st.execute();

processSoci(
st, ind, row, [&index](std::string &r) { index.push_back(r); });
this->callback(result, block_id)(index);
}
return result;
}

rxcpp::observable<BlockQuery::wTransaction>
std::vector<BlockQuery::wTransaction>
PostgresBlockQuery::getAccountAssetTransactions(
const shared_model::interface::types::AccountIdType &account_id,
const shared_model::interface::types::AssetIdType &asset_id) {
return rxcpp::observable<>::create<wTransaction>(
[this, account_id, asset_id](auto subscriber) {
auto block_ids = this->getBlockIds(account_id);
if (block_ids.empty()) {
subscriber.on_completed();
return;
}

for (const auto &block_id : block_ids) {
std::vector<std::string> index;
soci::indicator ind;
std::string row;
soci::statement st =
(sql_.prepare
<< "SELECT DISTINCT index FROM index_by_id_height_asset "
"WHERE id = :id AND height = :height AND asset_id = "
":asset_id",
soci::into(row, ind),
soci::use(account_id),
soci::use(block_id),
soci::use(asset_id));
st.execute();

processSoci(st, ind, row, [&index](std::string &r) {
index.push_back(r);
});
this->callback(subscriber, block_id)(index);
}
subscriber.on_completed();
});
std::vector<BlockQuery::wTransaction> result;
auto block_ids = this->getBlockIds(account_id);
if (block_ids.empty()) {
return result;
}

for (const auto &block_id : block_ids) {
std::vector<std::string> index;
soci::indicator ind;
std::string row;
soci::statement st =
(sql_.prepare
<< "SELECT DISTINCT index FROM index_by_id_height_asset "
"WHERE id = :id AND height = :height AND asset_id = "
":asset_id",
soci::into(row, ind),
soci::use(account_id),
soci::use(block_id),
soci::use(asset_id));
st.execute();

processSoci(
st, ind, row, [&index](std::string &r) { index.push_back(r); });
this->callback(result, block_id)(index);
}
return result;
}

rxcpp::observable<boost::optional<BlockQuery::wTransaction>>
std::vector<boost::optional<BlockQuery::wTransaction>>
PostgresBlockQuery::getTransactions(
const std::vector<shared_model::crypto::Hash> &tx_hashes) {
return rxcpp::observable<>::create<boost::optional<wTransaction>>(
[this, tx_hashes](const auto &subscriber) {
std::for_each(tx_hashes.begin(),
tx_hashes.end(),
[that = this, &subscriber](const auto &tx_hash) {
subscriber.on_next(that->getTxByHashSync(tx_hash));
});
subscriber.on_completed();
});
std::vector<boost::optional<BlockQuery::wTransaction>> result;
std::for_each(tx_hashes.begin(),
tx_hashes.end(),
[this, &result](const auto &tx_hash) {
result.push_back(this->getTxByHashSync(tx_hash));
});
return result;
}

boost::optional<BlockQuery::wTransaction>
Expand Down
14 changes: 7 additions & 7 deletions irohad/ametsuchi/impl/postgres_block_query.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,28 @@ namespace iroha {
explicit PostgresBlockQuery(soci::session &sql,
KeyValueStorage &file_store);

rxcpp::observable<wTransaction> getAccountTransactions(
std::vector<wTransaction> getAccountTransactions(
const shared_model::interface::types::AccountIdType &account_id)
override;

rxcpp::observable<wTransaction> getAccountAssetTransactions(
std::vector<wTransaction> getAccountAssetTransactions(
const shared_model::interface::types::AccountIdType &account_id,
const shared_model::interface::types::AssetIdType &asset_id) override;

rxcpp::observable<boost::optional<wTransaction>> getTransactions(
std::vector<boost::optional<wTransaction>> getTransactions(
const std::vector<shared_model::crypto::Hash> &tx_hashes) override;

boost::optional<wTransaction> getTxByHashSync(
const shared_model::crypto::Hash &hash) override;

rxcpp::observable<wBlock> getBlocks(
std::vector<wBlock> getBlocks(
shared_model::interface::types::HeightType height,
uint32_t count) override;

rxcpp::observable<wBlock> getBlocksFrom(
std::vector<wBlock> getBlocksFrom(
shared_model::interface::types::HeightType height) override;

rxcpp::observable<wBlock> getTopBlocks(uint32_t count) override;
std::vector<wBlock> getTopBlocks(uint32_t count) override;

uint32_t getTopBlockHeight() override;

Expand Down Expand Up @@ -80,7 +80,7 @@ namespace iroha {
* @return
*/
std::function<void(std::vector<std::string> &result)> callback(
const rxcpp::subscriber<wTransaction> &s, uint64_t block_id);
std::vector<wTransaction> &s, uint64_t block_id);

soci::session &sql_;

Expand Down
5 changes: 2 additions & 3 deletions irohad/ametsuchi/impl/wsv_restorer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ namespace iroha {
expected::Result<void, std::string> WsvRestorerImpl::restoreWsv(
Storage &storage) {
// get all blocks starting from the genesis
std::vector<std::shared_ptr<shared_model::interface::Block>> blocks;
storage.getBlockQuery()->getBlocksFrom(1).as_blocking().subscribe(
[&blocks](auto block) { blocks.push_back(std::move(block)); });
std::vector<std::shared_ptr<shared_model::interface::Block>> blocks=
storage.getBlockQuery()->getBlocksFrom(1);

storage.reset();

Expand Down
24 changes: 15 additions & 9 deletions irohad/execution/impl/query_execution_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,13 @@ QueryExecutionImpl::executeGetAccountAssetTransactions(
bq.getAccountAssetTransactions(query.accountId(), query.assetId());

std::vector<shared_model::proto::Transaction> txs;
acc_asset_tx.subscribe([&](const auto &tx) {
txs.push_back(
*std::static_pointer_cast<shared_model::proto::Transaction>(tx));
});
std::transform(
acc_asset_tx.begin(),
acc_asset_tx.end(),
std::back_inserter(txs),
[](const auto &tx) {
return *std::static_pointer_cast<shared_model::proto::Transaction>(tx);
});

auto response = QueryResponseBuilder().transactionsResponse(txs);
return response;
Expand All @@ -307,10 +310,13 @@ QueryExecutionImpl::executeGetAccountTransactions(
auto acc_tx = bq.getAccountTransactions(query.accountId());

std::vector<shared_model::proto::Transaction> txs;
acc_tx.subscribe([&](const auto &tx) {
txs.push_back(
*std::static_pointer_cast<shared_model::proto::Transaction>(tx));
});
std::transform(
acc_tx.begin(),
acc_tx.end(),
std::back_inserter(txs),
[](const auto &tx) {
return *std::static_pointer_cast<shared_model::proto::Transaction>(tx);
});

auto response = QueryResponseBuilder().transactionsResponse(txs);
return response;
Expand All @@ -329,7 +335,7 @@ QueryExecutionImpl::executeGetTransactions(
std::vector<shared_model::proto::Transaction> txs;
bool can_get_all =
checkAccountRolePermission(accountId, wq, Role::kGetAllTxs);
transactions.subscribe([&](const auto &tx) {
std::for_each(transactions.begin(), transactions.end(), [&](const auto &tx) {
if (tx) {
auto proto_tx =
*std::static_pointer_cast<shared_model::proto::Transaction>(*tx);
Expand Down
Loading

0 comments on commit 433fc8a

Please sign in to comment.