Skip to content

Commit

Permalink
record how many bytes were written to Scribe
Browse files Browse the repository at this point in the history
Summary:
Like it says in the title. This looks like a genuinely useful thing to be
tracking considering e.g. the Scribe category for this is already over quota.

Reviewed By: stepancheg

Differential Revision: D62493445

fbshipit-source-id: cd5c5ab41f4af489681a2866ab742f1d7d4b2103
  • Loading branch information
krallin authored and facebook-github-bot committed Sep 11, 2024
1 parent 7c239bf commit 450385c
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 0 deletions.
11 changes: 11 additions & 0 deletions app/buck2_client_ctx/src/subscribers/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ pub(crate) struct InvocationRecorder<'a> {
initial_sink_success_count: Option<u64>,
initial_sink_failure_count: Option<u64>,
initial_sink_dropped_count: Option<u64>,
initial_sink_bytes_written: Option<u64>,
sink_max_buffer_depth: u64,
soft_error_categories: HashSet<String>,
concurrent_command_blocking_duration: Option<Duration>,
Expand Down Expand Up @@ -266,6 +267,7 @@ impl<'a> InvocationRecorder<'a> {
initial_sink_success_count: None,
initial_sink_failure_count: None,
initial_sink_dropped_count: None,
initial_sink_bytes_written: None,
sink_max_buffer_depth: 0,
soft_error_categories: HashSet::new(),
concurrent_command_blocking_duration: None,
Expand Down Expand Up @@ -420,6 +422,7 @@ impl<'a> InvocationRecorder<'a> {
let mut sink_success_count = None;
let mut sink_failure_count = None;
let mut sink_dropped_count = None;
let mut sink_bytes_written = None;
let mut re_upload_bytes = None;
let mut re_download_bytes = None;

Expand Down Expand Up @@ -450,6 +453,10 @@ impl<'a> InvocationRecorder<'a> {
calculate_diff_if_some(&snapshot.sink_failures, &self.initial_sink_failure_count);
sink_dropped_count =
calculate_diff_if_some(&snapshot.sink_dropped, &self.initial_sink_dropped_count);
sink_bytes_written = calculate_diff_if_some(
&snapshot.sink_bytes_written,
&self.initial_sink_bytes_written,
);
re_upload_bytes = calculate_diff_if_some(
&Some(snapshot.re_upload_bytes),
&self.initial_re_upload_bytes,
Expand Down Expand Up @@ -627,6 +634,7 @@ impl<'a> InvocationRecorder<'a> {
sink_success_count,
sink_failure_count,
sink_dropped_count,
sink_bytes_written,
sink_max_buffer_depth: Some(self.sink_max_buffer_depth),
soft_error_categories: std::mem::take(&mut self.soft_error_categories)
.into_iter()
Expand Down Expand Up @@ -1129,6 +1137,9 @@ impl<'a> InvocationRecorder<'a> {
if self.initial_sink_dropped_count.is_none() {
self.initial_sink_dropped_count = update.sink_dropped;
}
if self.initial_sink_bytes_written.is_none() {
self.initial_sink_bytes_written = update.sink_bytes_written;
}
self.sink_max_buffer_depth = max(self.sink_max_buffer_depth, update.sink_buffer_depth());

if self.initial_re_upload_bytes.is_none() {
Expand Down
2 changes: 2 additions & 0 deletions app/buck2_data/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@ message Snapshot {
optional uint64 sink_buffer_depth = 107;
// Cumulative count of messages that were dropped (i.e. not even processed).
optional uint64 sink_dropped = 108;
optional uint64 sink_bytes_written = 111;

// Network statistics for "interesting" network interfaces.
map<string, NetworkInterfaceStats> network_interface_stats = 109;
Expand Down Expand Up @@ -1915,6 +1916,7 @@ message InvocationRecord {
optional uint64 sink_success_count = 51;
optional uint64 sink_failure_count = 52;
optional uint64 sink_dropped_count = 53;
optional uint64 sink_bytes_written = 101;
optional uint64 sink_max_buffer_depth = 54;
// Version number of watchman
optional string watchman_version = 55;
Expand Down
3 changes: 3 additions & 0 deletions app/buck2_events/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ pub struct EventSinkStats {
pub buffered: u64,
/// How many messages were not even enqueued by this sink.
pub dropped: u64,
/// How many bytes were written into this sink.
pub bytes_written: u64,
}

impl EventSinkStats {
Expand All @@ -238,6 +240,7 @@ impl EventSinkStats {
failures_unknown,
buffered: _,
dropped: _,
bytes_written: _,
} = self;
*failures_invalid_request
+ *failures_unauthorized
Expand Down
1 change: 1 addition & 0 deletions app/buck2_events/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ mod fbcode {
failures_unknown: counters.failures_unknown,
buffered: counters.queue_depth,
dropped: counters.dropped,
bytes_written: counters.bytes_written,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions app/buck2_server/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ impl SnapshotCollector {
failures_unknown,
buffered,
dropped,
bytes_written,
} = metrics;
snapshot.sink_successes = Some(successes);
snapshot.sink_failures = Some(metrics.failures());
Expand All @@ -199,6 +200,7 @@ impl SnapshotCollector {
snapshot.sink_failures_unknown = Some(failures_unknown);
snapshot.sink_buffer_depth = Some(buffered);
snapshot.sink_dropped = Some(dropped);
snapshot.sink_bytes_written = Some(bytes_written);
}
}

Expand Down
12 changes: 12 additions & 0 deletions shed/scribe_client/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ pub struct ProducerCounters {
pub queue_depth: u64,
// How many messages were dropped before we even enqueued them (e.g. because the internal buffer is full).
pub dropped: u64,
/// How many bytes were written into this sink.
pub bytes_written: u64,
}

impl ProducerCounters {
Expand All @@ -111,6 +113,7 @@ impl ProducerCounters {
failures_unknown,
queue_depth: _,
dropped: _,
bytes_written: _,
} = self;
*failures_invalid_request
+ *failures_unauthorized
Expand All @@ -135,6 +138,7 @@ struct ProducerCountersData {
failures_timed_out: AtomicU64,
failures_unknown: AtomicU64,
dropped: AtomicU64,
bytes_written: AtomicU64,
}

// This congestion control has 2 states (phases) each of which changes its behavior depending on whether
Expand Down Expand Up @@ -316,6 +320,7 @@ impl ScribeProducer {
.failures_unknown
.load(atomic::Ordering::Relaxed),
dropped: self.counters.dropped.load(atomic::Ordering::Relaxed),
bytes_written: self.counters.bytes_written.load(atomic::Ordering::Relaxed),
// So we get an accurate snapshot of the queue depth when scraping
// metrics, do this here and now rather than in the background.
queue_depth: self.queue.len() as u64,
Expand Down Expand Up @@ -443,6 +448,13 @@ impl ScribeProducer {
self.counters
.successes
.fetch_add(1, atomic::Ordering::Relaxed);

// Unwrap safety: an individual message cant't be so large its length
// can't be repsentable as 64 bits.
self.counters.bytes_written.fetch_add(
message.message.message.len().try_into().unwrap(),
atomic::Ordering::Relaxed,
);
success_count += 1;
false
}
Expand Down

0 comments on commit 450385c

Please sign in to comment.