Skip to content

Commit

Permalink
replay: send duplicate proofs from blockstore to state machine (solan…
Browse files Browse the repository at this point in the history
…a-labs#32962)

* replay: send duplicate proofs from blockstore to state machine

* pr feedback: bank.slot() -> slot

* pr feedback
  • Loading branch information
AshwinSekar authored Sep 6, 2023
1 parent d077b13 commit a8e83c8
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 23 deletions.
1 change: 1 addition & 0 deletions core/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ impl Tower {
bank_forks.frozen_banks().values().cloned().collect(),
node_pubkey,
vote_account,
vec![],
);
let root = root_bank.slot();

Expand Down
73 changes: 70 additions & 3 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ impl ReplayStage {
&bank_forks,
&my_pubkey,
&vote_account,
&blockstore,
);
let mut current_leader = None;
let mut last_reset = Hash::default();
Expand Down Expand Up @@ -1230,23 +1231,37 @@ impl ReplayStage {
bank_forks: &RwLock<BankForks>,
my_pubkey: &Pubkey,
vote_account: &Pubkey,
blockstore: &Blockstore,
) -> (ProgressMap, HeaviestSubtreeForkChoice) {
let (root_bank, frozen_banks) = {
let (root_bank, frozen_banks, duplicate_slot_hashes) = {
let bank_forks = bank_forks.read().unwrap();
let duplicate_slots = blockstore
.duplicate_slots_iterator(bank_forks.root_bank().slot())
.unwrap();
let duplicate_slot_hashes = duplicate_slots
.filter_map(|slot| bank_forks.bank_hash(slot).map(|hash| (slot, hash)));
(
bank_forks.root_bank(),
bank_forks.frozen_banks().values().cloned().collect(),
duplicate_slot_hashes.collect::<Vec<(Slot, Hash)>>(),
)
};

Self::initialize_progress_and_fork_choice(&root_bank, frozen_banks, my_pubkey, vote_account)
Self::initialize_progress_and_fork_choice(
&root_bank,
frozen_banks,
my_pubkey,
vote_account,
duplicate_slot_hashes,
)
}

pub fn initialize_progress_and_fork_choice(
root_bank: &Bank,
mut frozen_banks: Vec<Arc<Bank>>,
my_pubkey: &Pubkey,
vote_account: &Pubkey,
duplicate_slot_hashes: Vec<(Slot, Hash)>,
) -> (ProgressMap, HeaviestSubtreeForkChoice) {
let mut progress = ProgressMap::default();

Expand All @@ -1261,11 +1276,15 @@ impl ReplayStage {
);
}
let root = root_bank.slot();
let heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new_from_frozen_banks(
let mut heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new_from_frozen_banks(
(root, root_bank.hash()),
&frozen_banks,
);

for slot_hash in duplicate_slot_hashes {
heaviest_subtree_fork_choice.mark_fork_invalid_candidate(&slot_hash);
}

(progress, heaviest_subtree_fork_choice)
}

Expand Down Expand Up @@ -2086,6 +2105,30 @@ impl ReplayStage {
purge_repair_slot_counter,
SlotStateUpdate::Dead(dead_state),
);

// If we previously marked this slot as duplicate in blockstore, let the state machine know
if !duplicate_slots_tracker.contains(&slot) && blockstore.get_duplicate_slot(slot).is_some()
{
let duplicate_state = DuplicateState::new_from_state(
slot,
gossip_duplicate_confirmed_slots,
heaviest_subtree_fork_choice,
|| true,
|| None,
);
check_slot_agrees_with_cluster(
slot,
root,
blockstore,
duplicate_slots_tracker,
epoch_slots_frozen_slots,
heaviest_subtree_fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
SlotStateUpdate::Duplicate(duplicate_state),
);
}
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -2827,6 +2870,30 @@ impl ReplayStage {
purge_repair_slot_counter,
SlotStateUpdate::BankFrozen(bank_frozen_state),
);
// If we previously marked this slot as duplicate in blockstore, let the state machine know
if !duplicate_slots_tracker.contains(&bank.slot())
&& blockstore.get_duplicate_slot(bank.slot()).is_some()
{
let duplicate_state = DuplicateState::new_from_state(
bank.slot(),
gossip_duplicate_confirmed_slots,
heaviest_subtree_fork_choice,
|| false,
|| Some(bank.hash()),
);
check_slot_agrees_with_cluster(
bank.slot(),
bank_forks.read().unwrap().root(),
blockstore,
duplicate_slots_tracker,
epoch_slots_frozen_slots,
heaviest_subtree_fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
SlotStateUpdate::Duplicate(duplicate_state),
);
}
if let Some(sender) = bank_notification_sender {
sender
.sender
Expand Down
18 changes: 18 additions & 0 deletions local-cluster/src/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use {
solana_ledger::{
ancestor_iterator::AncestorIterator,
blockstore::{Blockstore, PurgeType},
blockstore_meta::DuplicateSlotProof,
blockstore_options::{AccessType, BlockstoreOptions},
leader_schedule::{FixedSchedule, LeaderSchedule},
},
Expand Down Expand Up @@ -153,6 +154,23 @@ pub fn wait_for_last_vote_in_tower_to_land_in_ledger(
})
}

/// Waits roughly 10 seconds for duplicate proof to appear in blockstore at `dup_slot`. Returns proof if found.
pub fn wait_for_duplicate_proof(ledger_path: &Path, dup_slot: Slot) -> Option<DuplicateSlotProof> {
for _ in 0..10 {
let duplicate_fork_validator_blockstore = open_blockstore(ledger_path);
if let Some((found_dup_slot, found_duplicate_proof)) =
duplicate_fork_validator_blockstore.get_first_duplicate_proof()
{
if found_dup_slot == dup_slot {
return Some(found_duplicate_proof);
};
}

sleep(Duration::from_millis(1000));
}
None
}

pub fn copy_blocks(end_slot: Slot, source: &Blockstore, dest: &Blockstore) {
for slot in std::iter::once(end_slot).chain(AncestorIterator::new(end_slot, source)) {
let source_meta = source.meta(slot).unwrap().unwrap();
Expand Down
176 changes: 156 additions & 20 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ use {
ancestor_iterator::AncestorIterator,
bank_forks_utils,
blockstore::{entries_to_test_shreds, Blockstore},
blockstore_meta::DuplicateSlotProof,
blockstore_processor::ProcessOptions,
leader_schedule::FixedSchedule,
shred::Shred,
shred::{ProcessShredsStats, ReedSolomonCache, Shred, Shredder},
use_snapshot_archives_at_startup::UseSnapshotArchivesAtStartup,
},
solana_local_cluster::{
Expand All @@ -39,7 +38,7 @@ use {
last_root_in_tower, last_vote_in_tower, ms_for_n_slots, open_blockstore,
purge_slots_with_count, remove_tower, remove_tower_if_exists, restore_tower,
run_cluster_partition, run_kill_partition_switch_threshold, save_tower,
setup_snapshot_validator_config, test_faulty_node,
setup_snapshot_validator_config, test_faulty_node, wait_for_duplicate_proof,
wait_for_last_vote_in_tower_to_land_in_ledger, SnapshotValidatorConfig,
ValidatorTestConfig, DEFAULT_CLUSTER_LAMPORTS, DEFAULT_NODE_STAKE, RUST_LOG_FILTER,
},
Expand Down Expand Up @@ -69,7 +68,7 @@ use {
client::{AsyncClient, SyncClient},
clock::{self, Slot, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE},
commitment_config::CommitmentConfig,
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
epoch_schedule::{DEFAULT_SLOTS_PER_EPOCH, MINIMUM_SLOTS_PER_EPOCH},
genesis_config::ClusterType,
hard_forks::HardForks,
hash::Hash,
Expand Down Expand Up @@ -5145,22 +5144,6 @@ fn test_duplicate_shreds_switch_failure() {
}
}

fn wait_for_duplicate_proof(ledger_path: &Path, dup_slot: Slot) -> Option<DuplicateSlotProof> {
for _ in 0..10 {
let duplicate_fork_validator_blockstore = open_blockstore(ledger_path);
if let Some((found_dup_slot, found_duplicate_proof)) =
duplicate_fork_validator_blockstore.get_first_duplicate_proof()
{
if found_dup_slot == dup_slot {
return Some(found_duplicate_proof);
};
}

sleep(Duration::from_millis(1000));
}
None
}

solana_logger::setup_with_default(RUST_LOG_FILTER);
let validator_keypairs = [
"28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4",
Expand Down Expand Up @@ -5506,3 +5489,156 @@ fn test_duplicate_shreds_switch_failure() {
SocketAddrSpace::Unspecified,
);
}

/// Forks previous marked invalid should be marked as such in fork choice on restart
#[test]
#[serial]
fn test_invalid_forks_persisted_on_restart() {
solana_logger::setup_with("info,solana_metrics=off,solana_ledger=off");

let dup_slot = 10;
let validator_keypairs = [
"28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4",
"2saHBBoTkLMmttmPQP8KfBkcCw45S5cwtV3wTdGCscRC8uxdgvHxpHiWXKx4LvJjNJtnNcbSv5NdheokFFqnNDt8",
]
.iter()
.map(|s| (Arc::new(Keypair::from_base58_string(s)), true))
.collect::<Vec<_>>();
let majority_keypair = validator_keypairs[1].0.clone();

let validators = validator_keypairs
.iter()
.map(|(kp, _)| kp.pubkey())
.collect::<Vec<_>>();

let node_stakes = vec![DEFAULT_NODE_STAKE, 100 * DEFAULT_NODE_STAKE];
let (target_pubkey, majority_pubkey) = (validators[0], validators[1]);
// Need majority validator to make the dup_slot
let validator_to_slots = vec![
(majority_pubkey, dup_slot as usize + 5),
(target_pubkey, DEFAULT_SLOTS_PER_EPOCH as usize),
];
let leader_schedule = create_custom_leader_schedule(validator_to_slots.into_iter());
let mut default_config = ValidatorConfig::default_for_test();
default_config.fixed_leader_schedule = Some(FixedSchedule {
leader_schedule: Arc::new(leader_schedule),
});
let mut validator_configs = make_identical_validator_configs(&default_config, 2);
// Majority shouldn't duplicate confirm anything
validator_configs[1].voting_disabled = true;

let mut cluster = LocalCluster::new(
&mut ClusterConfig {
cluster_lamports: DEFAULT_CLUSTER_LAMPORTS + node_stakes.iter().sum::<u64>(),
validator_configs,
node_stakes,
validator_keys: Some(validator_keypairs),
skip_warmup_slots: true,
..ClusterConfig::default()
},
SocketAddrSpace::Unspecified,
);

let target_ledger_path = cluster.ledger_path(&target_pubkey);

// Wait for us to vote past duplicate slot
let timer = Instant::now();
loop {
if let Some(slot) =
wait_for_last_vote_in_tower_to_land_in_ledger(&target_ledger_path, &target_pubkey)
{
if slot > dup_slot {
break;
}
}

assert!(
timer.elapsed() < Duration::from_secs(30),
"Did not make more than 10 blocks in 30 seconds"
);
sleep(Duration::from_millis(100));
}

// Send duplicate
let parent = {
let blockstore = open_blockstore(&target_ledger_path);
let parent = blockstore
.meta(dup_slot)
.unwrap()
.unwrap()
.parent_slot
.unwrap();

let entries = create_ticks(
64 * (std::cmp::max(1, dup_slot - parent)),
0,
cluster.genesis_config.hash(),
);
let last_hash = entries.last().unwrap().hash;
let version = solana_sdk::shred_version::version_from_hash(&last_hash);
let dup_shreds = Shredder::new(dup_slot, parent, 0, version)
.unwrap()
.entries_to_shreds(
&majority_keypair,
&entries,
true, // is_full_slot
0, // next_shred_index,
0, // next_code_index
false, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
)
.0;

info!("Sending duplicate shreds for {dup_slot}");
cluster.send_shreds_to_validator(dup_shreds.iter().collect(), &target_pubkey);
wait_for_duplicate_proof(&target_ledger_path, dup_slot)
.expect("Duplicate proof for {dup_slot} not found");
parent
};

info!("Duplicate proof for {dup_slot} has landed, restarting node");
let info = cluster.exit_node(&target_pubkey);

{
let blockstore = open_blockstore(&target_ledger_path);
purge_slots_with_count(&blockstore, dup_slot + 5, 100);
}

// Restart, should create an entirely new fork
cluster.restart_node(&target_pubkey, info, SocketAddrSpace::Unspecified);

info!("Waiting for fork built off {parent}");
let timer = Instant::now();
let mut checked_children: HashSet<Slot> = HashSet::default();
let mut done = false;
while !done {
let blockstore = open_blockstore(&target_ledger_path);
let parent_meta = blockstore.meta(parent).unwrap().expect("Meta must exist");
for child in parent_meta.next_slots {
if checked_children.contains(&child) {
continue;
}

if blockstore.is_full(child) {
let shreds = blockstore
.get_data_shreds_for_slot(child, 0)
.expect("Child is full");
let mut is_our_block = true;
for shred in shreds {
is_our_block &= shred.verify(&target_pubkey);
}
if is_our_block {
done = true;
}
checked_children.insert(child);
}
}

assert!(
timer.elapsed() < Duration::from_secs(30),
"Did not create a new fork off parent {parent} in 30 seconds after restart"
);
sleep(Duration::from_millis(100));
}
}

0 comments on commit a8e83c8

Please sign in to comment.