Skip to content

Commit

Permalink
Merge pull request hyperledger-iroha#640 from hyperledger/fix/peer-join
Browse files Browse the repository at this point in the history
Fix/peer join
  • Loading branch information
lebdron authored Oct 19, 2017
2 parents 0c4749d + b11d32c commit 1af59cf
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 41 deletions.
1 change: 1 addition & 0 deletions example/config.sample
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"block_store_path" : "/tmp/block_store/",
"torii_port" : 50051,
"internal_port" : 10001,
"pg_opt" : "host=localhost port=5432 user=postgres password=mysecretpassword",
"redis_host" : "localhost",
"redis_port" : 6379
Expand Down
5 changes: 5 additions & 0 deletions irohad/ametsuchi/impl/flat_file/flat_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ std::unique_ptr<FlatFile> FlatFile::create(const std::string &path) {
}

void FlatFile::add(Identifier id, const std::vector<uint8_t> &block) {
if (id != current_id_ + 1) {
log_->warn("Cannot append non-consecutive block");
return;
}

auto next_id = id;
auto file_name = dump_dir_ + SEPARATOR + id_to_name(id);

Expand Down
4 changes: 3 additions & 1 deletion irohad/ametsuchi/impl/mutable_storage_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ namespace iroha {
void index_block(uint64_t height, model::Block block);

hash256_t top_hash_;
std::unordered_map<uint32_t, model::Block> block_store_;
// ordered collection is used to enforce block insertion order in
// StorageImpl::commit
std::map<uint32_t, model::Block> block_store_;
std::unique_ptr<cpp_redis::redis_client> index_;

std::unique_ptr<pqxx::lazyconnection> connection_;
Expand Down
18 changes: 15 additions & 3 deletions irohad/consensus/yac/transport/impl/network_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ namespace iroha {
const std::vector<model::Peer> &peers)
: address_(address) {
for (const auto &peer : peers) {
peers_[peer] = proto::Yac::NewStub(grpc::CreateChannel(
peer.address, grpc::InsecureChannelCredentials()));
peers_addresses_[peer.address] = peer;
createPeerConnection(peer);
}
log_ = logger::log("YacNetwork");
}
Expand All @@ -44,6 +42,8 @@ namespace iroha {
}

void NetworkImpl::send_vote(model::Peer to, VoteMessage vote) {
createPeerConnection(to);

auto request = PbConverters::serializeVote(vote);

auto call = new AsyncClientCall;
Expand All @@ -59,6 +59,8 @@ namespace iroha {
}

void NetworkImpl::send_commit(model::Peer to, CommitMessage commit) {
createPeerConnection(to);

proto::Commit request;
for (const auto &vote : commit.votes) {
auto pb_vote = request.add_votes();
Expand All @@ -80,6 +82,8 @@ namespace iroha {
}

void NetworkImpl::send_reject(model::Peer to, RejectMessage reject) {
createPeerConnection(to);

proto::Reject request;
for (const auto &vote : reject.votes) {
auto pb_vote = request.add_votes();
Expand Down Expand Up @@ -173,6 +177,14 @@ namespace iroha {
return grpc::Status::OK;
}

void NetworkImpl::createPeerConnection(const model::Peer &peer) {
if (peers_.count(peer) == 0) {
peers_[peer] = proto::Yac::NewStub(grpc::CreateChannel(
peer.address, grpc::InsecureChannelCredentials()));
peers_addresses_[peer.address] = peer;
}
}

} // namespace yac
} // namespace consensus
} // namespace iroha
13 changes: 11 additions & 2 deletions irohad/consensus/yac/transport/impl/network_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
#include <atomic>
#include <thread>
#include <unordered_map>
#include "network/impl/async_grpc_client.hpp"

#include "ametsuchi/peer_query.hpp"
#include "consensus/yac/transport/yac_network_interface.hpp"
#include "yac.grpc.pb.h"
#include "logger/logger.hpp"
#include "network/impl/async_grpc_client.hpp"
#include "yac.grpc.pb.h"

namespace iroha {
namespace consensus {
Expand Down Expand Up @@ -82,6 +84,13 @@ namespace iroha {

private:

/**
* Create GRPC connection for given peer if it does not exist in
* peers map
* @param peer to instantiate connection with
*/
void createPeerConnection(const model::Peer &peer);

/**
* Address of current peer
*/
Expand Down
32 changes: 11 additions & 21 deletions irohad/main/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ Irohad::Irohad(const std::string &block_store_dir,
size_t redis_port,
const std::string &pg_conn,
size_t torii_port,
size_t internal_port,
const keypair_t &keypair)
: block_store_dir_(block_store_dir),
redis_host_(redis_host),
redis_port_(redis_port),
pg_conn_(pg_conn),
torii_port_(torii_port),
internal_port_(internal_port),
keypair(keypair) {
log_ = logger::log("IROHAD");
log_->info("created");
Expand All @@ -63,7 +65,6 @@ Irohad::~Irohad() {
void Irohad::init() {
initProtoFactories();
initPeerQuery();
initPeer();
initCryptoProvider();
initValidators();
initOrderingGate();
Expand Down Expand Up @@ -99,22 +100,6 @@ void Irohad::initPeerQuery() {
log_->info("[Init] => peer query");
}

void Irohad::initPeer() {
auto peers = wsv->getLedgerPeers().value();

auto it = std::find_if(peers.begin(), peers.end(), [this](auto peer) {
return peer.pubkey == keypair.pubkey;
});

if (it == peers.end()) {
log_->error("Cannot find peer with given public key");
}

peer = *it;

log_->info("[Init] => peer address is {}", peer.address);
}

void Irohad::initCryptoProvider() {
crypto_verifier = std::make_shared<ModelCryptoProviderImpl>(keypair);

Expand Down Expand Up @@ -162,8 +147,12 @@ void Irohad::initBlockLoader() {
}

void Irohad::initConsensusGate() {
consensus_gate = yac_init.initConsensusGate(
peer.address, wsv, simulator, block_loader, keypair);
consensus_gate =
yac_init.initConsensusGate("0.0.0.0:" + std::to_string(internal_port_),
wsv,
simulator,
block_loader,
keypair);

log_->info("[Init] => consensus gate");
}
Expand Down Expand Up @@ -217,8 +206,9 @@ void Irohad::run() {

grpc::ServerBuilder builder;
int port = 0;
builder.AddListeningPort(
peer.address, grpc::InsecureServerCredentials(), &port);
builder.AddListeningPort("0.0.0.0:" + std::to_string(internal_port_),
grpc::InsecureServerCredentials(),
&port);
builder.RegisterService(ordering_init.ordering_gate_transport.get());
builder.RegisterService(ordering_init.ordering_service_transport.get());
builder.RegisterService(yac_init.consensus_network.get());
Expand Down
9 changes: 4 additions & 5 deletions irohad/main/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,16 @@ class Irohad {
* @param redis_port - port of redis connection
* @param pg_conn - initialization string for postgre
* @param torii_port - port for torii binding
* @param internal_port - port for internal communication - ordering service,
* consensus, and block loader
* @param keypair - public and private keys for crypto provider
*/
Irohad(const std::string &block_store_dir,
const std::string &redis_host,
size_t redis_port,
const std::string &pg_conn,
size_t torii_port,
size_t internal_port,
const iroha::keypair_t &keypair);

/**
Expand All @@ -85,8 +88,6 @@ class Irohad {

virtual void initPeerQuery();

virtual void initPeer();

virtual void initCryptoProvider();

virtual void initValidators();
Expand All @@ -113,6 +114,7 @@ class Irohad {
size_t redis_port_;
std::string pg_conn_;
size_t torii_port_;
size_t internal_port_;

// ------------------------| internal dependencies |-------------------------

Expand All @@ -133,9 +135,6 @@ class Irohad {
// peer query
std::shared_ptr<iroha::ametsuchi::PeerQuery> wsv;

// peer
iroha::model::Peer peer;

// ordering gate
std::shared_ptr<iroha::network::OrderingGate> ordering_gate;

Expand Down
5 changes: 5 additions & 0 deletions irohad/main/iroha_conf_loader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
namespace config_members {
const char* BlockStorePath = "block_store_path";
const char* ToriiPort = "torii_port";
const char* InternalPort = "internal_port";
const char* KeyPairPath = "key_pair_path";
const char* PgOpt = "pg_opt";
const char* RedisHost = "redis_host";
Expand Down Expand Up @@ -56,6 +57,10 @@ inline rapidjson::Document parse_iroha_config(std::string const& iroha_conf_path
assert_fatal(doc[mbr::ToriiPort].IsUint(),
type_error(mbr::ToriiPort, "uint"));

assert_fatal(doc.HasMember(mbr::InternalPort), no_member_error(mbr::InternalPort));
assert_fatal(doc[mbr::InternalPort].IsUint(),
type_error(mbr::InternalPort, "uint"));

assert_fatal(doc.HasMember(mbr::PgOpt), no_member_error(mbr::PgOpt));
assert_fatal(doc[mbr::PgOpt].IsString(), type_error(mbr::PgOpt, "string"));

Expand Down
13 changes: 9 additions & 4 deletions irohad/main/irohad.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ int main(int argc, char *argv[]) {
config[mbr::RedisPort].GetUint(),
config[mbr::PgOpt].GetString(),
config[mbr::ToriiPort].GetUint(),
config[mbr::InternalPort].GetUint(),
keypair);

if (not irohad.storage) {
Expand All @@ -84,11 +85,15 @@ int main(int argc, char *argv[]) {
auto block = inserter.parseBlock(file.value());
log->info("Block is parsed");

if (block.has_value()) {
inserter.applyToLedger({block.value()});
log->info("Genesis block inserted, number of transactions: {}",
block.value().transactions.size());
if (not block.has_value()) {
log->error("Failed to parse genesis block");
return EXIT_FAILURE;
}

inserter.applyToLedger({block.value()});
log->info("Genesis block inserted, number of transactions: {}",
block.value().transactions.size());

// init pipeline components
irohad.init();

Expand Down
10 changes: 8 additions & 2 deletions irohad/simulator/impl/simulator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,14 @@ namespace iroha {
.subscribe([this](auto block) {
last_block = block;
});
if (not last_block.has_value() or
last_block.value().height + 1 != proposal.height) {
if (not last_block.has_value()) {
log_->warn("Could not fetch last block");
return;
}
if (last_block.value().height + 1 != proposal.height) {
log_->warn("Last block height: {}, proposal height: {}",
last_block.value().height,
proposal.height);
return;
}
auto temporaryStorage = ametsuchi_factory_->createTemporaryWsv();
Expand Down
9 changes: 6 additions & 3 deletions test/integration/pipeline/tx_pipeline_integration_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ class TestIrohad : public Irohad {
size_t redis_port,
const std::string &pg_conn,
size_t torii_port,
size_t internal_port,
const iroha::keypair_t &keypair)
: Irohad(block_store_dir,
redis_host,
redis_port,
pg_conn,
torii_port,
internal_port,
keypair) {}

auto &getCommandService() { return command_service; }
Expand All @@ -53,8 +55,9 @@ class TestIrohad : public Irohad {
void run() override {
grpc::ServerBuilder builder;
int port = 0;
builder.AddListeningPort(
peer.address, grpc::InsecureServerCredentials(), &port);
builder.AddListeningPort("0.0.0.0:" + std::to_string(internal_port_),
grpc::InsecureServerCredentials(),
&port);
builder.RegisterService(ordering_init.ordering_gate_transport.get());
builder.RegisterService(ordering_init.ordering_service_transport.get());
builder.RegisterService(yac_init.consensus_network.get());
Expand All @@ -80,7 +83,7 @@ class TxPipelineIntegrationTest : public iroha::ametsuchi::AmetsuchiTest {
auto keypair = manager->loadKeys().value();

irohad = std::make_shared<TestIrohad>(
block_store_path, redishost_, redisport_, pgopt_, 0, keypair);
block_store_path, redishost_, redisport_, pgopt_, 0, 10001, keypair);

ASSERT_TRUE(irohad->storage);

Expand Down

0 comments on commit 1af59cf

Please sign in to comment.