Skip to content

Commit

Permalink
replace model in block_query
Browse files Browse the repository at this point in the history
Signed-off-by: Victor Drobny <[email protected]>
  • Loading branch information
vdrobnyi committed Feb 12, 2018
1 parent 83bc4fe commit e0aebaa
Show file tree
Hide file tree
Showing 18 changed files with 850 additions and 621 deletions.
1 change: 1 addition & 0 deletions irohad/ametsuchi/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ target_link_libraries(ametsuchi
libs_common
command_execution
boost
model_interfaces
)
45 changes: 27 additions & 18 deletions irohad/ametsuchi/block_query.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
#include <rxcpp/rx-observable.hpp>

#include "common/types.hpp"
#include "interfaces/iroha_internal/block.hpp"
#include "interfaces/transaction.hpp"
#include "utils/polymorphic_wrapper.hpp"

namespace iroha {

namespace model {
struct Transaction;
struct Block;
}

namespace ametsuchi {
/**
* Public interface for queries on blocks and transactions
Expand All @@ -43,57 +41,68 @@ namespace iroha {
* @param account_id - account_id (accountName@domainName)
* @return observable of Model Transaction
*/
virtual rxcpp::observable<model::Transaction> getAccountTransactions(
const std::string &account_id) = 0;
virtual rxcpp::observable<::shared_model::detail::PolymorphicWrapper<
::shared_model::interface::Transaction>>
getAccountTransactions(const std::string &account_id) = 0;

/**
* Get asset transactions of an account.
* @param account_id - account_id (accountName@domainName)
* @param asset_id - asset_id (assetName#domainName)
* @return observable of Model Transaction
*/
virtual rxcpp::observable<model::Transaction> getAccountAssetTransactions(
const std::string &account_id, const std::string &asset_id) = 0;
virtual rxcpp::observable<::shared_model::detail::PolymorphicWrapper<
::shared_model::interface::Transaction>>
getAccountAssetTransactions(const std::string &account_id,
const std::string &asset_id) = 0;

/**
* Get transactions from transactions' hashes
* @param tx_hashes - transactions' hashes to retrieve
* @return observable of Model Transaction
*/
virtual rxcpp::observable<boost::optional<model::Transaction>>
getTransactions(const std::vector<iroha::hash256_t> &tx_hashes) = 0;
virtual rxcpp::observable<
boost::optional<::shared_model::detail::PolymorphicWrapper<
::shared_model::interface::Transaction>>>
getTransactions(
const std::vector<shared_model::crypto::Hash> &tx_hashes) = 0;

/**
* Get given number of blocks starting with given height.
* @param height - starting height
* @param count - number of blocks to retrieve
* @return observable of Model Block
*/
virtual rxcpp::observable<model::Block> getBlocks(uint32_t height,
uint32_t count) = 0;
virtual rxcpp::observable<::shared_model::detail::PolymorphicWrapper<
::shared_model::interface::Block>>
getBlocks(uint32_t height, uint32_t count) = 0;

/**
* Get all blocks starting from given height.
* @param from - starting height
* @return observable of Model Block
*/
virtual rxcpp::observable<model::Block> getBlocksFrom(
uint32_t height) = 0;
virtual rxcpp::observable<::shared_model::detail::PolymorphicWrapper<
::shared_model::interface::Block>>
getBlocksFrom(uint32_t height) = 0;

/**
* Get given number of blocks from top.
* @param count - number of blocks to retrieve
* @return observable of Model Block
*/
virtual rxcpp::observable<model::Block> getTopBlocks(uint32_t count) = 0;
virtual rxcpp::observable<::shared_model::detail::PolymorphicWrapper<
::shared_model::interface::Block>>
getTopBlocks(uint32_t count) = 0;

/**
* Synchronously gets transaction by its hash
* @param hash - hash to search
* @return transaction or boost::none
*/
virtual boost::optional<model::Transaction> getTxByHashSync(
const std::string &hash) = 0;
virtual boost::optional<::shared_model::detail::PolymorphicWrapper<
::shared_model::interface::Transaction>>
getTxByHashSync(const std::string &hash) = 0;
};
} // namespace ametsuchi
} // namespace iroha
Expand Down
132 changes: 86 additions & 46 deletions irohad/ametsuchi/impl/postgres_block_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
#include "ametsuchi/impl/postgres_block_query.hpp"
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/algorithm/for_each.hpp>
#include "model/sha3_hash.hpp"
#include "backend/protobuf/from_old_model.hpp"
#include "converters/protobuf/json_proto_converter.hpp"
#include "validators/default_validator.hpp"

namespace iroha {
namespace ametsuchi {
Expand All @@ -30,105 +32,124 @@ namespace iroha {
log_(logger::log("PostgresBlockIndex")),
execute_{makeExecute(transaction_, log_)} {}

rxcpp::observable<model::Block> PostgresBlockQuery::getBlocks(
uint32_t height, uint32_t count) {
rxcpp::observable<::shared_model::detail::PolymorphicWrapper<
::shared_model::interface::Block>>
PostgresBlockQuery::getBlocks(uint32_t height, uint32_t count) {
auto last_id = block_store_.last_id();
auto to = std::min(last_id, height + count - 1);
if (height > to or count == 0) {
return rxcpp::observable<>::empty<model::Block>();
return rxcpp::observable<>::empty<
::shared_model::detail::PolymorphicWrapper<
::shared_model::interface::Block>>();
}

return rxcpp::observable<>::range(height, to).flat_map([this](auto i) {
auto bytes = block_store_.get(i);
return rxcpp::observable<>::create<model::Block>([this, bytes](auto s) {
return rxcpp::observable<>::create<
::shared_model::detail::PolymorphicWrapper<
::shared_model::interface::Block>>([this, bytes](auto s) {
if (not bytes.has_value()) {
s.on_completed();
return;
}
// TODO victordrobny 12.02.2018 convert directly to
// shared_model::proto::Block after FlatFile will be reworked to new
// model
auto document =
model::converters::stringToJson(bytesToString(bytes.value()));
if (not document.has_value()) {
s.on_completed();
return;
}
auto block = serializer_.deserialize(document.value());
if (not block.has_value()) {
auto block_old = serializer_.deserialize(document.value());
if (not block_old.has_value()) {
s.on_completed();
return;
}
s.on_next(block.value());
auto block = ::shared_model::proto::from_old(block_old.value());
s.on_next(::shared_model::detail::makePolymorphic<
::shared_model::proto::Block>(block.getTransport()));
s.on_completed();
});
});
}

rxcpp::observable<model::Block> PostgresBlockQuery::getBlocksFrom(
uint32_t height) {
rxcpp::observable<::shared_model::detail::PolymorphicWrapper<
::shared_model::interface::Block>>
PostgresBlockQuery::getBlocksFrom(uint32_t height) {
return getBlocks(height, block_store_.last_id());
}

rxcpp::observable<model::Block> PostgresBlockQuery::getTopBlocks(
uint32_t count) {
rxcpp::observable<::shared_model::detail::PolymorphicWrapper<
::shared_model::interface::Block>>
PostgresBlockQuery::getTopBlocks(uint32_t count) {
auto last_id = block_store_.last_id();
count = std::min(count, last_id);
return getBlocks(last_id - count + 1, count);
}

std::vector<iroha::model::Block::BlockHeightType>
std::vector<::shared_model::interface::types::HeightType>
PostgresBlockQuery::getBlockIds(const std::string &account_id) {
return execute_(
"SELECT DISTINCT height FROM height_by_account_set WHERE "
"account_id = "
+ transaction_.quote(account_id) + ";")
| [&](const auto &result)
-> std::vector<iroha::model::Block::BlockHeightType> {
return transform<iroha::model::Block::BlockHeightType>(
-> std::vector<::shared_model::interface::types::HeightType> {
return transform<::shared_model::interface::types::HeightType>(
result, [&](const auto &row) {
return row.at("height")
.template as<iroha::model::Block::BlockHeightType>();
.template as<::shared_model::interface::types::HeightType>();
});
};
}

boost::optional<iroha::model::Block::BlockHeightType>
boost::optional<::shared_model::interface::types::HeightType>
PostgresBlockQuery::getBlockId(const std::string &hash) {
boost::optional<uint64_t> blockId;
return execute_("SELECT height FROM height_by_hash WHERE hash = "
+ transaction_.quote(
pqxx::binarystring(hash.data(), hash.size()))
+ ";")
| [&](const auto &result)
-> boost::optional<iroha::model::Block::BlockHeightType> {
-> boost::optional<
::shared_model::interface::types::HeightType> {
if (result.size() == 0) {
return boost::none;
}
return result[0]
.at("height")
.template as<iroha::model::Block::BlockHeightType>();
.template as<::shared_model::interface::types::HeightType>();
};
}

std::function<void(pqxx::result &result)> PostgresBlockQuery::callback(
const rxcpp::subscriber<model::Transaction> &subscriber,
const rxcpp::subscriber<shared_model::detail::PolymorphicWrapper<
shared_model::interface::Transaction>> &subscriber,
uint64_t block_id) {
return [this, &subscriber, block_id](pqxx::result &result) {
auto block = block_store_.get(block_id) | [](auto bytes) {
return model::converters::stringToJson(bytesToString(bytes));
} | [this](const auto &json) { return serializer_.deserialize(json); };
auto block = block_store_.get(block_id) | [this](auto bytes) {
return boost::optional<::shared_model::proto::Block>(
::shared_model::proto::from_old(*serializer_.deserialize(
*model::converters::stringToJson(bytesToString(bytes)))));
};
boost::for_each(
result | boost::adaptors::transformed([&block](const auto &x) {
return x.at("index").template as<size_t>();
}),
[&](auto x) {
const auto &tx = block->transactions.at(x);
subscriber.on_next(tx);
subscriber.on_next(::shared_model::detail::PolymorphicWrapper<
::shared_model::interface::Transaction>(
block->transactions().at(x)));
});
};
}

rxcpp::observable<model::Transaction>
rxcpp::observable<::shared_model::detail::PolymorphicWrapper<
::shared_model::interface::Transaction>>
PostgresBlockQuery::getAccountTransactions(const std::string &account_id) {
return rxcpp::observable<>::create<model::Transaction>(
return rxcpp::observable<>::create<
::shared_model::detail::PolymorphicWrapper<
::shared_model::interface::Transaction>>(
[this, account_id](auto subscriber) {
auto block_ids = this->getBlockIds(account_id);
if (block_ids.empty()) {
Expand All @@ -148,11 +169,16 @@ namespace iroha {
});
}

rxcpp::observable<model::Transaction>
rxcpp::observable<::shared_model::detail::PolymorphicWrapper<
::shared_model::interface::Transaction>>
PostgresBlockQuery::getAccountAssetTransactions(
const std::string &account_id, const std::string &asset_id) {
return rxcpp::observable<>::create<
model::Transaction>([this, account_id, asset_id](auto subscriber) {
::shared_model::detail::PolymorphicWrapper<
::shared_model::interface::Transaction>>([this,
account_id,
asset_id](
auto subscriber) {
auto block_ids = this->getBlockIds(account_id);
if (block_ids.empty()) {
subscriber.on_completed();
Expand All @@ -172,37 +198,51 @@ namespace iroha {
});
}

rxcpp::observable<boost::optional<model::Transaction>>
rxcpp::observable<
boost::optional<::shared_model::detail::PolymorphicWrapper<
::shared_model::interface::Transaction>>>
PostgresBlockQuery::getTransactions(
const std::vector<iroha::hash256_t> &tx_hashes) {
return rxcpp::observable<>::create<boost::optional<model::Transaction>>(
const std::vector<shared_model::crypto::Hash> &tx_hashes) {
return rxcpp::observable<>::create<
boost::optional<::shared_model::detail::PolymorphicWrapper<
::shared_model::interface::Transaction>>>(
[this, tx_hashes](auto subscriber) {
std::for_each(tx_hashes.begin(),
tx_hashes.end(),
[that = this, &subscriber](auto tx_hash) {
subscriber.on_next(
that->getTxByHashSync(tx_hash.to_string()));
auto b = tx_hash.blob();
subscriber.on_next(that->getTxByHashSync(
std::string{b.begin(), b.end()}));
});
subscriber.on_completed();
});
}

boost::optional<model::Transaction> PostgresBlockQuery::getTxByHashSync(
const std::string &hash) {
boost::optional<::shared_model::detail::PolymorphicWrapper<
::shared_model::interface::Transaction>>
PostgresBlockQuery::getTxByHashSync(const std::string &hash) {
return getBlockId(hash) |
[this](auto blockId) { return block_store_.get(blockId); } |
[](auto bytes) {
return model::converters::stringToJson(bytesToString(bytes));
[this](auto bytes) {
// TODO victordrobny 12.02.2018 convert directly to
// shared_model::proto::Block after FlatFile will be reworked to new
// model
return boost::optional<shared_model::proto::Block>(
shared_model::proto::from_old(*serializer_.deserialize(
*model::converters::stringToJson(bytesToString(bytes)))));
}
| [this](const auto &json) { return serializer_.deserialize(json); }
| [&](const auto &block) {
auto it = std::find_if(
block.transactions.begin(),
block.transactions.end(),
[&hash](auto tx) { return iroha::hash(tx).to_string() == hash; });
return (it == block.transactions.end())
auto it =
std::find_if(block.transactions().begin(),
block.transactions().end(),
[&hash](auto tx) {
auto b = tx->hash().blob();
return std::string{b.begin(), b.end()} == hash;
});
return (it == block.transactions().end())
? boost::none
: boost::optional<model::Transaction>(*it);
: boost::optional<::shared_model::detail::PolymorphicWrapper<
::shared_model::interface::Transaction>>(*it);
};
}

Expand Down
Loading

0 comments on commit e0aebaa

Please sign in to comment.