Skip to content

Commit

Permalink
chainHead: Add support for storage pagination and cancellation (parit…
Browse files Browse the repository at this point in the history
…ytech#14755)

* chainHead/api: Add `chain_head_unstable_continue` method

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/subscriptions: Register operations for pagination

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/subscriptions: Merge limits with registered operation

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/subscriptions: Expose the operation state

Signed-off-by: Alexandru Vasile <[email protected]>

* chain_head/storage: Generate WaitingForContinue event

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead: Use the continue operation

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/tests: Adjust testing to the new storage interface

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/config: Make pagination limit configurable

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/tests: Adjust chainHeadConfig

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/tests: Check pagination and continue method

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/api: Add `chainHead_unstable_stopOperation` method

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/subscription: Add shared atomic state for efficient alloc

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead: Implement operation stop

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/tests: Check that storage ops can be cancelled

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/storage: Change docs for query_storage_iter_pagination

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/subscriptions: Fix merge conflicts

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead: Replace `async-channel` with `tokio::sync`

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/subscription: Add comment about the sender/recv continue

Signed-off-by: Alexandru Vasile <[email protected]>

---------

Signed-off-by: Alexandru Vasile <[email protected]>
  • Loading branch information
lexnv authored Aug 24, 2023
1 parent 6b07b97 commit 92633bb
Show file tree
Hide file tree
Showing 6 changed files with 749 additions and 152 deletions.
27 changes: 27 additions & 0 deletions client/rpc-spec-v2/src/chain_head/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,31 @@ pub trait ChainHeadApi<Hash> {
/// This method is unstable and subject to change in the future.
#[method(name = "chainHead_unstable_unpin", blocking)]
fn chain_head_unstable_unpin(&self, follow_subscription: String, hash: Hash) -> RpcResult<()>;

/// Resumes a storage fetch started with `chainHead_storage` after it has generated an
/// `operationWaitingForContinue` event.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "chainHead_unstable_continue", blocking)]
fn chain_head_unstable_continue(
&self,
follow_subscription: String,
operation_id: String,
) -> RpcResult<()>;

/// Stops an operation started with chainHead_unstable_body, chainHead_unstable_call, or
/// chainHead_unstable_storage. If the operation was still in progress, this interrupts it. If
/// the operation was already finished, this call has no effect.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "chainHead_unstable_stopOperation", blocking)]
fn chain_head_unstable_stop_operation(
&self,
follow_subscription: String,
operation_id: String,
) -> RpcResult<()>;
}
82 changes: 63 additions & 19 deletions client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ pub struct ChainHeadConfig {
pub subscription_max_pinned_duration: Duration,
/// The maximum number of ongoing operations per subscription.
pub subscription_max_ongoing_operations: usize,
/// The maximum number of items reported by the `chainHead_storage` before
/// pagination is required.
pub operation_max_storage_items: usize,
}

/// Maximum pinned blocks across all connections.
Expand All @@ -78,12 +81,17 @@ const MAX_PINNED_DURATION: Duration = Duration::from_secs(60);
/// Note: The lower limit imposed by the spec is 16.
const MAX_ONGOING_OPERATIONS: usize = 16;

/// The maximum number of items the `chainHead_storage` can return
/// before paginations is required.
const MAX_STORAGE_ITER_ITEMS: usize = 5;

impl Default for ChainHeadConfig {
fn default() -> Self {
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: MAX_PINNED_DURATION,
subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
operation_max_storage_items: MAX_STORAGE_ITER_ITEMS,
}
}
}
Expand All @@ -100,6 +108,9 @@ pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
subscriptions: Arc<SubscriptionManagement<Block, BE>>,
/// The hexadecimal encoded hash of the genesis block.
genesis_hash: String,
/// The maximum number of items reported by the `chainHead_storage` before
/// pagination is required.
operation_max_storage_items: usize,
/// Phantom member to pin the block type.
_phantom: PhantomData<Block>,
}
Expand All @@ -124,6 +135,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
config.subscription_max_ongoing_operations,
backend,
)),
operation_max_storage_items: config.operation_max_storage_items,
genesis_hash,
_phantom: PhantomData,
}
Expand Down Expand Up @@ -232,7 +244,7 @@ where
follow_subscription: String,
hash: Block::Hash,
) -> RpcResult<MethodResponse> {
let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => return Ok(MethodResponse::LimitReached),
Expand All @@ -243,6 +255,8 @@ where
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
};

let operation_id = block_guard.operation().operation_id();

let event = match self.client.block(hash) {
Ok(Some(signed_block)) => {
let extrinsics = signed_block
Expand All @@ -252,7 +266,7 @@ where
.map(|extrinsic| hex_string(&extrinsic.encode()))
.collect();
FollowEvent::<Block::Hash>::OperationBodyDone(OperationBodyDone {
operation_id: block_guard.operation_id(),
operation_id: operation_id.clone(),
value: extrinsics,
})
},
Expand All @@ -268,16 +282,13 @@ where
return Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(error) => FollowEvent::<Block::Hash>::OperationError(OperationError {
operation_id: block_guard.operation_id(),
operation_id: operation_id.clone(),
error: error.to_string(),
}),
};

let _ = block_guard.response_sender().unbounded_send(event);
Ok(MethodResponse::Started(MethodResponseStarted {
operation_id: block_guard.operation_id(),
discarded_items: None,
}))
Ok(MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items: None }))
}

fn chain_head_unstable_header(
Expand Down Expand Up @@ -337,7 +348,7 @@ where
.transpose()?
.map(ChildInfo::new_default_from_vec);

let block_guard =
let mut block_guard =
match self.subscriptions.lock_block(&follow_subscription, hash, items.len()) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) |
Expand All @@ -349,17 +360,21 @@ where
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
};

let storage_client = ChainHeadStorage::<Client, Block, BE>::new(self.client.clone());
let operation_id = block_guard.operation_id();
let mut storage_client = ChainHeadStorage::<Client, Block, BE>::new(
self.client.clone(),
self.operation_max_storage_items,
);
let operation = block_guard.operation();
let operation_id = operation.operation_id();

// The number of operations we are allowed to execute.
let num_operations = block_guard.num_reserved();
let num_operations = operation.num_reserved();
let discarded = items.len().saturating_sub(num_operations);
let mut items = items;
items.truncate(num_operations);

let fut = async move {
storage_client.generate_events(block_guard, hash, items, child_trie);
storage_client.generate_events(block_guard, hash, items, child_trie).await;
};

self.executor
Expand All @@ -379,7 +394,7 @@ where
) -> RpcResult<MethodResponse> {
let call_parameters = Bytes::from(parse_hex_param(call_parameters)?);

let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => {
Expand All @@ -401,28 +416,26 @@ where
.into())
}

let operation_id = block_guard.operation().operation_id();
let event = self
.client
.executor()
.call(hash, &function, &call_parameters, CallContext::Offchain)
.map(|result| {
FollowEvent::<Block::Hash>::OperationCallDone(OperationCallDone {
operation_id: block_guard.operation_id(),
operation_id: operation_id.clone(),
output: hex_string(&result),
})
})
.unwrap_or_else(|error| {
FollowEvent::<Block::Hash>::OperationError(OperationError {
operation_id: block_guard.operation_id(),
operation_id: operation_id.clone(),
error: error.to_string(),
})
});

let _ = block_guard.response_sender().unbounded_send(event);
Ok(MethodResponse::Started(MethodResponseStarted {
operation_id: block_guard.operation_id(),
discarded_items: None,
}))
Ok(MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items: None }))
}

fn chain_head_unstable_unpin(
Expand All @@ -443,4 +456,35 @@ where
Err(_) => Err(ChainHeadRpcError::InvalidBlock.into()),
}
}

fn chain_head_unstable_continue(
&self,
follow_subscription: String,
operation_id: String,
) -> RpcResult<()> {
let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id) else {
return Ok(())
};

if !operation.submit_continue() {
// Continue called without generating a `WaitingForContinue` event.
Err(ChainHeadRpcError::InvalidContinue.into())
} else {
Ok(())
}
}

fn chain_head_unstable_stop_operation(
&self,
follow_subscription: String,
operation_id: String,
) -> RpcResult<()> {
let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id) else {
return Ok(())
};

operation.stop_operation();

Ok(())
}
}
Loading

0 comments on commit 92633bb

Please sign in to comment.