Skip to content

Commit

Permalink
[network] Add per application network traffic metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
gregnazario authored and aptos-bot committed Apr 5, 2022
1 parent f4ecf8e commit 995dfc9
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 8 deletions.
62 changes: 62 additions & 0 deletions network/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,3 +474,65 @@ pub static NETWORK_RATE_LIMIT_METRICS: Lazy<HistogramVec> = Lazy::new(|| {
)
.unwrap()
});

pub static NETWORK_APPLICATION_INBOUND_METRIC: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"aptos_network_app_inbound_traffic",
"Network Inbound Traffic by application",
&[
"role_type",
"network_id",
"peer_id",
"protocol_id",
"metric"
]
)
.unwrap()
});

pub fn network_application_inbound_traffic(
network_context: NetworkContext,
protocol_id: ProtocolId,
size: u64,
) {
NETWORK_APPLICATION_INBOUND_METRIC
.with_label_values(&[
network_context.role().as_str(),
network_context.network_id().as_str(),
network_context.peer_id().short_str().as_str(),
protocol_id.as_str(),
"size",
])
.observe(size as f64);
}

pub static NETWORK_APPLICATION_OUTBOUND_METRIC: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"aptos_network_app_outbound_traffic",
"Network Outbound Traffic by application",
&[
"role_type",
"network_id",
"peer_id",
"protocol_id",
"metric"
]
)
.unwrap()
});

pub fn network_application_outbound_traffic(
network_context: NetworkContext,
protocol_id: ProtocolId,
size: u64,
) {
NETWORK_APPLICATION_OUTBOUND_METRIC
.with_label_values(&[
network_context.role().as_str(),
network_context.network_id().as_str(),
network_context.peer_id().short_str().as_str(),
protocol_id.as_str(),
"size",
])
.observe(size as f64);
}
21 changes: 17 additions & 4 deletions network/src/peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
//! [`PeerManager`]: crate::peer_manager::PeerManager
use crate::{
counters::{self, RECEIVED_LABEL, SENT_LABEL},
counters::{
self, network_application_inbound_traffic, network_application_outbound_traffic,
RECEIVED_LABEL, SENT_LABEL,
},
logging::NetworkSchema,
peer_manager::{PeerManagerError, TransportNotification},
protocols::{
Expand Down Expand Up @@ -489,10 +492,10 @@ where
peer_id.short_str(),
protocol_id
);

let data_len = data.len() as u64;
counters::direct_send_messages(&self.network_context, RECEIVED_LABEL).inc();
counters::direct_send_bytes(&self.network_context, RECEIVED_LABEL)
.inc_by(data.len() as u64);
counters::direct_send_bytes(&self.network_context, RECEIVED_LABEL).inc_by(data_len);
network_application_inbound_traffic(self.network_context, message.protocol_id, data_len);

let notif = PeerNotification::RecvMessage(Message {
protocol_id,
Expand Down Expand Up @@ -529,6 +532,11 @@ where
PeerRequest::SendDirectSend(message) => {
let message_len = message.mdata.len();
let protocol_id = message.protocol_id;
network_application_outbound_traffic(
self.network_context,
protocol_id,
message_len as u64,
);
let message = NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id,
priority: Priority::default(),
Expand Down Expand Up @@ -557,6 +565,11 @@ where
}
PeerRequest::SendRpc(request) => {
let protocol_id = request.protocol_id;
network_application_outbound_traffic(
self.network_context,
protocol_id,
request.data.len() as u64,
);
if let Err(e) = self
.outbound_rpcs
.handle_outbound_request(request, write_reqs_tx)
Expand Down
18 changes: 14 additions & 4 deletions network/src/protocols/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
use crate::{
counters::{
self, CANCELED_LABEL, DECLINED_LABEL, FAILED_LABEL, RECEIVED_LABEL, REQUEST_LABEL,
self, network_application_inbound_traffic, network_application_outbound_traffic,
CANCELED_LABEL, DECLINED_LABEL, FAILED_LABEL, RECEIVED_LABEL, REQUEST_LABEL,
RESPONSE_LABEL, SENT_LABEL,
},
logging::NetworkSchema,
Expand Down Expand Up @@ -234,6 +235,7 @@ impl InboundRpcs {
// Collect counters for received request.
counters::rpc_messages(network_context, REQUEST_LABEL, RECEIVED_LABEL).inc();
counters::rpc_bytes(network_context, REQUEST_LABEL, RECEIVED_LABEL).inc_by(req_len);
network_application_inbound_traffic(self.network_context, protocol_id, req_len);
let timer =
counters::inbound_rpc_handler_latency(network_context, protocol_id).start_timer();

Expand Down Expand Up @@ -354,7 +356,7 @@ pub struct OutboundRpcs {
/// Maps a `RequestId` into a handle to a task in the `outbound_rpc_tasks`
/// completion queue. When a new `RpcResponse` message comes in, we will use
/// this map to notify the corresponding task that its response has arrived.
pending_outbound_rpcs: HashMap<RequestId, oneshot::Sender<RpcResponse>>,
pending_outbound_rpcs: HashMap<RequestId, (ProtocolId, oneshot::Sender<RpcResponse>)>,
/// Only allow this many concurrent outbound rpcs at one time from this remote
/// peer. New outbound requests exceeding this limit will be dropped.
max_concurrent_outbound_rpcs: u32,
Expand Down Expand Up @@ -442,13 +444,15 @@ impl OutboundRpcs {
// Collect counters for requests sent.
counters::rpc_messages(network_context, REQUEST_LABEL, SENT_LABEL).inc();
counters::rpc_bytes(network_context, REQUEST_LABEL, SENT_LABEL).inc_by(req_len);
network_application_outbound_traffic(self.network_context, protocol_id, req_len);

// Create channel over which response is delivered to outbound_rpc_task.
let (response_tx, response_rx) = oneshot::channel::<RpcResponse>();

// Store send-side in the pending map so we can notify outbound_rpc_task
// when the rpc response has arrived.
self.pending_outbound_rpcs.insert(request_id, response_tx);
self.pending_outbound_rpcs
.insert(request_id, (protocol_id, response_tx));

// A future that waits for the rpc response with a timeout. We create the
// timeout out here to start the timer as soon as we push onto the queue
Expand Down Expand Up @@ -581,8 +585,14 @@ impl OutboundRpcs {
let peer_id = &self.remote_peer_id;
let request_id = response.request_id;

let is_canceled = if let Some(response_tx) = self.pending_outbound_rpcs.remove(&request_id)
let is_canceled = if let Some((protocol_id, response_tx)) =
self.pending_outbound_rpcs.remove(&request_id)
{
network_application_inbound_traffic(
self.network_context,
protocol_id,
response.raw_response.len() as u64,
);
response_tx.send(response).is_err()
} else {
true
Expand Down

0 comments on commit 995dfc9

Please sign in to comment.