Skip to content

Commit

Permalink
Merge pull request ProvableHQ#1219 from niklaslong/sync-state-decoupling
Browse files Browse the repository at this point in the history
Decouple sync state from peer quality
  • Loading branch information
ljedrz authored Oct 14, 2021
2 parents 6dfcbd9 + d5ba84c commit 2b8b22d
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 82 deletions.
22 changes: 5 additions & 17 deletions network/src/peers/peer/inbound_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ impl Peer {
time_received: Option<Instant>,
payload: Payload,
) -> Result<(), NetworkError> {
self.quality.see();
self.quality.num_messages_received += 1;
self.register_received_message();
metrics::increment_counter!(inbound::ALL_SUCCESSES);

let source = self.address;
Expand Down Expand Up @@ -65,7 +64,7 @@ impl Peer {
}
Payload::Block(block, height) => {
metrics::increment_counter!(inbound::BLOCKS);
self.quality.blocks_received_from += 1;
self.sync_state.blocks_received_from += 1;

if node.sync().is_some() {
let node = node.clone();
Expand Down Expand Up @@ -98,7 +97,7 @@ impl Peer {
}
Payload::SyncBlock(block, height) => {
metrics::increment_counter!(inbound::SYNCBLOCKS);
self.quality.blocks_synced_from += 1;
self.sync_state.blocks_synced_from += 1;

if node.sync().is_some() {
let node = node.clone();
Expand Down Expand Up @@ -206,7 +205,7 @@ impl Peer {
Payload::Ping(block_height) => {
network.write_payload(&Payload::Pong).await?;
debug!("Sent a '{}' message to {}", Payload::Pong, self.address);
self.quality.block_height = block_height;
self.block_height = block_height;
metrics::increment_counter!(PINGS);

// Pongs are sent without going through the outbound handler,
Expand All @@ -221,18 +220,7 @@ impl Peer {
}
}
Payload::Pong => {
if self.quality.expecting_pong {
let rtt = self
.quality
.last_ping_sent
.map(|x| x.elapsed().as_millis() as u64)
.unwrap_or(u64::MAX);
trace!("RTT for {} is {}ms", source, rtt);
self.quality.expecting_pong = false;
self.quality.rtt_ms = rtt;
} else {
self.fail();
}
self.stop_ping_measurement();
metrics::increment_counter!(PONGS);
}
Payload::Unknown => {
Expand Down
36 changes: 6 additions & 30 deletions network/src/peers/peer/outbound_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,16 @@ impl Peer {
PeerAction::Disconnect => Ok(PeerResponse::Disconnect),
PeerAction::Send(message, time_received) => {
match &message {
Payload::Ping(_) => {
self.quality.expecting_pong = true;
self.quality.last_ping_sent = Some(Instant::now());
}
Payload::Ping(_) => self.start_ping_measurement(),
Payload::Block(block, _) => {
if self.block_received_cache.contains(&block[..]) {
metrics::increment_counter!(metrics::outbound::ALL_CACHE_HITS);
return Ok(PeerResponse::None);
}
self.quality.blocks_sent_to += 1;
self.sync_state.blocks_sent_to += 1;
}
Payload::SyncBlock(..) => {
self.quality.blocks_synced_to += 1;
self.sync_state.blocks_synced_to += 1;
}
_ => (),
}
Expand Down Expand Up @@ -146,37 +143,16 @@ impl Peer {
}
}
PeerAction::CancelSync => {
if self.quality.remaining_sync_blocks > self.quality.total_sync_blocks / 2 {
warn!(
"Was expecting {} more sync blocks from {}",
self.quality.remaining_sync_blocks, self.address,
);
self.quality.remaining_sync_blocks = 0;
self.quality.total_sync_blocks = 0;
self.fail();
} else if self.quality.remaining_sync_blocks > 0 {
trace!(
"Was expecting {} more sync blocks from {}",
self.quality.remaining_sync_blocks,
self.address,
);
self.quality.remaining_sync_blocks = 0;
self.quality.total_sync_blocks = 0;
}
self.cancel_sync();
Ok(PeerResponse::None)
//todo: should we notify the peer we are no longer expecting anything from them?
}
PeerAction::GotSyncBlock => {
if self.quality.remaining_sync_blocks > 0 {
self.quality.remaining_sync_blocks -= 1;
} else {
trace!("received unexpected or late sync block from {}", self.address);
}
self.register_received_sync_block();
Ok(PeerResponse::None)
}
PeerAction::ExpectingSyncBlocks(amount) => {
self.quality.remaining_sync_blocks += amount;
self.quality.total_sync_blocks += amount;
self.increment_sync_expectations(amount);
Ok(PeerResponse::None)
}
PeerAction::SoftFail => {
Expand Down
102 changes: 93 additions & 9 deletions network/src/peers/peer/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,29 @@

use anyhow::*;
use chrono::Utc;
use serde::{Deserialize, Serialize};
use snarkos_metrics::wrapped_mpsc;
use std::{
net::SocketAddr,
time::{Duration, Instant},
};

use super::PeerQuality;
use super::{PeerQuality, SyncState};
use crate::{message::Payload, BlockCache, NetworkError, Node};

use super::{network::*, outbound_handler::*};
/// A data structure containing information about a peer.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone)]
pub struct Peer {
/// The address of the node's listener socket.
pub address: SocketAddr,
/// The latest broadcast block height of the peer.
pub block_height: u32,
/// Quantifies the node's connection quality with the peer.
pub quality: PeerQuality,
/// Tracks the node's sync state with the peer.
pub sync_state: SyncState,

#[serde(skip)]
/// The cache of received blocks from the peer.
pub block_received_cache: BlockCache<{ crate::PEER_BLOCK_CACHE_SIZE }>,
}

Expand All @@ -42,13 +47,21 @@ const FAILURE_THRESHOLD: usize = 5;

impl Peer {
pub fn new(address: SocketAddr, data: Option<&snarkos_storage::Peer>) -> Self {
let mut block_height = 0;
let mut quality: PeerQuality = Default::default();
let mut sync_state: SyncState = Default::default();

if let Some(data) = data {
block_height = data.block_height;
quality.sync_from_storage(data);
sync_state.sync_from_storage(data);
}

Self {
address,
block_height,
quality,
sync_state,

block_received_cache: BlockCache::default(),
}
Expand All @@ -57,14 +70,14 @@ impl Peer {
pub fn serialize(&self) -> snarkos_storage::Peer {
snarkos_storage::Peer {
address: self.address,
block_height: self.quality.block_height,
block_height: self.block_height,
first_seen: self.quality.first_seen,
last_seen: self.quality.last_seen,
last_connected: self.quality.last_connected,
blocks_synced_to: self.quality.blocks_synced_to,
blocks_synced_from: self.quality.blocks_synced_from,
blocks_received_from: self.quality.blocks_received_from,
blocks_sent_to: self.quality.blocks_sent_to,
blocks_synced_to: self.sync_state.blocks_synced_to,
blocks_synced_from: self.sync_state.blocks_synced_from,
blocks_received_from: self.sync_state.blocks_received_from,
blocks_sent_to: self.sync_state.blocks_sent_to,
connection_attempt_count: self.quality.connection_attempt_count,
connection_success_count: self.quality.connected_count,
connection_transient_fail_count: self.quality.connection_transient_fail_count,
Expand Down Expand Up @@ -175,6 +188,77 @@ impl Peer {
}

pub(super) fn set_disconnected(&mut self) {
self.sync_state.reset();
self.quality.disconnected();
}

pub fn start_ping_measurement(&mut self) {
self.quality.expecting_pong = true;
self.quality.last_ping_sent = Some(Instant::now());
}

pub fn stop_ping_measurement(&mut self) {
if !self.quality.expecting_pong {
self.fail();

return;
}

let rtt = self
.quality
.last_ping_sent
.map(|x| x.elapsed().as_millis() as u64)
.unwrap_or(u64::MAX);

trace!("RTT for {} is {}ms", self.address, rtt);

self.quality.expecting_pong = false;
self.quality.rtt_ms = rtt;
}

pub fn register_received_message(&mut self) {
self.quality.see();
self.quality.num_messages_received += 1;
}

pub fn register_received_sync_block(&mut self) {
let remaining_sync_blocks = self.sync_state.remaining_sync_blocks;

if remaining_sync_blocks == 0 {
trace!("received unexpected or late sync block from {}", self.address);

return;
}

// Decrement if > 0.
// Safe since the value can never be negative and would only overflow if equal to 0.
self.sync_state.remaining_sync_blocks -= 1;
}

pub fn increment_sync_expectations(&mut self, amount: u32) {
self.sync_state.remaining_sync_blocks += amount;
self.sync_state.total_sync_blocks += amount;
}

pub fn cancel_sync(&mut self) {
if self.sync_state.remaining_sync_blocks == 0 {
return;
}

let message = format!(
"Was expecting {} more sync blocks from {}",
self.sync_state.remaining_sync_blocks, self.address
);

// Unfortunately the tracing crate doesn't support dynamic log levels, so the `else`
// statement is necessary here.
if self.sync_state.remaining_sync_blocks > self.sync_state.total_sync_blocks / 2 {
warn!("{}", message);
self.fail();
} else {
trace!("{}", message);
}

self.sync_state.reset();
}
}
71 changes: 49 additions & 22 deletions network/src/peers/peer/peer_quality.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,49 +21,78 @@ use std::time::Instant;
use chrono::{DateTime, Utc};

#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
pub struct PeerQuality {
pub block_height: u32,
pub last_seen: Option<DateTime<Utc>>,
#[serde(skip)]
pub expecting_pong: bool,
#[serde(skip)]
pub last_ping_sent: Option<Instant>,
/// The time it took to send a `Ping` to the peer and for it to respond with a `Pong`.
pub rtt_ms: u64,
/// The number of failures associated with the peer; grounds for dismissal.
pub failures: Vec<DateTime<Utc>>,
pub struct SyncState {
/// number of requested sync blocks
pub total_sync_blocks: u32,
/// The number of remaining blocks to sync with.
pub remaining_sync_blocks: u32,
pub num_messages_received: u64,
pub first_seen: Option<DateTime<Utc>>,
pub last_connected: Option<DateTime<Utc>>,
pub last_disconnected: Option<DateTime<Utc>>,

/// The number of sync blocks sent to this peer.
pub blocks_synced_to: u32,
/// The number of sync blocks received from this peer.
pub blocks_synced_from: u32,
/// The number of blocks received from this peer.
pub blocks_received_from: u32,
/// The number of blocks sent to this peer.
pub blocks_sent_to: u32,
}

impl SyncState {
pub fn sync_from_storage(&mut self, data: &snarkos_storage::Peer) {
self.blocks_synced_to = data.blocks_synced_to;
self.blocks_synced_from = data.blocks_synced_from;
self.blocks_received_from = data.blocks_received_from;
self.blocks_sent_to = data.blocks_sent_to;
}

pub fn reset(&mut self) {
self.remaining_sync_blocks = 0;
self.total_sync_blocks = 0;
}
}

#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
pub struct PeerQuality {
/// The number of failures associated with the peer; grounds for dismissal.
pub failures: Vec<DateTime<Utc>>,

/// The last time the node interacted with this peer.
pub last_seen: Option<DateTime<Utc>>,
/// The first time the node interacted with this peer.
pub first_seen: Option<DateTime<Utc>>,
/// The last time the node was connected to this peer.
pub last_connected: Option<DateTime<Utc>>,
/// The last time the node was disconnected from this peer.
pub last_disconnected: Option<DateTime<Utc>>,

/// The number of times we have attempted to connect to this peer.
pub connection_attempt_count: u64,
/// The number of failed connection attempts since the last connection success
pub connection_transient_fail_count: u64,
/// The number of times we have connected to this peer.
pub connected_count: u64,
/// The number of times we have disconnected from this peer.
pub disconnected_count: u64,

/// Set to `true` if this node has sent a `Ping` and is expecting a `Pong` in return.
#[serde(skip)]
pub expecting_pong: bool,
/// The timestamp of the last sent `Ping` to this peer.
#[serde(skip)]
pub last_ping_sent: Option<Instant>,
/// The time it took to send a `Ping` to the peer and for it to respond with a `Pong`.
pub rtt_ms: u64,

/// The number of messages received from this peer.
pub num_messages_received: u64,
}

impl PeerQuality {
pub fn sync_from_storage(&mut self, peer: &snarkos_storage::Peer) {
self.block_height = peer.block_height;
self.last_seen = peer.last_seen;
self.first_seen = peer.first_seen;
self.last_connected = peer.last_connected;
self.blocks_synced_to = peer.blocks_synced_to;
self.blocks_synced_from = peer.blocks_synced_from;
self.blocks_received_from = peer.blocks_received_from;
self.blocks_sent_to = peer.blocks_sent_to;

self.connection_attempt_count = peer.connection_attempt_count;
self.connection_transient_fail_count = peer.connection_transient_fail_count;
self.connected_count = peer.connection_success_count;
Expand Down Expand Up @@ -109,8 +138,6 @@ impl PeerQuality {
self.last_disconnected = Some(disconnect_timestamp);
self.disconnected_count += 1;
self.expecting_pong = false;
self.remaining_sync_blocks = 0;
self.total_sync_blocks = 0;

if let Some(last_connected) = self.last_connected {
if let Ok(elapsed) = disconnect_timestamp.signed_duration_since(last_connected).to_std() {
Expand Down
2 changes: 1 addition & 1 deletion network/src/peers/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl Node {
.cloned()
.collect();

candidates.sort_unstable_by(|x, y| y.quality.block_height.cmp(&x.quality.block_height));
candidates.sort_unstable_by(|x, y| y.block_height.cmp(&x.block_height));

candidates.truncate(count - random_count);
candidates
Expand Down
Loading

0 comments on commit 2b8b22d

Please sign in to comment.