Skip to content

Commit

Permalink
refactor: improve the organization of node stats
Browse files Browse the repository at this point in the history
Signed-off-by: ljedrz <[email protected]>
  • Loading branch information
ljedrz committed May 5, 2021
1 parent 0d7148e commit e72a90e
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 148 deletions.
32 changes: 16 additions & 16 deletions network/src/inbound/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl<S: Storage + Send + Sync + 'static> Node<S> {
}
Err(e) => error!("Failed to accept a connection: {}", e),
}
node.stats.inbound_connection_requests.fetch_add(1, Ordering::Relaxed);
node.stats.connections.all_accepted.fetch_add(1, Ordering::Relaxed);
}
});

Expand Down Expand Up @@ -220,7 +220,7 @@ impl<S: Storage + Send + Sync + 'static> Node<S> {
pub async fn process_incoming_messages(&self, receiver: &mut Receiver) -> Result<(), NetworkError> {
let Message { direction, payload } = receiver.recv().await.ok_or(NetworkError::ReceiverFailedToParse)?;

self.stats.inbound_channel_items.fetch_sub(1, Ordering::SeqCst);
self.stats.inbound.queued_messages.fetch_sub(1, Ordering::SeqCst);

let source = if let Direction::Inbound(addr) = direction {
self.peer_book.update_last_seen(addr);
Expand All @@ -231,21 +231,21 @@ impl<S: Storage + Send + Sync + 'static> Node<S> {

match payload {
Payload::Transaction(transaction) => {
self.stats.recv_transactions.fetch_add(1, Ordering::Relaxed);
self.stats.inbound.transactions.fetch_add(1, Ordering::Relaxed);

if let Some(ref sync) = self.sync() {
sync.received_memory_pool_transaction(source, transaction).await?;
}
}
Payload::Block(block) => {
self.stats.recv_blocks.fetch_add(1, Ordering::Relaxed);
self.stats.inbound.blocks.fetch_add(1, Ordering::Relaxed);

if let Some(ref sync) = self.sync() {
sync.received_block(source, block, true).await?;
}
}
Payload::SyncBlock(block) => {
self.stats.recv_syncblocks.fetch_add(1, Ordering::Relaxed);
self.stats.inbound.syncblocks.fetch_add(1, Ordering::Relaxed);

if let Some(ref sync) = self.sync() {
sync.received_block(source, block, false).await?;
Expand All @@ -262,35 +262,35 @@ impl<S: Storage + Send + Sync + 'static> Node<S> {
}
}
Payload::GetBlocks(hashes) => {
self.stats.recv_getblocks.fetch_add(1, Ordering::Relaxed);
self.stats.inbound.getblocks.fetch_add(1, Ordering::Relaxed);

if let Some(ref sync) = self.sync() {
sync.received_get_blocks(source, hashes).await?;
}
}
Payload::GetMemoryPool => {
self.stats.recv_getmemorypool.fetch_add(1, Ordering::Relaxed);
self.stats.inbound.getmemorypool.fetch_add(1, Ordering::Relaxed);

if let Some(ref sync) = self.sync() {
sync.received_get_memory_pool(source).await?;
}
}
Payload::MemoryPool(mempool) => {
self.stats.recv_memorypool.fetch_add(1, Ordering::Relaxed);
self.stats.inbound.memorypool.fetch_add(1, Ordering::Relaxed);

if let Some(ref sync) = self.sync() {
sync.received_memory_pool(mempool)?;
}
}
Payload::GetSync(getsync) => {
self.stats.recv_getsync.fetch_add(1, Ordering::Relaxed);
self.stats.inbound.getsync.fetch_add(1, Ordering::Relaxed);

if let Some(ref sync) = self.sync() {
sync.received_get_sync(source, getsync).await?;
}
}
Payload::Sync(sync) => {
self.stats.recv_syncs.fetch_add(1, Ordering::Relaxed);
self.stats.inbound.syncs.fetch_add(1, Ordering::Relaxed);

if let Some(ref sync_handler) = self.sync() {
if sync.is_empty() {
Expand All @@ -302,17 +302,17 @@ impl<S: Storage + Send + Sync + 'static> Node<S> {
}
}
Payload::GetPeers => {
self.stats.recv_getpeers.fetch_add(1, Ordering::Relaxed);
self.stats.inbound.getpeers.fetch_add(1, Ordering::Relaxed);

self.send_peers(source).await;
}
Payload::Peers(peers) => {
self.stats.recv_peers.fetch_add(1, Ordering::Relaxed);
self.stats.inbound.peers.fetch_add(1, Ordering::Relaxed);

self.process_inbound_peers(peers);
}
Payload::Ping(block_height) => {
self.stats.recv_pings.fetch_add(1, Ordering::Relaxed);
self.stats.inbound.pings.fetch_add(1, Ordering::Relaxed);

self.peer_book.received_ping(source, block_height);

Expand All @@ -333,11 +333,11 @@ impl<S: Storage + Send + Sync + 'static> Node<S> {
// }
}
Payload::Pong => {
self.stats.recv_pongs.fetch_add(1, Ordering::Relaxed);
self.stats.inbound.pongs.fetch_add(1, Ordering::Relaxed);
// Skip as this case is already handled with priority in Inbound::listen_for_messages
}
Payload::Unknown => {
self.stats.recv_unknown.fetch_add(1, Ordering::Relaxed);
self.stats.inbound.unknown.fetch_add(1, Ordering::Relaxed);
warn!("Unknown payload received; this could indicate that the client you're using is out-of-date");
}
}
Expand Down Expand Up @@ -413,7 +413,7 @@ impl<S: Storage + Send + Sync + 'static> Node<S> {
if let Err(err) = self.inbound.sender.send(response).await {
error!("Failed to route a response for a message: {}", err);
} else {
self.stats.inbound_channel_items.fetch_add(1, Ordering::SeqCst);
self.stats.inbound.queued_messages.fetch_add(1, Ordering::SeqCst);
}
}
}
4 changes: 2 additions & 2 deletions network/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,10 @@ impl<S: Storage + Send + core::marker::Sync + 'static> Node<S> {
let incoming_task = task::spawn(async move {
loop {
if let Err(e) = node_clone.process_incoming_messages(&mut receiver).await {
node_clone.stats.recv_failure_count.fetch_add(1, Ordering::Relaxed);
node_clone.stats.inbound.all_failures.fetch_add(1, Ordering::Relaxed);
error!("Node error: {}", e);
} else {
node_clone.stats.recv_success_count.fetch_add(1, Ordering::Relaxed);
node_clone.stats.inbound.all_successes.fetch_add(1, Ordering::Relaxed);
}
}
});
Expand Down
10 changes: 5 additions & 5 deletions network/src/outbound/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,19 @@ impl<S: Storage + Send + Sync + 'static> Node<S> {
"Couldn't send a {} to {}: the send channel is full",
request, target_addr
);
self.stats.send_failure_count.fetch_add(1, Ordering::Relaxed);
self.stats.outbound.all_failures.fetch_add(1, Ordering::Relaxed);
}
Err(TrySendError::Closed(request)) => {
error!(
"Couldn't send a {} to {}: the send channel is closed",
request, target_addr
);
self.stats.send_failure_count.fetch_add(1, Ordering::Relaxed);
self.stats.outbound.all_failures.fetch_add(1, Ordering::Relaxed);
}
},
Err(_) => {
warn!("Failed to send a {}: peer is disconnected", request);
self.stats.send_failure_count.fetch_add(1, Ordering::Relaxed);
self.stats.outbound.all_failures.fetch_add(1, Ordering::Relaxed);
}
}
}
Expand Down Expand Up @@ -108,11 +108,11 @@ impl<S: Storage + Send + Sync + 'static> Node<S> {
if let Some(message) = receiver.recv().await {
match writer.write_message(&message.payload).await {
Ok(_) => {
self.stats.send_success_count.fetch_add(1, Ordering::Relaxed);
self.stats.outbound.all_successes.fetch_add(1, Ordering::Relaxed);
}
Err(error) => {
warn!("Failed to send a {}: {}", message, error);
self.stats.send_failure_count.fetch_add(1, Ordering::Relaxed);
self.stats.outbound.all_failures.fetch_add(1, Ordering::Relaxed);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion network/src/peers/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl<S: Storage + Send + Sync + 'static> Node<S> {
return Err(NetworkError::PeerAlreadyConnected);
}

self.stats.outbound_connection_requests.fetch_add(1, Ordering::Relaxed);
self.stats.connections.all_initiated.fetch_add(1, Ordering::Relaxed);

self.peer_book.set_connecting(remote_address)?;

Expand Down
74 changes: 47 additions & 27 deletions network/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,47 +20,67 @@ use std::sync::atomic::{AtomicU32, AtomicU64};
// interchangeable with prometheus metrics.
#[derive(Default)]
pub struct Stats {
/// The monotonic counter for the number of send requests that succeeded.
pub send_success_count: AtomicU64,
/// The monotonic counter for the number of send requests that failed.
pub send_failure_count: AtomicU64,
/// The number of successfully processed inbound messages.
pub recv_success_count: AtomicU64,
/// The number of inbound messages that couldn't be processed.
pub recv_failure_count: AtomicU64,
/// The current number of items in the inbound channel.
pub inbound_channel_items: AtomicU64,
/// The number of all connection requests the node has received.
pub inbound_connection_requests: AtomicU64,
/// The number of outbound connection requests.
pub outbound_connection_requests: AtomicU64,
/// Stats related to messages received by the node.
pub inbound: InboundStats,
/// Stats related to messages sent by the node.
pub outbound: OutboundStats,
/// Stats related to the node's connections.
pub connections: ConnectionStats,

/// The number of mined blocks.
pub blocks_mined: AtomicU32,
}

#[derive(Default)]
pub struct InboundStats {
/// The number of successfully processed inbound messages.
pub all_successes: AtomicU64,
/// The number of inbound messages that couldn't be processed.
pub all_failures: AtomicU64,

/// The current number of messages queued in the inbound channel.
pub queued_messages: AtomicU64,

/// The number of all received `Block` messages.
pub recv_blocks: AtomicU64,
pub blocks: AtomicU64,
/// The number of all received `GetBlocks` messages.
pub recv_getblocks: AtomicU64,
pub getblocks: AtomicU64,
/// The number of all received `GetMemoryPool` messages.
pub recv_getmemorypool: AtomicU64,
pub getmemorypool: AtomicU64,
/// The number of all received `GetPeers` messages.
pub recv_getpeers: AtomicU64,
pub getpeers: AtomicU64,
/// The number of all received `GetSync` messages.
pub recv_getsync: AtomicU64,
pub getsync: AtomicU64,
/// The number of all received `MemoryPool` messages.
pub recv_memorypool: AtomicU64,
pub memorypool: AtomicU64,
/// The number of all received `Peers` messages.
pub recv_peers: AtomicU64,
pub peers: AtomicU64,
/// The number of all received `Ping` messages.
pub recv_pings: AtomicU64,
pub pings: AtomicU64,
/// The number of all received `Pong` messages.
pub recv_pongs: AtomicU64,
pub pongs: AtomicU64,
/// The number of all received `Sync` messages.
pub recv_syncs: AtomicU64,
pub syncs: AtomicU64,
/// The number of all received `SyncBlock` messages.
pub recv_syncblocks: AtomicU64,
pub syncblocks: AtomicU64,
/// The number of all received `Transaction` messages.
pub recv_transactions: AtomicU64,
pub transactions: AtomicU64,
/// The number of all received `Unknown` messages.
pub recv_unknown: AtomicU64,
pub unknown: AtomicU64,
}

#[derive(Default)]
pub struct OutboundStats {
/// The number of messages successfully sent by the node.
pub all_successes: AtomicU64,
/// The number of messages that failed to be sent to peers.
pub all_failures: AtomicU64,
}

#[derive(Default)]
pub struct ConnectionStats {
/// The number of all connections the node has accepted.
pub all_accepted: AtomicU64,
/// The number of all connections the node has initiated.
pub all_initiated: AtomicU64,
}
44 changes: 22 additions & 22 deletions rpc/documentation/public_endpoints/getnodestats.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,29 @@ None

| Parameter | Type | Description |
|:------------------------------:|:----:|:---------------------------------------------------------:|
| `send_success_count` | u64 | The number of successfully sent messages |
| `send_failure_count` | u64 | The number of failures to send messages |
| `recv_success_count` | u64 | The number of successfully processed inbound messages |
| `recv_failure_count` | u64 | The number of inbound messages that couldn't be processed |
| `inbound_channel_items` | u64 | The number of inbound items queued to be processed |
| `inbound_connection_requests` | u64 | The number of connection requests the node has received |
| `outbound_connection_requests` | u64 | The number of connection requests the node has made |
| `number_of_connected_peers` | u16 | The number of currently connected peers |
| `number_of_connecting_peers` | u16 | The number of currently connecting peers |
| `blocks_mined` | u32 | The number of blocks the node has mined |
| `block_height` | u32 | The current block height of the node |
| `recv_blocks` | u64 | The number of all received Block messages |
| `recv_getmemorypool` | u64 | The number of all received GetMemoryPool messages |
| `recv_getpeers` | u64 | The number of all received GetPeers messages |
| `recv_getsync` | u64 | The number of all received GetSync messages |
| `recv_memorypool` | u64 | The number of all received MemoryPool messages |
| `recv_peers` | u64 | The number of all received Peers messages |
| `recv_pings` | u64 | The number of all received Ping messages |
| `recv_pongs` | u64 | The number of all received Pong messages |
| `recv_syncs` | u64 | The number of all received Sync messages |
| `recv_syncblocks` | u64 | The number of all received SyncBlock messages |
| `recv_transactions` | u64 | The number of all received Transaction messages |
| `recv_unknown` | u64 | The number of all received Unknown messages |
| `blocks_mined` | u32 | The number of blocks the node has mined |
| `connections.all_accepted` | u64 | The number of connection requests the node has received |
| `connections.all_initiated` | u64 | The number of connection requests the node has made |
| `connections.connected_peers` | u16 | The number of currently connected peers |
| `connections.connecting_peers` | u16 | The number of currently connecting peers |
| `inbound.all_successes` | u64 | The number of successfully processed inbound messages |
| `inbound.all_failures` | u64 | The number of inbound messages that couldn't be processed |
| `inbound.queued_messages` | u64 | The number of inbound items queued to be processed |
| `inbound.blocks` | u64 | The number of all received Block messages |
| `inbound.getmemorypool` | u64 | The number of all received GetMemoryPool messages |
| `inbound.getpeers` | u64 | The number of all received GetPeers messages |
| `inbound.getsync` | u64 | The number of all received GetSync messages |
| `inbound.memorypool` | u64 | The number of all received MemoryPool messages |
| `inbound.peers` | u64 | The number of all received Peers messages |
| `inbound.pings` | u64 | The number of all received Ping messages |
| `inbound.pongs` | u64 | The number of all received Pong messages |
| `inbound.syncs` | u64 | The number of all received Sync messages |
| `inbound.syncblocks` | u64 | The number of all received SyncBlock messages |
| `inbound.transactions` | u64 | The number of all received Transaction messages |
| `inbound.unknown` | u64 | The number of all received Unknown messages |
| `outbound.all_successes` | u64 | The number of successfully sent messages |
| `outbound.all_failures` | u64 | The number of failures to send messages |

### Example
```ignore
Expand Down
Loading

0 comments on commit e72a90e

Please sign in to comment.