Skip to content

Commit

Permalink
Queue in Ordering Gate
Browse files Browse the repository at this point in the history
Add unsubscribe in the dtor of ordering gate

Signed-off-by: Fedor Muratov <[email protected]>

Fix gcc related stuff in ordering_gate_impl
Add test for ordering gate with queue semantic.

Signed-off-by: Fedor Muratov <[email protected]>

Fix test for ordering service with respect to queue in ordering gate;
A little bit fix of documentation in orderings_gate_service_test;

Signed-off-by: Fedor Muratov <[email protected]>

* Clang format application
* Add comment for pcs purposes

Signed-off-by: Fedor Muratov <[email protected]>

* Fix PCS const behaviour
* Fix review issues:
  - typos
  - weak ptr
  - todo for behaviour

Signed-off-by: Fedor Muratov <[email protected]>

Fix more review issues:
 - typos with pcs
 - race condition for queue

Signed-off-by: Fedor Muratov <[email protected]>

Fix order of fetching from queue and atomic exchange.

Signed-off-by: Fedor Muratov <[email protected]>

Fix order of initialization in the service test
Rework ordering gate with empty queue checking.

Signed-off-by: Fedor Muratov <[email protected]>

Remove return value from setPcs

Signed-off-by: Fedor Muratov <[email protected]>

Fix merge issues:
* add missed set pcs
* rework test with shared_model

Signed-off-by: Fedor Muratov <[email protected]>
  • Loading branch information
muratovv committed Mar 6, 2018
1 parent b983214 commit 753a438
Show file tree
Hide file tree
Showing 11 changed files with 184 additions and 21 deletions.
3 changes: 3 additions & 0 deletions irohad/main/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ void Irohad::initPeerCommunicationService() {
pcs->on_commit().subscribe(
[this](auto) { log_->info("~~~~~~~~~| COMMIT =^._.^= |~~~~~~~~~ "); });

// complete initialization of ordering gate
ordering_gate->setPcs(*pcs);

log_->info("[Init] => pcs");
}

Expand Down
4 changes: 2 additions & 2 deletions irohad/network/impl/peer_communication_service_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ namespace iroha {
}

rxcpp::observable<std::shared_ptr<shared_model::interface::Proposal>>
PeerCommunicationServiceImpl::on_proposal() {
PeerCommunicationServiceImpl::on_proposal() const {
return ordering_gate_->on_proposal().map(
[](auto prop) -> std::shared_ptr<shared_model::interface::Proposal> {
return std::make_shared<shared_model::proto::Proposal>(
shared_model::proto::from_old(prop));
});
}

rxcpp::observable<Commit> PeerCommunicationServiceImpl::on_commit() {
rxcpp::observable<Commit> PeerCommunicationServiceImpl::on_commit() const {
return synchronizer_->on_commit_chain().map([](auto commit) -> Commit {
return commit.map(
[](auto block) -> std::shared_ptr<shared_model::interface::Block> {
Expand Down
4 changes: 2 additions & 2 deletions irohad/network/impl/peer_communication_service_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ namespace iroha {
transaction) override;

rxcpp::observable<std::shared_ptr<shared_model::interface::Proposal>>
on_proposal() override;
on_proposal() const override;

rxcpp::observable<Commit> on_commit() override;
rxcpp::observable<Commit> on_commit() const override;

private:
std::shared_ptr<OrderingGate> ordering_gate_;
Expand Down
10 changes: 10 additions & 0 deletions irohad/network/ordering_gate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <rxcpp/rx-observable.hpp>
#include "model/proposal.hpp"
#include "model/transaction.hpp"
#include "network/peer_communication_service.hpp"

namespace iroha {
namespace network {
Expand All @@ -43,6 +44,15 @@ namespace iroha {
*/
virtual rxcpp::observable<model::Proposal> on_proposal() = 0;

/**
* Set peer communication service for commit notification
* @param pcs - const reference for PeerCommunicationService
* design notes: pcs passed by const reference because of cyclic linking
* between OG and PCS in the implementation. Same reasons to move the pcs
* dependency not in ctor but make the setter method.
*/
virtual void setPcs(const PeerCommunicationService &pcs) = 0;

virtual ~OrderingGate() = default;
};
} // namespace network
Expand Down
4 changes: 2 additions & 2 deletions irohad/network/peer_communication_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ namespace iroha {
*/
virtual rxcpp::observable<
std::shared_ptr<shared_model::interface::Proposal>>
on_proposal() = 0;
on_proposal() const = 0;

/**
* Event is triggered when commit block arrives.
Expand All @@ -64,7 +64,7 @@ namespace iroha {
* But there are scenarios when consensus provide many blocks, e.g.
* on peer startup - peer will get all actual blocks.
*/
virtual rxcpp::observable<Commit> on_commit() = 0;
virtual rxcpp::observable<Commit> on_commit() const = 0;

virtual ~PeerCommunicationService() = default;
};
Expand Down
33 changes: 31 additions & 2 deletions irohad/ordering/impl/ordering_gate_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
* limitations under the License.
*/

#include <utility>

#include "ordering/impl/ordering_gate_impl.hpp"

namespace iroha {
namespace ordering {

OrderingGateImpl::OrderingGateImpl(
std::shared_ptr<iroha::network::OrderingGateTransport> transport)
: transport_(transport), log_(logger::log("OrderingGate")) {}
: transport_(std::move(transport)), log_(logger::log("OrderingGate")) {}

void OrderingGateImpl::propagateTransaction(
std::shared_ptr<const model::Transaction> transaction) {
Expand All @@ -37,9 +39,36 @@ namespace iroha {
return proposals_.get_observable();
}

void OrderingGateImpl::setPcs(
const iroha::network::PeerCommunicationService &pcs) {
pcs_subscriber_ = pcs.on_commit().subscribe([this](auto) {
// TODO: 05/03/2018 @muratovv rework behavior of queue with respect to
// block height IR-1042
unlock_next_.store(true);
this->tryNextRound();

});
}

void OrderingGateImpl::onProposal(model::Proposal proposal) {
log_->info("Received new proposal");
proposals_.get_subscriber().on_next(proposal);
proposal_queue_.push(
std::make_shared<model::Proposal>(std::move(proposal)));
tryNextRound();
}

void OrderingGateImpl::tryNextRound() {
if (not proposal_queue_.empty()
and unlock_next_.exchange(false)) {
std::shared_ptr<model::Proposal> next_proposal;
proposal_queue_.try_pop(next_proposal);
log_->info("Pass the proposal to pipeline");
proposals_.get_subscriber().on_next(*next_proposal);
}
}

OrderingGateImpl::~OrderingGateImpl() {
pcs_subscriber_.unsubscribe();
}

} // namespace ordering
Expand Down
26 changes: 25 additions & 1 deletion irohad/ordering/impl/ordering_gate_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@
#ifndef IROHA_ORDERING_GATE_IMPL_HPP
#define IROHA_ORDERING_GATE_IMPL_HPP

#include "network/ordering_gate.hpp"

#include <atomic>

#include <tbb/concurrent_queue.h>

#include "model/converters/pb_transaction_factory.hpp"
#include "network/impl/async_grpc_client.hpp"
#include "network/ordering_gate.hpp"
#include "network/ordering_gate_transport.hpp"

#include "logger/logger.hpp"
Expand All @@ -45,11 +50,30 @@ namespace iroha {

rxcpp::observable<model::Proposal> on_proposal() override;

void setPcs(const iroha::network::PeerCommunicationService &pcs) override;

void onProposal(model::Proposal proposal) override;

~OrderingGateImpl() override;

private:
/**
* Try to push proposal for next consensus round
*/
void tryNextRound();

rxcpp::subjects::subject<model::Proposal> proposals_;
std::shared_ptr<iroha::network::OrderingGateTransport> transport_;

/// invariant: true if proposal can be pushed to subscribers
std::atomic_bool unlock_next_{true};

/// queue with all proposals received from ordering service
tbb::concurrent_queue<std::shared_ptr<model::Proposal>> proposal_queue_;

/// subscription of pcs::on_commit
rxcpp::composite_subscription pcs_subscriber_;

logger::Logger log_;
};
} // namespace ordering
Expand Down
14 changes: 10 additions & 4 deletions test/module/irohad/network/network_mocks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@ namespace iroha {
namespace network {
class MockPeerCommunicationService : public PeerCommunicationService {
public:
MOCK_METHOD1(propagate_transaction,
void(std::shared_ptr<const shared_model::interface::Transaction>));
MOCK_METHOD1(
propagate_transaction,
void(std::shared_ptr<const shared_model::interface::Transaction>));

MOCK_METHOD0(on_proposal, rxcpp::observable<std::shared_ptr<shared_model::interface::Proposal>>());
MOCK_CONST_METHOD0(
on_proposal,
rxcpp::observable<
std::shared_ptr<shared_model::interface::Proposal>>());

MOCK_METHOD0(on_commit, rxcpp::observable<Commit>());
MOCK_CONST_METHOD0(on_commit, rxcpp::observable<Commit>());
};

class MockBlockLoader : public BlockLoader {
Expand All @@ -53,6 +57,8 @@ namespace iroha {
void(std::shared_ptr<const model::Transaction> transaction));

MOCK_METHOD0(on_proposal, rxcpp::observable<model::Proposal>());

MOCK_METHOD1(setPcs, void(const PeerCommunicationService &));
};

class MockConsensusGate : public ConsensusGate {
Expand Down
41 changes: 37 additions & 4 deletions test/module/irohad/ordering/ordering_gate_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,32 @@
* limitations under the License.
*/

#include <gtest/gtest.h>

#include "backend/protobuf/common_objects/peer.hpp"
#include "builders/protobuf/common_objects/proto_peer_builder.hpp"
#include "builders/common_objects/peer_builder.hpp"
#include "framework/test_subscriber.hpp"
#include "mock_ordering_service_persistent_state.hpp"
#include "model/asset.hpp"
#include "module/irohad/ametsuchi/ametsuchi_mocks.hpp"
#include "module/irohad/network/network_mocks.hpp"
#include "ordering/impl/ordering_gate_impl.hpp"
#include "ordering/impl/ordering_gate_transport_grpc.hpp"
#include "ordering/impl/ordering_service_impl.hpp"
#include "ordering/impl/ordering_service_transport_grpc.hpp"
#include "validators/field_validator.hpp"

#include "module/shared_model/builders/protobuf/test_block_builder.hpp"

using namespace iroha;
using namespace iroha::ordering;
using namespace iroha::model;
using namespace iroha::network;
using namespace framework::test_subscriber;
using namespace iroha::ametsuchi;
using namespace std::chrono_literals;

using ::testing::Return;

using wPeer = std::shared_ptr<shared_model::interface::Peer>;
Expand All @@ -46,8 +53,12 @@ class OrderingGateServiceTest : public ::testing::Test {
.address(address)
.pubkey(shared_model::interface::types::PubkeyType(std::string(32, '0')))
.build().copy());
pcs_ = std::make_shared<MockPeerCommunicationService>();
EXPECT_CALL(*pcs_, on_commit())
.WillRepeatedly(Return(commit_subject_.get_observable()));
gate_transport = std::make_shared<OrderingGateTransportGrpc>(address);
gate = std::make_shared<OrderingGateImpl>(gate_transport);
gate->setPcs(*pcs_);
gate_transport->subscribe(gate);

service_transport = std::make_shared<OrderingServiceTransportGrpc>();
Expand Down Expand Up @@ -93,14 +104,29 @@ class OrderingGateServiceTest : public ::testing::Test {

TestSubscriber<iroha::model::Proposal> init(size_t times) {
auto wrapper = make_test_subscriber<CallExact>(gate->on_proposal(), times);
wrapper.subscribe([this](auto proposal) { proposals.push_back(proposal); });
gate->on_proposal().subscribe([this](auto) {
counter--;
cv.notify_one();
});
gate->on_proposal().subscribe([this](auto proposal) {
proposals.push_back(proposal);

// emulate commit event after receiving the proposal to perform next
// round inside the peer.
std::shared_ptr<shared_model::interface::Block> block =
std::make_shared<shared_model::proto::Block>(
TestBlockBuilder().build());
commit_subject_.get_subscriber().on_next(
rxcpp::observable<>::just(block));
});
wrapper.subscribe();
return wrapper;
}

/**
* Send a stub transaction to OS
* @param i - number of transaction
*/
void send_transaction(size_t i) {
auto tx = std::make_shared<Transaction>();
tx->tx_counter = i;
Expand All @@ -113,6 +139,11 @@ class OrderingGateServiceTest : public ::testing::Test {
std::shared_ptr<OrderingGateImpl> gate;
std::shared_ptr<OrderingServiceImpl> service;

/// Peer Communication Service and commit subject are required to emulate
/// commits for Ordering Service
std::shared_ptr<MockPeerCommunicationService> pcs_;
rxcpp::subjects::subject<Commit> commit_subject_;

std::vector<Proposal> proposals;
std::atomic<size_t> counter;
std::condition_variable cv;
Expand All @@ -127,9 +158,11 @@ class OrderingGateServiceTest : public ::testing::Test {
};

/**
* @given ordering service
* @when a bunch of transaction has arrived
* @then proposal is sent
* @given Ordering service
* @when Send 8 transactions
* AND 2 transactions to OS
* @then Received proposal with 8 transactions
* AND proposal with 2 transactions
*/
TEST_F(OrderingGateServiceTest, SplittingBunchTransactions) {
// 8 transaction -> proposal -> 2 transaction -> proposal
Expand Down
Loading

0 comments on commit 753a438

Please sign in to comment.