Skip to content

Commit

Permalink
Fix occupied core handling (paritytech#4691)
Browse files Browse the repository at this point in the history
Co-authored-by: eskimor <[email protected]>
Co-authored-by: Andrei Sandu <[email protected]>
  • Loading branch information
3 people authored Jun 7, 2024
1 parent 2a89cc2 commit 9dfe0fe
Show file tree
Hide file tree
Showing 14 changed files with 270 additions and 135 deletions.
2 changes: 1 addition & 1 deletion polkadot/runtime/parachains/src/assigner_coretime/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ fn run_to_block(
Scheduler::initializer_initialize(b + 1);

// In the real runtime this is expected to be called by the `InclusionInherent` pallet.
Scheduler::free_cores_and_fill_claimqueue(BTreeMap::new(), b + 1);
Scheduler::free_cores_and_fill_claim_queue(BTreeMap::new(), b + 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ fn run_to_block(
OnDemandAssigner::on_initialize(b + 1);

// In the real runtime this is expected to be called by the `InclusionInherent` pallet.
Scheduler::free_cores_and_fill_claimqueue(BTreeMap::new(), b + 1);
Scheduler::free_cores_and_fill_claim_queue(BTreeMap::new(), b + 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ fn run_to_block(
Scheduler::initializer_initialize(b + 1);

// In the real runtime this is expected to be called by the `InclusionInherent` pallet.
Scheduler::free_cores_and_fill_claimqueue(BTreeMap::new(), b + 1);
Scheduler::free_cores_and_fill_claim_queue(BTreeMap::new(), b + 1);
}
}

Expand Down
49 changes: 33 additions & 16 deletions polkadot/runtime/parachains/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,17 @@ pub(crate) struct BenchBuilder<T: paras_inherent::Config> {
/// will correspond to core index 3. There must be one entry for each core with a dispute
/// statement set.
dispute_sessions: Vec<u32>,
/// Paras here will both be backed in the inherent data and already occupying a core (which is
/// freed via bitfields).
///
/// Map from para id to number of validity votes. Core indices are generated based on
/// `elastic_paras` configuration. Each para id in `elastic_paras` gets the
/// specified amount of consecutive cores assigned to it. If a para id is not present
/// in `elastic_paras` it get assigned to a single core.
backed_and_concluding_paras: BTreeMap<u32, u32>,

/// Paras which don't yet occupy a core, but will after the inherent has been processed.
backed_in_inherent_paras: BTreeMap<u32, u32>,
/// Map from para id (seed) to number of chained candidates.
elastic_paras: BTreeMap<u32, u8>,
/// Make every candidate include a code upgrade by setting this to `Some` where the interior
Expand Down Expand Up @@ -132,6 +138,7 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
dispute_statements: BTreeMap::new(),
dispute_sessions: Default::default(),
backed_and_concluding_paras: Default::default(),
backed_in_inherent_paras: Default::default(),
elastic_paras: Default::default(),
code_upgrade: None,
fill_claimqueue: true,
Expand Down Expand Up @@ -167,6 +174,12 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
self
}

/// Set a map from para id seed to number of validity votes for votes in inherent data.
pub(crate) fn set_backed_in_inherent_paras(mut self, backed: BTreeMap<u32, u32>) -> Self {
self.backed_in_inherent_paras = backed;
self
}

/// Set a map from para id seed to number of cores assigned to it.
pub(crate) fn set_elastic_paras(mut self, elastic_paras: BTreeMap<u32, u8>) -> Self {
self.elastic_paras = elastic_paras;
Expand Down Expand Up @@ -753,8 +766,8 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
///
/// Note that this API only allows building scenarios where the `backed_and_concluding_paras`
/// are mutually exclusive with the cores for disputes. So
/// `backed_and_concluding_paras.len() + dispute_sessions.len()` must be less than the max
/// number of cores.
/// `backed_and_concluding_paras.len() + dispute_sessions.len() + backed_in_inherent_paras` must
/// be less than the max number of cores.
pub(crate) fn build(self) -> Bench<T> {
// Make sure relevant storage is cleared. This is just to get the asserts to work when
// running tests because it seems the storage is not cleared in between.
Expand All @@ -771,8 +784,10 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
.sum::<usize>()
.saturating_sub(self.elastic_paras.len() as usize);

let used_cores =
self.dispute_sessions.len() + self.backed_and_concluding_paras.len() + extra_cores;
let used_cores = self.dispute_sessions.len() +
self.backed_and_concluding_paras.len() +
self.backed_in_inherent_paras.len() +
extra_cores;

assert!(used_cores <= max_cores);
let fill_claimqueue = self.fill_claimqueue;
Expand All @@ -793,8 +808,12 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
&builder.elastic_paras,
used_cores,
);

let mut backed_in_inherent = BTreeMap::new();
backed_in_inherent.append(&mut builder.backed_and_concluding_paras.clone());
backed_in_inherent.append(&mut builder.backed_in_inherent_paras.clone());
let backed_candidates = builder.create_backed_candidates(
&builder.backed_and_concluding_paras,
&backed_in_inherent,
&builder.elastic_paras,
builder.code_upgrade,
);
Expand Down Expand Up @@ -845,12 +864,16 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
scheduler::AvailabilityCores::<T>::set(cores);

core_idx = 0u32;

// We need entries in the claim queue for those:
all_cores.append(&mut builder.backed_in_inherent_paras.clone());

if fill_claimqueue {
let cores = all_cores
.keys()
.flat_map(|para_id| {
(0..elastic_paras.get(&para_id).cloned().unwrap_or(1))
.filter_map(|_para_local_core_idx| {
.map(|_para_local_core_idx| {
let ttl = configuration::ActiveConfig::<T>::get().scheduler_params.ttl;
// Load an assignment into provider so that one is present to pop
let assignment =
Expand All @@ -859,17 +882,11 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
ParaId::from(*para_id),
);

let entry = (
CoreIndex(core_idx),
[ParasEntry::new(assignment, now + ttl)].into(),
);
let res = if builder.unavailable_cores.contains(&core_idx) {
None
} else {
Some(entry)
};
core_idx += 1;
res
(
CoreIndex(core_idx - 1),
[ParasEntry::new(assignment, now + ttl)].into(),
)
})
.collect::<Vec<(CoreIndex, VecDeque<ParasEntry<_>>)>>()
})
Expand Down
6 changes: 6 additions & 0 deletions polkadot/runtime/parachains/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ pub enum InconsistentError<BlockNumber> {
InconsistentExecutorParams { inner: ExecutorParamError },
/// TTL should be bigger than lookahead
LookaheadExceedsTTL,
/// Lookahead is zero, while it must be at least 1 for parachains to work.
LookaheadZero,
/// Passed in queue size for on-demand was too large.
OnDemandQueueSizeTooLarge,
/// Number of delay tranches cannot be 0.
Expand Down Expand Up @@ -432,6 +434,10 @@ where
return Err(LookaheadExceedsTTL)
}

if self.scheduler_params.lookahead == 0 {
return Err(LookaheadZero)
}

if self.scheduler_params.on_demand_queue_max_size > ON_DEMAND_MAX_QUEUE_MAX_SIZE {
return Err(OnDemandQueueSizeTooLarge)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ benchmarks! {
.collect();

let scenario = BenchBuilder::<T>::new()
.set_backed_and_concluding_paras(cores_with_backed.clone())
.set_backed_in_inherent_paras(cores_with_backed.clone())
.build();

let mut benchmark = scenario.data.clone();
Expand Down Expand Up @@ -161,7 +161,7 @@ benchmarks! {
.collect();

let scenario = BenchBuilder::<T>::new()
.set_backed_and_concluding_paras(cores_with_backed.clone())
.set_backed_in_inherent_paras(cores_with_backed.clone())
.set_code_upgrade(v)
.build();

Expand Down
19 changes: 10 additions & 9 deletions polkadot/runtime/parachains/src/paras_inherent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ impl<T: Config> Pallet<T> {
.chain(freed_disputed.into_iter().map(|core| (core, FreedReason::Concluded)))
.chain(freed_timeout.into_iter().map(|c| (c, FreedReason::TimedOut)))
.collect::<BTreeMap<CoreIndex, FreedReason>>();
scheduler::Pallet::<T>::free_cores_and_fill_claimqueue(freed, now);
scheduler::Pallet::<T>::free_cores_and_fill_claim_queue(freed, now);

METRICS.on_candidates_processed_total(backed_candidates.len() as u64);

Expand All @@ -570,25 +570,26 @@ impl<T: Config> Pallet<T> {
.map(|b| *b)
.unwrap_or(false);

let mut scheduled: BTreeMap<ParaId, BTreeSet<CoreIndex>> = BTreeMap::new();
let mut total_scheduled_cores = 0;
let mut eligible: BTreeMap<ParaId, BTreeSet<CoreIndex>> = BTreeMap::new();
let mut total_eligible_cores = 0;

for (core_idx, para_id) in scheduler::Pallet::<T>::scheduled_paras() {
total_scheduled_cores += 1;
scheduled.entry(para_id).or_default().insert(core_idx);
for (core_idx, para_id) in scheduler::Pallet::<T>::eligible_paras() {
total_eligible_cores += 1;
log::trace!(target: LOG_TARGET, "Found eligible para {:?} on core {:?}", para_id, core_idx);
eligible.entry(para_id).or_default().insert(core_idx);
}

let initial_candidate_count = backed_candidates.len();
let backed_candidates_with_core = sanitize_backed_candidates::<T>(
backed_candidates,
&allowed_relay_parents,
concluded_invalid_hashes,
scheduled,
eligible,
core_index_enabled,
);
let count = count_backed_candidates(&backed_candidates_with_core);

ensure!(count <= total_scheduled_cores, Error::<T>::UnscheduledCandidate);
ensure!(count <= total_eligible_cores, Error::<T>::UnscheduledCandidate);

METRICS.on_candidates_sanitized(count as u64);

Expand Down Expand Up @@ -1422,7 +1423,7 @@ fn map_candidates_to_cores<T: configuration::Config + scheduler::Config + inclus
} else {
log::debug!(
target: LOG_TARGET,
"Paraid: {:?} has no scheduled cores but {} candidates were supplied.",
"Paraid: {:?} has no entry in scheduled cores but {} candidates were supplied.",
para_id,
backed_candidates.len()
);
Expand Down
20 changes: 10 additions & 10 deletions polkadot/runtime/parachains/src/paras_inherent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ mod enter {
.unwrap();

// The current schedule is empty prior to calling `create_inherent_enter`.
assert!(scheduler::Pallet::<Test>::claimqueue_is_empty());
assert!(scheduler::Pallet::<Test>::claim_queue_is_empty());

// Nothing is filtered out (including the backed candidates.)
assert_eq!(
Expand Down Expand Up @@ -257,7 +257,7 @@ mod enter {
.unwrap();

// The current schedule is empty prior to calling `create_inherent_enter`.
assert!(scheduler::Pallet::<Test>::claimqueue_is_empty());
assert!(scheduler::Pallet::<Test>::claim_queue_is_empty());

assert!(pallet::OnChainVotes::<Test>::get().is_none());

Expand Down Expand Up @@ -372,7 +372,7 @@ mod enter {
let mut inherent_data = InherentData::new();
inherent_data.put_data(PARACHAINS_INHERENT_IDENTIFIER, &scenario.data).unwrap();

assert!(!scheduler::Pallet::<Test>::claimqueue_is_empty());
assert!(!scheduler::Pallet::<Test>::claim_queue_is_empty());

// The right candidates have been filtered out (the ones for cores 0,4,5)
assert_eq!(
Expand Down Expand Up @@ -618,7 +618,7 @@ mod enter {
.unwrap();

// The current schedule is empty prior to calling `create_inherent_enter`.
assert!(scheduler::Pallet::<Test>::claimqueue_is_empty());
assert!(scheduler::Pallet::<Test>::claim_queue_is_empty());

let multi_dispute_inherent_data =
Pallet::<Test>::create_inherent_inner(&inherent_data.clone()).unwrap();
Expand Down Expand Up @@ -690,7 +690,7 @@ mod enter {
.unwrap();

// The current schedule is empty prior to calling `create_inherent_enter`.
assert!(scheduler::Pallet::<Test>::claimqueue_is_empty());
assert!(scheduler::Pallet::<Test>::claim_queue_is_empty());

let limit_inherent_data =
Pallet::<Test>::create_inherent_inner(&inherent_data.clone()).unwrap();
Expand Down Expand Up @@ -762,7 +762,7 @@ mod enter {
.unwrap();

// The current schedule is empty prior to calling `create_inherent_enter`.
assert!(scheduler::Pallet::<Test>::claimqueue_is_empty());
assert!(scheduler::Pallet::<Test>::claim_queue_is_empty());

// Nothing is filtered out (including the backed candidates.)
let limit_inherent_data =
Expand Down Expand Up @@ -849,7 +849,7 @@ mod enter {
.unwrap();

// The current schedule is empty prior to calling `create_inherent_enter`.
assert!(scheduler::Pallet::<Test>::claimqueue_is_empty());
assert!(scheduler::Pallet::<Test>::claim_queue_is_empty());

// Nothing is filtered out (including the backed candidates.)
let limit_inherent_data =
Expand Down Expand Up @@ -1818,7 +1818,7 @@ mod sanitizers {
]);

// Update scheduler's claimqueue with the parachains
scheduler::Pallet::<Test>::set_claimqueue(BTreeMap::from([
scheduler::Pallet::<Test>::set_claim_queue(BTreeMap::from([
(
CoreIndex::from(0),
VecDeque::from([ParasEntry::new(
Expand Down Expand Up @@ -2001,7 +2001,7 @@ mod sanitizers {
]);

// Update scheduler's claimqueue with the parachains
scheduler::Pallet::<Test>::set_claimqueue(BTreeMap::from([
scheduler::Pallet::<Test>::set_claim_queue(BTreeMap::from([
(
CoreIndex::from(0),
VecDeque::from([ParasEntry::new(
Expand Down Expand Up @@ -2542,7 +2542,7 @@ mod sanitizers {
]);

// Update scheduler's claimqueue with the parachains
scheduler::Pallet::<Test>::set_claimqueue(BTreeMap::from([
scheduler::Pallet::<Test>::set_claim_queue(BTreeMap::from([
(
CoreIndex::from(0),
VecDeque::from([ParasEntry::new(
Expand Down
4 changes: 2 additions & 2 deletions polkadot/runtime/parachains/src/runtime_api_impl/v10.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub fn availability_cores<T: initializer::Config>() -> Vec<CoreState<T::Hash, Bl
//
// At the end of a session we clear the claim queues: Without this update call, nothing would be
// scheduled to the client.
scheduler::Pallet::<T>::free_cores_and_fill_claimqueue(Vec::new(), now);
scheduler::Pallet::<T>::free_cores_and_fill_claim_queue(Vec::new(), now);

let time_out_for = scheduler::Pallet::<T>::availability_timeout_predicate();

Expand Down Expand Up @@ -305,7 +305,7 @@ pub fn validation_code<T: initializer::Config>(

/// Implementation for the `candidate_pending_availability` function of the runtime API.
#[deprecated(
note = "`candidate_pending_availability` will be removed. Use `candidates_pending_availability` to query
note = "`candidate_pending_availability` will be removed. Use `candidates_pending_availability` to query
all candidates pending availability"
)]
pub fn candidate_pending_availability<T: initializer::Config>(
Expand Down
15 changes: 12 additions & 3 deletions polkadot/runtime/parachains/src/runtime_api_impl/vstaging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

//! Put implementations of functions from staging APIs here.
use crate::{inclusion, initializer, scheduler};
use crate::{configuration, inclusion, initializer, scheduler};
use polkadot_primitives::{CommittedCandidateReceipt, CoreIndex, Id as ParaId};
use sp_runtime::traits::One;
use sp_std::{
Expand All @@ -32,12 +32,21 @@ pub fn claim_queue<T: scheduler::Config>() -> BTreeMap<CoreIndex, VecDeque<ParaI
//
// At the end of a session we clear the claim queues: Without this update call, nothing would be
// scheduled to the client.
<scheduler::Pallet<T>>::free_cores_and_fill_claimqueue(Vec::new(), now);
<scheduler::Pallet<T>>::free_cores_and_fill_claim_queue(Vec::new(), now);
let config = configuration::ActiveConfig::<T>::get();
// Extra sanity, config should already never be smaller than 1:
let n_lookahead = config.scheduler_params.lookahead.max(1);

scheduler::ClaimQueue::<T>::get()
.into_iter()
.map(|(core_index, entries)| {
(core_index, entries.into_iter().map(|e| e.para_id()).collect())
// on cores timing out internal claim queue size may be temporarily longer than it
// should be as the timed out assignment might got pushed back to an already full claim
// queue:
(
core_index,
entries.into_iter().map(|e| e.para_id()).take(n_lookahead as usize).collect(),
)
})
.collect()
}
Expand Down
Loading

0 comments on commit 9dfe0fe

Please sign in to comment.