Skip to content

Commit

Permalink
convert tcp connection state sampler to async (iopsystems#330)
Browse files Browse the repository at this point in the history
Converts the tcp connection state sampler to async
  • Loading branch information
brayniac authored Sep 10, 2024
1 parent 2e619e8 commit 23d3545
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 31 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2021"
description = "High resolution systems performance telemetry agent"

[workspace.package]
version = "3.18.2-alpha.10"
version = "3.18.2-alpha.11"
license = "MIT OR Apache-2.0"

[dependencies]
Expand Down
59 changes: 31 additions & 28 deletions src/samplers/tcp/linux/connection_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,24 @@ use crate::common::*;
use crate::samplers::tcp::stats::*;
use crate::samplers::tcp::*;
use metriken::Gauge;
use std::fs::File;
use std::io::{Read, Seek};

#[distributed_slice(SAMPLERS)]
fn init(config: Arc<Config>) -> Box<dyn Sampler> {
if let Ok(s) = ConnectionState::new(config) {
Box::new(s)
} else {
Box::new(Nop {})
}
}
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncSeekExt};

const NAME: &str = "tcp_connection_state";

#[distributed_slice(ASYNC_SAMPLERS)]
fn spawn(config: Arc<Config>, runtime: &Runtime) {
runtime.spawn(async {
if let Ok(mut s) = ConnectionState::new(config) {
loop {
s.sample().await;
}
}
});
}

pub struct ConnectionState {
interval: Interval,
interval: AsyncInterval,
files: Vec<File>,
gauges: Vec<(&'static Lazy<Gauge>, i64)>,
}
Expand All @@ -44,13 +46,17 @@ impl ConnectionState {
(&TCP_CONN_STATE_NEW_SYN_RECV, 0),
];

let ipv4 = File::open("/proc/net/tcp").map_err(|e| {
error!("Failed to open /proc/net/tcp: {e}");
});
let ipv4 = std::fs::File::open("/proc/net/tcp")
.map(|f| File::from_std(f))
.map_err(|e| {
error!("Failed to open /proc/net/tcp: {e}");
});

let ipv6 = File::open("/proc/net/tcp6").map_err(|e| {
error!("Failed to open /proc/net/tcp6: {e}");
});
let ipv6 = std::fs::File::open("/proc/net/tcp6")
.map(|f| File::from_std(f))
.map_err(|e| {
error!("Failed to open /proc/net/tcp6: {e}");
});

let mut files: Vec<Result<File, ()>> = vec![ipv4, ipv6];

Expand All @@ -64,18 +70,15 @@ impl ConnectionState {
Ok(Self {
files,
gauges,
interval: Interval::new(Instant::now(), config.interval(NAME)),
interval: config.async_interval(NAME),
})
}
}

impl Sampler for ConnectionState {
fn sample(&mut self) {
let now = Instant::now();

if self.interval.try_wait(now).is_err() {
return;
}
#[async_trait]
impl AsyncSampler for ConnectionState {
async fn sample(&mut self) {
let (now, _elapsed) = self.interval.tick().await;

METADATA_TCP_CONNECTION_STATE_COLLECTED_AT.set(UnixInstant::EPOCH.elapsed().as_nanos());

Expand All @@ -86,9 +89,9 @@ impl Sampler for ConnectionState {

for file in self.files.iter_mut() {
// seek to start to cause reload of content
if file.rewind().is_ok() {
if file.rewind().await.is_ok() {
let mut data = String::new();
if file.read_to_string(&mut data).is_err() {
if file.read_to_string(&mut data).await.is_err() {
error!("error reading /proc/net/tcp");
return;
}
Expand Down

0 comments on commit 23d3545

Please sign in to comment.