Skip to content

Commit

Permalink
refactor: pass Block by value, remove an unused method
Browse files Browse the repository at this point in the history
Signed-off-by: ljedrz <[email protected]>
  • Loading branch information
ljedrz committed Aug 26, 2022
1 parent 41bb295 commit 248d909
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 43 deletions.
33 changes: 3 additions & 30 deletions snarkos/ledger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
mod server;
pub use server::*;

use crate::{handle_dispatch_error, BlockDB, Data, ProgramDB};
use crate::{handle_dispatch_error, BlockDB, ProgramDB};
use snarkvm::prelude::*;

use colored::Colorize;
Expand Down Expand Up @@ -137,38 +137,11 @@ impl<N: Network> Ledger<N> {
self.ledger.write().add_to_memory_pool(transaction)
}

/// Advances the ledger to the next block.
pub async fn advance_to_next_block(self: &Arc<Self>) -> Result<Block<N>> {
let self_clone = self.clone();
let next_block = task::spawn_blocking(move || {
// Initialize an RNG.
let rng = &mut ::rand::thread_rng();
// Propose the next block.
self_clone.ledger.read().propose_next_block(&self_clone.private_key, rng)
})
.await??;

// Add the next block to the ledger.
self.add_next_block(&next_block).await?;

// Broadcast the block to all peers.
let peers = self.peers().read().clone();
for (_, sender) in peers.iter() {
let _ = sender
.send(crate::Message::<N>::BlockBroadcast(Data::Object(next_block.clone())))
.await;
}

// Return the next block.
Ok(next_block)
}

/// Attempts to add the given block to the ledger.
pub(crate) async fn add_next_block(self: &Arc<Self>, next_block: &Block<N>) -> Result<()> {
pub(crate) async fn add_next_block(self: &Arc<Self>, next_block: Block<N>) -> Result<()> {
// Add the next block to the ledger.
let self_clone = self.clone();
let next_block_clone = next_block.clone();
if let Err(error) = task::spawn_blocking(move || self_clone.ledger.write().add_next_block(&next_block_clone)).await? {
if let Err(error) = task::spawn_blocking(move || self_clone.ledger.write().add_next_block(&next_block)).await? {
// Log the error.
warn!("{error}");
return Err(error);
Expand Down
32 changes: 19 additions & 13 deletions snarkos/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,21 @@ pub(crate) async fn handle_peer<N: Network>(
// Perform deferred deserialization.
let block = block_bytes.deserialize().await?;

let block_height = block.height();
let block_hash = block.hash();

// Check if the block can be added to the ledger.
if block.height() == ledger.ledger().read().latest_height() + 1 {
if block_height == ledger.ledger().read().latest_height() + 1 {
// Attempt to add the block to the ledger.
match ledger.add_next_block(&block).await {
Ok(_) => info!("Advanced to block {} ({})", block.height(), block.hash()),
Err(err) => warn!("Failed to process block {} (height: {}): {:?}",block.hash(),block.header().height(), err)
match ledger.add_next_block(block).await {
Ok(_) => info!("Advanced to block {} ({})", block_height, block_hash),
Err(err) => warn!("Failed to process block {} (height: {}): {:?}", block_hash, block_height, err)
};

// Send a ping.
peer.outbound.send(Message::<N>::Ping).await?;
} else {
trace!("Skipping block {} (height: {})", block.hash(), block.height());
trace!("Skipping block {} (height: {})", block_hash, block_height);
}
},
Message::TransactionBroadcast(transaction_bytes) => {
Expand Down Expand Up @@ -169,34 +172,37 @@ pub(crate) async fn handle_peer<N: Network>(
},
Message::BlockBroadcast(block_bytes) => {
// Perform deferred deserialization.
let block = block_bytes.deserialize().await?;
let block = block_bytes.clone().deserialize().await?;

let block_height = block.height();
let block_hash = block.hash();

// Check if the block can be added to the ledger.
if block.height() == ledger.ledger().read().latest_height() + 1 {
if block_height == ledger.ledger().read().latest_height() + 1 {
// Attempt to add the block to the ledger.
match ledger.add_next_block(&block).await {
match ledger.add_next_block(block).await {
Ok(_) => {
info!("Advanced to block {} ({})", block.height(), block.hash());
info!("Advanced to block {} ({})", block_height, block_hash);

// Broadcast block to all peers except the sender.
let peers = ledger.peers().read().clone();
tokio::spawn(async move {
for (_, sender) in peers.iter().filter(|(ip, _)| *ip != &peer.ip) {
let _ = sender.send(Message::<N>::BlockBroadcast(Data::Object(block.clone()))).await;
let _ = sender.send(Message::<N>::BlockBroadcast(block_bytes.clone())).await;
}
});
},
Err(err) => {
trace!(
"Failed to process block {} (height: {}): {:?}",
block.hash(),
block.header().height(),
block_hash,
block_height,
err
);
}
};
} else {
trace!("Skipping block {} (height: {})", block.hash(), block.height());
trace!("Skipping block {} (height: {})", block_hash, block_height);
}
}
}
Expand Down

0 comments on commit 248d909

Please sign in to comment.