diff --git a/Cargo.lock b/Cargo.lock index 1f69c32d1e..2ee308b7bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -363,20 +363,6 @@ dependencies = [ "tower-service", ] -[[package]] -name = "backoff" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" -dependencies = [ - "futures-core", - "getrandom", - "instant", - "pin-project-lite", - "rand", - "tokio", -] - [[package]] name = "backtrace" version = "0.3.69" @@ -3316,7 +3302,6 @@ name = "snarkos-node-cdn" version = "2.2.7" dependencies = [ "anyhow", - "backoff", "bincode", "colored", "futures", diff --git a/node/cdn/Cargo.toml b/node/cdn/Cargo.toml index 68ada00a96..aff5c26cba 100644 --- a/node/cdn/Cargo.toml +++ b/node/cdn/Cargo.toml @@ -23,10 +23,6 @@ parallel = [ "rayon" ] [dependencies.anyhow] version = "1.0.79" -[dependencies.backoff] -version = "0.4" -features = [ "tokio" ] - [dependencies.bincode] version = "1.0" diff --git a/node/cdn/src/blocks.rs b/node/cdn/src/blocks.rs index 6477ee658e..08eb4062f4 100644 --- a/node/cdn/src/blocks.rs +++ b/node/cdn/src/blocks.rs @@ -12,6 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Avoid a false positive from clippy: +// https://github.com/rust-lang/rust-clippy/issues/6446 +#![allow(clippy::await_holding_lock)] + use snarkvm::prelude::{ block::Block, store::{cow_to_copied, ConsensusStorage}, @@ -24,13 +28,12 @@ use snarkvm::prelude::{ use anyhow::{anyhow, bail, Result}; use colored::Colorize; -use core::ops::Range; -use futures::{Future, StreamExt}; -use parking_lot::RwLock; +use parking_lot::Mutex; use reqwest::Client; use std::{ + cmp, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU32, Ordering}, Arc, }, time::{Duration, Instant}, @@ -38,6 +41,12 @@ use std::{ /// The number of blocks per file. const BLOCKS_PER_FILE: u32 = 50; +/// The desired number of concurrent requests to the CDN. +const CONCURRENT_REQUESTS: u32 = 16; +/// Maximum number of pending sync blocks. +const MAXIMUM_PENDING_BLOCKS: u32 = BLOCKS_PER_FILE * CONCURRENT_REQUESTS * 2; +/// Maximum number of attempts for a request to the CDN. +const MAXIMUM_REQUEST_ATTEMPTS: u8 = 10; /// The supported network. const NETWORK_ID: u16 = 3; @@ -103,8 +112,16 @@ pub async fn load_blocks( return Err((start_height, anyhow!("The network ({}) is not supported", N::ID))); } + // Create a Client to maintain a connection pool throughout the sync. + let client = match Client::builder().build() { + Ok(client) => client, + Err(error) => { + return Err((start_height.saturating_sub(1), anyhow!("Failed to create a CDN request client - {error}"))); + } + }; + // Fetch the CDN height. - let cdn_height = match cdn_height::(base_url).await { + let cdn_height = match cdn_height::(&client, base_url).await { Ok(cdn_height) => cdn_height, Err(error) => return Err((start_height, error)), }; @@ -118,12 +135,12 @@ pub async fn load_blocks( // If the end height is not specified, set it to the CDN height. // If the end height is greater than the CDN height, set the end height to the CDN height. - let end_height = end_height.unwrap_or(cdn_height).min(cdn_height); + let end_height = cmp::min(end_height.unwrap_or(cdn_height), cdn_height); // If the end height is less than the start height, return. if end_height < start_height { return Err(( start_height, - anyhow!("The given end height ({end_height}) must be less than the start height ({start_height})"), + anyhow!("The given end height ({end_height}) must not be less than the start height ({start_height})"), )); } @@ -131,157 +148,202 @@ pub async fn load_blocks( let cdn_start = start_height - (start_height % BLOCKS_PER_FILE); // Set the CDN end height to the given end height. let cdn_end = end_height; - // Construct the CDN range. - let cdn_range = cdn_start..cdn_end; // If the CDN range is empty, return. - if cdn_range.is_empty() { + if cdn_start >= cdn_end { return Ok(cdn_end); } - // Create a Client to maintain a connection pool throughout the sync. - let client = match Client::builder().build() { - Ok(client) => client, - Err(error) => return Err((start_height, anyhow!("Failed to create a CDN request client: {error}"))), - }; - - // A tracker for the completed block height. - let completed_height: Arc> = Arc::new(RwLock::new(start_height)); - // A tracker to indicate if the sync failed. - let failed: Arc>> = Default::default(); + // A collection of dowloaded blocks pending insertion into the ledger. + let pending_blocks: Arc>>> = Default::default(); // Start a timer. let timer = Instant::now(); - futures::stream::iter(cdn_range.clone().step_by(BLOCKS_PER_FILE as usize)) - .map(|start| { - // If the Ctrl-C handler registered the signal, then stop the sync. - if shutdown.load(Ordering::Relaxed) { - info!("Skipping block sync (at {start}) - The node is shutting down"); - // Note: Calling 'exit' from here is not ideal, but the CDN sync happens before - // the node is even initialized, so it doesn't result in any other - // functionalities being shut down abruptly. - std::process::exit(0); - } + // Spawn a background task responsible for concurrent downloads. + let pending_blocks_clone = pending_blocks.clone(); + let base_url = base_url.to_owned(); + let shutdown_clone = shutdown.clone(); + tokio::spawn(async move { + download_block_bundles(client, base_url, cdn_start, cdn_end, pending_blocks_clone, shutdown_clone).await; + }); + + // A loop for inserting the pending blocks into the ledger. + let mut current_height = start_height.saturating_sub(1); + while current_height < end_height - 1 { + // If we are instructed to shut down, abort. + if shutdown.load(Ordering::Relaxed) { + info!("Stopping block sync at {} - shutting down", current_height); + // We can shut down cleanly from here, as the node hasn't been started yet. + std::process::exit(0); + } - // Prepare the end height. - let end = start + BLOCKS_PER_FILE; + let mut candidate_blocks = pending_blocks.lock(); - // If the sync *has not* failed, log the progress. - let ctx = format!("blocks {start} to {end}"); - if failed.read().is_none() { - debug!("Requesting {ctx} (of {cdn_end})"); - } + // Obtain the height of the nearest pending block. + let Some(next_height) = candidate_blocks.first().map(|b| b.height()) else { + debug!("No pending blocks yet"); + drop(candidate_blocks); + tokio::time::sleep(Duration::from_secs(3)).await; + continue; + }; - // Download the blocks with an exponential backoff retry policy. - let client_clone = client.clone(); - let base_url_clone = base_url.to_string(); - let failed_clone = failed.clone(); - handle_dispatch_error(move || { - let ctx = ctx.clone(); - let client = client_clone.clone(); - let base_url = base_url_clone.clone(); - let failed = failed_clone.clone(); - async move { - // If the sync failed, return with an empty vector. - if failed.read().is_some() { - return std::future::ready(Ok(vec![])).await - } - // Prepare the URL. - let blocks_url = format!("{base_url}/{start}.{end}.blocks"); - // Fetch the blocks. - let blocks: Vec> = match cdn_get(client, &blocks_url, &ctx).await { - Ok(blocks) => blocks, - Err(error) => { - error!("Failed to request {ctx} - {error}"); - failed.write().replace(error); - return std::future::ready(Ok(vec![])).await - } - }; - // Return the blocks. - std::future::ready(Ok(blocks)).await + // Wait if the nearest pending block is not the next one that can be inserted. + if next_height > current_height + 1 { + // There is a gap in pending blocks, we need to wait. + debug!("Waiting for the first relevant blocks ({} pending)", candidate_blocks.len()); + drop(candidate_blocks); + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } + + // Obtain the first BLOCKS_PER_FILE applicable blocks. + let retained_blocks = candidate_blocks.split_off(BLOCKS_PER_FILE as usize); + let next_blocks = std::mem::replace(&mut *candidate_blocks, retained_blocks); + drop(candidate_blocks); + + // Attempt to advance the ledger using the CDN block bundle. + let mut process_clone = process.clone(); + let shutdown_clone = shutdown.clone(); + current_height = tokio::task::spawn_blocking(move || { + for block in next_blocks.into_iter().filter(|b| (start_height..end_height).contains(&b.height())) { + // If we are instructed to shut down, abort. + if shutdown_clone.load(Ordering::Relaxed) { + info!("Stopping block sync at {} - the node is shutting down", current_height); + // We can shut down cleanly from here, as the node hasn't been started yet. + std::process::exit(0); } - }) - }) - .buffered(128) // The number of concurrent requests. - .for_each(|result| async { - // If the sync previously failed, return early. - if failed.read().is_some() { - return; + + // Insert the block into the ledger. + process_clone(block)?; + current_height += 1; + + // Log the progress. + log_progress::(timer, current_height, cdn_start, cdn_end, "block"); } - // Unwrap the blocks. - let mut blocks = match result { - Ok(blocks) => blocks, - Err(error) => { - failed.write().replace(error); - return; - } - }; + Ok(current_height) + }) + .await + .map_err(|e| (current_height, e.into()))? + .map_err(|e| (current_height, e))?; + } - // Only retain blocks that are at or above the start height and below the end height. - blocks.retain(|block| block.height() >= start_height && block.height() < end_height); + Ok(current_height) +} - #[cfg(debug_assertions)] - // Ensure the blocks are in order by height. - for (i, block) in blocks.iter().enumerate() { - if i > 0 { - assert_eq!(block.height(), blocks[i - 1].height() + 1); - } +async fn download_block_bundles( + client: Client, + base_url: String, + cdn_start: u32, + cdn_end: u32, + pending_blocks: Arc>>>, + shutdown: Arc, +) { + // Keep track of the number of concurrent requests. + let active_requests: Arc = Default::default(); + + let mut start = cdn_start; + while start < cdn_end - 1 { + // If we are instructed to shut down, stop downloading. + if shutdown.load(Ordering::Relaxed) { + break; + } + + // Avoid collecting too many blocks in order to restrict memory use. + let num_pending_blocks = pending_blocks.lock().len(); + if num_pending_blocks >= MAXIMUM_PENDING_BLOCKS as usize { + debug!("Maximum number of pending blocks reached ({num_pending_blocks}), waiting..."); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + + // The number of concurrent requests is maintained at CONCURRENT_REQUESTS, unless the maximum + // number of pending blocks may be breached. + let active_request_count = active_requests.load(Ordering::Relaxed); + let num_requests = + cmp::min(CONCURRENT_REQUESTS, (MAXIMUM_PENDING_BLOCKS - num_pending_blocks as u32) / BLOCKS_PER_FILE) + .saturating_sub(active_request_count); + + // Spawn concurrent requests for bundles of blocks. + for i in 0..num_requests { + let start = start + i * BLOCKS_PER_FILE; + let end = start + BLOCKS_PER_FILE; + + // If this request would breach the upper limit, stop downloading. + if end > cdn_end + BLOCKS_PER_FILE { + debug!("Finishing network requests to the CDN..."); + break; } - // Use blocking tasks, as deserialization and adding blocks are expensive operations. - let mut process_clone = process.clone(); - let cdn_range_clone = cdn_range.clone(); - let completed_height_clone = completed_height.clone(); - let failed_clone = failed.clone(); - let result = tokio::task::spawn_blocking(move || { - // Fetch the last height in the blocks. - let curr_height = blocks.last().map(|block| block.height()).unwrap_or(start_height); - - // Process each of the blocks. - for block in blocks { - // Retrieve the block height. - let block_height = block.height(); - - // If the sync failed, set the failed flag, and return. - if let Err(error) = process_clone(block) { - let error = anyhow!("Failed to process block {block_height}: {error}"); - failed_clone.write().replace(error); - return; - } + let client_clone = client.clone(); + let base_url_clone = base_url.clone(); + let pending_blocks_clone = pending_blocks.clone(); + let active_requests_clone = active_requests.clone(); + let shutdown_clone = shutdown.clone(); + tokio::spawn(async move { + // Increment the number of active requests. + active_requests_clone.fetch_add(1, Ordering::Relaxed); + + let ctx = format!("blocks {start} to {end}"); + debug!("Requesting {ctx} (of {cdn_end})"); - // On success, update the completed height. - *completed_height_clone.write() = block_height; + // Prepare the URL. + let blocks_url = format!("{base_url_clone}/{start}.{end}.blocks"); + let ctx = format!("blocks {start} to {end}"); + // Download blocks, retrying on failure. + let mut attempts = 0; + let request_time = Instant::now(); + + loop { + // Fetch the blocks. + match cdn_get(client_clone.clone(), &blocks_url, &ctx).await { + Ok::>, _>(blocks) => { + // Keep the collection of pending blocks sorted by the height. + let mut pending_blocks = pending_blocks_clone.lock(); + for block in blocks { + match pending_blocks.binary_search_by_key(&block.height(), |b| b.height()) { + Ok(_idx) => warn!("Found a duplicate pending block at height {}", block.height()), + Err(idx) => pending_blocks.insert(idx, block), + } + } + debug!("Received {ctx} {}", format!("(in {:.2?})", request_time.elapsed()).dimmed()); + break; + } + Err(error) => { + // Increment the attempt counter, and wait with a linear backoff, or abort in + // case the maximum number of attempts has been breached. + attempts += 1; + if attempts > MAXIMUM_REQUEST_ATTEMPTS { + warn!("Maximum number of requests to {blocks_url} reached - shutting down..."); + shutdown_clone.store(true, Ordering::Relaxed); + break; + } + tokio::time::sleep(Duration::from_secs(attempts as u64)).await; + warn!("{error} - retrying ({attempts} attempt(s) so far)"); + } + } } - // Log the progress. - log_progress::(timer, curr_height, &cdn_range_clone, "block"); - }).await; + // Decrement the number of active requests. + active_requests_clone.fetch_sub(1, Ordering::Relaxed); + }); + } - // If the sync failed, set the failed flag. - if let Err(error) = result { - let error = anyhow!("Failed to process blocks: {error}"); - failed.write().replace(error); - } - }) - .await; - - // Retrieve the successfully completed height (does not include failed blocks). - let completed = *completed_height.read(); - // Return the result. - match Arc::try_unwrap(failed).unwrap().into_inner() { - // If the sync failed, return the completed height along with the error. - Some(error) => Err((completed, error)), - // Otherwise, return the completed height. - None => Ok(completed), + // Increase the starting block height for the subsequent requests. + start += BLOCKS_PER_FILE * num_requests; + + // A short sleep in order to allow some block processing to happen in the meantime. + tokio::time::sleep(Duration::from_secs(1)).await; } + + debug!("Finished network requests to the CDN"); } /// Retrieves the CDN height with the given base URL. /// /// Note: This function decrements the tip by a few blocks, to ensure the /// tip is not on a block that is not yet available on the CDN. -async fn cdn_height(base_url: &str) -> Result { +async fn cdn_height(client: &Client, base_url: &str) -> Result { // A representation of the 'latest.json' file object. #[derive(Deserialize, Serialize, Debug)] struct LatestState { @@ -289,32 +351,27 @@ async fn cdn_height(base_url: &str) -> Result { inclusive_height: u32, hash: String, } - // Create a request client. - let client = match reqwest::Client::builder().build() { - Ok(client) => client, - Err(error) => bail!("Failed to create a CDN request client: {error}"), - }; // Prepare the URL. let latest_json_url = format!("{base_url}/latest.json"); // Send the request. let response = match client.get(latest_json_url).send().await { Ok(response) => response, - Err(error) => bail!("Failed to fetch the CDN height: {error}"), + Err(error) => bail!("Failed to fetch the CDN height - {error}"), }; // Parse the response. let bytes = match response.bytes().await { Ok(bytes) => bytes, - Err(error) => bail!("Failed to parse the CDN height response: {error}"), + Err(error) => bail!("Failed to parse the CDN height response - {error}"), }; // Parse the bytes for the string. let latest_state_string = match bincode::deserialize::(&bytes) { Ok(string) => string, - Err(error) => bail!("Failed to deserialize the CDN height response: {error}"), + Err(error) => bail!("Failed to deserialize the CDN height response - {error}"), }; // Parse the string for the tip. let tip = match serde_json::from_str::(&latest_state_string) { Ok(latest) => latest.exclusive_height, - Err(error) => bail!("Failed to extract the CDN height response: {error}"), + Err(error) => bail!("Failed to extract the CDN height response - {error}"), }; // Decrement the tip by a few blocks to ensure the CDN is caught up. let tip = tip.saturating_sub(10); @@ -327,18 +384,18 @@ async fn cdn_get(client: Client, url: &str // Fetch the bytes from the given URL. let response = match client.get(url).send().await { Ok(response) => response, - Err(error) => bail!("Failed to fetch {ctx}: {error}"), + Err(error) => bail!("Failed to fetch {ctx} - {error}"), }; // Parse the response. let bytes = match response.bytes().await { Ok(bytes) => bytes, - Err(error) => bail!("Failed to parse {ctx}: {error}"), + Err(error) => bail!("Failed to parse {ctx} - {error}"), }; // Parse the objects. match tokio::task::spawn_blocking(move || bincode::deserialize::(&bytes)).await { Ok(Ok(objects)) => Ok(objects), - Ok(Err(error)) => bail!("Failed to deserialize {ctx}: {error}"), - Err(error) => bail!("Failed to join task for {ctx}: {error}"), + Ok(Err(error)) => bail!("Failed to deserialize {ctx} - {error}"), + Err(error) => bail!("Failed to join task for {ctx} - {error}"), } } @@ -346,18 +403,18 @@ async fn cdn_get(client: Client, url: &str fn log_progress( timer: Instant, current_index: u32, - cdn_range: &Range, + cdn_start: u32, + mut cdn_end: u32, object_name: &str, ) { - // Prepare the CDN start and end heights. - let cdn_start = cdn_range.start; - let cdn_end = cdn_range.end; + // Subtract 1, as the end of the range is exclusive. + cdn_end -= 1; // Compute the percentage completed. let percentage = current_index * 100 / cdn_end; // Compute the number of files processed so far. let num_files_done = 1 + (current_index - cdn_start) / OBJECTS_PER_FILE; // Compute the number of files remaining. - let num_files_remaining = 1 + (cdn_end - current_index) / OBJECTS_PER_FILE; + let num_files_remaining = 1 + (cdn_end.saturating_sub(current_index)) / OBJECTS_PER_FILE; // Compute the milliseconds per file. let millis_per_file = timer.elapsed().as_millis() / num_files_done as u128; // Compute the heuristic slowdown factor (in millis). @@ -370,52 +427,16 @@ fn log_progress( info!("Synced up to {object_name} {current_index} of {cdn_end} - {percentage}% complete {}", estimate.dimmed()); } -/// Executes the given closure, with a backoff policy, and returns the result. -pub(crate) async fn handle_dispatch_error<'a, T, F>(func: impl Fn() -> F + 'a) -> anyhow::Result -where - F: Future>, -{ - use backoff::{future::retry, ExponentialBackoff}; - - fn default_backoff() -> ExponentialBackoff { - ExponentialBackoff { - max_interval: Duration::from_secs(15), - max_elapsed_time: Some(Duration::from_secs(60)), - ..Default::default() - } - } - - fn from_anyhow_err(err: anyhow::Error) -> backoff::Error { - use backoff::Error; - - if let Ok(err) = err.downcast::() { - debug!("Server error: {err}; retrying..."); - Error::Transient { err: err.into(), retry_after: None } - } else { - Error::Transient { err: anyhow!("Block parse error"), retry_after: None } - } - } - - retry(default_backoff(), || async { func().await.map_err(from_anyhow_err) }).await -} - #[cfg(test)] mod tests { use crate::{ - blocks::{cdn_get, cdn_height, handle_dispatch_error, log_progress, BLOCKS_PER_FILE}, + blocks::{cdn_get, cdn_height, log_progress, BLOCKS_PER_FILE}, load_blocks, }; use snarkvm::prelude::{block::Block, Testnet3}; - use anyhow::{anyhow, Result}; use parking_lot::RwLock; - use std::{ - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - time::Instant, - }; + use std::{sync::Arc, time::Instant}; type CurrentNetwork = Testnet3; @@ -444,10 +465,10 @@ mod tests { } #[test] - fn test_load_blocks_0_to_50() { - let start_height = 0; + fn test_load_blocks_1_to_50() { + let start_height = 1; let end_height = Some(50); - check_load_blocks(start_height, end_height, 50); + check_load_blocks(start_height, end_height, 49); } #[test] @@ -458,10 +479,10 @@ mod tests { } #[test] - fn test_load_blocks_0_to_123() { - let start_height = 0; + fn test_load_blocks_1_to_123() { + let start_height = 1; let end_height = Some(123); - check_load_blocks(start_height, end_height, 123); + check_load_blocks(start_height, end_height, 122); } #[test] @@ -474,8 +495,9 @@ mod tests { #[test] fn test_cdn_height() { let rt = tokio::runtime::Runtime::new().unwrap(); + let client = reqwest::Client::builder().build().unwrap(); rt.block_on(async { - let height = cdn_height::(TEST_BASE_URL).await.unwrap(); + let height = cdn_height::(&client, TEST_BASE_URL).await.unwrap(); assert!(height > 0); }); } @@ -495,45 +517,19 @@ mod tests { fn test_log_progress() { // This test sanity checks that basic arithmetic is correct (i.e. no divide by zero, etc.). let timer = Instant::now(); - let cdn_range = &(0..100); + let cdn_start = 0; + let cdn_end = 100; let object_name = "blocks"; - log_progress::<10>(timer, 0, cdn_range, object_name); - log_progress::<10>(timer, 10, cdn_range, object_name); - log_progress::<10>(timer, 20, cdn_range, object_name); - log_progress::<10>(timer, 30, cdn_range, object_name); - log_progress::<10>(timer, 40, cdn_range, object_name); - log_progress::<10>(timer, 50, cdn_range, object_name); - log_progress::<10>(timer, 60, cdn_range, object_name); - log_progress::<10>(timer, 70, cdn_range, object_name); - log_progress::<10>(timer, 80, cdn_range, object_name); - log_progress::<10>(timer, 90, cdn_range, object_name); - log_progress::<10>(timer, 100, cdn_range, object_name); - } - - #[test] - fn test_handle_dispatch_error() { - let counter = AtomicUsize::new(0); - - let result: Result<()> = tokio_test::block_on(handle_dispatch_error(|| async { - counter.fetch_add(1, Ordering::SeqCst); - Err(anyhow!("test error")) - })); - - assert!(result.is_err()); - assert!(counter.load(Ordering::SeqCst) >= 10); - } - - #[test] - fn test_handle_dispatch_error_success() { - let counter = AtomicUsize::new(0); - - let result = tokio_test::block_on(handle_dispatch_error(|| async { - counter.fetch_add(1, Ordering::SeqCst); - Ok(42) - })); - - assert!(result.is_ok()); - assert_eq!(result.unwrap(), 42); - assert_eq!(counter.load(Ordering::SeqCst), 1); + log_progress::<10>(timer, 0, cdn_start, cdn_end, object_name); + log_progress::<10>(timer, 10, cdn_start, cdn_end, object_name); + log_progress::<10>(timer, 20, cdn_start, cdn_end, object_name); + log_progress::<10>(timer, 30, cdn_start, cdn_end, object_name); + log_progress::<10>(timer, 40, cdn_start, cdn_end, object_name); + log_progress::<10>(timer, 50, cdn_start, cdn_end, object_name); + log_progress::<10>(timer, 60, cdn_start, cdn_end, object_name); + log_progress::<10>(timer, 70, cdn_start, cdn_end, object_name); + log_progress::<10>(timer, 80, cdn_start, cdn_end, object_name); + log_progress::<10>(timer, 90, cdn_start, cdn_end, object_name); + log_progress::<10>(timer, 100, cdn_start, cdn_end, object_name); } }