Skip to content

Commit

Permalink
Set drop callback on first root bank (solana-labs#23999)
Browse files Browse the repository at this point in the history
  • Loading branch information
carllin authored Apr 5, 2022
1 parent 2282571 commit 4ea59d8
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 59 deletions.
24 changes: 5 additions & 19 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ use {
},
solana_runtime::{
accounts_background_service::{
AbsRequestHandler, AbsRequestSender, AccountsBackgroundService, SnapshotRequestHandler,
AbsRequestHandler, AbsRequestSender, AccountsBackgroundService, DroppedSlotsReceiver,
SnapshotRequestHandler,
},
accounts_db::AccountShrinkThreshold,
bank_forks::BankForks,
Expand All @@ -57,7 +58,6 @@ use {
},
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair},
std::{
boxed::Box,
collections::HashSet,
net::UdpSocket,
sync::{atomic::AtomicBool, Arc, Mutex, RwLock},
Expand Down Expand Up @@ -147,6 +147,7 @@ impl Tvu {
last_full_snapshot_slot: Option<Slot>,
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
wait_to_vote_slot: Option<Slot>,
pruned_banks_receiver: DroppedSlotsReceiver,
) -> Self {
let TvuSockets {
repair: repair_socket,
Expand Down Expand Up @@ -246,23 +247,6 @@ impl Tvu {
}
};

let (pruned_banks_sender, pruned_banks_receiver) = unbounded();

// Before replay starts, set the callbacks in each of the banks in BankForks
// Note after this callback is created, only the AccountsBackgroundService should be calling
// AccountsDb::purge_slot() to clean up dropped banks.
let callback = bank_forks
.read()
.unwrap()
.root_bank()
.rc
.accounts
.accounts_db
.create_drop_bank_callback(pruned_banks_sender);
for bank in bank_forks.read().unwrap().banks().values() {
bank.set_callback(Some(Box::new(callback.clone())));
}

let accounts_background_request_sender = AbsRequestSender::new(snapshot_request_sender);

let accounts_background_request_handler = AbsRequestHandler {
Expand Down Expand Up @@ -462,6 +446,7 @@ pub mod tests {
let tower = Tower::default();
let accounts_package_channel = unbounded();
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let (_pruned_banks_sender, pruned_banks_receiver) = unbounded();
let tvu = Tvu::new(
&vote_keypair.pubkey(),
Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
Expand Down Expand Up @@ -511,6 +496,7 @@ pub mod tests {
None,
None,
None,
pruned_banks_receiver,
);
exit.store(true, Ordering::Relaxed);
tvu.join().unwrap();
Expand Down
39 changes: 26 additions & 13 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use {
transaction_status_service::TransactionStatusService,
},
solana_runtime::{
accounts_background_service::DroppedSlotsReceiver,
accounts_db::{AccountShrinkThreshold, AccountsDbConfig},
accounts_index::AccountSecondaryIndexes,
accounts_update_notifier_interface::AccountsUpdateNotifier,
Expand Down Expand Up @@ -508,6 +509,7 @@ impl Validator {
},
blockstore_process_options,
blockstore_root_scan,
pruned_banks_receiver,
) = load_blockstore(
config,
ledger_path,
Expand All @@ -528,6 +530,7 @@ impl Validator {
config.snapshot_config.as_ref(),
accounts_package_channel.0.clone(),
blockstore_root_scan,
pruned_banks_receiver.clone(),
);
let last_full_snapshot_slot =
last_full_snapshot_slot.or_else(|| starting_snapshot_hashes.map(|x| x.full.hash.0));
Expand Down Expand Up @@ -935,6 +938,7 @@ impl Validator {
last_full_snapshot_slot,
block_metadata_notifier,
config.wait_to_vote_slot,
pruned_banks_receiver,
);

let tpu = Tpu::new(
Expand Down Expand Up @@ -1279,6 +1283,7 @@ fn load_blockstore(
TransactionHistoryServices,
blockstore_processor::ProcessOptions,
BlockstoreRootScan,
DroppedSlotsReceiver,
) {
info!("loading ledger from {:?}...", ledger_path);
*start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
Expand Down Expand Up @@ -1358,19 +1363,23 @@ fn load_blockstore(
TransactionHistoryServices::default()
};

let (mut bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) =
bank_forks_utils::load_bank_forks(
&genesis_config,
&blockstore,
config.account_paths.clone(),
config.account_shrink_paths.clone(),
config.snapshot_config.as_ref(),
&process_options,
transaction_history_services
.cache_block_meta_sender
.as_ref(),
accounts_update_notifier,
);
let (
mut bank_forks,
mut leader_schedule_cache,
starting_snapshot_hashes,
pruned_banks_receiver,
) = bank_forks_utils::load_bank_forks(
&genesis_config,
&blockstore,
config.account_paths.clone(),
config.account_shrink_paths.clone(),
config.snapshot_config.as_ref(),
&process_options,
transaction_history_services
.cache_block_meta_sender
.as_ref(),
accounts_update_notifier,
);

leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone());
bank_forks.set_snapshot_config(config.snapshot_config.clone());
Expand All @@ -1392,9 +1401,11 @@ fn load_blockstore(
transaction_history_services,
process_options,
blockstore_root_scan,
pruned_banks_receiver,
)
}

#[allow(clippy::too_many_arguments)]
fn process_blockstore(
blockstore: &Blockstore,
bank_forks: &mut BankForks,
Expand All @@ -1405,6 +1416,7 @@ fn process_blockstore(
snapshot_config: Option<&SnapshotConfig>,
accounts_package_sender: AccountsPackageSender,
blockstore_root_scan: BlockstoreRootScan,
pruned_banks_receiver: DroppedSlotsReceiver,
) -> Option<Slot> {
let last_full_snapshot_slot = blockstore_processor::process_blockstore_from_root(
blockstore,
Expand All @@ -1415,6 +1427,7 @@ fn process_blockstore(
cache_block_meta_sender,
snapshot_config,
accounts_package_sender,
pruned_banks_receiver,
)
.unwrap_or_else(|err| {
error!("Failed to load ledger: {:?}", err);
Expand Down
54 changes: 41 additions & 13 deletions ledger/src/bank_forks_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use {
},
leader_schedule_cache::LeaderScheduleCache,
},
crossbeam_channel::unbounded,
log::*,
solana_runtime::{
accounts_background_service::DroppedSlotsReceiver,
accounts_update_notifier_interface::AccountsUpdateNotifier,
bank_forks::BankForks,
snapshot_archive_info::SnapshotArchiveInfoGetter,
Expand Down Expand Up @@ -47,16 +49,17 @@ pub fn load(
accounts_package_sender: AccountsPackageSender,
accounts_update_notifier: Option<AccountsUpdateNotifier>,
) -> LoadResult {
let (mut bank_forks, leader_schedule_cache, starting_snapshot_hashes) = load_bank_forks(
genesis_config,
blockstore,
account_paths,
shrink_paths,
snapshot_config,
&process_options,
cache_block_meta_sender,
accounts_update_notifier,
);
let (mut bank_forks, leader_schedule_cache, starting_snapshot_hashes, pruned_banks_receiver) =
load_bank_forks(
genesis_config,
blockstore,
account_paths,
shrink_paths,
snapshot_config,
&process_options,
cache_block_meta_sender,
accounts_update_notifier,
);

blockstore_processor::process_blockstore_from_root(
blockstore,
Expand All @@ -67,6 +70,7 @@ pub fn load(
cache_block_meta_sender,
snapshot_config,
accounts_package_sender,
pruned_banks_receiver,
)
.map(|_| (bank_forks, leader_schedule_cache, starting_snapshot_hashes))
}
Expand All @@ -85,6 +89,7 @@ pub fn load_bank_forks(
BankForks,
LeaderScheduleCache,
Option<StartingSnapshotHashes>,
DroppedSlotsReceiver,
) {
let snapshot_present = if let Some(snapshot_config) = snapshot_config {
info!(
Expand Down Expand Up @@ -144,12 +149,30 @@ pub fn load_bank_forks(
)
};

let mut leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank_forks.root_bank());
// Before replay starts, set the callbacks in each of the banks in BankForks so that
// all dropped banks come through the `pruned_banks_receiver` channel. This way all bank
// drop behavior can be safely synchronized with any other ongoing accounts activity like
// cache flush, clean, shrink, as long as the same thread performing those activities also
// is processing the dropped banks from the `pruned_banks_receiver` channel.

// There should only be one bank, the root bank in BankForks. Thus all banks added to
// BankForks from now on will be descended from the root bank and thus will inherit
// the bank drop callback.
assert_eq!(bank_forks.banks().len(), 1);
let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
let root_bank = bank_forks.root_bank();
let callback = root_bank
.rc
.accounts
.accounts_db
.create_drop_bank_callback(pruned_banks_sender);
root_bank.set_callback(Some(Box::new(callback)));

let mut leader_schedule_cache = LeaderScheduleCache::new_from_bank(&root_bank);
if process_options.full_leader_cache {
leader_schedule_cache.set_max_schedules(std::usize::MAX);
}

assert_eq!(bank_forks.banks().len(), 1);
if let Some(ref new_hard_forks) = process_options.new_hard_forks {
let root_bank = bank_forks.root_bank();
let hard_forks = root_bank.hard_forks();
Expand All @@ -166,7 +189,12 @@ pub fn load_bank_forks(
}
}

(bank_forks, leader_schedule_cache, starting_snapshot_hashes)
(
bank_forks,
leader_schedule_cache,
starting_snapshot_hashes,
pruned_banks_receiver,
)
}

#[allow(clippy::too_many_arguments)]
Expand Down
41 changes: 31 additions & 10 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use {
solana_program_runtime::timings::{ExecuteTimingType, ExecuteTimings},
solana_rayon_threadlimit::get_thread_count,
solana_runtime::{
accounts_background_service::DroppedSlotsReceiver,
accounts_db::{AccountShrinkThreshold, AccountsDbConfig},
accounts_index::AccountSecondaryIndexes,
accounts_update_notifier_interface::AccountsUpdateNotifier,
Expand Down Expand Up @@ -567,16 +568,17 @@ pub fn test_process_blockstore(
blockstore: &Blockstore,
opts: ProcessOptions,
) -> (BankForks, LeaderScheduleCache) {
let (mut bank_forks, leader_schedule_cache, ..) = crate::bank_forks_utils::load_bank_forks(
genesis_config,
blockstore,
Vec::new(),
None,
None,
&opts,
None,
None,
);
let (mut bank_forks, leader_schedule_cache, .., pruned_banks_receiver) =
crate::bank_forks_utils::load_bank_forks(
genesis_config,
blockstore,
Vec::new(),
None,
None,
&opts,
None,
None,
);
let (accounts_package_sender, _) = unbounded();
process_blockstore_from_root(
blockstore,
Expand All @@ -587,6 +589,7 @@ pub fn test_process_blockstore(
None,
None,
accounts_package_sender,
pruned_banks_receiver,
)
.unwrap();
(bank_forks, leader_schedule_cache)
Expand Down Expand Up @@ -637,6 +640,7 @@ pub fn process_blockstore_from_root(
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
snapshot_config: Option<&SnapshotConfig>,
accounts_package_sender: AccountsPackageSender,
pruned_banks_receiver: DroppedSlotsReceiver,
) -> result::Result<Option<Slot>, BlockstoreProcessorError> {
if let Some(num_threads) = opts.override_num_threads {
PAR_THREAD_POOL.with(|pool| {
Expand Down Expand Up @@ -696,6 +700,7 @@ pub fn process_blockstore_from_root(
accounts_package_sender,
&mut timing,
&mut last_full_snapshot_slot,
pruned_banks_receiver,
)?;
} else {
// If there's no meta for the input `start_slot`, then we started from a snapshot
Expand Down Expand Up @@ -1117,6 +1122,7 @@ fn load_frozen_forks(
accounts_package_sender: AccountsPackageSender,
timing: &mut ExecuteTimings,
last_full_snapshot_slot: &mut Option<Slot>,
pruned_banks_receiver: DroppedSlotsReceiver,
) -> result::Result<(), BlockstoreProcessorError> {
let recyclers = VerifyRecyclers::default();
let mut all_banks = HashMap::new();
Expand Down Expand Up @@ -1285,6 +1291,17 @@ fn load_frozen_forks(
}

if last_free.elapsed() > Duration::from_secs(10) {
// Purge account state for all dropped banks
for (pruned_slot, pruned_bank_id) in pruned_banks_receiver.try_iter() {
// Simulate this purge being from the AccountsBackgroundService
let is_from_abs = true;
new_root_bank.rc.accounts.purge_slot(
pruned_slot,
pruned_bank_id,
is_from_abs,
);
}

// Must be called after `squash()`, so that AccountsDb knows what
// the roots are for the cache flushing in exhaustively_free_unused_resource().
// This could take few secs; so update last_free later
Expand Down Expand Up @@ -3147,6 +3164,7 @@ pub mod tests {

// Test process_blockstore_from_root() from slot 1 onwards
let (accounts_package_sender, _) = unbounded();
let (_pruned_banks_sender, pruned_banks_receiver) = unbounded();
process_blockstore_from_root(
&blockstore,
&mut bank_forks,
Expand All @@ -3156,6 +3174,7 @@ pub mod tests {
None,
None,
accounts_package_sender,
pruned_banks_receiver,
)
.unwrap();

Expand Down Expand Up @@ -3256,6 +3275,7 @@ pub mod tests {
let (accounts_package_sender, accounts_package_receiver) = unbounded();
let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank);

let (_pruned_banks_sender, pruned_banks_receiver) = unbounded();
process_blockstore_from_root(
&blockstore,
&mut bank_forks,
Expand All @@ -3265,6 +3285,7 @@ pub mod tests {
None,
Some(&snapshot_config),
accounts_package_sender.clone(),
pruned_banks_receiver,
)
.unwrap();

Expand Down
Loading

0 comments on commit 4ea59d8

Please sign in to comment.