Skip to content

Commit

Permalink
Make metrics collection async
Browse files Browse the repository at this point in the history
Summary: Modern dice has the state in a separate thread, so access requires futures

Reviewed By: stepancheg

Differential Revision: D43824897

fbshipit-source-id: 20c9966a759be0a2679410dc4f0320773ad703b1
  • Loading branch information
bobyangyf authored and facebook-github-bot committed Mar 8, 2023
1 parent 7b6b8cf commit 75d47c3
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 10 deletions.
3 changes: 2 additions & 1 deletion app/buck2_server/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,8 @@ impl DaemonApi for BuckdServer {
data.materializer.dupe(),
data.scribe_sink.dupe() as _,
)
.create_snapshot(),
.create_snapshot()
.await,
)
} else {
None
Expand Down
7 changes: 5 additions & 2 deletions app/buck2_server/src/heartbeat_guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl HeartbeatGuard {
let mut interval = tokio::time::interval(Duration::from_secs(1));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
let snapshot = collector.create_snapshot();
let snapshot = collector.create_snapshot().await;
match events.lock().expect("Poisoned lock").as_ref() {
Some(events) => events.instant_event(Box::new(snapshot)),
None => break,
Expand All @@ -70,7 +70,10 @@ impl Drop for HeartbeatGuard {
// Synchronously remove access for sending new heartbeats.
if let Some(events) = maybe_events.take() {
// Send one last snapshot.
events.instant_event(Box::new(self.collector.create_snapshot()));
let collector = self.collector.dupe();
tokio::spawn(async move {
events.instant_event(Box::new(collector.create_snapshot().await));
});
}
// Cancel the task as well.
self.handle.abort();
Expand Down
8 changes: 4 additions & 4 deletions app/buck2_server/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ impl SnapshotCollector {
}

/// Create a new Snapshot.
pub fn create_snapshot(&self) -> buck2_data::Snapshot {
pub async fn create_snapshot(&self) -> buck2_data::Snapshot {
let mut snapshot = Self::pre_initialization_snapshot(self.daemon_start_time);
self.add_daemon_metrics(&mut snapshot);
self.add_re_metrics(&mut snapshot);
self.add_io_metrics(&mut snapshot);
self.add_dice_metrics(&mut snapshot);
self.add_dice_metrics(&mut snapshot).await;
self.add_materializer_metrics(&mut snapshot);
self.add_sink_metrics(&mut snapshot);
snapshot
Expand Down Expand Up @@ -156,8 +156,8 @@ impl SnapshotCollector {
}
}

fn add_dice_metrics(&self, snapshot: &mut buck2_data::Snapshot) {
let metrics = self.dice.metrics();
async fn add_dice_metrics(&self, snapshot: &mut buck2_data::Snapshot) {
let metrics = self.dice.metrics().await;
snapshot.dice_key_count = metrics.key_count as u64;
snapshot.dice_currently_running_key_count = metrics.currently_running_key_count as u64;
snapshot.dice_active_transaction_count = metrics.active_transaction_count;
Expand Down
4 changes: 2 additions & 2 deletions dice/dice/src/api/dice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ impl Dice {
self.implementation.detect_cycles()
}

pub fn metrics(&self) -> Metrics {
self.implementation.metrics()
pub async fn metrics(&self) -> Metrics {
self.implementation.metrics().await
}

/// Wait until all active versions have exited.
Expand Down
2 changes: 1 addition & 1 deletion dice/dice/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ impl DiceImplementation {
}
}

pub fn metrics(&self) -> Metrics {
pub async fn metrics(&self) -> Metrics {
match self {
DiceImplementation::Legacy(dice) => dice.metrics(),
DiceImplementation::Modern(dice) => dice.metrics(),
Expand Down

0 comments on commit 75d47c3

Please sign in to comment.