Skip to content

Commit

Permalink
Syncing strategy refactoring (part 3) (paritytech#5737)
Browse files Browse the repository at this point in the history
# Description

This is a continuation of
paritytech#5666 that finally fixes
paritytech#5333.

This should allow developers to create custom syncing strategies or even
the whole syncing engine if they so desire. It also moved syncing engine
creation and addition of corresponding protocol outside
`build_network_advanced` method, which is something Bastian expressed as
desired in
paritytech#5 (comment)

Here I replaced strategy-specific types and methods in `SyncingStrategy`
trait with generic ones. Specifically `SyncingAction` is now used by all
strategies instead of strategy-specific types with conversions.
`StrategyKey` was an enum with a fixed set of options and now replaced
with an opaque type that strategies create privately and send to upper
layers as an opaque type. Requests and responses are now handled in a
generic way regardless of the strategy, which reduced and simplified
strategy API.

`PolkadotSyncingStrategy` now lives in its dedicated module (had to edit
.gitignore for this) like other strategies.

`build_network_advanced` takes generic `SyncingService` as an argument
alongside with a few other low-level types (that can probably be
extracted in the future as well) without any notion of specifics of the
way syncing is actually done. All the protocol and tasks are created
outside and not a part of the network anymore. It still adds a bunch of
protocols like for light client and some others that should eventually
be restructured making `build_network_advanced` just building generic
network and not application-specific protocols handling.

## Integration

Just like paritytech#5666
introduced `build_polkadot_syncing_strategy`, this PR introduces
`build_default_block_downloader`, but for convenience and to avoid
typical boilerplate a simpler high-level function
`build_default_syncing_engine` is added that will take care of creating
typical block downloader, syncing strategy and syncing engine, which is
what most users will be using going forward. `build_network` towards the
end of the PR was renamed to `build_network_advanced` and
`build_network`'s API was reverted to
pre-paritytech#5666, so most users
will not see much of a difference during upgrade unless they opt-in to
use new API.

## Review Notes

For `StrategyKey` I was thinking about using something like private type
and then storing `TypeId` inside instead of a static string in it, let
me know if that would preferred.

The biggest change happened to requests that different strategies make
and how their responses are handled. The most annoying thing here is
that block response decoding, in contrast to all other responses, is
dependent on request. This meant request had to be sent throughout the
system. While originally `Response` was `Vec<u8>`, I didn't want to
re-encode/decode request and response just to fit into that API, so I
ended up with `Box<dyn Any + Send>`. This allows responses to be truly
generic and each strategy will know how to downcast it back to the
concrete type when handling the response.

Import queue refactoring was needed to move `SyncingEngine` construction
out of `build_network` that awkwardly implemented for `SyncingService`,
but due to `&mut self` wasn't usable on `Arc<SyncingService>` for no
good reason. `Arc<SyncingService>` itself is of course useless, but
refactoring to replace it with just `SyncingService` was unfortunately
rejected in paritytech#5454

As usual I recommend to review this PR as a series of commits instead of
as the final diff, it'll make more sense that way.

# Checklist

* [x] My PR includes a detailed description as outlined in the
"Description" and its two subsections above.
* [x] My PR follows the [labeling requirements](

https://github.com/paritytech/polkadot-sdk/blob/master/docs/contributor/CONTRIBUTING.md#Process
) of this project (at minimum one label for `T` required)
* External contributors: ask maintainers to put the right label on your
PR.
* [x] I have made corresponding changes to the documentation (if
applicable)
  • Loading branch information
nazar-pc authored Nov 7, 2024
1 parent 3c7b9a0 commit 12d9052
Show file tree
Hide file tree
Showing 33 changed files with 1,695 additions and 1,306 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ artifacts
bin/node-template/Cargo.lock
nohup.out
polkadot_argument_parsing
polkadot.*
!docs/sdk/src/polkadot_sdk/polkadot.rs
pwasm-alloc/Cargo.lock
pwasm-libc/Cargo.lock
Expand Down
19 changes: 3 additions & 16 deletions cumulus/client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ use sc_consensus::{
use sc_network::{config::SyncMode, service::traits::NetworkService, NetworkBackend};
use sc_network_sync::SyncingService;
use sc_network_transactions::TransactionsHandlerController;
use sc_service::{
build_polkadot_syncing_strategy, Configuration, NetworkStarter, SpawnTaskHandle, TaskManager,
WarpSyncConfig,
};
use sc_service::{Configuration, NetworkStarter, SpawnTaskHandle, TaskManager, WarpSyncConfig};
use sc_telemetry::{log, TelemetryWorkerHandle};
use sc_utils::mpsc::TracingUnboundedSender;
use sp_api::ProvideRuntimeApi;
Expand Down Expand Up @@ -429,7 +426,7 @@ pub struct BuildNetworkParams<
pub async fn build_network<'a, Block, Client, RCInterface, IQ, Network>(
BuildNetworkParams {
parachain_config,
mut net_config,
net_config,
client,
transaction_pool,
para_id,
Expand Down Expand Up @@ -500,16 +497,6 @@ where
parachain_config.prometheus_config.as_ref().map(|config| &config.registry),
);

let syncing_strategy = build_polkadot_syncing_strategy(
parachain_config.protocol_id(),
parachain_config.chain_spec.fork_id(),
&mut net_config,
warp_sync_config,
client.clone(),
&spawn_handle,
parachain_config.prometheus_config.as_ref().map(|config| &config.registry),
)?;

sc_service::build_network(sc_service::BuildNetworkParams {
config: parachain_config,
net_config,
Expand All @@ -518,7 +505,7 @@ where
spawn_handle,
import_queue,
block_announce_validator_builder: Some(Box::new(move |_| block_announce_validator)),
syncing_strategy,
warp_sync_config,
block_relay: None,
metrics,
})
Expand Down
16 changes: 3 additions & 13 deletions cumulus/polkadot-omni-node/lib/src/nodes/manual_seal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use cumulus_primitives_core::ParaId;
use sc_consensus::{DefaultImportQueue, LongestChain};
use sc_consensus_manual_seal::rpc::{ManualSeal, ManualSealApiServer};
use sc_network::NetworkBackend;
use sc_service::{build_polkadot_syncing_strategy, Configuration, PartialComponents, TaskManager};
use sc_service::{Configuration, PartialComponents, TaskManager};
use sc_telemetry::TelemetryHandle;
use sp_runtime::traits::Header;
use std::{marker::PhantomData, sync::Arc};
Expand Down Expand Up @@ -85,24 +85,14 @@ impl<NodeSpec: NodeSpecT> ManualSealNode<NodeSpec> {
// Since this is a dev node, prevent it from connecting to peers.
config.network.default_peers_set.in_peers = 0;
config.network.default_peers_set.out_peers = 0;
let mut net_config = sc_network::config::FullNetworkConfiguration::<_, _, Net>::new(
let net_config = sc_network::config::FullNetworkConfiguration::<_, _, Net>::new(
&config.network,
config.prometheus_config.as_ref().map(|cfg| cfg.registry.clone()),
);
let metrics = Net::register_notification_metrics(
config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
);

let syncing_strategy = build_polkadot_syncing_strategy(
config.protocol_id(),
config.chain_spec.fork_id(),
&mut net_config,
None,
client.clone(),
&task_manager.spawn_handle(),
config.prometheus_config.as_ref().map(|config| &config.registry),
)?;

let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
Expand All @@ -112,7 +102,7 @@ impl<NodeSpec: NodeSpecT> ManualSealNode<NodeSpec> {
import_queue,
net_config,
block_announce_validator_builder: None,
syncing_strategy,
warp_sync_config: None,
block_relay: None,
metrics,
})?;
Expand Down
14 changes: 2 additions & 12 deletions polkadot/node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration};
use prometheus_endpoint::Registry;
#[cfg(feature = "full-node")]
use sc_service::KeystoreContainer;
use sc_service::{build_polkadot_syncing_strategy, RpcHandlers, SpawnTaskHandle};
use sc_service::{RpcHandlers, SpawnTaskHandle};
use sc_telemetry::TelemetryWorker;
#[cfg(feature = "full-node")]
use sc_telemetry::{Telemetry, TelemetryWorkerHandle};
Expand Down Expand Up @@ -1003,16 +1003,6 @@ pub fn new_full<
})
};

let syncing_strategy = build_polkadot_syncing_strategy(
config.protocol_id(),
config.chain_spec.fork_id(),
&mut net_config,
Some(WarpSyncConfig::WithProvider(warp_sync)),
client.clone(),
&task_manager.spawn_handle(),
config.prometheus_config.as_ref().map(|config| &config.registry),
)?;

let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
Expand All @@ -1022,7 +1012,7 @@ pub fn new_full<
spawn_handle: task_manager.spawn_handle(),
import_queue,
block_announce_validator_builder: None,
syncing_strategy,
warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)),
block_relay: None,
metrics,
})?;
Expand Down
25 changes: 25 additions & 0 deletions prdoc/pr_5737.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
title: Make syncing service an argument of `build_network`

doc:
- audience: Node Dev
description: |
`build_network` is accompanied with lower-level `build_network_advanced` with simpler API that does not create
syncing engine internally, but instead takes a handle to syncing service as an argument. In most cases typical
syncing engine with polkadot syncing strategy and default block downloader can be created with newly introduced
`sc_service::build_default_syncing_engine()` function, but lower-level `build_default_block_downloader` also
exists for those needing more customization.

These changes allow developers higher than ever control over syncing implementation, but `build_network` is still
available for easier high-level usage.

crates:
- name: cumulus-client-service
bump: patch
- name: polkadot-service
bump: patch
- name: sc-consensus
bump: major
- name: sc-service
bump: major
- name: sc-network-sync
bump: major
13 changes: 1 addition & 12 deletions substrate/bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use frame_system_rpc_runtime_api::AccountNonceApi;
use futures::prelude::*;
use kitchensink_runtime::RuntimeApi;
use node_primitives::Block;
use polkadot_sdk::sc_service::build_polkadot_syncing_strategy;
use sc_client_api::{Backend, BlockBackend};
use sc_consensus_babe::{self, SlotProportion};
use sc_network::{
Expand Down Expand Up @@ -514,16 +513,6 @@ pub fn new_full_base<N: NetworkBackend<Block, <Block as BlockT>::Hash>>(
Vec::default(),
));

let syncing_strategy = build_polkadot_syncing_strategy(
config.protocol_id(),
config.chain_spec.fork_id(),
&mut net_config,
Some(WarpSyncConfig::WithProvider(warp_sync)),
client.clone(),
&task_manager.spawn_handle(),
config.prometheus_config.as_ref().map(|config| &config.registry),
)?;

let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
Expand All @@ -533,7 +522,7 @@ pub fn new_full_base<N: NetworkBackend<Block, <Block as BlockT>::Hash>>(
spawn_handle: task_manager.spawn_handle(),
import_queue,
block_announce_validator_builder: None,
syncing_strategy,
warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)),
block_relay: None,
metrics,
})?;
Expand Down
14 changes: 7 additions & 7 deletions substrate/client/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub trait Verifier<B: BlockT>: Send + Sync {
///
/// The `import_*` methods can be called in order to send elements for the import queue to verify.
pub trait ImportQueueService<B: BlockT>: Send {
/// Import bunch of blocks, every next block must be an ancestor of the previous block in the
/// Import a bunch of blocks, every next block must be an ancestor of the previous block in the
/// list.
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>);

Expand All @@ -132,21 +132,21 @@ pub trait ImportQueue<B: BlockT>: Send {
/// This method should behave in a way similar to `Future::poll`. It can register the current
/// task and notify later when more actions are ready to be polled. To continue the comparison,
/// it is as if this method always returned `Poll::Pending`.
fn poll_actions(&mut self, cx: &mut futures::task::Context, link: &mut dyn Link<B>);
fn poll_actions(&mut self, cx: &mut futures::task::Context, link: &dyn Link<B>);

/// Start asynchronous runner for import queue.
///
/// Takes an object implementing [`Link`] which allows the import queue to
/// influence the synchronization process.
async fn run(self, link: Box<dyn Link<B>>);
async fn run(self, link: &dyn Link<B>);
}

/// Hooks that the verification queue can use to influence the synchronization
/// algorithm.
pub trait Link<B: BlockT>: Send {
pub trait Link<B: BlockT>: Send + Sync {
/// Batch of blocks imported, with or without error.
fn blocks_processed(
&mut self,
&self,
_imported: usize,
_count: usize,
_results: Vec<(BlockImportResult<B>, B::Hash)>,
Expand All @@ -155,7 +155,7 @@ pub trait Link<B: BlockT>: Send {

/// Justification import result.
fn justification_imported(
&mut self,
&self,
_who: RuntimeOrigin,
_hash: &B::Hash,
_number: NumberFor<B>,
Expand All @@ -164,7 +164,7 @@ pub trait Link<B: BlockT>: Send {
}

/// Request a justification for the given block.
fn request_justification(&mut self, _hash: &B::Hash, _number: NumberFor<B>) {}
fn request_justification(&self, _hash: &B::Hash, _number: NumberFor<B>) {}
}

/// Block import successful result.
Expand Down
29 changes: 15 additions & 14 deletions substrate/client/consensus/common/src/import_queue/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl<B: BlockT> ImportQueue<B> for BasicQueue<B> {
}

/// Poll actions from network.
fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link<B>) {
fn poll_actions(&mut self, cx: &mut Context, link: &dyn Link<B>) {
if self.result_port.poll_actions(cx, link).is_err() {
log::error!(
target: LOG_TARGET,
Expand All @@ -190,9 +190,9 @@ impl<B: BlockT> ImportQueue<B> for BasicQueue<B> {
///
/// Takes an object implementing [`Link`] which allows the import queue to
/// influence the synchronization process.
async fn run(mut self, mut link: Box<dyn Link<B>>) {
async fn run(mut self, link: &dyn Link<B>) {
loop {
if let Err(_) = self.result_port.next_action(&mut *link).await {
if let Err(_) = self.result_port.next_action(link).await {
log::error!(target: "sync", "poll_actions: Background import task is no longer alive");
return
}
Expand Down Expand Up @@ -223,7 +223,7 @@ mod worker_messages {
async fn block_import_process<B: BlockT>(
mut block_import: BoxBlockImport<B>,
verifier: impl Verifier<B>,
mut result_sender: BufferedLinkSender<B>,
result_sender: BufferedLinkSender<B>,
mut block_import_receiver: TracingUnboundedReceiver<worker_messages::ImportBlocks<B>>,
metrics: Option<Metrics>,
) {
Expand Down Expand Up @@ -501,6 +501,7 @@ mod tests {
import_queue::Verifier,
};
use futures::{executor::block_on, Future};
use parking_lot::Mutex;
use sp_test_primitives::{Block, BlockNumber, Hash, Header};

#[async_trait::async_trait]
Expand Down Expand Up @@ -558,29 +559,29 @@ mod tests {

#[derive(Default)]
struct TestLink {
events: Vec<Event>,
events: Mutex<Vec<Event>>,
}

impl Link<Block> for TestLink {
fn blocks_processed(
&mut self,
&self,
_imported: usize,
_count: usize,
results: Vec<(Result<BlockImportStatus<BlockNumber>, BlockImportError>, Hash)>,
) {
if let Some(hash) = results.into_iter().find_map(|(r, h)| r.ok().map(|_| h)) {
self.events.push(Event::BlockImported(hash));
self.events.lock().push(Event::BlockImported(hash));
}
}

fn justification_imported(
&mut self,
&self,
_who: RuntimeOrigin,
hash: &Hash,
_number: BlockNumber,
_success: bool,
) {
self.events.push(Event::JustificationImported(*hash))
self.events.lock().push(Event::JustificationImported(*hash))
}
}

Expand Down Expand Up @@ -638,7 +639,7 @@ mod tests {
hash
};

let mut link = TestLink::default();
let link = TestLink::default();

// we send a bunch of tasks to the worker
let block1 = import_block(1);
Expand All @@ -653,22 +654,22 @@ mod tests {

// we poll the worker until we have processed 9 events
block_on(futures::future::poll_fn(|cx| {
while link.events.len() < 9 {
while link.events.lock().len() < 9 {
match Future::poll(Pin::new(&mut worker), cx) {
Poll::Pending => {},
Poll::Ready(()) => panic!("import queue worker should not conclude."),
}

result_port.poll_actions(cx, &mut link).unwrap();
result_port.poll_actions(cx, &link).unwrap();
}

Poll::Ready(())
}));

// all justification tasks must be done before any block import work
assert_eq!(
link.events,
vec![
&*link.events.lock(),
&[
Event::JustificationImported(justification1),
Event::JustificationImported(justification2),
Event::JustificationImported(justification3),
Expand Down
Loading

0 comments on commit 12d9052

Please sign in to comment.