Skip to content

Commit

Permalink
Remove sync from archive command (MystenLabs#16927)
Browse files Browse the repository at this point in the history
## Description 

Remove this as this is not used anymore and may have issues.
  • Loading branch information
sadhansood authored Mar 29, 2024
1 parent 404b5aa commit 4ecea11
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 149 deletions.
31 changes: 2 additions & 29 deletions crates/sui-tool/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use crate::{
db_tool::{execute_db_tool_command, print_db_all_tables, DbToolCommand},
download_db_snapshot, download_formal_snapshot, dump_checkpoints_from_archive,
get_latest_available_epoch, get_object, get_transaction_block, make_clients, pkg_dump,
restore_from_db_checkpoint, state_sync_from_archive, verify_archive,
verify_archive_by_checksum, ConciseObjectOutput, GroupedObjectOutput, VerboseObjectOutput,
restore_from_db_checkpoint, verify_archive, verify_archive_by_checksum, ConciseObjectOutput,
GroupedObjectOutput, VerboseObjectOutput,
};
use anyhow::Result;
use std::env;
Expand Down Expand Up @@ -140,19 +140,6 @@ pub enum ToolCommand {
cmd: Option<DbToolCommand>,
},

/// Tool to sync the node from archive store
#[command(name = "sync-from-archive")]
SyncFromArchive {
#[arg(long = "genesis")]
genesis: PathBuf,
#[arg(long = "db-path")]
db_path: PathBuf,
#[command(flatten)]
object_store_config: ObjectStoreConfig,
#[arg(default_value_t = 5)]
download_concurrency: usize,
},

/// Tool to verify the archive store
#[command(name = "verify-archive")]
VerifyArchive {
Expand Down Expand Up @@ -942,20 +929,6 @@ impl ToolCommand {
execute_replay_command(rpc_url, safety_checks, use_authority, cfg_path, cmd)
.await?;
}
ToolCommand::SyncFromArchive {
genesis,
db_path,
object_store_config,
download_concurrency,
} => {
state_sync_from_archive(
&db_path,
&genesis,
object_store_config,
download_concurrency,
)
.await?;
}
ToolCommand::VerifyArchive {
genesis,
object_store_config,
Expand Down
120 changes: 0 additions & 120 deletions crates/sui-tool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1184,123 +1184,3 @@ pub async fn verify_archive_by_checksum(
) -> Result<()> {
verify_archive_with_checksums(remote_store_config, concurrency).await
}

pub async fn state_sync_from_archive(
path: &Path,
genesis: &Path,
remote_store_config: ObjectStoreConfig,
concurrency: usize,
) -> Result<()> {
let genesis = Genesis::load(genesis).unwrap();
let genesis_committee = genesis.committee()?;

let checkpoint_store = Arc::new(CheckpointStore::open_tables_read_write(
path.join("checkpoints"),
MetricConf::default(),
None,
None,
));
// Only insert the genesis checkpoint if the DB is empty and doesn't have it already
if checkpoint_store
.get_checkpoint_by_digest(genesis.checkpoint().digest())
.unwrap()
.is_none()
{
checkpoint_store.insert_checkpoint_contents(genesis.checkpoint_contents().clone())?;
checkpoint_store.insert_verified_checkpoint(&genesis.checkpoint())?;
checkpoint_store.update_highest_synced_checkpoint(&genesis.checkpoint())?;
}

let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&path.join("store"), None));

let committee_store = Arc::new(CommitteeStore::new(
path.join("epochs"),
&genesis_committee,
None,
));

let store = AuthorityStore::open(
perpetual_db,
&genesis,
usize::MAX,
false,
&Registry::default(),
)
.await?;

let latest_checkpoint = checkpoint_store
.get_highest_synced_checkpoint()?
.map(|c| c.sequence_number)
.unwrap_or(0);
let state_sync_store = RocksDbStore::new(
Arc::new(ExecutionCache::new_for_tests(store, &Registry::default())),
committee_store,
checkpoint_store.clone(),
);
let archive_reader_config = ArchiveReaderConfig {
remote_store_config,
download_concurrency: NonZeroUsize::new(concurrency).unwrap(),
use_for_pruning_watermark: false,
};
let metrics = ArchiveReaderMetrics::new(&Registry::default());
let archive_reader = ArchiveReader::new(archive_reader_config, &metrics)?;
archive_reader.sync_manifest_once().await?;
let latest_checkpoint_in_archive = archive_reader.latest_available_checkpoint().await?;
info!(
"Latest available checkpoint in archive store: {}",
latest_checkpoint_in_archive
);
info!("Highest synced checkpoint in db: {latest_checkpoint}");
if latest_checkpoint_in_archive <= latest_checkpoint {
return Ok(());
}
let progress_bar = ProgressBar::new(latest_checkpoint_in_archive).with_style(
ProgressStyle::with_template("[{elapsed_precise}] {wide_bar} {pos}/{len}({msg})").unwrap(),
);
let txn_counter = Arc::new(AtomicU64::new(0));
let checkpoint_counter = Arc::new(AtomicU64::new(0));
let cloned_progress_bar = progress_bar.clone();
let cloned_checkpoint_store = checkpoint_store.clone();
let cloned_counter = txn_counter.clone();
let instant = Instant::now();
tokio::spawn(async move {
loop {
let curr_latest_checkpoint = cloned_checkpoint_store
.get_highest_synced_checkpoint()
.unwrap()
.map(|c| c.sequence_number)
.unwrap_or(0);
let total_checkpoints_per_sec = (curr_latest_checkpoint - latest_checkpoint) as f64
/ instant.elapsed().as_secs_f64();
let total_txns_per_sec =
cloned_counter.load(Ordering::Relaxed) as f64 / instant.elapsed().as_secs_f64();
cloned_progress_bar.set_position(curr_latest_checkpoint);
cloned_progress_bar.set_message(format!(
"checkpoints/s: {}, txns/s: {}",
total_checkpoints_per_sec, total_txns_per_sec
));
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
let start = latest_checkpoint
.checked_add(1)
.context("Checkpoint overflow")
.map_err(|_| anyhow!("Failed to increment checkpoint"))?;
info!("Starting syncing checkpoints from checkpoint seq num: {start}");
archive_reader
.read(
state_sync_store,
start..u64::MAX,
txn_counter,
checkpoint_counter,
true,
)
.await?;
let end = checkpoint_store
.get_highest_synced_checkpoint()?
.map(|c| c.sequence_number)
.unwrap_or(0);
progress_bar.finish_and_clear();
info!("Highest synced checkpoint after sync: {end}");
Ok(())
}

0 comments on commit 4ecea11

Please sign in to comment.