Skip to content

Commit

Permalink
Feature/rework ordering gate with shared model (hyperledger-iroha#1092)
Browse files Browse the repository at this point in the history
Rework ordering gate with shared_model.

Signed-off-by: Alexey Chernyshov <[email protected]>
  • Loading branch information
Alexey-N-Chernyshov authored and x3medima17 committed Mar 30, 2018
1 parent f253407 commit 40d2655
Show file tree
Hide file tree
Showing 17 changed files with 178 additions and 127 deletions.
11 changes: 3 additions & 8 deletions irohad/network/impl/peer_communication_service_impl.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright Soramitsu Co., Ltd. 2016 All Rights Reserved.
Copyright Soramitsu Co., Ltd. 2018 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,17 +31,12 @@ namespace iroha {
std::shared_ptr<const shared_model::interface::Transaction>
transaction) {
log_->info("propagate tx");
ordering_gate_->propagateTransaction(
std::shared_ptr<model::Transaction>(transaction->makeOldModel()));
ordering_gate_->propagateTransaction(transaction);
}

rxcpp::observable<std::shared_ptr<shared_model::interface::Proposal>>
PeerCommunicationServiceImpl::on_proposal() const {
return ordering_gate_->on_proposal().map(
[](auto prop) -> std::shared_ptr<shared_model::interface::Proposal> {
return std::make_shared<shared_model::proto::Proposal>(
shared_model::proto::from_old(prop));
});
return ordering_gate_->on_proposal();
}

rxcpp::observable<Commit> PeerCommunicationServiceImpl::on_commit() const {
Expand Down
16 changes: 12 additions & 4 deletions irohad/network/ordering_gate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@
#define IROHA_ORDERING_SERVICE_HPP

#include <rxcpp/rx-observable.hpp>
#include "model/proposal.hpp"
#include "model/transaction.hpp"
#include "network/peer_communication_service.hpp"

namespace shared_model {
namespace interface {
class Transaction;
class Proposal;
} // namespace interface
} // namespace shared_model

namespace iroha {
namespace network {

Expand All @@ -36,13 +41,16 @@ namespace iroha {
* @param transaction
*/
virtual void propagateTransaction(
std::shared_ptr<const model::Transaction> transaction) = 0;
std::shared_ptr<const shared_model::interface::Transaction>
transaction) = 0;

/**
* Return observable of all proposals in the consensus
* @return observable with notifications
*/
virtual rxcpp::observable<model::Proposal> on_proposal() = 0;
virtual rxcpp::observable<
std::shared_ptr<shared_model::interface::Proposal>>
on_proposal() = 0;

/**
* Set peer communication service for commit notification
Expand Down
14 changes: 11 additions & 3 deletions irohad/network/ordering_gate_transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@
#define IROHA_ORDERING_GATE_TRANSPORT_H

#include <memory>
#include "model/proposal.hpp"

namespace shared_model {
namespace interface {
class Transaction;
class Proposal;
} // namespace interface
} // namespace shared_model

namespace iroha {
namespace network {
Expand All @@ -33,7 +39,8 @@ namespace iroha {
* Callback on receiving proposal
* @param proposal - proposal object itself
*/
virtual void onProposal(model::Proposal) = 0;
virtual void onProposal(
std::shared_ptr<shared_model::interface::Proposal>) = 0;

virtual ~OrderingGateNotification() = default;
};
Expand All @@ -58,7 +65,8 @@ namespace iroha {
* @param transaction : transaction to be propagated
*/
virtual void propagateTransaction(
std::shared_ptr<const model::Transaction> transaction) = 0;
std::shared_ptr<const shared_model::interface::Transaction>
transaction) = 0;

virtual ~OrderingGateTransport() = default;
};
Expand Down
26 changes: 14 additions & 12 deletions irohad/ordering/impl/ordering_gate_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <utility>

#include "interfaces/transaction.hpp"
#include "ordering/impl/ordering_gate_impl.hpp"

namespace iroha {
Expand All @@ -27,15 +28,17 @@ namespace iroha {
: transport_(std::move(transport)), log_(logger::log("OrderingGate")) {}

void OrderingGateImpl::propagateTransaction(
std::shared_ptr<const model::Transaction> transaction) {
log_->info("propagate tx, tx_counter: "
+ std::to_string(transaction->tx_counter)
+ " account_id: " + transaction->creator_account_id);
std::shared_ptr<const shared_model::interface::Transaction>
transaction) {
log_->info("propagate tx, tx_counter: {} account_id: {}",
std::to_string(transaction->transactionCounter()),
" account_id: " + transaction->creatorAccountId());

transport_->propagateTransaction(transaction);
}

rxcpp::observable<model::Proposal> OrderingGateImpl::on_proposal() {
rxcpp::observable<std::shared_ptr<shared_model::interface::Proposal>>
OrderingGateImpl::on_proposal() {
return proposals_.get_observable();
}

Expand All @@ -50,20 +53,19 @@ namespace iroha {
});
}

void OrderingGateImpl::onProposal(model::Proposal proposal) {
void OrderingGateImpl::onProposal(
std::shared_ptr<shared_model::interface::Proposal> proposal) {
log_->info("Received new proposal");
proposal_queue_.push(
std::make_shared<model::Proposal>(std::move(proposal)));
proposal_queue_.push(std::move(proposal));
tryNextRound();
}

void OrderingGateImpl::tryNextRound() {
if (not proposal_queue_.empty()
and unlock_next_.exchange(false)) {
std::shared_ptr<model::Proposal> next_proposal;
if (not proposal_queue_.empty() and unlock_next_.exchange(false)) {
std::shared_ptr<shared_model::interface::Proposal> next_proposal;
proposal_queue_.try_pop(next_proposal);
log_->info("Pass the proposal to pipeline");
proposals_.get_subscriber().on_next(*next_proposal);
proposals_.get_subscriber().on_next(next_proposal);
}
}

Expand Down
26 changes: 18 additions & 8 deletions irohad/ordering/impl/ordering_gate_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@
#include "network/ordering_gate.hpp"

#include <atomic>

#include <tbb/concurrent_queue.h>

#include "model/converters/pb_transaction_factory.hpp"
#include "logger/logger.hpp"
#include "network/impl/async_grpc_client.hpp"
#include "network/ordering_gate_transport.hpp"

#include "logger/logger.hpp"
namespace shared_model {
namespace interaface {
class Transaction;
class Proposal;
} // namespace interaface
} // namespace shared_model

namespace iroha {
namespace ordering {
Expand All @@ -46,13 +50,16 @@ namespace iroha {
std::shared_ptr<iroha::network::OrderingGateTransport> transport);

void propagateTransaction(
std::shared_ptr<const model::Transaction> transaction) override;
std::shared_ptr<const shared_model::interface::Transaction>
transaction) override;

rxcpp::observable<model::Proposal> on_proposal() override;
rxcpp::observable<std::shared_ptr<shared_model::interface::Proposal>>
on_proposal() override;

void setPcs(const iroha::network::PeerCommunicationService &pcs) override;

void onProposal(model::Proposal proposal) override;
void onProposal(
std::shared_ptr<shared_model::interface::Proposal> proposal) override;

~OrderingGateImpl() override;

Expand All @@ -62,14 +69,17 @@ namespace iroha {
*/
void tryNextRound();

rxcpp::subjects::subject<model::Proposal> proposals_;
rxcpp::subjects::subject<
std::shared_ptr<shared_model::interface::Proposal>>
proposals_;
std::shared_ptr<iroha::network::OrderingGateTransport> transport_;

/// invariant: true if proposal can be pushed to subscribers
std::atomic_bool unlock_next_{true};

/// queue with all proposals received from ordering service
tbb::concurrent_queue<std::shared_ptr<model::Proposal>> proposal_queue_;
tbb::concurrent_queue<std::shared_ptr<shared_model::interface::Proposal>>
proposal_queue_;

/// subscription of pcs::on_commit
rxcpp::composite_subscription pcs_subscriber_;
Expand Down
27 changes: 19 additions & 8 deletions irohad/ordering/impl/ordering_gate_transport_grpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
#include "ordering_gate_transport_grpc.hpp"

#include "backend/protobuf/transaction.hpp"
#include "builders/protobuf/proposal.hpp"
#include "interfaces/common_objects/types.hpp"

using namespace iroha::ordering;

grpc::Status OrderingGateTransportGrpc::onProposal(
Expand All @@ -24,15 +28,19 @@ grpc::Status OrderingGateTransportGrpc::onProposal(
::google::protobuf::Empty *response) {
log_->info("receive proposal");

auto transactions = decltype(std::declval<model::Proposal>().transactions)();
std::vector<shared_model::proto::Transaction> transactions;
for (const auto &tx : request->transactions()) {
transactions.push_back(*factory_.deserialize(tx));
transactions.emplace_back(tx);
}
log_->info("transactions in proposal: {}", transactions.size());

model::Proposal proposal(transactions);
proposal.height = request->height();
proposal.created_time = request->created_time();
auto proposal = std::make_shared<shared_model::proto::Proposal>(
shared_model::proto::ProposalBuilder()
.transactions(transactions)
.height(request->height())
.createdTime(request->created_time())
.build());

if (not subscriber_.expired()) {
subscriber_.lock()->onProposal(std::move(proposal));
} else {
Expand All @@ -49,12 +57,15 @@ OrderingGateTransportGrpc::OrderingGateTransportGrpc(
log_(logger::log("OrderingGate")) {}

void OrderingGateTransportGrpc::propagateTransaction(
std::shared_ptr<const model::Transaction> transaction) {
std::shared_ptr<const shared_model::interface::Transaction> transaction) {
log_->info("Propagate tx (on transport)");
auto call = new AsyncClientCall;

call->response_reader = client_->AsynconTransaction(
&call->context, factory_.serialize(*transaction), &cq_);
auto transaction_transport =
static_cast<const shared_model::proto::Transaction &>(*transaction)
.getTransport();
call->response_reader =
client_->AsynconTransaction(&call->context, transaction_transport, &cq_);

call->response_reader->Finish(&call->reply, &call->status, call);
}
Expand Down
13 changes: 9 additions & 4 deletions irohad/ordering/impl/ordering_gate_transport_grpc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@
#define IROHA_ORDERING_GATE_TRANSPORT_GRPC_H

#include <google/protobuf/empty.pb.h>

#include "logger/logger.hpp"
#include "model/converters/pb_transaction_factory.hpp"
#include "network/impl/async_grpc_client.hpp"
#include "network/ordering_gate_transport.hpp"
#include "ordering.grpc.pb.h"
#include "proposal.pb.h"

namespace shared_model {
namespace interface {
class Transaction;
}
} // namespace shared_model

namespace iroha {
namespace ordering {
Expand All @@ -39,15 +44,15 @@ namespace iroha {
::google::protobuf::Empty *response) override;

void propagateTransaction(
std::shared_ptr<const model::Transaction> transaction) override;
std::shared_ptr<const shared_model::interface::Transaction>
transaction) override;

void subscribe(std::shared_ptr<iroha::network::OrderingGateNotification>
subscriber) override;

private:
std::weak_ptr<iroha::network::OrderingGateNotification> subscriber_;
std::unique_ptr<proto::OrderingServiceTransportGrpc::Stub> client_;
model::converters::PbTransactionFactory factory_;
logger::Logger log_;
};

Expand Down
10 changes: 6 additions & 4 deletions irohad/ordering/impl/ordering_service_impl.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright Soramitsu Co., Ltd. 2017 All Rights Reserved.
* Copyright Soramitsu Co., Ltd. 2018 All Rights Reserved.
* http://soramitsu.co.jp
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -16,11 +16,13 @@
*/

#include "ordering/impl/ordering_service_impl.hpp"

#include "ametsuchi/ordering_service_persistent_state.hpp"
#include "ametsuchi/peer_query.hpp"
#include "backend/protobuf/proposal.hpp"
#include "backend/protobuf/transaction.hpp"
#include "builders/protobuf/proposal.hpp"
#include "logger/logger.hpp"
#include "proposal.pb.h"
#include "datetime/time.hpp"
#include "network/ordering_service_transport.hpp"

namespace iroha {
namespace ordering {
Expand Down
11 changes: 5 additions & 6 deletions irohad/ordering/impl/ordering_service_impl.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright Soramitsu Co., Ltd. 2017 All Rights Reserved.
* Copyright Soramitsu Co., Ltd. 2018 All Rights Reserved.
* http://soramitsu.co.jp
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -21,20 +21,19 @@
#include <tbb/concurrent_queue.h>
#include <memory>
#include <rxcpp/rx.hpp>
#include <unordered_map>

#include "ametsuchi/peer_query.hpp"
#include "logger/logger.hpp"
#include "network/impl/async_grpc_client.hpp"
#include "network/ordering_service.hpp"
#include "network/ordering_service_transport.hpp"
#include "ordering.grpc.pb.h"

namespace iroha {

namespace ametsuchi {
class OrderingServicePersistentState;
}
class OrderingServiceTransport;
class PeerQuery;
} // namespace ametsuchi

namespace ordering {

/**
Expand Down
3 changes: 2 additions & 1 deletion irohad/ordering/impl/ordering_service_transport_grpc.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright Soramitsu Co., Ltd. 2017 All Rights Reserved.
* Copyright Soramitsu Co., Ltd. 2018 All Rights Reserved.
* http://soramitsu.co.jp
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -15,6 +15,7 @@
* limitations under the License.
*/
#include "ordering/impl/ordering_service_transport_grpc.hpp"

#include "backend/protobuf/transaction.hpp"
#include "builders/protobuf/proposal.hpp"

Expand Down
Loading

0 comments on commit 40d2655

Please sign in to comment.