Skip to content

Commit

Permalink
add support for a repair protocol whitelist (solana-labs#29161)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbiseda authored Dec 16, 2022
1 parent acb7eb2 commit a44ea77
Show file tree
Hide file tree
Showing 11 changed files with 331 additions and 52 deletions.
17 changes: 13 additions & 4 deletions core/src/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ impl AncestorHashesService {
let serve_repair = ServeRepair::new(
repair_info.cluster_info.clone(),
repair_info.bank_forks.clone(),
repair_info.repair_whitelist.clone(),
);
let mut repair_stats = AncestorRepairRequestsStats::default();

Expand Down Expand Up @@ -969,8 +970,11 @@ mod test {
Arc::new(keypair),
SocketAddrSpace::Unspecified,
);
let responder_serve_repair =
ServeRepair::new(Arc::new(cluster_info), vote_simulator.bank_forks);
let responder_serve_repair = ServeRepair::new(
Arc::new(cluster_info),
vote_simulator.bank_forks,
Arc::<RwLock<HashSet<_>>>::default(), // repair whitelist
);

// Set up thread to give us responses
let ledger_path = get_tmp_ledger_path!();
Expand Down Expand Up @@ -1054,8 +1058,12 @@ mod test {
Arc::new(keypair),
SocketAddrSpace::Unspecified,
));
let requester_serve_repair =
ServeRepair::new(requester_cluster_info.clone(), bank_forks.clone());
let repair_whitelist = Arc::new(RwLock::new(HashSet::default()));
let requester_serve_repair = ServeRepair::new(
requester_cluster_info.clone(),
bank_forks.clone(),
repair_whitelist.clone(),
);
let (duplicate_slots_reset_sender, _duplicate_slots_reset_receiver) = unbounded();
let repair_info = RepairInfo {
bank_forks,
Expand All @@ -1064,6 +1072,7 @@ mod test {
epoch_schedule,
duplicate_slots_reset_sender,
repair_validators: None,
repair_whitelist,
};

let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) =
Expand Down
16 changes: 14 additions & 2 deletions core/src/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ pub struct RepairInfo {
pub cluster_slots: Arc<ClusterSlots>,
pub epoch_schedule: EpochSchedule,
pub duplicate_slots_reset_sender: DuplicateSlotsResetSender,
// Validators from which repairs are requested
pub repair_validators: Option<HashSet<Pubkey>>,
// Validators which should be given priority when serving
pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
}

pub struct RepairSlotRange {
Expand Down Expand Up @@ -251,6 +254,7 @@ impl RepairService {
let serve_repair = ServeRepair::new(
repair_info.cluster_info.clone(),
repair_info.bank_forks.clone(),
repair_info.repair_whitelist.clone(),
);
let id = repair_info.cluster_info.id();
let mut repair_stats = RepairStats::default();
Expand Down Expand Up @@ -1084,7 +1088,11 @@ mod test {
let cluster_slots = ClusterSlots::default();
let cluster_info = Arc::new(new_test_cluster_info(Node::new_localhost().info));
let identity_keypair = cluster_info.keypair().clone();
let serve_repair = ServeRepair::new(cluster_info, bank_forks);
let serve_repair = ServeRepair::new(
cluster_info,
bank_forks,
Arc::new(RwLock::new(HashSet::default())),
);
let mut duplicate_slot_repair_statuses = HashMap::new();
let dead_slot = 9;
let receive_socket = &UdpSocket::bind("0.0.0.0:0").unwrap();
Expand Down Expand Up @@ -1179,7 +1187,11 @@ mod test {
UdpSocket::bind("0.0.0.0:0").unwrap().local_addr().unwrap(),
));
let cluster_info = Arc::new(new_test_cluster_info(Node::new_localhost().info));
let serve_repair = ServeRepair::new(cluster_info.clone(), bank_forks);
let serve_repair = ServeRepair::new(
cluster_info.clone(),
bank_forks,
Arc::new(RwLock::new(HashSet::default())),
);
let valid_repair_peer = Node::new_localhost().info;

// Signal that this peer has confirmed the dead slot, and is thus
Expand Down
152 changes: 108 additions & 44 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ struct ServeRepairStats {
dropped_requests_outbound_bandwidth: usize,
dropped_requests_load_shed: usize,
dropped_requests_low_stake: usize,
whitelisted_requests: usize,
total_dropped_response_packets: usize,
total_response_packets: usize,
total_response_bytes_staked: usize,
Expand Down Expand Up @@ -281,6 +282,7 @@ impl RepairProtocol {
pub struct ServeRepair {
cluster_info: Arc<ClusterInfo>,
bank_forks: Arc<RwLock<BankForks>>,
repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
}

// Cache entry for repair peers for a slot.
Expand Down Expand Up @@ -316,11 +318,23 @@ impl RepairPeers {
}
}

struct RepairRequestWithMeta {
request: RepairProtocol,
from_addr: SocketAddr,
stake: u64,
whitelisted: bool,
}

impl ServeRepair {
pub fn new(cluster_info: Arc<ClusterInfo>, bank_forks: Arc<RwLock<BankForks>>) -> Self {
pub fn new(
cluster_info: Arc<ClusterInfo>,
bank_forks: Arc<RwLock<BankForks>>,
repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
) -> Self {
Self {
cluster_info,
bank_forks,
repair_whitelist,
}
}

Expand Down Expand Up @@ -456,7 +470,11 @@ impl ServeRepair {
let my_id = identity_keypair.pubkey();

let max_buffered_packets = if root_bank.cluster_type() != ClusterType::MainnetBeta {
2 * MAX_REQUESTS_PER_ITERATION
if self.repair_whitelist.read().unwrap().len() > 0 {
4 * MAX_REQUESTS_PER_ITERATION
} else {
2 * MAX_REQUESTS_PER_ITERATION
}
} else {
MAX_REQUESTS_PER_ITERATION
};
Expand All @@ -475,58 +493,74 @@ impl ServeRepair {
stats.total_requests += total_requests;

let decode_start = Instant::now();
let mut decoded_reqs = Vec::default();
for packet in reqs_v.iter().flatten() {
let request: RepairProtocol = match packet.deserialize_slice(..) {
Ok(request) => request,
Err(_) => {
let mut decoded_requests = Vec::default();
let mut whitelisted_request_count: usize = 0;
{
let whitelist = self.repair_whitelist.read().unwrap();
for packet in reqs_v.iter().flatten() {
let request: RepairProtocol = match packet.deserialize_slice(..) {
Ok(request) => request,
Err(_) => {
stats.err_malformed += 1;
continue;
}
};

let from_addr = packet.meta().socket_addr();
if !ContactInfo::is_valid_address(&from_addr, &socket_addr_space) {
stats.err_malformed += 1;
continue;
}
};

let from_addr = packet.meta().socket_addr();
if !ContactInfo::is_valid_address(&from_addr, &socket_addr_space) {
stats.err_malformed += 1;
continue;
}
if request.supports_signature() {
// collect stats for signature verification
Self::verify_signed_packet(&my_id, packet, &request, stats);
} else {
stats.unsigned_requests += 1;
}

if request.supports_signature() {
// collect stats for signature verification
Self::verify_signed_packet(&my_id, packet, &request, stats);
} else {
stats.unsigned_requests += 1;
}
if request.sender() == &my_id {
stats.self_repair += 1;
continue;
}

if request.sender() == &my_id {
stats.self_repair += 1;
continue;
}
let stake = epoch_staked_nodes
.as_ref()
.and_then(|stakes| stakes.get(request.sender()))
.unwrap_or(&0);
if *stake == 0 {
stats.handle_requests_unstaked += 1;
} else {
stats.handle_requests_staked += 1;
}

let stake = epoch_staked_nodes
.as_ref()
.and_then(|stakes| stakes.get(request.sender()))
.unwrap_or(&0);
if *stake == 0 {
stats.handle_requests_unstaked += 1;
} else {
stats.handle_requests_staked += 1;
let whitelisted = whitelist.contains(request.sender());
if whitelisted {
whitelisted_request_count += 1;
}

decoded_requests.push(RepairRequestWithMeta {
request,
from_addr,
stake: *stake,
whitelisted,
});
}
decoded_reqs.push((request, from_addr, *stake));
}
stats.decode_time_us += decode_start.elapsed().as_micros() as u64;
stats.whitelisted_requests += whitelisted_request_count.min(MAX_REQUESTS_PER_ITERATION);

if decoded_reqs.len() > MAX_REQUESTS_PER_ITERATION {
stats.dropped_requests_low_stake += decoded_reqs.len() - MAX_REQUESTS_PER_ITERATION;
decoded_reqs.sort_unstable_by_key(|(_, _, stake)| Reverse(*stake));
decoded_reqs.truncate(MAX_REQUESTS_PER_ITERATION);
if decoded_requests.len() > MAX_REQUESTS_PER_ITERATION {
stats.dropped_requests_low_stake += decoded_requests.len() - MAX_REQUESTS_PER_ITERATION;
decoded_requests.sort_unstable_by_key(|r| Reverse((r.whitelisted, r.stake)));
decoded_requests.truncate(MAX_REQUESTS_PER_ITERATION);
}

self.handle_packets(
ping_cache,
recycler,
blockstore,
decoded_reqs,
decoded_requests,
response_sender,
stats,
data_budget,
Expand Down Expand Up @@ -564,6 +598,7 @@ impl ServeRepair {
stats.dropped_requests_low_stake,
i64
),
("whitelisted_requests", stats.whitelisted_requests, i64),
(
"total_dropped_response_packets",
stats.total_dropped_response_packets,
Expand Down Expand Up @@ -778,7 +813,7 @@ impl ServeRepair {
ping_cache: &mut PingCache,
recycler: &PacketBatchRecycler,
blockstore: &Blockstore,
requests: Vec<(RepairProtocol, SocketAddr, /*stake*/ u64)>,
requests: Vec<RepairRequestWithMeta>,
response_sender: &PacketBatchSender,
stats: &mut ServeRepairStats,
data_budget: &DataBudget,
Expand All @@ -787,7 +822,16 @@ impl ServeRepair {
let mut pending_pings = Vec::default();

let requests_len = requests.len();
for (i, (request, from_addr, stake)) in requests.into_iter().enumerate() {
for (
i,
RepairRequestWithMeta {
request,
from_addr,
stake,
..
},
) in requests.into_iter().enumerate()
{
if !matches!(&request, RepairProtocol::Pong(_)) {
let (check, ping_pkt) =
Self::check_ping_cache(ping_cache, &request, &from_addr, &identity_keypair);
Expand Down Expand Up @@ -1246,7 +1290,11 @@ mod tests {
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let me = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp());
let cluster_info = Arc::new(new_test_cluster_info(me));
let serve_repair = ServeRepair::new(cluster_info.clone(), bank_forks);
let serve_repair = ServeRepair::new(
cluster_info.clone(),
bank_forks,
Arc::new(RwLock::new(HashSet::default())),
);
let keypair = cluster_info.keypair().clone();
let repair_peer_id = solana_sdk::pubkey::new_rand();
let repair_request = ShredRepairType::Orphan(123);
Expand Down Expand Up @@ -1292,7 +1340,11 @@ mod tests {
let mut bank = Bank::new_for_tests(&genesis_config);
bank.feature_set = Arc::new(FeatureSet::all_enabled());
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let serve_repair = ServeRepair::new(cluster_info, bank_forks);
let serve_repair = ServeRepair::new(
cluster_info,
bank_forks,
Arc::new(RwLock::new(HashSet::default())),
);

let request_bytes = serve_repair
.ancestor_repair_request_bytes(&keypair, &repair_peer_id, slot, nonce)
Expand Down Expand Up @@ -1326,7 +1378,11 @@ mod tests {
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let me = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp());
let cluster_info = Arc::new(new_test_cluster_info(me));
let serve_repair = ServeRepair::new(cluster_info.clone(), bank_forks);
let serve_repair = ServeRepair::new(
cluster_info.clone(),
bank_forks,
Arc::new(RwLock::new(HashSet::default())),
);
let keypair = cluster_info.keypair().clone();
let repair_peer_id = solana_sdk::pubkey::new_rand();

Expand Down Expand Up @@ -1653,7 +1709,11 @@ mod tests {
let cluster_slots = ClusterSlots::default();
let me = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp());
let cluster_info = Arc::new(new_test_cluster_info(me));
let serve_repair = ServeRepair::new(cluster_info.clone(), bank_forks);
let serve_repair = ServeRepair::new(
cluster_info.clone(),
bank_forks,
Arc::new(RwLock::new(HashSet::default())),
);
let identity_keypair = cluster_info.keypair().clone();
let mut outstanding_requests = OutstandingShredRepairs::default();
let rv = serve_repair.repair_request(
Expand Down Expand Up @@ -1984,7 +2044,11 @@ mod tests {
cluster_info.insert_info(contact_info2.clone());
cluster_info.insert_info(contact_info3.clone());
let identity_keypair = cluster_info.keypair().clone();
let serve_repair = ServeRepair::new(cluster_info, bank_forks);
let serve_repair = ServeRepair::new(
cluster_info,
bank_forks,
Arc::new(RwLock::new(HashSet::default())),
);

// If:
// 1) repair validator set doesn't exist in gossip
Expand Down
4 changes: 4 additions & 0 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ pub struct TvuSockets {
pub struct TvuConfig {
pub max_ledger_shreds: Option<u64>,
pub shred_version: u16,
// Validators from which repairs are requested
pub repair_validators: Option<HashSet<Pubkey>>,
// Validators which should be given priority when serving repairs
pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
pub wait_for_vote_to_start_leader: bool,
pub replay_slots_concurrently: bool,
}
Expand Down Expand Up @@ -189,6 +192,7 @@ impl Tvu {
epoch_schedule,
duplicate_slots_reset_sender,
repair_validators: tvu_config.repair_validators,
repair_whitelist: tvu_config.repair_whitelist,
cluster_info: cluster_info.clone(),
cluster_slots: cluster_slots.clone(),
};
Expand Down
Loading

0 comments on commit a44ea77

Please sign in to comment.