Skip to content

Commit

Permalink
use async for blockio samplers (iopsystems#326)
Browse files Browse the repository at this point in the history
Convert blockio bpf samplers to use async
  • Loading branch information
brayniac authored Aug 29, 2024
1 parent 49bb575 commit 84d3dc4
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 178 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2021"
description = "High resolution systems performance telemetry agent"

[workspace.package]
version = "3.18.2-alpha.6"
version = "3.18.2-alpha.7"
license = "MIT OR Apache-2.0"

[dependencies]
Expand Down
130 changes: 48 additions & 82 deletions src/samplers/block_io/linux/latency/mod.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,55 @@
#[distributed_slice(SAMPLERS)]
fn init(config: Arc<Config>) -> Box<dyn Sampler> {
if let Ok(s) = BlockIOLatency::new(config) {
Box::new(s)
} else {
Box::new(Nop {})
}
}
/// Collects BlockIO Latency stats using BPF and traces:
/// * `block_rq_insert`
/// * `block_rq_issue`
/// * `block_rq_complete`
///
/// And produces these stats:
/// * `blockio/latency`
static NAME: &str = "block_io_latency";

mod bpf {
include!(concat!(env!("OUT_DIR"), "/block_io_latency.bpf.rs"));
}

static NAME: &str = "block_io_latency";

use bpf::*;

use crate::common::bpf::*;
use crate::common::*;
use crate::samplers::block_io::stats::*;
use crate::samplers::block_io::*;
use crate::*;

#[distributed_slice(ASYNC_SAMPLERS)]
fn spawn(config: Arc<Config>, runtime: &Runtime) {
// check if sampler should be enabled
if !(config.enabled(NAME) && config.bpf(NAME)) {
return;
}

let bpf = AsyncBpfBuilder::new(ModSkelBuilder::default)
.distribution("latency", &BLOCKIO_LATENCY)
.collected_at(&METADATA_BLOCKIO_LATENCY_COLLECTED_AT)
.runtime(
&METADATA_BLOCKIO_LATENCY_RUNTIME,
&METADATA_BLOCKIO_LATENCY_RUNTIME_HISTOGRAM,
)
.build();

if bpf.is_err() {
return;
}

runtime.spawn(async move {
let mut sampler = AsyncBpfSampler::new(bpf.unwrap(), config.async_interval(NAME));

loop {
if sampler.is_finished() {
return;
}

sampler.sample().await;
}
});
}

impl GetMap for ModSkel<'_> {
fn map(&self, name: &str) -> &libbpf_rs::Map {
Expand All @@ -29,84 +60,19 @@ impl GetMap for ModSkel<'_> {
}
}

/// Collects Scheduler Runqueue Latency stats using BPF and traces:
/// * `block_rq_insert`
/// * `block_rq_issue`
/// * `block_rq_complete`
///
/// And produces these stats:
/// * `blockio/latency`
/// * `blockio/size`
pub struct BlockIOLatency {
bpf: Bpf<ModSkel<'static>>,
interval: Interval,
}

impl BlockIOLatency {
pub fn new(config: Arc<Config>) -> Result<Self, ()> {
// check if sampler should be enabled
if !(config.enabled(NAME) && config.bpf(NAME)) {
return Err(());
}

let open_object: &'static mut MaybeUninit<OpenObject> =
Box::leak(Box::new(MaybeUninit::uninit()));

let builder = ModSkelBuilder::default();
let mut skel = builder
.open(open_object)
.map_err(|e| error!("failed to open bpf builder: {e}"))?
.load()
.map_err(|e| error!("failed to load bpf program: {e}"))?;

impl OpenSkelExt for ModSkel<'_> {
fn log_prog_instructions(&self) {
debug!(
"{NAME} block_rq_insert() BPF instruction count: {}",
skel.progs.block_rq_insert.insn_cnt()
self.progs.block_rq_insert.insn_cnt()
);
debug!(
"{NAME} block_rq_issue() BPF instruction count: {}",
skel.progs.block_rq_issue.insn_cnt()
self.progs.block_rq_issue.insn_cnt()
);
debug!(
"{NAME} block_rq_complete() BPF instruction count: {}",
skel.progs.block_rq_complete.insn_cnt()
self.progs.block_rq_complete.insn_cnt()
);

skel.attach()
.map_err(|e| error!("failed to attach bpf program: {e}"))?;

let bpf = BpfBuilder::new(skel)
.distribution("latency", &BLOCKIO_LATENCY)
.build();

let now = Instant::now();

Ok(Self {
bpf,
interval: Interval::new(now, config.interval(NAME)),
})
}

pub fn refresh(&mut self, now: Instant) -> Result<(), ()> {
let elapsed = self.interval.try_wait(now)?;

METADATA_BLOCKIO_LATENCY_COLLECTED_AT.set(UnixInstant::EPOCH.elapsed().as_nanos());

self.bpf.refresh(elapsed);

Ok(())
}
}

impl Sampler for BlockIOLatency {
fn sample(&mut self) {
let now = Instant::now();

if self.refresh(now).is_ok() {
let elapsed = now.elapsed().as_nanos() as u64;

METADATA_BLOCKIO_LATENCY_RUNTIME.add(elapsed);
let _ = METADATA_BLOCKIO_LATENCY_RUNTIME_HISTOGRAM.increment(elapsed);
}
}
}
154 changes: 61 additions & 93 deletions src/samplers/block_io/linux/requests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,71 @@
#[distributed_slice(SAMPLERS)]
fn init(config: Arc<Config>) -> Box<dyn Sampler> {
if let Ok(s) = BlockIORequests::new(config) {
Box::new(s)
} else {
Box::new(Nop {})
}
}
/// Collects BlockIO Request stats using BPF and traces:
/// * `block_rq_complete`
///
/// And produces these stats:
/// * `blockio/*/operations`
/// * `blockio/*/bytes`
/// * `blockio/size`
static NAME: &str = "block_io_requests";

mod bpf {
include!(concat!(env!("OUT_DIR"), "/block_io_requests.bpf.rs"));
}

static NAME: &str = "block_io_requests";

use bpf::*;

use crate::common::bpf::*;
use crate::common::*;
use crate::samplers::block_io::stats::*;
use crate::samplers::block_io::*;
use crate::*;

#[distributed_slice(ASYNC_SAMPLERS)]
fn spawn(config: Arc<Config>, runtime: &Runtime) {
// check if sampler should be enabled
if !(config.enabled(NAME) && config.bpf(NAME)) {
return;
}

let counters = vec![
Counter::new(&BLOCKIO_READ_OPS, Some(&BLOCKIO_READ_OPS_HISTOGRAM)),
Counter::new(&BLOCKIO_WRITE_OPS, Some(&BLOCKIO_WRITE_OPS_HISTOGRAM)),
Counter::new(&BLOCKIO_FLUSH_OPS, Some(&BLOCKIO_FLUSH_OPS_HISTOGRAM)),
Counter::new(&BLOCKIO_DISCARD_OPS, Some(&BLOCKIO_DISCARD_OPS_HISTOGRAM)),
Counter::new(&BLOCKIO_READ_BYTES, Some(&BLOCKIO_READ_BYTES_HISTOGRAM)),
Counter::new(&BLOCKIO_WRITE_BYTES, Some(&BLOCKIO_WRITE_BYTES_HISTOGRAM)),
Counter::new(&BLOCKIO_FLUSH_BYTES, Some(&BLOCKIO_FLUSH_BYTES_HISTOGRAM)),
Counter::new(
&BLOCKIO_DISCARD_BYTES,
Some(&BLOCKIO_DISCARD_BYTES_HISTOGRAM),
),
];

let bpf = AsyncBpfBuilder::new(ModSkelBuilder::default)
.counters("counters", counters)
.distribution("size", &BLOCKIO_SIZE)
.collected_at(&METADATA_BLOCKIO_REQUESTS_COLLECTED_AT)
.runtime(
&METADATA_BLOCKIO_REQUESTS_RUNTIME,
&METADATA_BLOCKIO_REQUESTS_RUNTIME_HISTOGRAM,
)
.build();

if bpf.is_err() {
return;
}

runtime.spawn(async move {
let mut sampler = AsyncBpfSampler::new(bpf.unwrap(), config.async_interval(NAME));

loop {
if sampler.is_finished() {
return;
}

sampler.sample().await;
}
});
}

impl GetMap for ModSkel<'_> {
fn map(&self, name: &str) -> &libbpf_rs::Map {
Expand All @@ -30,90 +77,11 @@ impl GetMap for ModSkel<'_> {
}
}

/// Collects BlockIO stats using BPF and traces:
/// * `block_rq_complete`
///
/// And produces these stats:
/// * `blockio/*/operations`
/// * `blockio/*/bytes`
/// * `blockio/size`
pub struct BlockIORequests {
bpf: Bpf<ModSkel<'static>>,
interval: Interval,
}

impl BlockIORequests {
pub fn new(config: Arc<Config>) -> Result<Self, ()> {
// check if sampler should be enabled
if !(config.enabled(NAME) && config.bpf(NAME)) {
return Err(());
}

let open_object: &'static mut MaybeUninit<OpenObject> =
Box::leak(Box::new(MaybeUninit::uninit()));

let builder = ModSkelBuilder::default();
let mut skel = builder
.open(open_object)
.map_err(|e| error!("failed to open bpf builder: {e}"))?
.load()
.map_err(|e| error!("failed to load bpf program: {e}"))?;

impl OpenSkelExt for ModSkel<'_> {
fn log_prog_instructions(&self) {
debug!(
"{NAME} block_rq_complete() BPF instruction count: {}",
skel.progs.block_rq_complete.insn_cnt()
self.progs.block_rq_complete.insn_cnt()
);

skel.attach()
.map_err(|e| error!("failed to attach bpf program: {e}"))?;

let counters = vec![
Counter::new(&BLOCKIO_READ_OPS, Some(&BLOCKIO_READ_OPS_HISTOGRAM)),
Counter::new(&BLOCKIO_WRITE_OPS, Some(&BLOCKIO_WRITE_OPS_HISTOGRAM)),
Counter::new(&BLOCKIO_FLUSH_OPS, Some(&BLOCKIO_FLUSH_OPS_HISTOGRAM)),
Counter::new(&BLOCKIO_DISCARD_OPS, Some(&BLOCKIO_DISCARD_OPS_HISTOGRAM)),
Counter::new(&BLOCKIO_READ_BYTES, Some(&BLOCKIO_READ_BYTES_HISTOGRAM)),
Counter::new(&BLOCKIO_WRITE_BYTES, Some(&BLOCKIO_WRITE_BYTES_HISTOGRAM)),
Counter::new(&BLOCKIO_FLUSH_BYTES, Some(&BLOCKIO_FLUSH_BYTES_HISTOGRAM)),
Counter::new(
&BLOCKIO_DISCARD_BYTES,
Some(&BLOCKIO_DISCARD_BYTES_HISTOGRAM),
),
];

let bpf = BpfBuilder::new(skel)
.counters("counters", counters)
.distribution("size", &BLOCKIO_SIZE)
.build();

let now = Instant::now();

Ok(Self {
bpf,
interval: Interval::new(now, config.interval(NAME)),
})
}

pub fn refresh(&mut self, now: Instant) -> Result<(), ()> {
let elapsed = self.interval.try_wait(now)?;

METADATA_BLOCKIO_REQUESTS_COLLECTED_AT.set(UnixInstant::EPOCH.elapsed().as_nanos());

self.bpf.refresh(elapsed);

Ok(())
}
}

impl Sampler for BlockIORequests {
fn sample(&mut self) {
let now = Instant::now();

if self.refresh(now).is_ok() {
let elapsed = now.elapsed().as_nanos() as u64;

METADATA_BLOCKIO_REQUESTS_RUNTIME.add(elapsed);
let _ = METADATA_BLOCKIO_REQUESTS_RUNTIME_HISTOGRAM.increment(elapsed);
}
}
}

0 comments on commit 84d3dc4

Please sign in to comment.