Skip to content

Commit

Permalink
Add info about event count to LatencyMeter.
Browse files Browse the repository at this point in the history
  • Loading branch information
lifr0m committed Nov 17, 2024
1 parent 20fe511 commit 9279675
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/exchanges/binance/spot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub async fn spawn(books: HashMap<Pair, Arc<Mutex<Book>>>) {

let (lat_tx, lat_rx) = mpsc::unbounded_channel();
let _lat_meter = LatencyMeter::new(String::from(LOG_PREFIX), difference::LATENCY_CHECK_INTERVAL, lat_rx);

for books in HashMapChunks::new(books, STREAMS_PER_CONNECTION) {
tokio::spawn(loop_connection(books, Arc::clone(&r_tb), Arc::clone(&w_tb), lat_tx.clone()));
}
Expand Down
14 changes: 7 additions & 7 deletions src/latency_meter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,40 @@ pub struct LatencyMeter {
impl LatencyMeter {
pub fn new(prefix: String, interval: Duration, mut rx: mpsc::UnboundedReceiver<Duration>) -> Self {
let vec = Arc::new(Mutex::new(Vec::new()));

let recv_jh = tokio::spawn({
let vec = Arc::clone(&vec);

async move {
while let Some(latency) = rx.recv().await {
vec.lock().unwrap().push(latency);
}
}
});

let check_jh = tokio::spawn({
let vec = Arc::clone(&vec);
let mut interval = tokio::time::interval(interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);

async move {
loop {
interval.tick().await;

{
let mut vec = vec.lock().unwrap();

if !vec.is_empty() {
let mean = vec.iter().sum::<Duration>() / vec.len() as u32;
eprintln!("{prefix}: high latency - {mean:?}");
eprintln!("{prefix}: high latency - {mean:?} ({} events)", vec.len());

vec.clear();
}
}
}
}
});

Self { recv_jh, check_jh }
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async fn main() {
for (place, spawner) in &spawners {
tokio::spawn(spawner(books[place].clone()));
}

loop {
do_some_calculations(copy_books(&books));
tokio::time::sleep(CALCULATION_INTERVAL).await;
Expand Down

0 comments on commit 9279675

Please sign in to comment.