Skip to content

Commit

Permalink
fix: don't stop the node abruptly when advancing blocks
Browse files Browse the repository at this point in the history
Signed-off-by: ljedrz <[email protected]>
  • Loading branch information
ljedrz committed Dec 12, 2023
1 parent bb2926c commit bf38ef7
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 45 deletions.
3 changes: 2 additions & 1 deletion .integration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ mod tests {
// Perform the sync.
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let completed_height = sync_ledger_with_cdn(TEST_BASE_URL, ledger.clone(), None).await.unwrap();
let completed_height =
sync_ledger_with_cdn(TEST_BASE_URL, ledger.clone(), Default::default()).await.unwrap();
assert_eq!(completed_height, ledger.latest_height());
});
}
Expand Down
5 changes: 4 additions & 1 deletion cli/src/commands/developer/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,12 @@ impl Scan {
// Construct the runtime.
let rt = tokio::runtime::Runtime::new()?;

// Create a placeholder shutdown flag.
let _sf = Default::default();

// Scan the blocks via the CDN.
rt.block_on(async move {
let _ = snarkos_node_cdn::load_blocks(&cdn, cdn_request_start, Some(cdn_request_end), None, move |block| {
let _ = snarkos_node_cdn::load_blocks(&cdn, cdn_request_start, Some(cdn_request_end), _sf, move |block| {
// Check if the block is within the requested range.
if block.height() < start_height || block.height() > end_height {
return Ok(());
Expand Down
17 changes: 14 additions & 3 deletions node/bft/ledger-service/src/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,27 @@ use snarkvm::{
};

use indexmap::IndexMap;
use std::{fmt, ops::Range, sync::Arc};
use std::{
fmt,
ops::Range,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};

/// A core ledger service.
pub struct CoreLedgerService<N: Network, C: ConsensusStorage<N>> {
ledger: Ledger<N, C>,
coinbase_verifying_key: Arc<CoinbaseVerifyingKey<N>>,
shutdown: Arc<AtomicBool>,
}

impl<N: Network, C: ConsensusStorage<N>> CoreLedgerService<N, C> {
/// Initializes a new core ledger service.
pub fn new(ledger: Ledger<N, C>) -> Self {
pub fn new(ledger: Ledger<N, C>, shutdown: Arc<AtomicBool>) -> Self {
let coinbase_verifying_key = Arc::new(ledger.coinbase_puzzle().coinbase_verifying_key().clone());
Self { ledger, coinbase_verifying_key }
Self { ledger, coinbase_verifying_key, shutdown }
}
}

Expand Down Expand Up @@ -283,6 +291,9 @@ impl<N: Network, C: ConsensusStorage<N>> LedgerService<N> for CoreLedgerService<
/// Adds the given block as the next block in the ledger.
#[cfg(feature = "ledger-write")]
fn advance_to_next_block(&self, block: &Block<N>) -> Result<()> {
if self.shutdown.load(Ordering::Relaxed) {
bail!("The node is shutting down; no longer advancing blocks.");
}
self.ledger.advance_to_next_block(block)?;
tracing::info!("\n\nAdvanced to block {} at round {} - {}\n", block.height(), block.round(), block.hash());
Ok(())
Expand Down
10 changes: 7 additions & 3 deletions node/bft/ledger-service/src/translucent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ use snarkvm::{
},
prelude::{narwhal::BatchCertificate, Field, Network, Result},
};
use std::{fmt, ops::Range};
use std::{
fmt,
ops::Range,
sync::{atomic::AtomicBool, Arc},
};

pub struct TranslucentLedgerService<N: Network, C: ConsensusStorage<N>> {
inner: CoreLedgerService<N, C>,
Expand All @@ -41,8 +45,8 @@ impl<N: Network, C: ConsensusStorage<N>> fmt::Debug for TranslucentLedgerService

impl<N: Network, C: ConsensusStorage<N>> TranslucentLedgerService<N, C> {
/// Initializes a new ledger service wrapper.
pub fn new(ledger: Ledger<N, C>) -> Self {
Self { inner: CoreLedgerService::new(ledger) }
pub fn new(ledger: Ledger<N, C>, shutdown: Arc<AtomicBool>) -> Self {
Self { inner: CoreLedgerService::new(ledger, shutdown) }
}
}

Expand Down
2 changes: 1 addition & 1 deletion node/bft/tests/common/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl TestNetwork {
for (id, account) in accounts.into_iter().enumerate() {
let mut rng = TestRng::fixed(id as u64);
let gen_ledger = genesis_ledger(gen_key, committee.clone(), balances.clone(), &mut rng);
let ledger = Arc::new(TranslucentLedgerService::new(gen_ledger));
let ledger = Arc::new(TranslucentLedgerService::new(gen_ledger, Default::default()));
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), MAX_GC_ROUNDS);

let (primary, bft) = if config.bft {
Expand Down
21 changes: 11 additions & 10 deletions node/cdn/src/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ const NETWORK_ID: u16 = 3;
pub async fn sync_ledger_with_cdn<N: Network, C: ConsensusStorage<N>>(
base_url: &str,
ledger: Ledger<N, C>,
stop_sync: Option<Arc<AtomicBool>>,
shutdown: Arc<AtomicBool>,
) -> Result<u32, (u32, anyhow::Error)> {
// Fetch the node height.
let start_height = ledger.latest_height() + 1;
// Load the blocks from the CDN into the ledger.
let ledger_clone = ledger.clone();
let result = load_blocks(base_url, start_height, None, stop_sync, move |block: Block<N>| {
let result = load_blocks(base_url, start_height, None, shutdown, move |block: Block<N>| {
ledger_clone.advance_to_next_block(&block)
})
.await;
Expand Down Expand Up @@ -95,7 +95,7 @@ pub async fn load_blocks<N: Network>(
base_url: &str,
start_height: u32,
end_height: Option<u32>,
stop_sync: Option<Arc<AtomicBool>>,
shutdown: Arc<AtomicBool>,
process: impl FnMut(Block<N>) -> Result<()> + Clone + Send + Sync + 'static,
) -> Result<u32, (u32, anyhow::Error)> {
// If the network is not supported, return.
Expand Down Expand Up @@ -154,12 +154,13 @@ pub async fn load_blocks<N: Network>(

futures::stream::iter(cdn_range.clone().step_by(BLOCKS_PER_FILE as usize))
.map(|start| {
// If signalled to stop the sync, log it and exit.
if let Some(stop_sync) = &stop_sync {
if stop_sync.load(Ordering::Relaxed) {
info!("Stopping block sync");
std::process::exit(0);
}
// Stop syncing if the shutdown has begun.
if shutdown.load(Ordering::Relaxed) {
info!("Stopping block sync");
// Calling it from here isn't ideal, but the CDN sync happens before
// the node is even initialized, so it doesn't result in any other
// functionalities being shut down abruptly.
std::process::exit(0);
}

// Prepare the end height.
Expand Down Expand Up @@ -430,7 +431,7 @@ mod tests {

let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let completed_height = load_blocks(TEST_BASE_URL, start, end, None, process).await.unwrap();
let completed_height = load_blocks(TEST_BASE_URL, start, end, Default::default(), process).await.unwrap();
assert_eq!(blocks.read().len(), expected);
if expected > 0 {
assert_eq!(blocks.read().last().unwrap().height(), completed_height);
Expand Down
13 changes: 7 additions & 6 deletions node/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
cdn: Option<String>,
dev: Option<u16>,
) -> Result<Self> {
// Prepare a flag to stop CDN syncing in case of interruption via Ctrl-C.
let stop_sync = if cdn.is_some() { Some(Default::default()) } else { None };
// Prepare the shutdown flag.
let shutdown: Arc<AtomicBool> = Default::default();

// Initialize the signal handler.
let signal_node = Self::handle_signals(stop_sync.clone());
let signal_node = Self::handle_signals(shutdown.clone());

// Initialize the ledger.
let ledger = Ledger::<N, C>::load(genesis.clone(), dev)?;
Expand All @@ -95,15 +95,16 @@ impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
// Initialize the CDN.
if let Some(base_url) = cdn {
// Sync the ledger with the CDN.
if let Err((_, error)) = snarkos_node_cdn::sync_ledger_with_cdn(&base_url, ledger.clone(), stop_sync).await
if let Err((_, error)) =
snarkos_node_cdn::sync_ledger_with_cdn(&base_url, ledger.clone(), shutdown.clone()).await
{
crate::log_clean_error(dev);
return Err(error);
}
}

// Initialize the ledger service.
let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone()));
let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone(), shutdown.clone()));
// Initialize the sync module.
let sync = BlockSync::new(BlockSyncMode::Router, ledger_service.clone());

Expand All @@ -128,7 +129,7 @@ impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
genesis,
coinbase_puzzle,
handles: Default::default(),
shutdown: Default::default(),
shutdown,
};

// Initialize the REST server.
Expand Down
7 changes: 5 additions & 2 deletions node/src/prover/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,11 @@ impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
genesis: Block<N>,
dev: Option<u16>,
) -> Result<Self> {
// Prepare the shutdown flag.
let shutdown: Arc<AtomicBool> = Default::default();

// Initialize the signal handler.
let signal_node = Self::handle_signals(None);
let signal_node = Self::handle_signals(shutdown.clone());

// Initialize the ledger service.
let ledger_service = Arc::new(ProverLedgerService::new());
Expand Down Expand Up @@ -123,7 +126,7 @@ impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
puzzle_instances: Default::default(),
max_puzzle_instances: u8::try_from(max_puzzle_instances)?,
handles: Default::default(),
shutdown: Default::default(),
shutdown,
_phantom: Default::default(),
};
// Initialize the routing.
Expand Down
29 changes: 17 additions & 12 deletions node/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ use snarkos_node_router::{messages::NodeType, Routing};
use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey};

use once_cell::sync::OnceCell;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};

#[async_trait]
Expand Down Expand Up @@ -49,9 +52,9 @@ pub trait NodeInterface<N: Network>: Routing<N> {
}

/// Handles OS signals for the node to intercept and perform a clean shutdown.
/// The optional `stop_sync` flag can be used to cleanly terminate the syncing process.
/// The optional `shutdown_flag` flag can be used to cleanly terminate the syncing process.
/// Note: Only Ctrl-C is supported; it should work on both Unix-family systems and Windows.
fn handle_signals(stop_sync: Option<Arc<AtomicBool>>) -> Arc<OnceCell<Self>> {
fn handle_signals(shutdown_flag: Arc<AtomicBool>) -> Arc<OnceCell<Self>> {
// In order for the signal handler to be started as early as possible, a reference to the node needs
// to be passed to it at a later time.
let node: Arc<OnceCell<Self>> = Default::default();
Expand All @@ -60,16 +63,18 @@ pub trait NodeInterface<N: Network>: Routing<N> {
tokio::task::spawn(async move {
match tokio::signal::ctrl_c().await {
Ok(()) => {
// Check if the `stop_sync` flag is present.
if let Some(flag) = stop_sync {
// Set it to notify the sync that it should halt.
flag.store(true, Ordering::Relaxed);
// For simplicity, the shutdown will be triggered from within sync internals.
return;
}
// If the node is already available, shut it down.
if let Some(node) = node_clone.get() {
node.shut_down().await;
} else {
// If the node is not available yet, set the shutdown flag individually.
shutdown_flag.store(true, Ordering::Relaxed);
}

// A best-effort attempt to let any ongoing activity conclude.
tokio::time::sleep(Duration::from_secs(3)).await;

// Terminate the process.
std::process::exit(0);
}
Err(error) => error!("tokio::signal::ctrl_c encountered an error: {}", error),
Expand Down
13 changes: 7 additions & 6 deletions node/src/validator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
cdn: Option<String>,
dev: Option<u16>,
) -> Result<Self> {
// Prepare a flag to stop CDN syncing in case of interruption via Ctrl-C.
let stop_sync = if cdn.is_some() { Some(Default::default()) } else { None };
// Prepare the shutdown flag.
let shutdown: Arc<AtomicBool> = Default::default();

// Initialize the signal handler.
let signal_node = Self::handle_signals(stop_sync.clone());
let signal_node = Self::handle_signals(shutdown.clone());

// Initialize the ledger.
let ledger = Ledger::load(genesis, dev)?;
Expand All @@ -95,15 +95,16 @@ impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
// Initialize the CDN.
if let Some(base_url) = cdn {
// Sync the ledger with the CDN.
if let Err((_, error)) = snarkos_node_cdn::sync_ledger_with_cdn(&base_url, ledger.clone(), stop_sync).await
if let Err((_, error)) =
snarkos_node_cdn::sync_ledger_with_cdn(&base_url, ledger.clone(), shutdown.clone()).await
{
crate::log_clean_error(dev);
return Err(error);
}
}

// Initialize the ledger service.
let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone()));
let ledger_service = Arc::new(CoreLedgerService::new(ledger.clone(), shutdown.clone()));
// Initialize the sync module.
let sync = BlockSync::new(BlockSyncMode::Gateway, ledger_service.clone());

Expand Down Expand Up @@ -133,7 +134,7 @@ impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
rest: None,
sync,
handles: Default::default(),
shutdown: Default::default(),
shutdown,
};
// Initialize the transaction pool.
node.initialize_transaction_pool(dev)?;
Expand Down

0 comments on commit bf38ef7

Please sign in to comment.