Skip to content

Commit

Permalink
Clean obsolete BABE's weight data (paritytech#10748)
Browse files Browse the repository at this point in the history
* Clean obsolete BABE weight data
* Take out test assertion from check closure
* Optimize metadata access using `HeaderMetadata` trait
* Apply suggestions from code review
* Introduce finalize and import pre-commit synchronous actions
* Do not hold locks between internal methods calls
* Remove unused generic bound
* Apply suggestions from code review
* Register BABE's pre-commit actions on `block_import` instead of `start_babe`
* PreCommit actions should be `Fn` instead of `FnMut`
* More robust safenet in case of malformed finality notifications

Co-authored-by: Bastian Köcher <[email protected]>
Co-authored-by: André Silva <[email protected]>
  • Loading branch information
3 people authored Feb 23, 2022
1 parent f54e1a2 commit b4ba44d
Show file tree
Hide file tree
Showing 6 changed files with 330 additions and 95 deletions.
2 changes: 2 additions & 0 deletions client/api/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ pub struct ImportSummary<Block: BlockT> {
/// Contains information about the block that just got finalized,
/// including tree heads that became stale at the moment of finalization.
pub struct FinalizeSummary<Block: BlockT> {
/// Last finalized block header.
pub header: Block::Header,
/// Blocks that were finalized.
/// The last entry is the one that has been explicitly finalized.
pub finalized: Vec<Block::Hash>,
Expand Down
55 changes: 54 additions & 1 deletion client/api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use sp_runtime::{
};
use std::{collections::HashSet, convert::TryFrom, fmt, sync::Arc};

use crate::{blockchain::Info, notifications::StorageEventStream};
use crate::{blockchain::Info, notifications::StorageEventStream, FinalizeSummary, ImportSummary};

use sc_transaction_pool_api::ChainEvent;
use sc_utils::mpsc::TracingUnboundedReceiver;
use sp_blockchain;
Expand Down Expand Up @@ -76,6 +77,34 @@ pub trait BlockchainEvents<Block: BlockT> {
) -> sp_blockchain::Result<StorageEventStream<Block::Hash>>;
}

/// List of operations to be performed on storage aux data.
/// First tuple element is the encoded data key.
/// Second tuple element is the encoded optional data to write.
/// If `None`, the key and the associated data are deleted from storage.
pub type AuxDataOperations = Vec<(Vec<u8>, Option<Vec<u8>>)>;

/// Callback invoked before committing the operations created during block import.
/// This gives the opportunity to perform auxiliary pre-commit actions and optionally
/// enqueue further storage write operations to be atomically performed on commit.
pub type OnImportAction<Block> =
Box<dyn (Fn(&BlockImportNotification<Block>) -> AuxDataOperations) + Send>;

/// Callback invoked before committing the operations created during block finalization.
/// This gives the opportunity to perform auxiliary pre-commit actions and optionally
/// enqueue further storage write operations to be atomically performed on commit.
pub type OnFinalityAction<Block> =
Box<dyn (Fn(&FinalityNotification<Block>) -> AuxDataOperations) + Send>;

/// Interface to perform auxiliary actions before committing a block import or
/// finality operation.
pub trait PreCommitActions<Block: BlockT> {
/// Actions to be performed on block import.
fn register_import_action(&self, op: OnImportAction<Block>);

/// Actions to be performed on block finalization.
fn register_finality_action(&self, op: OnFinalityAction<Block>);
}

/// Interface for fetching block data.
pub trait BlockBackend<Block: BlockT> {
/// Get block body by ID. Returns `None` if the body is not stored.
Expand Down Expand Up @@ -300,3 +329,27 @@ impl<B: BlockT> From<FinalityNotification<B>> for ChainEvent<B> {
Self::Finalized { hash: n.hash, tree_route: n.tree_route }
}
}

impl<B: BlockT> From<FinalizeSummary<B>> for FinalityNotification<B> {
fn from(mut summary: FinalizeSummary<B>) -> Self {
let hash = summary.finalized.pop().unwrap_or_default();
FinalityNotification {
hash,
header: summary.header,
tree_route: Arc::new(summary.finalized),
stale_heads: Arc::new(summary.stale_heads),
}
}
}

impl<B: BlockT> From<ImportSummary<B>> for BlockImportNotification<B> {
fn from(summary: ImportSummary<B>) -> Self {
BlockImportNotification {
hash: summary.hash,
origin: summary.origin,
header: summary.header,
is_new_best: summary.is_new_best,
tree_route: summary.tree_route.map(Arc::new),
}
}
}
105 changes: 91 additions & 14 deletions client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,16 @@
#![forbid(unsafe_code)]
#![warn(missing_docs)]

use std::{borrow::Cow, collections::HashMap, convert::TryInto, pin::Pin, sync::Arc, u64};
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
convert::TryInto,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};

use codec::{Decode, Encode};
use futures::{
Expand All @@ -82,7 +91,10 @@ use prometheus_endpoint::Registry;
use retain_mut::RetainMut;
use schnorrkel::SignatureError;

use sc_client_api::{backend::AuxStore, BlockchainEvents, ProvideUncles, UsageProvider};
use sc_client_api::{
backend::AuxStore, AuxDataOperations, BlockchainEvents, FinalityNotification, PreCommitActions,
ProvideUncles, UsageProvider,
};
use sc_consensus::{
block_import::{
BlockCheckParams, BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult,
Expand All @@ -98,7 +110,7 @@ use sc_consensus_slots::{
SlotInfo, StorageChanges,
};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
use sp_api::{ApiExt, NumberFor, ProvideRuntimeApi};
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_application_crypto::AppKey;
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata, Result as ClientResult};
Expand All @@ -113,7 +125,7 @@ use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvid
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use sp_runtime::{
generic::{BlockId, OpaqueDigestItemId},
traits::{Block as BlockT, Header, Zero},
traits::{Block as BlockT, Header, NumberFor, One, SaturatedConversion, Saturating, Zero},
DigestItem,
};

Expand Down Expand Up @@ -458,6 +470,7 @@ where
C: ProvideRuntimeApi<B>
+ ProvideUncles<B>
+ BlockchainEvents<B>
+ PreCommitActions<B>
+ HeaderBackend<B>
+ HeaderMetadata<B, Error = ClientError>
+ Send
Expand Down Expand Up @@ -501,7 +514,8 @@ where
};

info!(target: "babe", "👶 Starting BABE Authorship worker");
let inner = sc_consensus_slots::start_slot_worker(

let slot_worker = sc_consensus_slots::start_slot_worker(
babe_link.config.slot_duration(),
select_chain,
worker,
Expand All @@ -515,13 +529,69 @@ where
let answer_requests =
answer_requests(worker_rx, babe_link.config, client, babe_link.epoch_changes.clone());

let inner = future::select(Box::pin(slot_worker), Box::pin(answer_requests));
Ok(BabeWorker {
inner: Box::pin(future::join(inner, answer_requests).map(|_| ())),
inner: Box::pin(inner.map(|_| ())),
slot_notification_sinks,
handle: BabeWorkerHandle(worker_tx),
})
}

// Remove obsolete block's weight data by leveraging finality notifications.
// This includes data for all finalized blocks (excluding the most recent one)
// and all stale branches.
fn aux_storage_cleanup<C: HeaderMetadata<Block>, Block: BlockT>(
client: &C,
notification: &FinalityNotification<Block>,
) -> AuxDataOperations {
let mut aux_keys = HashSet::new();

// Cleans data for finalized block's ancestors down to, and including, the previously
// finalized one.

let first_new_finalized = notification.tree_route.get(0).unwrap_or(&notification.hash);
match client.header_metadata(*first_new_finalized) {
Ok(meta) => {
aux_keys.insert(aux_schema::block_weight_key(meta.parent));
},
Err(err) => {
warn!(target: "babe", "header lookup fail while cleaning data for block {}: {}", first_new_finalized.to_string(), err.to_string());
},
}

aux_keys.extend(notification.tree_route.iter().map(aux_schema::block_weight_key));

// Cleans data for stale branches.

// A safenet in case of malformed notification.
let height_limit = notification.header.number().saturating_sub(
notification.tree_route.len().saturated_into::<NumberFor<Block>>() + One::one(),
);
for head in notification.stale_heads.iter() {
let mut hash = *head;
// Insert stale blocks hashes until canonical chain is not reached.
// Soon or late we should hit an element already present within the `aux_keys` set.
while aux_keys.insert(aux_schema::block_weight_key(hash)) {
match client.header_metadata(hash) {
Ok(meta) => {
// This should never happen and must be considered a bug.
if meta.number <= height_limit {
warn!(target: "babe", "unexpected canonical chain state or malformed finality notification");
break
}
hash = meta.parent;
},
Err(err) => {
warn!(target: "babe", "header lookup fail while cleaning data for block {}: {}", head.to_string(), err.to_string());
break
},
}
}
}

aux_keys.into_iter().map(|val| (val, None)).collect()
}

async fn answer_requests<B: BlockT, C>(
mut request_rx: Receiver<BabeRequest<B>>,
config: Config,
Expand Down Expand Up @@ -604,7 +674,7 @@ impl<B: BlockT> BabeWorkerHandle<B> {
/// Worker for Babe which implements `Future<Output=()>`. This must be polled.
#[must_use]
pub struct BabeWorker<B: BlockT> {
inner: Pin<Box<dyn futures::Future<Output = ()> + Send + 'static>>,
inner: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
slot_notification_sinks: SlotNotificationSinks<B>,
handle: BabeWorkerHandle<B>,
}
Expand All @@ -628,13 +698,10 @@ impl<B: BlockT> BabeWorker<B> {
}
}

impl<B: BlockT> futures::Future for BabeWorker<B> {
impl<B: BlockT> Future for BabeWorker<B> {
type Output = ();

fn poll(
mut self: Pin<&mut Self>,
cx: &mut futures::task::Context,
) -> futures::task::Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.inner.as_mut().poll(cx)
}
}
Expand Down Expand Up @@ -857,7 +924,7 @@ where
self.telemetry.clone()
}

fn proposing_remaining_duration(&self, slot_info: &SlotInfo<B>) -> std::time::Duration {
fn proposing_remaining_duration(&self, slot_info: &SlotInfo<B>) -> Duration {
let parent_slot = find_pre_digest::<B>(&slot_info.chain_head).ok().map(|d| d.slot());

sc_consensus_slots::proposing_remaining_duration(
Expand Down Expand Up @@ -1683,7 +1750,11 @@ pub fn block_import<Client, Block: BlockT, I>(
client: Arc<Client>,
) -> ClientResult<(BabeBlockImport<Block, Client, I>, BabeLink<Block>)>
where
Client: AuxStore + HeaderBackend<Block> + HeaderMetadata<Block, Error = sp_blockchain::Error>,
Client: AuxStore
+ HeaderBackend<Block>
+ HeaderMetadata<Block, Error = sp_blockchain::Error>
+ PreCommitActions<Block>
+ 'static,
{
let epoch_changes =
aux_schema::load_epoch_changes::<Block, _>(&*client, &config.genesis_config)?;
Expand All @@ -1694,6 +1765,12 @@ where
// startup rather than waiting until importing the next epoch change block.
prune_finalized(client.clone(), &mut epoch_changes.shared_data())?;

let client_clone = client.clone();
let on_finality = move |summary: &FinalityNotification<Block>| {
aux_storage_cleanup(client_clone.as_ref(), summary)
};
client.register_finality_action(Box::new(on_finality));

let import = BabeBlockImport::new(client, epoch_changes, wrapped_block_import, config);

Ok((import, link))
Expand Down
Loading

0 comments on commit b4ba44d

Please sign in to comment.