Skip to content

Commit

Permalink
[dag] chain health backoff backpressure (aptos-labs#10689)
Browse files Browse the repository at this point in the history
* [dag] chain health backoff backpressure
  • Loading branch information
ibalajiarun authored Nov 5, 2023
1 parent e826527 commit 2fc3353
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 32 deletions.
5 changes: 3 additions & 2 deletions aptos-move/aptos-vm/src/natives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ use aptos_native_interface::SafeNativeBuilder;
use aptos_table_natives::{TableHandle, TableResolver};
use aptos_types::{
account_config::CORE_CODE_ADDRESS,
aggregator::PanicError,
on_chain_config::{Features, TimedFeatures, TimedFeaturesBuilder},
write_set::WriteOp,
};
#[cfg(feature = "testing")]
use aptos_types::{aggregator::PanicError, write_set::WriteOp};
#[cfg(feature = "testing")]
use aptos_types::{
chain_id::ChainId,
state_store::{state_key::StateKey, state_value::StateValue},
Expand All @@ -40,6 +40,7 @@ use move_core_types::language_storage::StructTag;
#[cfg(feature = "testing")]
use move_core_types::value::MoveTypeLayout;
use move_vm_runtime::native_functions::NativeFunctionTable;
#[cfg(feature = "testing")]
use std::{
collections::{BTreeMap, HashSet},
sync::Arc,
Expand Down
6 changes: 5 additions & 1 deletion config/src/config/dag_consensus_config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright © Aptos Foundation

use super::{config_sanitizer::ConfigSanitizer, node_config_loader::NodeType, Error, NodeConfig};
use super::{
config_sanitizer::ConfigSanitizer, node_config_loader::NodeType, ChainHealthBackoffValues,
Error, NodeConfig,
};
use aptos_types::chain_id::ChainId;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -139,6 +142,7 @@ pub struct DagConsensusConfig {
pub rb_config: ReliableBroadcastConfig,
pub fetcher_config: DagFetcherConfig,
pub round_state_config: DagRoundStateConfig,
pub chain_backoff_config: Vec<ChainHealthBackoffValues>,
}

impl ConfigSanitizer for DagConsensusConfig {
Expand Down
46 changes: 45 additions & 1 deletion consensus/src/dag/anchor_election/leader_reputation_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::TChainHealthBackoff;
use crate::{
counters::CHAIN_HEALTH_BACKOFF_TRIGGERED,
dag::{anchor_election::AnchorElection, storage::CommitEvent},
liveness::{
leader_reputation::{LeaderReputation, MetadataBackend, ReputationHeuristic},
proposal_generator::ChainHealthBackoffConfig,
proposer_election::ProposerElection,
},
};
Expand All @@ -17,6 +20,7 @@ use move_core_types::account_address::AccountAddress;
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
time::Duration,
};

pub struct MetadataBackendAdapter {
Expand Down Expand Up @@ -96,6 +100,7 @@ impl MetadataBackend for MetadataBackendAdapter {
pub struct LeaderReputationAdapter {
reputation: LeaderReputation,
data_source: Arc<MetadataBackendAdapter>,
chain_health_backoff_config: ChainHealthBackoffConfig,
}

impl LeaderReputationAdapter {
Expand All @@ -106,6 +111,7 @@ impl LeaderReputationAdapter {
backend: Arc<MetadataBackendAdapter>,
heuristic: Box<dyn ReputationHeuristic>,
window_for_chain_health: usize,
chain_health_backoff_config: ChainHealthBackoffConfig,
) -> Self {
Self {
reputation: LeaderReputation::new(
Expand All @@ -119,16 +125,54 @@ impl LeaderReputationAdapter {
window_for_chain_health,
),
data_source: backend,
chain_health_backoff_config,
}
}

fn get_chain_health_backoff(
&self,
round: u64,
) -> (f64, Option<&aptos_config::config::ChainHealthBackoffValues>) {
let voting_power_ratio = self.reputation.get_voting_power_participation_ratio(round);

let chain_health_backoff = self
.chain_health_backoff_config
.get_backoff(voting_power_ratio);
(voting_power_ratio, chain_health_backoff)
}
}

impl AnchorElection for LeaderReputationAdapter {
fn get_anchor(&self, round: Round) -> Author {
self.reputation.get_valid_proposer(round)
}

fn update_reputation(&mut self, commit_event: CommitEvent) {
fn update_reputation(&self, commit_event: CommitEvent) {
self.data_source.push(commit_event)
}
}

impl TChainHealthBackoff for LeaderReputationAdapter {
fn get_round_backoff(&self, round: Round) -> (f64, Option<Duration>) {
let (voting_power_ratio, chain_health_backoff) = self.get_chain_health_backoff(round);
let backoff_duration = if let Some(value) = chain_health_backoff {
CHAIN_HEALTH_BACKOFF_TRIGGERED.observe(1.0);
Some(Duration::from_millis(value.backoff_proposal_delay_ms))
} else {
CHAIN_HEALTH_BACKOFF_TRIGGERED.observe(0.0);
None
};
(voting_power_ratio, backoff_duration)
}

fn get_round_payload_limits(&self, round: Round) -> (f64, Option<(u64, u64)>) {
let (voting_power_ratio, chain_health_backoff) = self.get_chain_health_backoff(round);
let backoff_limits = chain_health_backoff.map(|value| {
(
value.max_sending_block_txns_override,
value.max_sending_block_bytes_override,
)
});
(voting_power_ratio, backoff_limits)
}
}
9 changes: 8 additions & 1 deletion consensus/src/dag/anchor_election/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@

use crate::dag::storage::CommitEvent;
use aptos_consensus_types::common::{Author, Round};
use std::time::Duration;

pub trait AnchorElection: Send + Sync {
fn get_anchor(&self, round: Round) -> Author;

fn update_reputation(&mut self, commit_event: CommitEvent);
fn update_reputation(&self, commit_event: CommitEvent);
}

pub trait TChainHealthBackoff: Send + Sync {
fn get_round_backoff(&self, round: Round) -> (f64, Option<Duration>);

fn get_round_payload_limits(&self, round: Round) -> (f64, Option<(u64, u64)>);
}

mod leader_reputation_adapter;
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/anchor_election/round_robin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ impl AnchorElection for RoundRobinAnchorElection {
self.validators[(round / 2) as usize % self.validators.len()]
}

fn update_reputation(&mut self, _event: CommitEvent) {}
fn update_reputation(&self, _event: CommitEvent) {}
}
24 changes: 17 additions & 7 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use super::{
adapter::{OrderedNotifierAdapter, TLedgerInfoProvider},
anchor_election::TChainHealthBackoff,
dag_driver::DagDriver,
dag_fetcher::{DagFetcher, DagFetcherService, FetchRequestHandler},
dag_handler::NetworkHandler,
Expand All @@ -23,7 +24,10 @@ use crate::{
round_state::{AdaptiveResponsive, RoundState},
},
experimental::buffer_manager::OrderedBlocks,
liveness::leader_reputation::{ProposerAndVoterHeuristic, ReputationHeuristic},
liveness::{
leader_reputation::{ProposerAndVoterHeuristic, ReputationHeuristic},
proposal_generator::ChainHealthBackoffConfig,
},
network::IncomingDAGRequest,
payload_manager::PayloadManager,
state_replication::{PayloadClient, StateComputer},
Expand Down Expand Up @@ -104,7 +108,7 @@ impl DagBootstrapper {
parent_block_info: BlockInfo,
ordered_nodes_tx: UnboundedSender<OrderedBlocks>,
dag_window_size_config: u64,
) -> (Arc<RwLock<Dag>>, OrderRule) {
) -> (Arc<RwLock<Dag>>, OrderRule, Arc<dyn TChainHealthBackoff>) {
let initial_ledger_info = ledger_info_provider
.get_latest_ledger_info()
.ledger_info()
Expand Down Expand Up @@ -162,7 +166,7 @@ impl DagBootstrapper {
.get_ordered_account_addresses_iter()
.map(|p| self.epoch_state.verifier.get_voting_power(&p).unwrap())
.collect();
let anchor_election = Box::new(LeaderReputationAdapter::new(
let anchor_election = Arc::new(LeaderReputationAdapter::new(
self.epoch_state.epoch,
HashMap::from([(
self.epoch_state.epoch,
Expand All @@ -172,19 +176,20 @@ impl DagBootstrapper {
metadata_adapter,
heuristic,
100,
ChainHealthBackoffConfig::new(self.config.chain_backoff_config.clone()),
));

let order_rule = OrderRule::new(
self.epoch_state.clone(),
commit_round + 1,
dag.clone(),
anchor_election,
anchor_election.clone(),
notifier,
self.storage.clone(),
self.onchain_config.dag_ordering_causal_history_window as Round,
);

(dag, order_rule)
(dag, order_rule, anchor_election)
}

fn bootstrap_components(
Expand All @@ -193,6 +198,7 @@ impl DagBootstrapper {
order_rule: OrderRule,
state_sync_trigger: StateSyncTrigger,
ledger_info_provider: Arc<dyn TLedgerInfoProvider>,
chain_health_backoff: Arc<dyn TChainHealthBackoff>,
rb_config: ReliableBroadcastConfig,
round_state_config: DagRoundStateConfig,
) -> (NetworkHandler, DagFetcherService) {
Expand Down Expand Up @@ -227,6 +233,7 @@ impl DagBootstrapper {
new_round_tx,
self.epoch_state.clone(),
Duration::from_millis(round_state_config.adaptive_responsive_minimum_wait_time_ms),
chain_health_backoff.clone(),
)),
);

Expand All @@ -245,6 +252,7 @@ impl DagBootstrapper {
round_state,
self.onchain_config.dag_ordering_causal_history_window as Round,
self.config.node_payload_config.clone(),
chain_health_backoff,
);
let rb_handler = NodeBroadcastHandler::new(
dag.clone(),
Expand Down Expand Up @@ -298,7 +306,7 @@ impl DagBootstrapper {

let ledger_info_provider = Arc::new(RwLock::new(LedgerInfoProvider::new(ledger_info)));

let (dag_store, order_rule) = self.bootstrap_dag_store(
let (dag_store, order_rule, chain_health_backoff) = self.bootstrap_dag_store(
ledger_info_provider.clone(),
parent_block_info,
ordered_nodes_tx.clone(),
Expand All @@ -318,6 +326,7 @@ impl DagBootstrapper {
order_rule,
state_sync_trigger,
ledger_info_provider.clone(),
chain_health_backoff,
self.config.rb_config.clone(),
self.config.round_state_config.clone(),
);
Expand Down Expand Up @@ -430,7 +439,7 @@ pub(super) fn bootstrap_dag_for_test(
let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded();
let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None);

let (dag_store, order_rule) = bootstraper.bootstrap_dag_store(
let (dag_store, order_rule, chain_health_backoff) = bootstraper.bootstrap_dag_store(
ledger_info_provider.clone(),
parent_block_info,
ordered_nodes_tx,
Expand All @@ -454,6 +463,7 @@ pub(super) fn bootstrap_dag_for_test(
order_rule,
state_sync_trigger,
ledger_info_provider,
chain_health_backoff,
bootstraper.config.rb_config.clone(),
bootstraper.config.round_state_config.clone(),
);
Expand Down
56 changes: 44 additions & 12 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::anchor_election::TChainHealthBackoff;
use crate::{
dag::{
adapter::TLedgerInfoProvider,
Expand Down Expand Up @@ -57,9 +58,11 @@ pub(crate) struct DagDriver {
round_state: RoundState,
window_size_config: Round,
payload_config: DagPayloadConfig,
chain_backoff: Arc<dyn TChainHealthBackoff>,
}

impl DagDriver {
#[allow(clippy::too_many_arguments)]
pub fn new(
author: Author,
epoch_state: Arc<EpochState>,
Expand All @@ -75,6 +78,7 @@ impl DagDriver {
round_state: RoundState,
window_size_config: Round,
payload_config: DagPayloadConfig,
chain_backoff: Arc<dyn TChainHealthBackoff>,
) -> Self {
let pending_node = storage
.get_pending_node()
Expand All @@ -98,6 +102,7 @@ impl DagDriver {
round_state,
window_size_config,
payload_config,
chain_backoff,
};

// If we were broadcasting the node for the round already, resume it
Expand Down Expand Up @@ -188,23 +193,15 @@ impl DagDriver {
)
}
};
// TODO: warn/panic if division yields 0 txns
let max_txns = self
.payload_config
.max_sending_txns_per_round
.saturating_div(self.epoch_state.verifier.len() as u64)
.max(1);
let max_txn_size_bytes = self
.payload_config
.max_sending_size_per_round_bytes
.saturating_div(self.epoch_state.verifier.len() as u64)
.max(1024);

let (max_txns, max_size_bytes) = self.calculate_payload_limits(new_round);

let payload = match self
.payload_client
.pull_payload(
Duration::from_millis(self.payload_config.payload_pull_max_poll_time_ms),
max_txns,
max_txn_size_bytes,
max_size_bytes,
payload_filter,
Box::pin(async {}),
false,
Expand Down Expand Up @@ -290,6 +287,41 @@ impl DagDriver {
prev_handle.abort();
}
}

fn calculate_payload_limits(&self, round: Round) -> (u64, u64) {
let (voting_power_ratio, maybe_backoff_limits) =
self.chain_backoff.get_round_payload_limits(round);
let (max_txns_per_round, max_size_per_round_bytes) = {
if let Some((backoff_max_txns, backoff_max_size_bytes)) = maybe_backoff_limits {
(
self.payload_config
.max_sending_txns_per_round
.min(backoff_max_txns),
self.payload_config
.max_sending_size_per_round_bytes
.min(backoff_max_size_bytes),
)
} else {
(
self.payload_config.max_sending_txns_per_round,
self.payload_config.max_sending_size_per_round_bytes,
)
}
};
// TODO: warn/panic if division yields 0 txns
let max_txns = max_txns_per_round
.saturating_div(
(self.epoch_state.verifier.len() as f64 * voting_power_ratio).ceil() as u64,
)
.max(1);
let max_txn_size_bytes = max_size_per_round_bytes
.saturating_div(
(self.epoch_state.verifier.len() as f64 * voting_power_ratio).ceil() as u64,
)
.max(1024);

(max_txns, max_txn_size_bytes)
}
}

#[async_trait]
Expand Down
Loading

0 comments on commit 2fc3353

Please sign in to comment.