Skip to content

Commit

Permalink
[State Sync] Refactor common backup functionality and expose via DbWr…
Browse files Browse the repository at this point in the history
…iter

Closes: aptos-labs#318
  • Loading branch information
JoshLind authored and aptos-bot committed Mar 29, 2022
1 parent db17f80 commit 57d8f54
Show file tree
Hide file tree
Showing 12 changed files with 384 additions and 166 deletions.
110 changes: 62 additions & 48 deletions state-sync/state-sync-v2/state-sync-driver/src/bootstrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ use aptos_types::{
epoch_state::EpochState,
ledger_info::LedgerInfoWithSignatures,
state_store::state_value::StateValueChunkWithProof,
transaction::{
TransactionInfo, TransactionListWithProof, TransactionOutputListWithProof, Version,
},
transaction::{TransactionListWithProof, TransactionOutputListWithProof, Version},
waypoint::Waypoint,
};
use data_streaming_service::{
Expand Down Expand Up @@ -175,6 +173,14 @@ impl VerifiedEpochStates {
}
}

/// Return all epoch ending ledger infos
pub fn all_epoch_ending_ledger_infos(&self) -> Vec<LedgerInfoWithSignatures> {
self.new_epoch_ending_ledger_infos
.values()
.cloned()
.collect()
}

/// Returns any epoch ending ledger info associated with the given version
pub fn get_epoch_ending_ledger_info(
&self,
Expand Down Expand Up @@ -222,7 +228,7 @@ struct AccountStateSyncer {
// Whether or not all states have been synced
is_sync_complete: bool,

// The ledger info we're currently syncing
// The epoch ending ledger info for the version we're syncing
ledger_info_to_sync: Option<LedgerInfoWithSignatures>,

// The next account index to commit (all accounts before this have been
Expand All @@ -233,8 +239,8 @@ struct AccountStateSyncer {
// processed -- i.e., sent to the storage synchronizer).
next_account_index_to_process: u64,

// The transaction info for the version we're trying to sync to
transaction_info_for_version: Option<TransactionInfo>,
// The transaction output (inc. info and proof) for the version we're syncing
transaction_output_to_sync: Option<TransactionOutputListWithProof>,
}

impl AccountStateSyncer {
Expand All @@ -245,7 +251,7 @@ impl AccountStateSyncer {
ledger_info_to_sync: None,
next_account_index_to_commit: 0,
next_account_index_to_process: 0,
transaction_info_for_version: None,
transaction_output_to_sync: None,
}
}

Expand Down Expand Up @@ -406,22 +412,29 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
let highest_known_ledger_info = self.get_highest_known_ledger_info()?;
let highest_known_ledger_version = highest_known_ledger_info.ledger_info().version();

// Check if we've already fetched the required data for bootstrapping
if highest_synced_version >= highest_known_ledger_version
|| self.account_state_syncer.is_sync_complete
{
return self.bootstrapping_complete();
}

// Bootstrap according to the mode
if self.driver_configuration.config.bootstrapping_mode
== BootstrappingMode::DownloadLatestAccountStates
{
self.fetch_all_account_states(highest_known_ledger_info)
.await
} else {
self.fetch_missing_transaction_data(highest_synced_version, highest_known_ledger_info)
// Check if we've already fetched the required data for bootstrapping.
// If not, bootstrap according to the mode.
match self.driver_configuration.config.bootstrapping_mode {
BootstrappingMode::DownloadLatestAccountStates => {
if (self.account_state_syncer.ledger_info_to_sync.is_none()
&& highest_synced_version >= highest_known_ledger_version)
|| self.account_state_syncer.is_sync_complete
{
return self.bootstrapping_complete();
}
self.fetch_all_account_states(highest_known_ledger_info)
.await
}
_ => {
if highest_synced_version >= highest_known_ledger_version {
return self.bootstrapping_complete();
}
self.fetch_missing_transaction_data(
highest_synced_version,
highest_known_ledger_info,
)
.await
}
}
}

Expand Down Expand Up @@ -485,8 +498,6 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
&mut self,
highest_known_ledger_info: LedgerInfoWithSignatures,
) -> Result<(), Error> {
let highest_known_ledger_version = highest_known_ledger_info.ledger_info().version();

// Verify we're trying to sync to an unchanging ledger info
if let Some(ledger_info_to_sync) = &self.account_state_syncer.ledger_info_to_sync {
if ledger_info_to_sync != &highest_known_ledger_info {
Expand All @@ -496,13 +507,14 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
);
}
} else {
self.account_state_syncer.ledger_info_to_sync = Some(highest_known_ledger_info);
self.account_state_syncer.ledger_info_to_sync = Some(highest_known_ledger_info.clone());
}

// Fetch the transaction info first, before the account states
let highest_known_ledger_version = highest_known_ledger_info.ledger_info().version();
let data_stream = if self
.account_state_syncer
.transaction_info_for_version
.transaction_output_to_sync
.is_none()
{
self.streaming_service_client
Expand Down Expand Up @@ -615,7 +627,7 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
);
return Err(Error::AdvertisedDataError(error_message));
} else if highest_local_epoch_end < highest_advertised_epoch_end {
debug!("Found higher epoch ending ledger infos in the network! Local: {:?}, advertised: {:?}",
info!("Found higher epoch ending ledger infos in the network! Local: {:?}, advertised: {:?}",
highest_local_epoch_end, highest_advertised_epoch_end);
let next_epoch_end = highest_local_epoch_end.checked_add(1).ok_or_else(|| {
Error::IntegerOverflow("The next epoch end has overflown!".into())
Expand All @@ -626,7 +638,7 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
.await?;
self.active_data_stream = Some(epoch_ending_stream);
} else if self.verified_epoch_states.verified_waypoint() {
debug!("No new epoch ending ledger infos to fetch! All peers are in the same epoch!");
info!("No new epoch ending ledger infos to fetch! All peers are in the same epoch!");
self.verified_epoch_states
.set_fetched_epoch_ending_ledger_infos();
} else {
Expand Down Expand Up @@ -756,18 +768,26 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
.account_state_syncer
.initialized_state_snapshot_receiver
{
let version = self
// Fetch all verified epoch change proofs
let epoch_change_proofs = self.verified_epoch_states.all_epoch_ending_ledger_infos();

// Fetch the target ledger info and transaction info for bootstrapping
let ledger_info_to_sync = self
.account_state_syncer
.ledger_info_to_sync
.as_ref()
.expect("Account state syncer version not initialized!")
.ledger_info()
.version();
let expected_root_hash = account_state_chunk_with_proof.root_hash;
.clone()
.expect("Ledger info to sync is missing!");
let transaction_output_to_sync = self
.account_state_syncer
.transaction_output_to_sync
.clone()
.expect("Transaction output to sync is missing!");

// Initialize the account state synchronizer
self.storage_synchronizer.initialize_account_synchronizer(
expected_root_hash,
version,
None, // TODO(joshlind): support spawning on a given runtime!
epoch_change_proofs,
ledger_info_to_sync,
transaction_output_to_sync,
)?;
self.account_state_syncer
.initialized_state_snapshot_receiver = true;
Expand Down Expand Up @@ -864,7 +884,7 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
BootstrappingMode::DownloadLatestAccountStates
) && self
.account_state_syncer
.transaction_info_for_version
.transaction_output_to_sync
.is_some())
{
self.terminate_active_stream(notification_id, NotificationFeedback::InvalidPayloadData)
Expand Down Expand Up @@ -997,19 +1017,13 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> Bootstrapper<StorageSy
.await?;

// Verify the payload proof (the ledger info has already been verified)
// and save the transaction info.
// and save the transaction output with proof.
if let Some(transaction_outputs_with_proof) = transaction_outputs_with_proof {
match &transaction_outputs_with_proof.proof.transaction_infos[..] {
[transaction_info] => {
[_transaction_info] => {
// TODO(joshlind): don't save the transaction info until after verification!
self.account_state_syncer.transaction_info_for_version =
Some(transaction_info.clone());
self.storage_synchronizer.apply_transaction_outputs(
notification_id,
transaction_outputs_with_proof,
ledger_info_to_sync,
None,
)?;
self.account_state_syncer.transaction_output_to_sync =
Some(transaction_outputs_with_proof);
}
_ => {
self.terminate_active_stream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,14 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> ContinuousSyncer<Stora
&mut self,
consensus_sync_request: Arc<Mutex<Option<ConsensusSyncRequest>>>,
) -> Result<(), Error> {
// Fetch transactions or outputs starting at highest_synced_version + 1
// Fetch the highest synced version and epoch (in storage)
let (highest_synced_version, highest_synced_epoch) =
self.get_highest_synced_version_and_epoch()?;

// Fetch the highest epoch state (in storage)
let highest_epoch_state = utils::fetch_latest_epoch_state(self.storage.clone())?;

// Start fetching data at highest_synced_version + 1
let next_version = highest_synced_version
.checked_add(1)
.ok_or_else(|| Error::IntegerOverflow("The next version has overflown!".into()))?;
Expand Down Expand Up @@ -115,7 +120,7 @@ impl<StorageSyncer: StorageSynchronizerInterface + Clone> ContinuousSyncer<Stora
}
};
self.speculative_stream_state = Some(SpeculativeStreamState::new(
utils::fetch_latest_epoch_state(self.storage.clone())?,
highest_epoch_state,
None,
highest_synced_version,
));
Expand Down
3 changes: 3 additions & 0 deletions state-sync/state-sync-v2/state-sync-driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,9 +518,12 @@ impl<

// Drive progress depending on if we're bootstrapping or continuously syncing
if self.bootstrapper.is_bootstrapped() {
// Fetch any consensus sync requests
let consensus_sync_request = self
.consensus_notification_handler
.get_consensus_sync_request();

// Attempt to continuously sync
if let Err(error) = self
.continuous_syncer
.drive_progress(consensus_sync_request)
Expand Down
Loading

0 comments on commit 57d8f54

Please sign in to comment.