Skip to content

Commit

Permalink
Refactor pass feature status to deserialized packet via packet meta (s…
Browse files Browse the repository at this point in the history
…olana-labs#31549)

Add a flag to packet, set its value by packet_deserializer when received by banking_stage with working_bank
  • Loading branch information
tao-stones authored May 11, 2023
1 parent 4d4dddc commit 49f44f5
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 23 deletions.
3 changes: 2 additions & 1 deletion core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ impl BankingStage {
),
};

let mut packet_receiver = PacketReceiver::new(id, packet_receiver);
let mut packet_receiver =
PacketReceiver::new(id, packet_receiver, bank_forks.clone());
let poh_recorder = poh_recorder.clone();

let committer = Committer::new(
Expand Down
14 changes: 11 additions & 3 deletions core/src/banking_stage/packet_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ use {
},
crossbeam_channel::RecvTimeoutError,
solana_measure::{measure::Measure, measure_us},
solana_runtime::bank_forks::BankForks,
solana_sdk::{saturating_add_assign, timing::timestamp},
std::{sync::atomic::Ordering, time::Duration},
std::{
sync::{atomic::Ordering, Arc, RwLock},
time::Duration,
},
};

pub struct PacketReceiver {
Expand All @@ -20,10 +24,14 @@ pub struct PacketReceiver {
}

impl PacketReceiver {
pub fn new(id: u32, banking_packet_receiver: BankingPacketReceiver) -> Self {
pub fn new(
id: u32,
banking_packet_receiver: BankingPacketReceiver,
bank_forks: Arc<RwLock<BankForks>>,
) -> Self {
Self {
id,
packet_deserializer: PacketDeserializer::new(banking_packet_receiver),
packet_deserializer: PacketDeserializer::new(banking_packet_receiver, bank_forks),
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/immutable_deserialized_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl ImmutableDeserializedPacket {

// drop transaction if prioritization fails.
let mut priority_details = sanitized_transaction
.get_transaction_priority_details()
.get_transaction_priority_details(packet.meta().round_compute_unit_price())
.ok_or(DeserializedPacketError::PrioritizationFailure)?;

// set priority to zero for vote transactions
Expand Down
40 changes: 34 additions & 6 deletions core/src/packet_deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ use {
},
crossbeam_channel::RecvTimeoutError,
solana_perf::packet::PacketBatch,
std::time::{Duration, Instant},
solana_runtime::bank_forks::BankForks,
std::{
sync::{Arc, RwLock},
time::{Duration, Instant},
},
};

/// Results from deserializing packet batches.
Expand All @@ -26,12 +30,18 @@ pub struct ReceivePacketResults {
pub struct PacketDeserializer {
/// Receiver for packet batches from sigverify stage
packet_batch_receiver: BankingPacketReceiver,
/// Provides root bank for deserializer to check feature activation
bank_forks: Arc<RwLock<BankForks>>,
}

impl PacketDeserializer {
pub fn new(packet_batch_receiver: BankingPacketReceiver) -> Self {
pub fn new(
packet_batch_receiver: BankingPacketReceiver,
bank_forks: Arc<RwLock<BankForks>>,
) -> Self {
Self {
packet_batch_receiver,
bank_forks,
}
}

Expand All @@ -42,9 +52,16 @@ impl PacketDeserializer {
capacity: usize,
) -> Result<ReceivePacketResults, RecvTimeoutError> {
let (packet_count, packet_batches) = self.receive_until(recv_timeout, capacity)?;

// Note: this can be removed after feature `round_compute_unit_price` is activated in
// mainnet-beta
let _working_bank = self.bank_forks.read().unwrap().working_bank();
let round_compute_unit_price_enabled = false; // TODO get from working_bank.feature_set

Ok(Self::deserialize_and_collect_packets(
packet_count,
&packet_batches,
round_compute_unit_price_enabled,
))
}

Expand All @@ -53,6 +70,7 @@ impl PacketDeserializer {
fn deserialize_and_collect_packets(
packet_count: usize,
banking_batches: &[BankingPacketBatch],
round_compute_unit_price_enabled: bool,
) -> ReceivePacketResults {
let mut passed_sigverify_count: usize = 0;
let mut failed_sigverify_count: usize = 0;
Expand All @@ -66,8 +84,11 @@ impl PacketDeserializer {
passed_sigverify_count += packet_indexes.len();
failed_sigverify_count += packet_batch.len().saturating_sub(packet_indexes.len());

deserialized_packets
.extend(Self::deserialize_packets(packet_batch, &packet_indexes));
deserialized_packets.extend(Self::deserialize_packets(
packet_batch,
&packet_indexes,
round_compute_unit_price_enabled,
));
}

if let Some(tracer_packet_stats) = &banking_batch.1 {
Expand Down Expand Up @@ -136,9 +157,14 @@ impl PacketDeserializer {
fn deserialize_packets<'a>(
packet_batch: &'a PacketBatch,
packet_indexes: &'a [usize],
round_compute_unit_price_enabled: bool,
) -> impl Iterator<Item = ImmutableDeserializedPacket> + 'a {
packet_indexes.iter().filter_map(move |packet_index| {
ImmutableDeserializedPacket::new(packet_batch[*packet_index].clone()).ok()
let mut packet_clone = packet_batch[*packet_index].clone();
packet_clone
.meta_mut()
.set_round_compute_unit_price(round_compute_unit_price_enabled);
ImmutableDeserializedPacket::new(packet_clone).ok()
})
}
}
Expand All @@ -160,7 +186,7 @@ mod tests {

#[test]
fn test_deserialize_and_collect_packets_empty() {
let results = PacketDeserializer::deserialize_and_collect_packets(0, &[]);
let results = PacketDeserializer::deserialize_and_collect_packets(0, &[], false);
assert_eq!(results.deserialized_packets.len(), 0);
assert!(results.new_tracer_stats_option.is_none());
assert_eq!(results.passed_sigverify_count, 0);
Expand All @@ -177,6 +203,7 @@ mod tests {
let results = PacketDeserializer::deserialize_and_collect_packets(
packet_count,
&[BankingPacketBatch::new((packet_batches, None))],
false,
);
assert_eq!(results.deserialized_packets.len(), 2);
assert!(results.new_tracer_stats_option.is_none());
Expand All @@ -195,6 +222,7 @@ mod tests {
let results = PacketDeserializer::deserialize_and_collect_packets(
packet_count,
&[BankingPacketBatch::new((packet_batches, None))],
false,
);
assert_eq!(results.deserialized_packets.len(), 1);
assert!(results.new_tracer_stats_option.is_none());
Expand Down
4 changes: 3 additions & 1 deletion runtime/src/prioritization_fee_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,9 @@ impl PrioritizationFeeCache {
continue;
}

let priority_details = sanitized_transaction.get_transaction_priority_details();
let round_compute_unit_price_enabled = false; // TODO: bank.feture_set.is_active(round_compute_unit_price)
let priority_details = sanitized_transaction
.get_transaction_priority_details(round_compute_unit_price_enabled);
let account_locks = sanitized_transaction
.get_account_locks(bank.get_transaction_account_lock_limit());

Expand Down
39 changes: 28 additions & 11 deletions runtime/src/transaction_priority_details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ pub struct TransactionPriorityDetails {
}

pub trait GetTransactionPriorityDetails {
fn get_transaction_priority_details(&self) -> Option<TransactionPriorityDetails>;
fn get_transaction_priority_details(
&self,
round_compute_unit_price_enabled: bool,
) -> Option<TransactionPriorityDetails>;

fn process_compute_budget_instruction<'a>(
instructions: impl Iterator<Item = (&'a Pubkey, &'a CompiledInstruction)>,
_round_compute_unit_price_enabled: bool,
) -> Option<TransactionPriorityDetails> {
let mut compute_budget = ComputeBudget::default();
let prioritization_fee_details = compute_budget
Expand All @@ -27,6 +31,7 @@ pub trait GetTransactionPriorityDetails {
false, // stop supporting prioritization by request_units_deprecated instruction
true, // enable request heap frame instruction
true, // enable support set accounts data size instruction
// TODO: round_compute_unit_price_enabled: bool
)
.ok()?;
Some(TransactionPriorityDetails {
Expand All @@ -37,14 +42,26 @@ pub trait GetTransactionPriorityDetails {
}

impl GetTransactionPriorityDetails for SanitizedVersionedTransaction {
fn get_transaction_priority_details(&self) -> Option<TransactionPriorityDetails> {
Self::process_compute_budget_instruction(self.get_message().program_instructions_iter())
fn get_transaction_priority_details(
&self,
round_compute_unit_price_enabled: bool,
) -> Option<TransactionPriorityDetails> {
Self::process_compute_budget_instruction(
self.get_message().program_instructions_iter(),
round_compute_unit_price_enabled,
)
}
}

impl GetTransactionPriorityDetails for SanitizedTransaction {
fn get_transaction_priority_details(&self) -> Option<TransactionPriorityDetails> {
Self::process_compute_budget_instruction(self.message().program_instructions_iter())
fn get_transaction_priority_details(
&self,
round_compute_unit_price_enabled: bool,
) -> Option<TransactionPriorityDetails> {
Self::process_compute_budget_instruction(
self.message().program_instructions_iter(),
round_compute_unit_price_enabled,
)
}
}

Expand Down Expand Up @@ -78,7 +95,7 @@ mod tests {
let sanitized_versioned_transaction =
SanitizedVersionedTransaction::try_new(versioned_transaction).unwrap();
assert_eq!(
sanitized_versioned_transaction.get_transaction_priority_details(),
sanitized_versioned_transaction.get_transaction_priority_details(false),
Some(TransactionPriorityDetails {
priority: 0,
compute_unit_limit:
Expand All @@ -91,7 +108,7 @@ mod tests {
let sanitized_transaction =
SanitizedTransaction::try_from_legacy_transaction(transaction).unwrap();
assert_eq!(
sanitized_transaction.get_transaction_priority_details(),
sanitized_transaction.get_transaction_priority_details(false),
Some(TransactionPriorityDetails {
priority: 0,
compute_unit_limit:
Expand All @@ -118,7 +135,7 @@ mod tests {
let sanitized_versioned_transaction =
SanitizedVersionedTransaction::try_new(versioned_transaction).unwrap();
assert_eq!(
sanitized_versioned_transaction.get_transaction_priority_details(),
sanitized_versioned_transaction.get_transaction_priority_details(false),
Some(TransactionPriorityDetails {
priority: 0,
compute_unit_limit: requested_cu as u64,
Expand All @@ -129,7 +146,7 @@ mod tests {
let sanitized_transaction =
SanitizedTransaction::try_from_legacy_transaction(transaction).unwrap();
assert_eq!(
sanitized_transaction.get_transaction_priority_details(),
sanitized_transaction.get_transaction_priority_details(false),
Some(TransactionPriorityDetails {
priority: 0,
compute_unit_limit: requested_cu as u64,
Expand All @@ -154,7 +171,7 @@ mod tests {
let sanitized_versioned_transaction =
SanitizedVersionedTransaction::try_new(versioned_transaction).unwrap();
assert_eq!(
sanitized_versioned_transaction.get_transaction_priority_details(),
sanitized_versioned_transaction.get_transaction_priority_details(false),
Some(TransactionPriorityDetails {
priority: requested_price,
compute_unit_limit:
Expand All @@ -167,7 +184,7 @@ mod tests {
let sanitized_transaction =
SanitizedTransaction::try_from_legacy_transaction(transaction).unwrap();
assert_eq!(
sanitized_transaction.get_transaction_priority_details(),
sanitized_transaction.get_transaction_priority_details(false),
Some(TransactionPriorityDetails {
priority: requested_price,
compute_unit_limit:
Expand Down
17 changes: 17 additions & 0 deletions sdk/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ bitflags! {
const REPAIR = 0b0000_0100;
const SIMPLE_VOTE_TX = 0b0000_1000;
const TRACER_PACKET = 0b0001_0000;
/// to be set by bank.feature_set.is_active(round_compute_unit_price::id()) at the moment
/// the packet is built.
/// This field can be removed when the above feature gate is adopted by mainnet-beta.
const ROUND_COMPUTE_UNIT_PRICE = 0b0010_0000;
}
}

Expand Down Expand Up @@ -214,6 +218,14 @@ impl Meta {
self.flags.set(PacketFlags::SIMPLE_VOTE_TX, is_simple_vote);
}

#[inline]
pub fn set_round_compute_unit_price(&mut self, round_compute_unit_price: bool) {
self.flags.set(
PacketFlags::ROUND_COMPUTE_UNIT_PRICE,
round_compute_unit_price,
);
}

#[inline]
pub fn forwarded(&self) -> bool {
self.flags.contains(PacketFlags::FORWARDED)
Expand All @@ -233,6 +245,11 @@ impl Meta {
pub fn is_tracer_packet(&self) -> bool {
self.flags.contains(PacketFlags::TRACER_PACKET)
}

#[inline]
pub fn round_compute_unit_price(&self) -> bool {
self.flags.contains(PacketFlags::ROUND_COMPUTE_UNIT_PRICE)
}
}

impl Default for Meta {
Expand Down

0 comments on commit 49f44f5

Please sign in to comment.