Skip to content

Commit

Permalink
Merge branch 'fix/nptr_to_result' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
Solonets committed Feb 13, 2018
2 parents 8aeaadd + 09e7da8 commit 821f237
Show file tree
Hide file tree
Showing 12 changed files with 263 additions and 104 deletions.
115 changes: 66 additions & 49 deletions irohad/ametsuchi/impl/storage_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

#include "ametsuchi/impl/flat_file/flat_file.hpp" // for FlatFile
#include "ametsuchi/impl/mutable_storage_impl.hpp"
#include "ametsuchi/impl/postgres_wsv_query.hpp"
#include "ametsuchi/impl/postgres_block_query.hpp"
#include "ametsuchi/impl/postgres_wsv_query.hpp"
#include "ametsuchi/impl/temporary_wsv_impl.hpp"
#include "model/converters/json_common.hpp"
#include "model/execution/command_executor_factory.hpp" // for CommandExecutorFactory

#include <boost/format.hpp>

namespace iroha {
namespace ametsuchi {

Expand Down Expand Up @@ -61,44 +63,45 @@ namespace iroha {
"SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;");
}

std::unique_ptr<TemporaryWsv> StorageImpl::createTemporaryWsv() {
expected::Result<std::unique_ptr<TemporaryWsv>, std::string>
StorageImpl::createTemporaryWsv() {
auto command_executors = model::CommandExecutorFactory::create();
if (not command_executors.has_value()) {
log_->error(kCommandExecutorError);
return nullptr;
return expected::makeError(kCommandExecutorError);
}

auto postgres_connection =
std::make_unique<pqxx::lazyconnection>(postgres_options_);
try {
postgres_connection->activate();
} catch (const pqxx::broken_connection &e) {
log_->error(kPsqlBroken, e.what());
return nullptr;
return expected::makeError(
(boost::format(kPsqlBroken) % e.what()).str());
}
auto wsv_transaction =
std::make_unique<pqxx::nontransaction>(*postgres_connection, kTmpWsv);

return std::make_unique<TemporaryWsvImpl>(
std::move(postgres_connection),
std::move(wsv_transaction),
std::move(command_executors.value()));
return expected::makeValue<std::unique_ptr<TemporaryWsv>>(
std::make_unique<TemporaryWsvImpl>(
std::move(postgres_connection),
std::move(wsv_transaction),
std::move(command_executors.value())));
}

std::unique_ptr<MutableStorage> StorageImpl::createMutableStorage() {
expected::Result<std::unique_ptr<MutableStorage>, std::string>
StorageImpl::createMutableStorage() {
auto command_executors = model::CommandExecutorFactory::create();
if (not command_executors.has_value()) {
log_->error(kCommandExecutorError);
return nullptr;
return expected::makeError(kCommandExecutorError);
}

auto postgres_connection =
std::make_unique<pqxx::lazyconnection>(postgres_options_);
try {
postgres_connection->activate();
} catch (const pqxx::broken_connection &e) {
log_->error(kPsqlBroken, e.what());
return nullptr;
return expected::makeError(
(boost::format(kPsqlBroken) % e.what()).str());
}
auto wsv_transaction =
std::make_unique<pqxx::nontransaction>(*postgres_connection, kTmpWsv);
Expand All @@ -110,23 +113,33 @@ namespace iroha {
.as_blocking()
.subscribe([&top_hash](auto block) { top_hash = block.hash; });

return std::make_unique<MutableStorageImpl>(
top_hash.value_or(hash256_t{}),
std::move(postgres_connection),
std::move(wsv_transaction),
std::move(command_executors.value()));
return expected::makeValue<std::unique_ptr<MutableStorage>>(
std::make_unique<MutableStorageImpl>(
top_hash.value_or(hash256_t{}),
std::move(postgres_connection),
std::move(wsv_transaction),
std::move(command_executors.value())));
}

bool StorageImpl::insertBlock(model::Block block) {
log_->info("create mutable storage");
auto storage = createMutableStorage();
auto inserted = storage->apply(
block,
[](const auto &current_block, auto &query, const auto &top_hash) {
return true;
auto storageResult = createMutableStorage();
bool inserted = false;
storageResult.match(
[&](expected::Value<std::unique_ptr<ametsuchi::MutableStorage>>
&storage) {
inserted =
storage.value->apply(block,
[](const auto &current_block,
auto &query,
const auto &top_hash) { return true; });
log_->info("block inserted: {}", inserted);
commit(std::move(storage.value));
},
[&](expected::Error<std::string> &error) {
log_->error(error.error);
});
log_->info("block inserted: {}", inserted);
commit(std::move(storage));

return inserted;
}

Expand Down Expand Up @@ -166,16 +179,15 @@ DROP TABLE IF EXISTS index_by_id_height_asset;
block_store_->dropAll();
}

nonstd::optional<ConnectionContext> StorageImpl::initConnections(
std::string block_store_dir,
std::string postgres_options) {
expected::Result<ConnectionContext, std::string> StorageImpl::initConnections(
std::string block_store_dir, std::string postgres_options) {
auto log_ = logger::log("StorageImpl:initConnection");
log_->info("Start storage creation");

auto block_store = FlatFile::create(block_store_dir);
if (not block_store) {
log_->error("Cannot create block store in {}", block_store_dir);
return nonstd::nullopt;
return expected::makeError(
(boost::format("Cannot create block store in {}") % block_store_dir).str());
}
log_->info("block store created");

Expand All @@ -184,35 +196,40 @@ DROP TABLE IF EXISTS index_by_id_height_asset;
try {
postgres_connection->activate();
} catch (const pqxx::broken_connection &e) {
log_->error(kPsqlBroken, e.what());
return nonstd::nullopt;
return expected::makeError(
(boost::format(kPsqlBroken) % e.what()).str());
}
log_->info("connection to PostgreSQL completed");

auto wsv_transaction = std::make_unique<pqxx::nontransaction>(
*postgres_connection, "Storage");
log_->info("transaction to PostgreSQL initialized");

return nonstd::make_optional<ConnectionContext>(
return expected::makeValue(ConnectionContext(
std::move(*block_store),
std::move(postgres_connection),
std::move(wsv_transaction));
std::move(wsv_transaction)));
}

std::shared_ptr<StorageImpl> StorageImpl::create(
std::string block_store_dir,
std::string postgres_options) {
auto ctx = initConnections(block_store_dir, postgres_options);
if (not ctx.has_value()) {
return nullptr;
expected::Result<std::shared_ptr<StorageImpl>, std::string>
StorageImpl::create(std::string block_store_dir,
std::string postgres_options) {
auto ctx_result = initConnections(block_store_dir, postgres_options);
expected::Result<std::shared_ptr<StorageImpl>, std::string> storage;
ctx_result.match(
[&](expected::Value<ConnectionContext> &ctx){
storage = expected::makeValue(std::shared_ptr<StorageImpl>(
new StorageImpl(block_store_dir,
postgres_options,
std::move(ctx.value.block_store),
std::move(ctx.value.pg_lazy),
std::move(ctx.value.pg_nontx))));
},
[&](expected::Error<std::string> &error) {
storage = error;
}

return std::shared_ptr<StorageImpl>(
new StorageImpl(block_store_dir,
postgres_options,
std::move(ctx->block_store),
std::move(ctx->pg_lazy),
std::move(ctx->pg_nontx)));
);
return storage;
}

void StorageImpl::commit(std::unique_ptr<MutableStorage> mutableStorage) {
Expand Down
10 changes: 6 additions & 4 deletions irohad/ametsuchi/impl/storage_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,18 @@ namespace iroha {

class StorageImpl : public Storage {
protected:
static nonstd::optional<ConnectionContext> initConnections(
static expected::Result<ConnectionContext, std::string> initConnections(
std::string block_store_dir, std::string postgres_options);

public:
static std::shared_ptr<StorageImpl> create(
static expected::Result<std::shared_ptr<StorageImpl>, std::string> create(
std::string block_store_dir, std::string postgres_connection);

std::unique_ptr<TemporaryWsv> createTemporaryWsv() override;
expected::Result<std::unique_ptr<TemporaryWsv>, std::string>
createTemporaryWsv() override;

std::unique_ptr<MutableStorage> createMutableStorage() override;
expected::Result<std::unique_ptr<MutableStorage>, std::string>
createMutableStorage() override;

virtual bool insertBlock(model::Block block) override;

Expand Down
6 changes: 4 additions & 2 deletions irohad/ametsuchi/mutable_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#define IROHA_MUTABLE_FACTORY_HPP

#include <memory>
#include "common/result.hpp"

namespace iroha {
namespace ametsuchi {
Expand All @@ -30,9 +31,10 @@ namespace iroha {
/**
* Creates a mutable storage from the current state.
* Mutable storage is the only way to commit the block to the ledger.
* @return Created mutable storage
* @return Created Result with mutable storage or error string
*/
virtual std::unique_ptr<MutableStorage> createMutableStorage() = 0;
virtual expected::Result<std::unique_ptr<MutableStorage>, std::string>
createMutableStorage() = 0;

/**
* Commit mutable storage to Ametsuchi.
Expand Down
1 change: 1 addition & 0 deletions irohad/ametsuchi/storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "ametsuchi/mutable_factory.hpp"
#include "ametsuchi/temporary_factory.hpp"
#include "common/result.hpp"

namespace iroha {

Expand Down
6 changes: 4 additions & 2 deletions irohad/ametsuchi/temporary_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#define IROHA_TEMPORARY_FACTORY_HPP

#include <memory>
#include "common/result.hpp"

namespace iroha {
namespace ametsuchi {
Expand All @@ -32,9 +33,10 @@ namespace iroha {
* Temporary state will be not committed and will be erased on destructor
* call.
* Temporary state might be used for transaction validation.
* @return Created temporary wsv
* @return Created Result with temporary wsv or string error
*/
virtual std::unique_ptr<TemporaryWsv> createTemporaryWsv() = 0;
virtual expected::Result<std::unique_ptr<TemporaryWsv>, std::string>
createTemporaryWsv() = 0;

virtual ~TemporaryFactory() = default;
};
Expand Down
9 changes: 8 additions & 1 deletion irohad/main/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,14 @@ void Irohad::dropStorage() {
* Initializing iroha daemon storage
*/
void Irohad::initStorage() {
storage = StorageImpl::create(block_store_dir_, pg_conn_);
auto storageResult = StorageImpl::create(block_store_dir_, pg_conn_);
storageResult.match(
[&](expected::Value<std::shared_ptr<ametsuchi::StorageImpl>> &_storage) {
storage = _storage.value;
},
[](expected::Error<std::string> &error) {
throw std::runtime_error(error.error);
});

log_->info("[Init] => storage", logger::logBool(storage));
}
Expand Down
16 changes: 13 additions & 3 deletions irohad/simulator/impl/simulator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,19 @@ namespace iroha {
proposal.height);
return;
}
auto temporaryStorage = ametsuchi_factory_->createTemporaryWsv();
notifier_.get_subscriber().on_next(
validator_->validate(proposal, *temporaryStorage));
auto temporaryStorageResult = ametsuchi_factory_->createTemporaryWsv();
temporaryStorageResult.match(
[&](expected::Value<std::unique_ptr<ametsuchi::TemporaryWsv>>
&temporaryStorage) {
notifier_.get_subscriber().on_next(
validator_->validate(proposal, *(temporaryStorage.value)));
},
[&](expected::Error<std::string> &error) {
log_->error(error.error);
// TODO: 13/02/18 Solonets - Handle the case when TemporaryWsv was
// failed to produced - IR-966
throw std::runtime_error(error.error);
});
}

void Simulator::process_verified_proposal(model::Proposal proposal) {
Expand Down
22 changes: 18 additions & 4 deletions irohad/synchronizer/impl/synchronizer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,16 @@ namespace iroha {

void SynchronizerImpl::process_commit(iroha::model::Block commit_message) {
log_->info("processing commit");
auto storage = mutableFactory_->createMutableStorage();
auto storageResult = mutableFactory_->createMutableStorage();
std::unique_ptr<ametsuchi::MutableStorage> storage;
storageResult.match(
[&](expected::Value<std::unique_ptr<ametsuchi::MutableStorage>>
&_storage) { storage = std::move(_storage.value); },
[&](expected::Error<std::string> &error) {
storage = nullptr;
log_->error(error.error);
});
if (not storage) {
log_->error("Cannot create mutable storage");
return;
}
if (validator_->validateBlock(commit_message, *storage)) {
Expand All @@ -54,9 +61,16 @@ namespace iroha {
// Block can't be applied to current storage
// Download all missing blocks
for (auto signature : commit_message.sigs) {
storage = mutableFactory_->createMutableStorage();
auto storageResult = mutableFactory_->createMutableStorage();
std::unique_ptr<ametsuchi::MutableStorage> storage;
storageResult.match(
[&](expected::Value<std::unique_ptr<ametsuchi::MutableStorage>>
&_storage) { storage = std::move(_storage.value); },
[&](expected::Error<std::string> &error) {
storage = nullptr;
log_->error(error.error);
});
if (not storage) {
log_->error("cannot create storage");
return;
}
auto chain = blockLoader_->retrieveBlocks(signature.pubkey);
Expand Down
Loading

0 comments on commit 821f237

Please sign in to comment.