Skip to content

Commit

Permalink
[storage] handle genesis in fast sync mode
Browse files Browse the repository at this point in the history
  • Loading branch information
areshand authored and msmouse committed Sep 18, 2023
1 parent 878e0b9 commit a52efba
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 54 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aptos-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ aptos-types = { workspace = true }
aptos-vm = { workspace = true }
bcs = { workspace = true }
clap = { workspace = true }
either = { workspace = true }
fail = { workspace = true }
futures = { workspace = true }
hex = { workspace = true }
Expand Down
108 changes: 64 additions & 44 deletions aptos-node/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,80 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use anyhow::anyhow;
use anyhow::{anyhow, Result};
use aptos_backup_service::start_backup_service;
use aptos_config::{config::NodeConfig, utils::get_genesis_txn};
use aptos_db::AptosDB;
use aptos_db::{fast_sync_aptos_db::FastSyncStorageWrapper, AptosDB};
use aptos_executor::db_bootstrapper::maybe_bootstrap;
use aptos_logger::{debug, info};
use aptos_storage_interface::{DbReader, DbReaderWriter};
use aptos_types::waypoint::Waypoint;
use aptos_vm::AptosVM;
use std::{fs, net::SocketAddr, path::Path, sync::Arc, time::Instant};
use either::Either;
use std::{fs, path::Path, sync::Arc, time::Instant};
use tokio::runtime::Runtime;

pub(crate) fn maybe_apply_genesis(db_rw: &DbReaderWriter, node_config: &NodeConfig) -> Result<()> {
let genesis_waypoint = node_config.base.waypoint.genesis_waypoint();
if let Some(genesis) = get_genesis_txn(node_config) {
maybe_bootstrap::<AptosVM>(db_rw, genesis, genesis_waypoint)
.map_err(|err| anyhow!("DB failed to bootstrap {}", err))?;
} else {
info ! ("Genesis txn not provided! This is fine only if you don't expect to apply it. Otherwise, the config is incorrect!");
}
Ok(())
}

#[cfg(not(feature = "consensus-only-perf-test"))]
pub(crate) fn bootstrap_db(
aptos_db: AptosDB,
backup_service_address: SocketAddr,
) -> (Arc<AptosDB>, DbReaderWriter, Option<Runtime>) {
use aptos_backup_service::start_backup_service;

let (aptos_db, db_rw) = DbReaderWriter::wrap(aptos_db);
let db_backup_service = start_backup_service(backup_service_address, aptos_db.clone());
(aptos_db, db_rw, Some(db_backup_service))
node_config: &NodeConfig,
) -> Result<(Arc<dyn DbReader>, DbReaderWriter, Option<Runtime>)> {
let (aptos_db_reader, db_rw, backup_service) =
match FastSyncStorageWrapper::initialize_dbs(node_config)? {
Either::Left(db) => {
let (db_arc, db_rw) = DbReaderWriter::wrap(db);
let db_backup_service = start_backup_service(
node_config.storage.backup_service_address,
db_arc.clone(),
);
maybe_apply_genesis(&db_rw, node_config)?;
(db_arc as Arc<dyn DbReader>, db_rw, Some(db_backup_service))
},
Either::Right(fast_sync_db_wrapper) => {
let temp_db = fast_sync_db_wrapper.get_temporary_db_with_genesis();
maybe_apply_genesis(&DbReaderWriter::from_arc(temp_db), node_config)?;

let (db_arc, db_rw) = DbReaderWriter::wrap(fast_sync_db_wrapper);
let fast_sync_db = db_arc.get_fast_sync_db();
let db_backup_service =
start_backup_service(node_config.storage.backup_service_address, fast_sync_db);

(db_arc as Arc<dyn DbReader>, db_rw, Some(db_backup_service))
},
};

Ok((aptos_db_reader, db_rw, backup_service))
}

/// In consensus-only mode, return a in-memory based [FakeAptosDB] and
/// do not run the backup service.
#[cfg(feature = "consensus-only-perf-test")]
pub(crate) fn bootstrap_db(
aptos_db: AptosDB,
_backup_service_address: SocketAddr,
) -> (
Arc<aptos_db::fake_aptosdb::FakeAptosDB>,
DbReaderWriter,
Option<Runtime>,
) {
node_config: &NodeConfig,
) -> Result<(Arc<dyn DbReader>, DbReaderWriter, Option<Runtime>)> {
use aptos_db::fake_aptosdb::FakeAptosDB;

let (aptos_db, db_rw) = DbReaderWriter::wrap(FakeAptosDB::new(aptos_db));
(aptos_db, db_rw, None)
let (aptos_db, db_rw, _) = match FastSyncStorageWrapper::initialize_dbs(node_config)? {
Either::Left(db) => DbReaderWriter::wrap(FakeAptosDB::new(db)),
Either::Right(fast_sync_db_wrapper) => DbReaderWriter::wrap(FakeAptosDB::new(
fast_sync_db_wrapper.expect("Fast sync db wrapper should not be None"),
)),
_ => unreachable!(),
};

maybe_apply_genesis(&db_rw, node_config)?;

Ok((aptos_db, db_rw, None))
}

/// Creates a RocksDb checkpoint for the consensus_db, state_sync_db,
Expand Down Expand Up @@ -87,7 +123,7 @@ fn create_rocksdb_checkpoint_and_change_working_dir(
/// the various handles.
pub fn initialize_database_and_checkpoints(
node_config: &mut NodeConfig,
) -> anyhow::Result<(Arc<dyn DbReader>, DbReaderWriter, Option<Runtime>, Waypoint)> {
) -> Result<(Arc<dyn DbReader>, DbReaderWriter, Option<Runtime>, Waypoint)> {
// If required, create RocksDB checkpoints and change the working directory.
// This is test-only.
if let Some(working_dir) = node_config.base.working_dir.clone() {
Expand All @@ -96,34 +132,18 @@ pub fn initialize_database_and_checkpoints(

// Open the database
let instant = Instant::now();
let aptos_db = AptosDB::open(
&node_config.storage.dir(),
false, /* readonly */
node_config.storage.storage_pruner_config,
node_config.storage.rocksdb_configs,
node_config.storage.enable_indexer,
node_config.storage.buffered_state_target_items,
node_config.storage.max_num_nodes_per_lru_cache_shard,
)
.map_err(|err| anyhow!("DB failed to open {}", err))?;
let (aptos_db, db_rw, backup_service) =
bootstrap_db(aptos_db, node_config.storage.backup_service_address);

// TODO: handle non-genesis waypoints for state sync!
// If there's a genesis txn and waypoint, commit it if the result matches.
let genesis_waypoint = node_config.base.waypoint.genesis_waypoint();
if let Some(genesis) = get_genesis_txn(node_config) {
maybe_bootstrap::<AptosVM>(&db_rw, genesis, genesis_waypoint)
.map_err(|err| anyhow!("DB failed to bootstrap {}", err))?;
} else {
info!("Genesis txn not provided! This is fine only if you don't expect to apply it. Otherwise, the config is incorrect!");
}
let (aptos_db, db_rw, backup_service) = bootstrap_db(node_config)?;

// Log the duration to open storage
debug!(
"Storage service started in {} ms",
instant.elapsed().as_millis()
);

Ok((aptos_db, db_rw, backup_service, genesis_waypoint))
Ok((
aptos_db,
db_rw,
backup_service,
node_config.base.waypoint.genesis_waypoint(),
))
}
1 change: 1 addition & 0 deletions storage/aptosdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ byteorder = { workspace = true }
claims = { workspace = true }
clap = { workspace = true, optional = true }
dashmap = { workspace = true }
either = { workspace = true }
itertools = { workspace = true }
lru = { workspace = true }
move-core-types = { workspace = true }
Expand Down
10 changes: 6 additions & 4 deletions storage/aptosdb/src/fake_aptosdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

use crate::{
errors::AptosDbError,
fast_sync_aptos_db::FastSyncStorageWrapper,
gauged_api,
metrics::{LATEST_CHECKPOINT_VERSION, LEDGER_VERSION, NEXT_BLOCK_EPOCH},
AptosDB,
};
use anyhow::{ensure, format_err, Result};
use aptos_accumulator::{HashReader, MerkleAccumulator};
Expand Down Expand Up @@ -142,7 +142,7 @@ impl FakeBufferedState {
/// features of [AptosDB] while passing through remaining features to the wrapped inner
/// [AptosDB].
pub struct FakeAptosDB {
inner: AptosDB,
inner: FastSyncStorageWrapper,
// A map of transaction hash to transaction version
txn_version_by_hash: Arc<DashMap<HashValue, Version>>,
// A map of transaction version to Transaction
Expand All @@ -160,7 +160,7 @@ pub struct FakeAptosDB {
}

impl FakeAptosDB {
pub fn new(db: AptosDB) -> Self {
pub fn new<D: DbReader + DbWriter>(db: D) -> Self {
Self {
inner: db,
txn_by_version: Arc::new(DashMap::new()),
Expand Down Expand Up @@ -349,7 +349,9 @@ impl FakeAptosDB {

// Once everything is successfully stored, update the latest in-memory ledger info.
if let Some(x) = ledger_info_with_sigs {
self.inner.ledger_store.set_latest_ledger_info(x.clone());
self.inner
.get_ledger_store()?
.set_latest_ledger_info(x.clone());

LEDGER_VERSION.set(x.ledger_info().version() as i64);
NEXT_BLOCK_EPOCH.set(x.ledger_info().next_block_epoch() as i64);
Expand Down
Loading

0 comments on commit a52efba

Please sign in to comment.