Skip to content

Commit

Permalink
Replace libuv timer with rxcpp in Yac
Browse files Browse the repository at this point in the history
  • Loading branch information
lebdron committed Oct 4, 2017
1 parent 7e5a619 commit 59fc7d4
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 81 deletions.
2 changes: 2 additions & 0 deletions .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@ BasedOnStyle: Google
NamespaceIndentation: All
BreakBeforeBinaryOperators: NonAssignment
AlignOperands: false
DerivePointerAlignment: false
PointerAlignment: Right
BinPackArguments: false
BinPackParameters: false
19 changes: 5 additions & 14 deletions irohad/consensus/yac/impl/timer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,18 @@
namespace iroha {
namespace consensus {
namespace yac {

TimerImpl::TimerImpl(std::shared_ptr<uvw::Loop> loop)
: timer_(loop->resource<uvw::TimerHandle>()) {
timer_->on<uvw::TimerEvent>([this](const auto&, auto&) { handler_(); });
}

void TimerImpl::invokeAfterDelay(uint64_t millis,
std::function<void()> handler) {
deny();
handler_ = std::move(handler);
timer_->start(uvw::TimerHandle::Time(millis),
uvw::TimerHandle::Time(0));
timer = rxcpp::observable<>::timer(std::chrono::milliseconds(millis));
handle = timer.subscribe_on(rxcpp::observe_on_new_thread())
.subscribe([this](auto) { handler_(); });
}

void TimerImpl::deny() { timer_->stop(); }

TimerImpl::~TimerImpl() {
timer_->stop();
timer_->close();
}
void TimerImpl::deny() { handle.unsubscribe(); }

TimerImpl::~TimerImpl() { deny(); }
} // namespace yac
} // namespace consensus
} // namespace iroha
13 changes: 5 additions & 8 deletions irohad/consensus/yac/impl/timer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,15 @@
#ifndef IROHA_TIMER_IMPL_HPP
#define IROHA_TIMER_IMPL_HPP

#include <uvw/loop.hpp>
#include <uvw/timer.hpp>
#include "consensus/yac/timer.hpp"
#include <rxcpp/rx.hpp>

namespace iroha {
namespace consensus {
namespace yac {

class TimerImpl : public Timer {
public:
explicit TimerImpl(
std::shared_ptr<uvw::Loop> loop = uvw::Loop::getDefault());

TimerImpl() = default;
TimerImpl(const TimerImpl&) = delete;
TimerImpl& operator=(const TimerImpl&) = delete;

Expand All @@ -41,10 +37,11 @@ namespace iroha {
~TimerImpl() override;

private:
std::shared_ptr<uvw::TimerHandle> timer_;
std::function<void()> handler_;
};

rxcpp::observable<long> timer;
rxcpp::composite_subscription handle;
};
} // namespace yac
} // namespace consensus
} // namespace iroha
Expand Down
2 changes: 1 addition & 1 deletion irohad/main/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ void Irohad::initBlockLoader() {

void Irohad::initConsensusGate() {
consensus_gate = yac_init.initConsensusGate(
peer.address, loop, orderer, simulator, block_loader);
peer.address, orderer, simulator, block_loader);

log_->info("[Init] => consensus gate");
}
Expand Down
34 changes: 15 additions & 19 deletions irohad/main/impl/consensus_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ namespace iroha {
}

auto YacInit::createCryptoProvider(model::Peer::KeyType pubkey) {
std::shared_ptr<MockYacCryptoProvider>
crypto = std::make_shared<MockYacCryptoProvider>(pubkey);
std::shared_ptr<MockYacCryptoProvider> crypto =
std::make_shared<MockYacCryptoProvider>(pubkey);

EXPECT_CALL(*crypto, verify(testing::An<CommitMessage>()))
.WillRepeatedly(testing::Return(true));
Expand All @@ -71,40 +71,36 @@ namespace iroha {
return crypto;
}

auto YacInit::createTimer(std::shared_ptr<uvw::Loop> loop) {
return std::make_shared<TimerImpl>(loop);
}
auto YacInit::createTimer() { return std::make_shared<TimerImpl>(); }

auto YacInit::createHashProvider() {
return std::make_shared<YacHashProviderImpl>();
}

std::shared_ptr<consensus::yac::Yac> YacInit::createYac(
std::string network_address, std::shared_ptr<uvw::Loop> loop,
ClusterOrdering initial_order) {
std::string network_address, ClusterOrdering initial_order) {
auto &&order = initial_order.getPeers();
auto pubkey =
std::find_if(order.begin(), order.end(), [network_address](
auto peer) {
return peer.address == network_address;
})->pubkey;
auto pubkey = std::find_if(order.begin(),
order.end(),
[network_address](auto peer) {
return peer.address == network_address;
})
->pubkey;

return Yac::create(YacVoteStorage(),
createNetwork(std::move(network_address), order),
createCryptoProvider(pubkey),
createTimer(std::move(loop)),
createTimer(),
initial_order,
delay_seconds_ * 1000);

}

std::shared_ptr<YacGateImpl> YacInit::initConsensusGate(
std::string network_address, std::shared_ptr<uvw::Loop> loop,
std::string network_address,
std::shared_ptr<YacPeerOrderer> peer_orderer,
std::shared_ptr<simulator::BlockCreator> block_creator,
std::shared_ptr<network::BlockLoader> block_loader) {
auto yac = createYac(std::move(network_address),
std::move(loop),
peer_orderer->getInitialOrdering().value());
consensus_network->subscribe(yac);

Expand All @@ -117,6 +113,6 @@ namespace iroha {
delay_seconds_ * 1000);
}

} // namespace yac
} // namespace consensus
} // namespace iroha
} // namespace yac
} // namespace consensus
} // namespace iroha
36 changes: 17 additions & 19 deletions irohad/main/impl/consensus_init.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
#include <memory>
#include <string>
#include <vector>
#include "consensus/yac/yac.hpp"
#include "consensus/yac/messages.hpp"
#include "consensus/yac/impl/yac_gate_impl.hpp"
#include "consensus/yac/transport/impl/network_impl.hpp"
#include "consensus/yac/impl/timer_impl.hpp"
#include "consensus/yac/impl/peer_orderer_impl.hpp"
#include "consensus/yac/impl/timer_impl.hpp"
#include "consensus/yac/impl/yac_gate_impl.hpp"
#include "consensus/yac/impl/yac_hash_provider_impl.hpp"
#include "consensus/yac/messages.hpp"
#include "consensus/yac/transport/impl/network_impl.hpp"
#include "consensus/yac/yac.hpp"

namespace iroha {
namespace consensus {
Expand All @@ -42,28 +42,26 @@ namespace iroha {

auto createCryptoProvider(model::Peer::KeyType pubkey);

auto createTimer(std::shared_ptr<uvw::Loop> loop);
auto createTimer();

auto createHashProvider();

std::shared_ptr<consensus::yac::Yac> createYac(std::string network_address,
std::shared_ptr<uvw::Loop> loop,
ClusterOrdering initial_order);
std::shared_ptr<consensus::yac::Yac> createYac(
std::string network_address, ClusterOrdering initial_order);

uint64_t delay_seconds_ = 5;

public:
std::shared_ptr<YacGateImpl> initConsensusGate(std::string network_address,
std::shared_ptr<uvw::Loop> loop,
std::shared_ptr<
YacPeerOrderer> peer_orderer,
std::shared_ptr<simulator::BlockCreator> block_creator,
std::shared_ptr<network::BlockLoader> block_loader);
std::shared_ptr<YacGateImpl> initConsensusGate(
std::string network_address,
std::shared_ptr<YacPeerOrderer> peer_orderer,
std::shared_ptr<simulator::BlockCreator> block_creator,
std::shared_ptr<network::BlockLoader> block_loader);

std::shared_ptr<NetworkImpl> consensus_network;
};
} // namespace yac
} // namespace consensus
} // namespace iroha
} // namespace yac
} // namespace consensus
} // namespace iroha

#endif //IROHA_CONSENSUS_INIT_HPP
#endif // IROHA_CONSENSUS_INIT_HPP
11 changes: 1 addition & 10 deletions test/integration/consensus/consensus_sunny_day.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ class FixedCryptoProvider : public MockYacCryptoProvider {

class ConsensusSunnyDayTest : public ::testing::Test {
public:
std::thread thread, loop_thread;
std::thread thread;
std::unique_ptr<grpc::Server> server;
std::shared_ptr<uvw::Loop> loop;
std::shared_ptr<NetworkImpl> network;
std::shared_ptr<MockYacCryptoProvider> crypto;
std::shared_ptr<TimerImpl> timer;
Expand Down Expand Up @@ -89,17 +88,9 @@ class ConsensusSunnyDayTest : public ::testing::Test {
// wait until server woke up
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock);

loop = uvw::Loop::create();
loop_thread = std::thread([this] { loop->run(); });
}

void TearDown() override {
loop->stop();
if (loop_thread.joinable()) {
loop_thread.join();
}

server->Shutdown();
if (thread.joinable()) {
thread.join();
Expand Down
11 changes: 1 addition & 10 deletions test/module/irohad/consensus/yac/timer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,15 @@ using namespace iroha::consensus::yac;
class TimerTest : public ::testing::Test {
protected:
void SetUp() override {
loop = uvw::Loop::create();
timer = std::make_shared<TimerImpl>(loop);
thread = std::thread([this] { ASSERT_TRUE(loop->run()); });
timer = std::make_shared<TimerImpl>();
}

void TearDown() override {
loop->stop();
if (thread.joinable()) {
thread.join();
}
timer.reset();
loop.reset();
}

public:
std::shared_ptr<Timer> timer;
std::shared_ptr<uvw::Loop> loop;
std::thread thread;
};

TEST_F(TimerTest, NothingInvokedWhenDenied) {
Expand Down

0 comments on commit 59fc7d4

Please sign in to comment.