Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

OG batch resend strategy #2207

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions irohad/main/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "network/impl/peer_communication_service_impl.hpp"
#include "ordering/impl/on_demand_common.hpp"
#include "ordering/impl/on_demand_ordering_gate.hpp"
#include "ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp"
#include "pending_txs_storage/impl/pending_txs_storage_impl.hpp"
#include "simulator/impl/simulator.hpp"
#include "synchronizer/impl/synchronizer_impl.hpp"
Expand Down Expand Up @@ -347,6 +348,9 @@ void Irohad::initOrderingGate() {
auto factory = std::make_unique<shared_model::proto::ProtoProposalFactory<
shared_model::validation::DefaultProposalValidator>>();

auto batch_resend_strategy =
std::make_shared<ordering::OnDemandResendStrategy>();

const uint64_t kCounter = 0, kMaxLocalCounter = 2;
// reject_delay and local_counter are local mutable variables of lambda
const auto kMaxDelay(max_rounds_delay_);
Expand Down Expand Up @@ -386,6 +390,7 @@ void Irohad::initOrderingGate() {
std::move(factory),
proposal_factory,
persistent_cache,
std::move(batch_resend_strategy),
delay,
log_manager_->getChild("Ordering"));
log_->info("[Init] => init ordering gate - [{}]",
Expand Down
10 changes: 10 additions & 0 deletions irohad/main/impl/on_demand_ordering_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ namespace iroha {
std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
async_call,
std::shared_ptr<TransportFactoryType> proposal_transport_factory,
std::shared_ptr<ordering::OrderingGateResendStrategy>
batch_resend_strategy,
std::chrono::milliseconds delay,
std::vector<shared_model::interface::types::HashType> initial_hashes,
const logger::LoggerManagerTreePtr &ordering_log_manager) {
Expand Down Expand Up @@ -196,6 +198,7 @@ namespace iroha {
delay,
ordering_log_manager),
peers,
std::move(batch_resend_strategy),
ordering_log_manager->getChild("ConnectionManager")->getLogger());
}

Expand All @@ -206,6 +209,8 @@ namespace iroha {
std::shared_ptr<shared_model::interface::UnsafeProposalFactory>
proposal_factory,
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
std::shared_ptr<ordering::OrderingGateResendStrategy>
batch_resend_strategy,
std::function<std::chrono::milliseconds(
const synchronizer::SynchronizationEvent &)> delay_func,
size_t max_number_of_transactions,
Expand Down Expand Up @@ -252,6 +257,7 @@ namespace iroha {
std::move(cache),
std::move(proposal_factory),
std::move(tx_cache),
std::move(batch_resend_strategy),
max_number_of_transactions,
ordering_log_manager->getChild("Gate")->getLogger());
}
Expand Down Expand Up @@ -292,6 +298,8 @@ namespace iroha {
proposal_factory,
std::shared_ptr<TransportFactoryType> proposal_transport_factory,
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
std::shared_ptr<ordering::OrderingGateResendStrategy>
batch_resend_strategy,
std::function<std::chrono::milliseconds(
const synchronizer::SynchronizationEvent &)> delay_func,
logger::LoggerManagerTreePtr ordering_log_manager) {
Expand All @@ -310,12 +318,14 @@ namespace iroha {
createConnectionManager(std::move(peer_query_factory),
std::move(async_call),
std::move(proposal_transport_factory),
batch_resend_strategy,
delay,
std::move(initial_hashes),
ordering_log_manager),
std::make_shared<ordering::cache::OnDemandCache>(),
std::move(proposal_factory),
std::move(tx_cache),
batch_resend_strategy,
std::move(delay_func),
max_number_of_transactions,
ordering_log_manager);
Expand Down
11 changes: 9 additions & 2 deletions irohad/main/impl/on_demand_ordering_init.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "ordering.grpc.pb.h"
#include "ordering/impl/on_demand_os_server_grpc.hpp"
#include "ordering/impl/ordering_gate_cache/ordering_gate_cache.hpp"
#include "ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp"
#include "ordering/on_demand_ordering_service.hpp"
#include "ordering/on_demand_os_transport.hpp"

Expand Down Expand Up @@ -57,6 +58,8 @@ namespace iroha {
std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
async_call,
std::shared_ptr<TransportFactoryType> proposal_transport_factory,
std::shared_ptr<ordering::OrderingGateResendStrategy>
batch_resend_strategy,
std::chrono::milliseconds delay,
std::vector<shared_model::interface::types::HashType> initial_hashes,
const logger::LoggerManagerTreePtr &ordering_log_manager);
Expand All @@ -72,6 +75,8 @@ namespace iroha {
std::shared_ptr<shared_model::interface::UnsafeProposalFactory>
proposal_factory,
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
std::shared_ptr<ordering::OrderingGateResendStrategy>
batch_resend_strategy,
std::function<std::chrono::milliseconds(
const synchronizer::SynchronizationEvent &)> delay_func,
size_t max_number_of_transactions,
Expand All @@ -94,11 +99,11 @@ namespace iroha {
OnDemandOrderingInit(logger::LoggerPtr log);

~OnDemandOrderingInit();

/**
* Initializes on-demand ordering gate and ordering sevice components
*
* @param max_number_of_transactions maximum number of transaction in a proposal
* @param max_number_of_transactions maximum number of transaction in a
* proposal
* @param delay timeout for ordering service response on proposal request
* @param initial_hashes seeds for peer list permutations for first k
* rounds they are required since hash of block i defines round i + k
Expand Down Expand Up @@ -134,6 +139,8 @@ namespace iroha {
proposal_factory,
std::shared_ptr<TransportFactoryType> proposal_transport_factory,
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
std::shared_ptr<ordering::OrderingGateResendStrategy>
batch_resend_strategy,
std::function<std::chrono::milliseconds(
const synchronizer::SynchronizationEvent &)> delay_func,
logger::LoggerManagerTreePtr ordering_log_manager);
Expand Down
1 change: 1 addition & 0 deletions irohad/ordering/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ add_library(on_demand_ordering_gate
impl/on_demand_ordering_gate.cpp
impl/ordering_gate_cache/ordering_gate_cache.cpp
impl/ordering_gate_cache/on_demand_cache.cpp
impl/ordering_gate_cache/on_demand_resend_strategy.cpp
)
target_link_libraries(on_demand_ordering_gate
on_demand_common
Expand Down
48 changes: 43 additions & 5 deletions irohad/ordering/impl/on_demand_connection_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <boost/range/combine.hpp>
#include "interfaces/iroha_internal/proposal.hpp"
#include "interfaces/iroha_internal/transaction_batch_impl.hpp"
#include "logger/logger.hpp"
#include "ordering/impl/on_demand_common.hpp"

Expand All @@ -16,9 +17,11 @@ using namespace iroha::ordering;
OnDemandConnectionManager::OnDemandConnectionManager(
std::shared_ptr<transport::OdOsNotificationFactory> factory,
rxcpp::observable<CurrentPeers> peers,
std::shared_ptr<OrderingGateResendStrategy> batch_resend_strategy,
logger::LoggerPtr log)
: log_(std::move(log)),
factory_(std::move(factory)),
batch_resend_strategy_(std::move(batch_resend_strategy)),
subscription_(peers.subscribe([this](const auto &peers) {
// exclusive lock
std::lock_guard<std::shared_timed_mutex> lock(mutex_);
Expand All @@ -30,8 +33,12 @@ OnDemandConnectionManager::OnDemandConnectionManager(
std::shared_ptr<transport::OdOsNotificationFactory> factory,
rxcpp::observable<CurrentPeers> peers,
CurrentPeers initial_peers,
std::shared_ptr<OrderingGateResendStrategy> batch_resend_strategy,
logger::LoggerPtr log)
: OnDemandConnectionManager(std::move(factory), peers, std::move(log)) {
: OnDemandConnectionManager(std::move(factory),
peers,
std::move(batch_resend_strategy),
std::move(log)) {
// using start_with(initial_peers) results in deadlock
initializeConnections(initial_peers);
}
Expand All @@ -55,10 +62,41 @@ void OnDemandConnectionManager::onBatches(CollectionType batches) {
* RejectReject CommitReject RejectCommit CommitCommit
*/

connections_.peers[kRejectRejectConsumer]->onBatches(batches);
connections_.peers[kRejectCommitConsumer]->onBatches(batches);
connections_.peers[kCommitRejectConsumer]->onBatches(batches);
connections_.peers[kCommitCommitConsumer]->onBatches(batches);
CollectionType reject_reject_batches{};
CollectionType reject_commit_batches{};
CollectionType commit_reject_batches{};
CollectionType commit_commit_batches{};

for (const auto &batch : batches) {
auto rounds = batch_resend_strategy_->extract(batch);
auto current_round = batch_resend_strategy_->getCurrentRound();

if (rounds.find(nextRejectRound(nextRejectRound(current_round)))
!= rounds.end()) {
reject_reject_batches.push_back(batch);
}
if (rounds.find(nextCommitRound(nextRejectRound(current_round)))
!= rounds.end()) {
reject_commit_batches.push_back(batch);
}
if (rounds.find(nextRejectRound(nextCommitRound(current_round)))
!= rounds.end()) {
commit_reject_batches.push_back(batch);
}
if (rounds.find(nextCommitRound(nextCommitRound(current_round)))
!= rounds.end()) {
commit_commit_batches.push_back(batch);
}
}
luckychess marked this conversation as resolved.
Show resolved Hide resolved

connections_.peers[kRejectRejectConsumer]->onBatches(
std::move(reject_reject_batches));
connections_.peers[kRejectCommitConsumer]->onBatches(
std::move(reject_commit_batches));
connections_.peers[kCommitRejectConsumer]->onBatches(
std::move(commit_reject_batches));
connections_.peers[kCommitCommitConsumer]->onBatches(
std::move(commit_commit_batches));
}

boost::optional<std::shared_ptr<const OnDemandConnectionManager::ProposalType>>
Expand Down
4 changes: 4 additions & 0 deletions irohad/ordering/impl/on_demand_connection_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include <rxcpp/rx.hpp>
#include "logger/logger_fwd.hpp"
#include "ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp"

namespace iroha {
namespace ordering {
Expand Down Expand Up @@ -53,12 +54,14 @@ namespace iroha {
OnDemandConnectionManager(
std::shared_ptr<transport::OdOsNotificationFactory> factory,
rxcpp::observable<CurrentPeers> peers,
std::shared_ptr<OrderingGateResendStrategy> batch_resend_strategy,
logger::LoggerPtr log);

OnDemandConnectionManager(
std::shared_ptr<transport::OdOsNotificationFactory> factory,
rxcpp::observable<CurrentPeers> peers,
CurrentPeers initial_peers,
std::shared_ptr<OrderingGateResendStrategy> batch_resend_strategy,
logger::LoggerPtr log);

~OnDemandConnectionManager() override;
Expand All @@ -85,6 +88,7 @@ namespace iroha {

logger::LoggerPtr log_;
std::shared_ptr<transport::OdOsNotificationFactory> factory_;
std::shared_ptr<OrderingGateResendStrategy> batch_resend_strategy_;
rxcpp::composite_subscription subscription_;

CurrentConnections connections_;
Expand Down
13 changes: 12 additions & 1 deletion irohad/ordering/impl/on_demand_ordering_gate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ OnDemandOrderingGate::OnDemandOrderingGate(
std::shared_ptr<cache::OrderingGateCache> cache,
std::shared_ptr<shared_model::interface::UnsafeProposalFactory> factory,
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
std::shared_ptr<OrderingGateResendStrategy> batch_resend_strategy,
size_t transaction_limit,
logger::LoggerPtr log)
: log_(std::move(log)),
Expand All @@ -60,6 +61,8 @@ OnDemandOrderingGate::OnDemandOrderingGate(

this->sendCachedTransactions(event);

batch_resend_strategy_->setCurrentRound(current_round);

// request proposal for the current round
auto proposal = this->processProposalRequest(
network_client_->onRequestProposal(current_round));
Expand All @@ -69,7 +72,8 @@ OnDemandOrderingGate::OnDemandOrderingGate(
})),
cache_(std::move(cache)),
proposal_factory_(std::move(factory)),
tx_cache_(std::move(tx_cache)) {}
tx_cache_(std::move(tx_cache)),
batch_resend_strategy_(std::move(batch_resend_strategy)) {}

OnDemandOrderingGate::~OnDemandOrderingGate() {
events_subscription_.unsubscribe();
Expand All @@ -79,6 +83,8 @@ void OnDemandOrderingGate::propagateBatch(
std::shared_ptr<shared_model::interface::TransactionBatch> batch) {
cache_->addToBack({batch});

batch_resend_strategy_->feed(batch);

network_client_->onBatches(
transport::OdOsNotification::CollectionType{batch});
}
Expand Down Expand Up @@ -109,6 +115,7 @@ void OnDemandOrderingGate::sendCachedTransactions(
[this](const BlockEvent &block_event) {
// block committed, remove transactions from cache
cache_->remove(block_event.hashes);
batch_resend_strategy_->remove(block_event.hashes);
},
[](const EmptyEvent &) {
// no blocks committed, no transactions to remove
Expand All @@ -117,6 +124,10 @@ void OnDemandOrderingGate::sendCachedTransactions(
auto batches = cache_->pop();
cache_->addToBack(batches);

for (const auto &batch : batches) {
batch_resend_strategy_->readyToUse(batch);
}

// get only transactions which fit to next proposal
auto end_iterator = batches.begin();
auto current_number_of_transactions = 0u;
Expand Down
3 changes: 3 additions & 0 deletions irohad/ordering/impl/on_demand_ordering_gate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "interfaces/iroha_internal/unsafe_proposal_factory.hpp"
#include "logger/logger_fwd.hpp"
#include "ordering/impl/ordering_gate_cache/ordering_gate_cache.hpp"
#include "ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp"
#include "ordering/on_demand_ordering_service.hpp"

namespace iroha {
Expand Down Expand Up @@ -66,6 +67,7 @@ namespace iroha {
std::shared_ptr<shared_model::interface::UnsafeProposalFactory>
factory,
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
std::shared_ptr<OrderingGateResendStrategy> batch_resend_strategy,
size_t transaction_limit,
logger::LoggerPtr log);

Expand Down Expand Up @@ -109,6 +111,7 @@ namespace iroha {
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache_;

rxcpp::subjects::subject<network::OrderingEvent> proposal_notifier_;
std::shared_ptr<OrderingGateResendStrategy> batch_resend_strategy_;
};

} // namespace ordering
Expand Down
Loading