Skip to content

Commit

Permalink
Merge pull request hyperledger-iroha#966 from hyperledger/feature/sha…
Browse files Browse the repository at this point in the history
…red-model-block-query

Feature/shared model block query
  • Loading branch information
vdrobnyi authored Mar 1, 2018
2 parents e60574e + 237157d commit 9ae0f97
Show file tree
Hide file tree
Showing 19 changed files with 754 additions and 661 deletions.
1 change: 1 addition & 0 deletions irohad/ametsuchi/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ target_link_libraries(ametsuchi
libs_common
command_execution
boost
model_interfaces
)
38 changes: 20 additions & 18 deletions irohad/ametsuchi/block_query.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,77 +23,79 @@
#include <rxcpp/rx-observable.hpp>

#include "common/types.hpp"
#include "interfaces/iroha_internal/block.hpp"
#include "interfaces/transaction.hpp"

namespace iroha {

namespace model {
struct Transaction;
struct Block;
}

namespace ametsuchi {
/**
* Public interface for queries on blocks and transactions
*/
class BlockQuery {
protected:
using wTransaction =
std::shared_ptr<shared_model::interface::Transaction>;
using wBlock = std::shared_ptr<shared_model::interface::Block>;

public:
virtual ~BlockQuery() = default;
/**
* Get all transactions of an account.
* @param account_id - account_id (accountName@domainName)
* @return observable of Model Transaction
*/
virtual rxcpp::observable<model::Transaction> getAccountTransactions(
const std::string &account_id) = 0;
virtual rxcpp::observable<wTransaction> getAccountTransactions(
const shared_model::interface::types::AccountIdType &account_id) = 0;

/**
* Get asset transactions of an account.
* @param account_id - account_id (accountName@domainName)
* @param asset_id - asset_id (assetName#domainName)
* @return observable of Model Transaction
*/
virtual rxcpp::observable<model::Transaction> getAccountAssetTransactions(
const std::string &account_id, const std::string &asset_id) = 0;
virtual rxcpp::observable<wTransaction> getAccountAssetTransactions(
const shared_model::interface::types::AccountIdType &account_id,
const shared_model::interface::types::AssetIdType &asset_id) = 0;

/**
* Get transactions from transactions' hashes
* @param tx_hashes - transactions' hashes to retrieve
* @return observable of Model Transaction
*/
virtual rxcpp::observable<boost::optional<model::Transaction>>
getTransactions(const std::vector<iroha::hash256_t> &tx_hashes) = 0;
virtual rxcpp::observable<boost::optional<wTransaction>> getTransactions(
const std::vector<shared_model::crypto::Hash> &tx_hashes) = 0;

/**
* Get given number of blocks starting with given height.
* @param height - starting height
* @param count - number of blocks to retrieve
* @return observable of Model Block
*/
virtual rxcpp::observable<model::Block> getBlocks(uint32_t height,
uint32_t count) = 0;
virtual rxcpp::observable<wBlock> getBlocks(shared_model::interface::types::HeightType height,
uint32_t count) = 0;

/**
* Get all blocks starting from given height.
* @param from - starting height
* @return observable of Model Block
*/
virtual rxcpp::observable<model::Block> getBlocksFrom(
uint32_t height) = 0;
virtual rxcpp::observable<wBlock> getBlocksFrom(shared_model::interface::types::HeightType height) = 0;

/**
* Get given number of blocks from top.
* @param count - number of blocks to retrieve
* @return observable of Model Block
*/
virtual rxcpp::observable<model::Block> getTopBlocks(uint32_t count) = 0;
virtual rxcpp::observable<wBlock> getTopBlocks(uint32_t count) = 0;

/**
* Synchronously gets transaction by its hash
* @param hash - hash to search
* @return transaction or boost::none
*/
virtual boost::optional<model::Transaction> getTxByHashSync(
const std::string &hash) = 0;
virtual boost::optional<wTransaction> getTxByHashSync(
const shared_model::crypto::Hash &hash) = 0;
};
} // namespace ametsuchi
} // namespace iroha
Expand Down
159 changes: 88 additions & 71 deletions irohad/ametsuchi/impl/postgres_block_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
#include "ametsuchi/impl/postgres_block_query.hpp"
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/algorithm/for_each.hpp>
#include "model/sha3_hash.hpp"

#include "backend/protobuf/from_old_model.hpp"
namespace iroha {
namespace ametsuchi {

Expand All @@ -30,105 +29,110 @@ namespace iroha {
log_(logger::log("PostgresBlockIndex")),
execute_{makeExecuteOptional(transaction_, log_)} {}

rxcpp::observable<model::Block> PostgresBlockQuery::getBlocks(
uint32_t height, uint32_t count) {
auto last_id = block_store_.last_id();
rxcpp::observable<BlockQuery::wBlock> PostgresBlockQuery::getBlocks(
shared_model::interface::types::HeightType height, uint32_t count) {
shared_model::interface::types::HeightType last_id =
block_store_.last_id();
auto to = std::min(last_id, height + count - 1);
if (height > to or count == 0) {
return rxcpp::observable<>::empty<model::Block>();
return rxcpp::observable<>::empty<wBlock>();
}

return rxcpp::observable<>::range(height, to).flat_map([this](auto i) {
auto bytes = block_store_.get(i);
return rxcpp::observable<>::create<model::Block>([this, bytes](auto s) {
if (not bytes.has_value()) {
s.on_completed();
return;
}
auto document =
model::converters::stringToJson(bytesToString(bytes.value()));
if (not document.has_value()) {
s.on_completed();
return;
}
auto block = serializer_.deserialize(document.value());
if (not block.has_value()) {
s.on_completed();
return;
}
s.on_next(block.value());
s.on_completed();
});
// TODO IR-975 victordrobny 12.02.2018 convert directly to
// shared_model::proto::Block after FlatFile will be reworked to new
// model
auto block = block_store_.get(i) | [](const auto &bytes) {
return model::converters::stringToJson(bytesToString(bytes));
} | [this](const auto &d) {
return serializer_.deserialize(d);
} | [](const auto &block_old) {
return std::make_shared<shared_model::proto::Block>(
shared_model::proto::from_old(block_old));
};
return rxcpp::observable<>::create<PostgresBlockQuery::wBlock>(
[this, block{std::move(block)}](auto s) {
if (block) {
s.on_next(block);
}
s.on_completed();
});
});
}

rxcpp::observable<model::Block> PostgresBlockQuery::getBlocksFrom(
uint32_t height) {
rxcpp::observable<BlockQuery::wBlock> PostgresBlockQuery::getBlocksFrom(
shared_model::interface::types::HeightType height) {
return getBlocks(height, block_store_.last_id());
}

rxcpp::observable<model::Block> PostgresBlockQuery::getTopBlocks(
rxcpp::observable<BlockQuery::wBlock> PostgresBlockQuery::getTopBlocks(
uint32_t count) {
auto last_id = block_store_.last_id();
count = std::min(count, last_id);
return getBlocks(last_id - count + 1, count);
}

std::vector<iroha::model::Block::BlockHeightType>
PostgresBlockQuery::getBlockIds(const std::string &account_id) {
std::vector<shared_model::interface::types::HeightType>
PostgresBlockQuery::getBlockIds(
const shared_model::interface::types::AccountIdType &account_id) {
return execute_(
"SELECT DISTINCT height FROM height_by_account_set WHERE "
"account_id = "
+ transaction_.quote(account_id) + ";")
| [&](const auto &result)
-> std::vector<iroha::model::Block::BlockHeightType> {
return transform<iroha::model::Block::BlockHeightType>(
-> std::vector<shared_model::interface::types::HeightType> {
return transform<shared_model::interface::types::HeightType>(
result, [&](const auto &row) {
return row.at("height")
.template as<iroha::model::Block::BlockHeightType>();
.template as<shared_model::interface::types::HeightType>();
});
};
}

boost::optional<iroha::model::Block::BlockHeightType>
PostgresBlockQuery::getBlockId(const std::string &hash) {
boost::optional<shared_model::interface::types::HeightType>
PostgresBlockQuery::getBlockId(const shared_model::crypto::Hash &hash) {
boost::optional<uint64_t> blockId;
return execute_("SELECT height FROM height_by_hash WHERE hash = "
+ transaction_.quote(
pqxx::binarystring(hash.data(), hash.size()))
+ transaction_.quote(pqxx::binarystring(
hash.blob().data(), hash.blob().size()))
+ ";")
| [&](const auto &result)
-> boost::optional<iroha::model::Block::BlockHeightType> {
-> boost::optional<
shared_model::interface::types::HeightType> {
if (result.size() == 0) {
return boost::none;
}
return result[0]
.at("height")
.template as<iroha::model::Block::BlockHeightType>();
.template as<shared_model::interface::types::HeightType>();
};
}

std::function<void(pqxx::result &result)> PostgresBlockQuery::callback(
const rxcpp::subscriber<model::Transaction> &subscriber,
uint64_t block_id) {
const rxcpp::subscriber<wTransaction> &subscriber, uint64_t block_id) {
return [this, &subscriber, block_id](pqxx::result &result) {
auto block = block_store_.get(block_id) | [](auto bytes) {
return model::converters::stringToJson(bytesToString(bytes));
} | [this](const auto &json) { return serializer_.deserialize(json); };
auto block = block_store_.get(block_id) | [this](auto bytes) {
// TODO IR-975 victordrobny 12.02.2018 convert directly to
// shared_model::proto::Block after FlatFile will be reworked to new
// model
return boost::optional<shared_model::proto::Block>(
shared_model::proto::from_old(*serializer_.deserialize(
*model::converters::stringToJson(bytesToString(bytes)))));
};
boost::for_each(
result | boost::adaptors::transformed([&block](const auto &x) {
return x.at("index").template as<size_t>();
}),
[&](auto x) {
const auto &tx = block->transactions.at(x);
subscriber.on_next(tx);
subscriber.on_next(PostgresBlockQuery::wTransaction(
block->transactions().at(x)->copy()));
});
};
}

rxcpp::observable<model::Transaction>
PostgresBlockQuery::getAccountTransactions(const std::string &account_id) {
return rxcpp::observable<>::create<model::Transaction>(
rxcpp::observable<BlockQuery::wTransaction>
PostgresBlockQuery::getAccountTransactions(
const shared_model::interface::types::AccountIdType &account_id) {
return rxcpp::observable<>::create<wTransaction>(
[this, account_id](auto subscriber) {
auto block_ids = this->getBlockIds(account_id);
if (block_ids.empty()) {
Expand All @@ -148,11 +152,14 @@ namespace iroha {
});
}

rxcpp::observable<model::Transaction>
rxcpp::observable<BlockQuery::wTransaction>
PostgresBlockQuery::getAccountAssetTransactions(
const std::string &account_id, const std::string &asset_id) {
return rxcpp::observable<>::create<
model::Transaction>([this, account_id, asset_id](auto subscriber) {
const shared_model::interface::types::AccountIdType &account_id,
const shared_model::interface::types::AssetIdType &asset_id) {
return rxcpp::observable<>::create<wTransaction>([this,
account_id,
asset_id](
auto subscriber) {
auto block_ids = this->getBlockIds(account_id);
if (block_ids.empty()) {
subscriber.on_completed();
Expand All @@ -171,37 +178,47 @@ namespace iroha {
});
}

rxcpp::observable<boost::optional<model::Transaction>>
rxcpp::observable<boost::optional<BlockQuery::wTransaction>>
PostgresBlockQuery::getTransactions(
const std::vector<iroha::hash256_t> &tx_hashes) {
return rxcpp::observable<>::create<boost::optional<model::Transaction>>(
const std::vector<shared_model::crypto::Hash> &tx_hashes) {
return rxcpp::observable<>::create<boost::optional<wTransaction>>(
[this, tx_hashes](auto subscriber) {
std::for_each(tx_hashes.begin(),
tx_hashes.end(),
[that = this, &subscriber](auto tx_hash) {
subscriber.on_next(
that->getTxByHashSync(tx_hash.to_string()));
subscriber.on_next(that->getTxByHashSync(tx_hash));
});
subscriber.on_completed();
});
}

boost::optional<model::Transaction> PostgresBlockQuery::getTxByHashSync(
const std::string &hash) {
boost::optional<BlockQuery::wTransaction>
PostgresBlockQuery::getTxByHashSync(
const shared_model::crypto::Hash &hash) {
return getBlockId(hash) |
[this](auto blockId) { return block_store_.get(blockId); } |
[](auto bytes) {
[this](auto bytes) {
// TODO IR-975 victordrobny 12.02.2018 convert directly to
// shared_model::proto::Block after FlatFile will be reworked to new
// model
return model::converters::stringToJson(bytesToString(bytes));
}
| [this](const auto &json) { return serializer_.deserialize(json); }
| [&](const auto &json) { return serializer_.deserialize(json); } |
[](const auto &block) {
return boost::optional<shared_model::proto::Block>(
shared_model::proto::from_old(block));
}
| [&](const auto &block) {
auto it = std::find_if(
block.transactions.begin(),
block.transactions.end(),
[&hash](auto tx) { return iroha::hash(tx).to_string() == hash; });
return (it == block.transactions.end())
? boost::none
: boost::optional<model::Transaction>(*it);
boost::optional<PostgresBlockQuery::wTransaction> result;
auto it =
std::find_if(block.transactions().begin(),
block.transactions().end(),
[&hash](auto tx) { return tx->hash() == hash; });
if (it != block.transactions().end()) {
result = boost::optional<PostgresBlockQuery::wTransaction>(
PostgresBlockQuery::wTransaction((*it)->copy()));
}
return result;
};
}

Expand Down
Loading

0 comments on commit 9ae0f97

Please sign in to comment.