Skip to content

Commit

Permalink
Merge pull request autonomys#338 from subspace/literally-follow-prima…
Browse files Browse the repository at this point in the history
…ry-block

Make the executor literally follow each primary block
  • Loading branch information
liuchengxu authored Apr 10, 2022
2 parents 9a338f4 + f46ffd8 commit fef53fa
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 20 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions crates/sc-consensus-subspace/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,13 @@ impl<Block: BlockT> SubspaceLink<Block> {
self.archived_segment_notification_stream.clone()
}

/// Get stream with notifications about each imported block.
pub fn imported_block_notification_stream(
&self,
) -> SubspaceNotificationStream<(NumberFor<Block>, mpsc::Sender<RootBlock>)> {
self.imported_block_notification_stream.clone()
}

/// Get blocks that are expected to be included at specified block number.
pub fn root_blocks_for_block(&self, block_number: NumberFor<Block>) -> Vec<RootBlock> {
self.root_blocks
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ sp-runtime = { version = "6.0.0", git = "https://github.com/paritytech/substrate
sp-timestamp = { version = "4.0.0-dev", git = "https://github.com/paritytech/substrate", rev = "c364008a6c7da8456e17967f55edf51e45146998" }
sp-transaction-pool = { version = "4.0.0-dev", git = "https://github.com/paritytech/substrate", rev = "c364008a6c7da8456e17967f55edf51e45146998" }
sp-trie = { version = "6.0.0", git = "https://github.com/paritytech/substrate", rev = "c364008a6c7da8456e17967f55edf51e45146998" }
subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" }
subspace-fraud-proof = { version = "0.1.0", path = "../subspace-fraud-proof" }
subspace-runtime-primitives = { version = "0.1.0", path = "../subspace-runtime-primitives" }
substrate-frame-rpc-system = { version = "4.0.0-dev", git = "https://github.com/paritytech/substrate", rev = "c364008a6c7da8456e17967f55edf51e45146998" }
Expand Down
18 changes: 17 additions & 1 deletion crates/subspace-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
pub mod rpc;

use futures::channel::mpsc;
use polkadot_overseer::{BlockInfo, Handle, Overseer};
use sc_client_api::ExecutorProvider;
use sc_consensus::BlockImport;
Expand All @@ -29,12 +30,13 @@ use sc_consensus_subspace::{
use sc_executor::{NativeElseWasmExecutor, NativeExecutionDispatch};
use sc_service::{error::Error as ServiceError, Configuration, TaskManager};
use sc_telemetry::{Telemetry, TelemetryWorker};
use sp_api::{ConstructRuntimeApi, TransactionFor};
use sp_api::{ConstructRuntimeApi, NumberFor, TransactionFor};
use sp_blockchain::HeaderBackend;
use sp_consensus::{CanAuthorWithNativeVersion, Error as ConsensusError, SelectChain};
use sp_consensus_slots::Slot;
use sp_runtime::traits::{BlakeTwo256, Block as BlockT, Header as HeaderT};
use std::sync::Arc;
use subspace_core_primitives::RootBlock;
use subspace_runtime_primitives::{
opaque::{Block, BlockId},
AccountId, Balance, Index as Nonce,
Expand Down Expand Up @@ -343,6 +345,9 @@ pub struct NewFull<C> {
pub new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
/// Block signing stream.
pub block_signing_notification_stream: SubspaceNotificationStream<BlockSigningNotification>,
/// Imported block stream.
pub imported_block_notification_stream:
SubspaceNotificationStream<(NumberFor<Block>, mpsc::Sender<RootBlock>)>,
}

/// Builds a new service for a full client.
Expand Down Expand Up @@ -396,6 +401,7 @@ where
let new_slot_notification_stream = subspace_link.new_slot_notification_stream();
let block_signing_notification_stream = subspace_link.block_signing_notification_stream();
let archived_segment_notification_stream = subspace_link.archived_segment_notification_stream();
let imported_block_notification_stream = subspace_link.imported_block_notification_stream();

if config.role.is_authority() {
let proposer_factory = sc_basic_authorship::ProposerFactory::new(
Expand Down Expand Up @@ -512,6 +518,7 @@ where
backend,
new_slot_notification_stream,
block_signing_notification_stream,
imported_block_notification_stream,
})
}

Expand All @@ -522,6 +529,10 @@ pub async fn create_overseer<RuntimeApi, ExecutorDispatch, SC>(
task_manager: &TaskManager,
select_chain: SC,
new_slot_notification_stream: SubspaceNotificationStream<NewSlotNotification>,
imported_block_notification_stream: SubspaceNotificationStream<(
NumberFor<Block>,
mpsc::Sender<RootBlock>,
)>,
) -> Result<Handle, Error>
where
RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi, ExecutorDispatch>>
Expand Down Expand Up @@ -561,6 +572,11 @@ where

let forward = polkadot_overseer::forward_events(
client,
Box::pin(
imported_block_notification_stream
.subscribe()
.then(|(block_number, _)| async move { block_number }),
),
Box::pin(new_slot_notification_stream.subscribe().then(
|slot_notification| async move {
let slot_info = slot_notification.new_slot_info;
Expand Down
1 change: 1 addition & 0 deletions cumulus/parachain-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ where
&primary_chain_full_node.task_manager,
primary_chain_full_node.select_chain.clone(),
primary_chain_full_node.new_slot_notification_stream.clone(),
primary_chain_full_node.imported_block_notification_stream.clone(),
)
.await
.map_err(|error| sc_service::Error::Other(format!("Failed to create overseer: {}", error)))?;
Expand Down
1 change: 1 addition & 0 deletions cumulus/test/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ where
&primary_chain_full_node.task_manager,
primary_chain_full_node.select_chain.clone(),
primary_chain_full_node.new_slot_notification_stream.clone(),
primary_chain_full_node.imported_block_notification_stream.clone(),
)
.await
.map_err(|error| sc_service::Error::Other(format!("Failed to create overseer: {}", error)))?;
Expand Down
70 changes: 51 additions & 19 deletions polkadot/node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ use std::{

use futures::{channel::mpsc, select, stream::FusedStream, SinkExt, StreamExt};

use sc_client_api::{BlockBackend, BlockImportNotification, BlockchainEvents};
use sc_client_api::{BlockBackend, BlockImportNotification};
use sp_api::{ApiError, ProvideRuntimeApi};
use sp_blockchain::HeaderBackend;
use sp_runtime::generic::DigestItem;
use sp_runtime::{
generic::DigestItem,
traits::{Header as HeaderT, NumberFor},
};

use cirrus_node_primitives::{CollationGenerationConfig, ExecutorSlotInfo};
use sp_executor::{BundleEquivocationProof, ExecutorApi, FraudProof, InvalidTransactionProof};
Expand Down Expand Up @@ -247,19 +250,27 @@ enum Event {

/// Glues together the [`Overseer`] and `BlockchainEvents` by forwarding
/// import and finality notifications to it.
pub async fn forward_events<P: BlockchainEvents<Block>>(
client: Arc<P>,
pub async fn forward_events<C: HeaderBackend<Block>>(
client: Arc<C>,
mut imports: impl FusedStream<Item = NumberFor<Block>> + Unpin,
mut slots: impl FusedStream<Item = ExecutorSlotInfo> + Unpin,
mut handle: Handle,
) {
let mut imports = client.import_notification_stream();

loop {
select! {
i = imports.next() => {
match i {
Some(block) => {
handle.block_imported(block.into()).await;
Some(block_number) => {
let header = client
.header(BlockId::Number(block_number))
.expect("Header of imported block must exist; qed")
.expect("Header of imported block must exist; qed");
let block = BlockInfo {
hash: header.hash(),
parent_hash: *header.parent_hash(),
number: *header.number(),
};
handle.block_imported(block).await;
}
None => break,
}
Expand Down Expand Up @@ -318,15 +329,20 @@ where
// Notify about active leaves on startup before starting the loop
for (hash, number) in std::mem::take(&mut self.leaves) {
let _ = self.active_leaves.insert(hash, number);
let update = ActivatedLeaf { hash, number };
if let Err(error) = self.update_activated_leave(update).await {
let updated_leaf = ActivatedLeaf { hash, number };
if let Err(error) = self.on_activated_leaf(updated_leaf).await {
tracing::error!(
target: LOG_TARGET,
"Collation generation processing error: {error}"
);
}
}

// TODO: remove this once the config can be initialized in [`Self::new`].
let mut config_initialized = false;
// Only a few dozens of backlog blocks.
let mut imports_backlog = Vec::new();

while let Some(msg) = self.events_rx.next().await {
match msg {
Event::MsgToSubsystem(message) => {
Expand All @@ -337,24 +353,40 @@ where
);
}
},
// TODO: we still need the context of block, e.g., executor gossips no message
// to the primary node during the major sync.
Event::BlockImported(block) => {
self.block_imported(block).await?;
if !config_initialized {
if self.config.is_some() {
// Process the backlog first once the config has been initialized.
if !imports_backlog.is_empty() {
for b in imports_backlog.drain(..) {
self.block_imported(b).await?;
}
}
config_initialized = true;
self.block_imported(block).await?;
} else {
imports_backlog.push(block);
}
} else {
self.block_imported(block).await?;
}
},
Event::NewSlot(slot_info) => {
if let Err(error) = self.update_new_slot(slot_info).await {
Event::NewSlot(slot_info) =>
if let Err(error) = self.on_new_slot(slot_info).await {
tracing::error!(
target: LOG_TARGET,
"Collation generation processing error: {error}"
);
}
},
},
}
}

Ok(())
}

async fn update_activated_leave(&self, activated_leaf: ActivatedLeaf) -> Result<(), ApiError> {
async fn on_activated_leaf(&self, activated_leaf: ActivatedLeaf) -> Result<(), ApiError> {
// follow the procedure from the guide
if let Some(config) = &self.config {
// TODO: invoke this on finalized block?
Expand All @@ -369,7 +401,7 @@ where
Ok(())
}

async fn update_new_slot(&self, slot_info: ExecutorSlotInfo) -> Result<(), ApiError> {
async fn on_new_slot(&self, slot_info: ExecutorSlotInfo) -> Result<(), ApiError> {
if let Some(config) = &self.config {
let client = &self.primary_chain_client;
let best_hash = client.info().best_hash;
Expand Down Expand Up @@ -442,13 +474,13 @@ where
},
};

let update = ActivatedLeaf { hash: block.hash, number: block.number };
let updated_leaf = ActivatedLeaf { hash: block.hash, number: block.number };

if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
debug_assert_eq!(block.number.saturating_sub(1), number);
}

if let Err(error) = self.update_activated_leave(update).await {
if let Err(error) = self.on_activated_leaf(updated_leaf).await {
tracing::error!(target: LOG_TARGET, "Collation generation processing error: {error}");
}

Expand Down

0 comments on commit fef53fa

Please sign in to comment.