Skip to content

Commit

Permalink
Wen_restart: check block full using blockstore (solana-labs#250)
Browse files Browse the repository at this point in the history
* Switch to blockstore.is_full() check because replay thread isn't active.

* Use make_chaining_slot_entries and add first_parent to the method.
Small style fixes.

* Switch to blockstore.is_full() check because replay thread isn't active.
  • Loading branch information
wen-coding authored and willhickey committed Mar 16, 2024
1 parent 5d72196 commit 9e394bd
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 71 deletions.
4 changes: 2 additions & 2 deletions core/src/repair/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,7 @@ mod test {
let slots: Vec<u64> = vec![1, 3, 5, 7, 8];
let num_entries_per_slot = max_ticks_per_n_shreds(1, None) + 1;

let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot);
let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot, 0);
for (mut slot_shreds, _) in shreds.into_iter() {
slot_shreds.remove(0);
blockstore.insert_shreds(slot_shreds, None, false).unwrap();
Expand Down Expand Up @@ -1621,7 +1621,7 @@ mod test {
let slots: Vec<u64> = vec![2, 3, 5, 7];
let num_entries_per_slot = max_ticks_per_n_shreds(3, None) + 1;

let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot);
let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot, 0);
for (i, (mut slot_shreds, _)) in shreds.into_iter().enumerate() {
slot_shreds.remove(i);
blockstore.insert_shreds(slot_shreds, None, false).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion core/src/repair/repair_weight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2377,7 +2377,7 @@ mod test {
assert_eq!(repairs[2].slot(), 5);

// Simulate repair on 6 and 5
for (shreds, _) in make_chaining_slot_entries(&[5, 6], 100) {
for (shreds, _) in make_chaining_slot_entries(&[5, 6], 100, 0) {
blockstore.insert_shreds(shreds, None, true).unwrap();
}

Expand Down
13 changes: 7 additions & 6 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4658,12 +4658,13 @@ pub fn make_many_slot_shreds(
pub fn make_chaining_slot_entries(
chain: &[u64],
entries_per_slot: u64,
first_parent: u64,
) -> Vec<(Vec<Shred>, Vec<Entry>)> {
let mut slots_shreds_and_entries = vec![];
for (i, slot) in chain.iter().enumerate() {
let parent_slot = {
if *slot == 0 || i == 0 {
0
first_parent
} else {
chain[i - 1]
}
Expand Down Expand Up @@ -5609,7 +5610,7 @@ pub mod tests {

let entries_per_slot = 10;
let slots = [2, 5, 10];
let mut all_shreds = make_chaining_slot_entries(&slots[..], entries_per_slot);
let mut all_shreds = make_chaining_slot_entries(&slots[..], entries_per_slot, 0);

// Get the shreds for slot 10, chaining to slot 5
let (mut orphan_child, _) = all_shreds.remove(2);
Expand Down Expand Up @@ -5654,7 +5655,7 @@ pub mod tests {

let entries_per_slot = 10;
let mut slots = vec![2, 5, 10];
let mut all_shreds = make_chaining_slot_entries(&slots[..], entries_per_slot);
let mut all_shreds = make_chaining_slot_entries(&slots[..], entries_per_slot, 0);
let disconnected_slot = 4;

let (shreds0, _) = all_shreds.remove(0);
Expand Down Expand Up @@ -7428,7 +7429,7 @@ pub mod tests {
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let shreds_per_slot = 10;
let slots = vec![2, 4, 8, 12];
let all_shreds = make_chaining_slot_entries(&slots, shreds_per_slot);
let all_shreds = make_chaining_slot_entries(&slots, shreds_per_slot, 0);
let slot_8_shreds = all_shreds[2].0.clone();
for (slot_shreds, _) in all_shreds {
blockstore.insert_shreds(slot_shreds, None, false).unwrap();
Expand Down Expand Up @@ -9963,7 +9964,7 @@ pub mod tests {
let slots = vec![2, unconfirmed_slot, unconfirmed_child_slot];

// Insert into slot 9, mark it as dead
let shreds: Vec<_> = make_chaining_slot_entries(&slots, 1)
let shreds: Vec<_> = make_chaining_slot_entries(&slots, 1, 0)
.into_iter()
.flat_map(|x| x.0)
.collect();
Expand Down Expand Up @@ -10005,7 +10006,7 @@ pub mod tests {
let unconfirmed_slot = 8;
let slots = vec![confirmed_slot, unconfirmed_slot];

let shreds: Vec<_> = make_chaining_slot_entries(&slots, 1)
let shreds: Vec<_> = make_chaining_slot_entries(&slots, 1, 0)
.into_iter()
.flat_map(|x| x.0)
.collect();
Expand Down
102 changes: 40 additions & 62 deletions wen-restart/src/wen_restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots(
cluster_info: Arc<ClusterInfo>,
last_voted_fork_slots: &Vec<Slot>,
bank_forks: Arc<RwLock<BankForks>>,
blockstore: Arc<Blockstore>,
wen_restart_repair_slots: Arc<RwLock<Vec<Slot>>>,
exit: Arc<AtomicBool>,
progress: &mut WenRestartProgress,
Expand Down Expand Up @@ -160,20 +161,18 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots(
let active_percent = last_voted_fork_slots_aggregate.active_percent();
let mut filtered_slots: Vec<Slot>;
{
let my_bank_forks = bank_forks.read().unwrap();
filtered_slots = last_voted_fork_slots_aggregate
.slots_to_repair_iter()
.filter(|slot| {
if *slot <= &root_slot || is_full_slots.contains(*slot) {
return false;
}
let is_full = my_bank_forks
.get(**slot)
.map_or(false, |bank| bank.is_frozen());
if is_full {
if blockstore.is_full(**slot) {
is_full_slots.insert(**slot);
false
} else {
true
}
!is_full
})
.cloned()
.collect();
Expand Down Expand Up @@ -234,6 +233,7 @@ pub fn wait_for_wen_restart(
cluster_info.clone(),
last_voted_fork_slots,
bank_forks.clone(),
blockstore.clone(),
wen_restart_repair_slots.clone().unwrap(),
exit.clone(),
&mut progress,
Expand Down Expand Up @@ -382,7 +382,6 @@ mod tests {
use {
crate::wen_restart::*,
assert_matches::assert_matches,
solana_entry::entry,
solana_gossip::{
cluster_info::ClusterInfo,
contact_info::ContactInfo,
Expand All @@ -391,7 +390,10 @@ mod tests {
legacy_contact_info::LegacyContactInfo,
restart_crds_values::RestartLastVotedForkSlots,
},
solana_ledger::{blockstore, get_tmp_ledger_path_auto_delete},
solana_ledger::{
blockstore::{make_chaining_slot_entries, Blockstore},
get_tmp_ledger_path_auto_delete,
},
solana_program::{
hash::Hash,
vote::state::{Vote, VoteStateUpdate},
Expand All @@ -403,7 +405,6 @@ mod tests {
},
},
solana_sdk::{
pubkey::Pubkey,
signature::{Keypair, Signer},
timing::timestamp,
},
Expand Down Expand Up @@ -454,6 +455,16 @@ mod tests {
pub wen_restart_proto_path: PathBuf,
}

fn insert_slots_into_blockstore(
blockstore: Arc<Blockstore>,
first_parent: Slot,
slots_to_insert: &[Slot],
) {
for (shreds, _) in make_chaining_slot_entries(slots_to_insert, 2, first_parent) {
blockstore.insert_shreds(shreds, None, false).unwrap();
}
}

fn wen_restart_test_init(ledger_path: &TempDir) -> WenRestartTestInitResult {
let validator_voting_keypairs: Vec<_> =
(0..10).map(|_| ValidatorVoteKeypairs::new_rand()).collect();
Expand All @@ -468,7 +479,7 @@ mod tests {
node_keypair.clone(),
SocketAddrSpace::Unspecified,
));
let blockstore = Arc::new(blockstore::Blockstore::open(ledger_path.path()).unwrap());
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts(
10_000,
&validator_voting_keypairs,
Expand All @@ -479,42 +490,16 @@ mod tests {
.try_into()
.unwrap();
let mut last_voted_fork_slots = Vec::new();
last_voted_fork_slots.extend([1, last_parent]);
for i in 0..EXPECTED_SLOTS {
let entries = entry::create_ticks(1, 0, Hash::default());
let parent_slot = if i > 0 {
(RestartLastVotedForkSlots::MAX_SLOTS.saturating_add(i))
.try_into()
.unwrap()
} else {
last_parent
};
let slot = (RestartLastVotedForkSlots::MAX_SLOTS
.saturating_add(i)
.saturating_add(1)) as Slot;
let shreds = blockstore::entries_to_test_shreds(
&entries,
slot,
parent_slot,
false,
0,
true, // merkle_variant
last_voted_fork_slots.push(
(RestartLastVotedForkSlots::MAX_SLOTS
.saturating_add(i)
.saturating_add(1)) as Slot,
);
blockstore.insert_shreds(shreds, None, false).unwrap();
last_voted_fork_slots.push(slot);
}
// link directly to slot 1 whose distance to last_vote > RestartLastVotedForkSlots::MAX_SLOTS so it will not be included.
let entries = entry::create_ticks(1, 0, Hash::default());
let shreds = blockstore::entries_to_test_shreds(
&entries,
last_parent,
1,
false,
0,
true, // merkle_variant
);
last_voted_fork_slots.extend([last_parent, 1]);
blockstore.insert_shreds(shreds, None, false).unwrap();
last_voted_fork_slots.sort();
insert_slots_into_blockstore(blockstore.clone(), 0, &last_voted_fork_slots);
last_voted_fork_slots.insert(0, 0);
last_voted_fork_slots.reverse();
let mut wen_restart_proto_path = ledger_path.path().to_path_buf();
wen_restart_proto_path.push("wen_restart_status.proto");
Expand Down Expand Up @@ -599,23 +584,6 @@ mod tests {
let _ = remove_file(&test_state.wen_restart_proto_path);
}

fn insert_and_freeze_slots(
bank_forks: Arc<RwLock<BankForks>>,
expected_slots_to_repair: Vec<Slot>,
) {
let mut parent_bank = bank_forks.read().unwrap().root_bank();
for slot in expected_slots_to_repair {
let mut bank_forks_rw = bank_forks.write().unwrap();
bank_forks_rw.insert(Bank::new_from_parent(
parent_bank.clone(),
&Pubkey::default(),
slot,
));
parent_bank = bank_forks_rw.get(slot).unwrap();
parent_bank.freeze();
}
}

#[test]
fn test_wen_restart_normal_flow() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
Expand Down Expand Up @@ -673,7 +641,11 @@ mod tests {
}

// Simulating successful repair of missing blocks.
insert_and_freeze_slots(test_state.bank_forks.clone(), expected_slots_to_repair);
insert_slots_into_blockstore(
test_state.blockstore.clone(),
last_vote_slot,
&expected_slots_to_repair,
);

let _ = wen_restart_thread_handle.join();
let progress = read_wen_restart_records(&test_state.wen_restart_proto_path).unwrap();
Expand Down Expand Up @@ -934,6 +906,7 @@ mod tests {
let wen_restart_proto_path_clone = test_state.wen_restart_proto_path.clone();
let cluster_info_clone = test_state.cluster_info.clone();
let bank_forks_clone = test_state.bank_forks.clone();
let blockstore_clone = test_state.blockstore.clone();
let exit = Arc::new(AtomicBool::new(false));
let exit_clone = exit.clone();
let mut progress_clone = progress.clone();
Expand All @@ -947,6 +920,7 @@ mod tests {
cluster_info_clone,
&last_voted_fork_slots,
bank_forks_clone,
blockstore_clone,
Arc::new(RwLock::new(Vec::new())),
exit_clone,
&mut progress_clone,
Expand Down Expand Up @@ -995,7 +969,11 @@ mod tests {
}

// Simulating successful repair of missing blocks.
insert_and_freeze_slots(test_state.bank_forks.clone(), expected_slots_to_repair);
insert_slots_into_blockstore(
test_state.blockstore.clone(),
last_vote_slot,
&expected_slots_to_repair,
);

let last_voted_fork_slots = test_state.last_voted_fork_slots.clone();
wen_restart_test_succeed_after_failure(
Expand Down

0 comments on commit 9e394bd

Please sign in to comment.