Skip to content

Commit

Permalink
feat: route commands to correct persistence service (paradigmxyz#9435)
Browse files Browse the repository at this point in the history
Co-authored-by: Federico Gimenez <[email protected]>
Co-authored-by: Matthias Seitz <[email protected]>
  • Loading branch information
3 people authored Jul 12, 2024
1 parent a49e993 commit 39c5382
Show file tree
Hide file tree
Showing 6 changed files with 395 additions and 259 deletions.
255 changes: 255 additions & 0 deletions crates/engine/tree/src/database.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
#![allow(dead_code)]

use crate::{static_files::StaticFileServiceHandle, tree::ExecutedBlock};
use reth_db::database::Database;
use reth_errors::ProviderResult;
use reth_primitives::B256;
use reth_provider::{
bundle_state::HashedStateChanges, BlockWriter, HistoryWriter, OriginalValuesKnown,
ProviderFactory, StageCheckpointWriter, StateWriter,
};
use reth_prune::{PruneProgress, Pruner};
use reth_stages_types::{StageCheckpoint, StageId};
use std::sync::mpsc::{Receiver, SendError, Sender};
use tokio::sync::oneshot;
use tracing::debug;

/// Writes parts of reth's in memory tree state to the database.
///
/// This is meant to be a spawned service that listens for various incoming database operations,
/// performing those actions on disk, and returning the result in a channel.
///
/// There are two types of operations this service can perform:
/// - Writing executed blocks to disk, returning the hash of the latest block that was inserted.
/// - Removing blocks from disk, returning the hash of the lowest block removed.
///
/// This should be spawned in its own thread with [`std::thread::spawn`], since this performs
/// blocking database operations in an endless loop.
#[derive(Debug)]
pub struct DatabaseService<DB> {
/// The db / static file provider to use
provider: ProviderFactory<DB>,
/// Incoming requests to persist stuff
incoming: Receiver<DatabaseAction>,
/// Handle for the static file service.
static_file_handle: StaticFileServiceHandle,
/// The pruner
pruner: Pruner<DB, ProviderFactory<DB>>,
}

impl<DB: Database> DatabaseService<DB> {
/// Create a new database service
pub const fn new(
provider: ProviderFactory<DB>,
incoming: Receiver<DatabaseAction>,
static_file_handle: StaticFileServiceHandle,
pruner: Pruner<DB, ProviderFactory<DB>>,
) -> Self {
Self { provider, incoming, static_file_handle, pruner }
}

/// Writes the cloned tree state to the database
fn write(&self, blocks: Vec<ExecutedBlock>) -> ProviderResult<()> {
let provider_rw = self.provider.provider_rw()?;

if blocks.is_empty() {
debug!(target: "tree::persistence::db", "Attempted to write empty block range");
return Ok(())
}

let first_number = blocks.first().unwrap().block().number;

let last = blocks.last().unwrap().block();
let last_block_number = last.number;

// TODO: remove all the clones and do performant / batched writes for each type of object
// instead of a loop over all blocks,
// meaning:
// * blocks
// * state
// * hashed state
// * trie updates (cannot naively extend, need helper)
// * indices (already done basically)
// Insert the blocks
for block in blocks {
let sealed_block =
block.block().clone().try_with_senders_unchecked(block.senders().clone()).unwrap();
provider_rw.insert_block(sealed_block)?;

// Write state and changesets to the database.
// Must be written after blocks because of the receipt lookup.
let execution_outcome = block.execution_outcome().clone();
execution_outcome.write_to_storage(&provider_rw, None, OriginalValuesKnown::No)?;

// insert hashes and intermediate merkle nodes
{
let trie_updates = block.trie_updates().clone();
let hashed_state = block.hashed_state();
HashedStateChanges(hashed_state.clone()).write_to_db(&provider_rw)?;
trie_updates.write_to_database(provider_rw.tx_ref())?;
}

// update history indices
provider_rw.update_history_indices(first_number..=last_block_number)?;

// Update pipeline progress
provider_rw.update_pipeline_stages(last_block_number, false)?;
}

debug!(target: "tree::persistence::db", range = ?first_number..=last_block_number, "Appended blocks");

Ok(())
}

/// Removes block data above the given block number from the database.
/// This is exclusive, i.e., it only removes blocks above `block_number`, and does not remove
/// `block_number`.
///
/// Returns the block hash for the lowest block removed from the database, which should be
/// the hash for `block_number + 1`.
///
/// This will then send a command to the static file service, to remove the actual block data.
fn remove_blocks_above(&self, block_number: u64) -> ProviderResult<B256> {
todo!("depends on PR")
// let mut provider_rw = self.provider.provider_rw()?;
// provider_rw.get_or_take_block_and_execution_range(range);
}

/// Prunes block data before the given block hash according to the configured prune
/// configuration.
fn prune_before(&mut self, block_num: u64) -> PruneProgress {
// TODO: doing this properly depends on pruner segment changes
self.pruner.run(block_num).expect("todo: handle errors")
}

/// Updates checkpoints related to block headers and bodies. This should be called by the static
/// file service, after new transactions have been successfully written to disk.
fn update_transaction_meta(&self, block_num: u64) -> ProviderResult<()> {
let provider_rw = self.provider.provider_rw()?;
provider_rw.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block_num))?;
provider_rw.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block_num))?;
provider_rw.commit()?;
Ok(())
}
}

impl<DB> DatabaseService<DB>
where
DB: Database,
{
/// This is the main loop, that will listen to database events and perform the requested
/// database actions
pub fn run(mut self) {
// If the receiver errors then senders have disconnected, so the loop should then end.
while let Ok(action) = self.incoming.recv() {
match action {
DatabaseAction::RemoveBlocksAbove((new_tip_num, sender)) => {
let output =
self.remove_blocks_above(new_tip_num).expect("todo: handle errors");

// we ignore the error because the caller may or may not care about the result
let _ = sender.send(output);
}
DatabaseAction::SaveBlocks((blocks, sender)) => {
if blocks.is_empty() {
todo!("return error or something");
}
let last_block_hash = blocks.last().unwrap().block().hash();
self.write(blocks).unwrap();

// we ignore the error because the caller may or may not care about the result
let _ = sender.send(last_block_hash);
}
DatabaseAction::PruneBefore((block_num, sender)) => {
let res = self.prune_before(block_num);

// we ignore the error because the caller may or may not care about the result
let _ = sender.send(res);
}
DatabaseAction::UpdateTransactionMeta((block_num, sender)) => {
self.update_transaction_meta(block_num).expect("todo: handle errors");

// we ignore the error because the caller may or may not care about the result
let _ = sender.send(());
}
}
}
}
}

/// A signal to the database service that part of the tree state can be persisted.
#[derive(Debug)]
pub enum DatabaseAction {
/// The section of tree state that should be persisted. These blocks are expected in order of
/// increasing block number.
///
/// This should just store the execution history-related data. Header, transaction, and
/// receipt-related data should already be written to static files.
SaveBlocks((Vec<ExecutedBlock>, oneshot::Sender<B256>)),

/// Updates checkpoints related to block headers and bodies. This should be called by the
/// static file service, after new transactions have been successfully written to disk.
UpdateTransactionMeta((u64, oneshot::Sender<()>)),

/// Removes block data above the given block number from the database.
///
/// This will then send a command to the static file service, to remove the actual block data.
///
/// Returns the block hash for the lowest block removed from the database.
RemoveBlocksAbove((u64, oneshot::Sender<B256>)),

/// Prune associated block data before the given block number, according to already-configured
/// prune modes.
PruneBefore((u64, oneshot::Sender<PruneProgress>)),
}

/// A handle to the database service
#[derive(Debug, Clone)]
pub struct DatabaseServiceHandle {
/// The channel used to communicate with the database service
sender: Sender<DatabaseAction>,
}

impl DatabaseServiceHandle {
/// Create a new [`DatabaseServiceHandle`] from a [`Sender<DatabaseAction>`].
pub const fn new(sender: Sender<DatabaseAction>) -> Self {
Self { sender }
}

/// Sends a specific [`DatabaseAction`] in the contained channel. The caller is responsible
/// for creating any channels for the given action.
pub fn send_action(&self, action: DatabaseAction) -> Result<(), SendError<DatabaseAction>> {
self.sender.send(action)
}

/// Tells the database service to save a certain list of finalized blocks. The blocks are
/// assumed to be ordered by block number.
///
/// This returns the latest hash that has been saved, allowing removal of that block and any
/// previous blocks from in-memory data structures.
pub async fn save_blocks(&self, blocks: Vec<ExecutedBlock>) -> B256 {
let (tx, rx) = oneshot::channel();
self.sender.send(DatabaseAction::SaveBlocks((blocks, tx))).expect("should be able to send");
rx.await.expect("todo: err handling")
}

/// Tells the database service to remove blocks above a certain block number. The removed
/// blocks are returned by the service.
pub async fn remove_blocks_above(&self, block_num: u64) -> B256 {
let (tx, rx) = oneshot::channel();
self.sender
.send(DatabaseAction::RemoveBlocksAbove((block_num, tx)))
.expect("should be able to send");
rx.await.expect("todo: err handling")
}

/// Tells the database service to remove block data before the given hash, according to the
/// configured prune config.
pub async fn prune_before(&self, block_num: u64) -> PruneProgress {
let (tx, rx) = oneshot::channel();
self.sender
.send(DatabaseAction::PruneBefore((block_num, tx)))
.expect("should be able to send");
rx.await.expect("todo: err handling")
}
}
6 changes: 4 additions & 2 deletions crates/engine/tree/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ pub use reth_blockchain_tree_api::*;
pub mod backfill;
/// The type that drives the chain forward.
pub mod chain;
/// The background writer service for batch db writes.
pub mod database;
/// Support for downloading blocks on demand for live sync.
pub mod download;
/// Engine Api chain handler support.
pub mod engine;
/// Metrics support.
pub mod metrics;
/// The background writer task for batch db writes.
/// The background writer service, coordinating the static file and database services.
pub mod persistence;
/// The background writer task for static file writes.
/// The background writer service for static file writes.
pub mod static_files;
/// Support for interacting with the blockchain tree.
pub mod tree;
Expand Down
Loading

0 comments on commit 39c5382

Please sign in to comment.