Skip to content

Commit

Permalink
fix(socket source): Avoid creating unix sockets too early (vectordotd…
Browse files Browse the repository at this point in the history
…ev#13021)

* fix(socket source): Avoid creating unix sockets too early

`vector validate` was creating the unix socket because it constructs all
components (when `--no-environment` is not passed).

This was a regression from vectordotdev#12115

Ideally we'd do something more sophisticated here in the future, but the
pattern for this sort of failure is simply to `panic` to cause Vector to
shutdown (see the `bind` call).

Fixes: vectordotdev#13018

Signed-off-by: Jesse Szwedko <[email protected]>
  • Loading branch information
jszwedko authored Jun 8, 2022
1 parent c3c2392 commit fe98d71
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 14 deletions.
46 changes: 40 additions & 6 deletions src/sources/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,9 @@ mod test {
#[cfg(unix)]
use {
super::{unix::UnixConfig, Mode},
crate::test_util::wait_for,
futures::{SinkExt, Stream},
std::future::ready,
std::os::unix::fs::PermissionsExt,
std::path::PathBuf,
tokio::{
Expand Down Expand Up @@ -1113,9 +1115,25 @@ mod test {
.unwrap();
tokio::spawn(server);

let meta = std::fs::metadata(in_path).unwrap();
// S_IFSOCK 0140000 socket
assert_eq!(0o140555, meta.permissions().mode());
wait_for(|| {
match std::fs::metadata(&in_path) {
Ok(meta) => {
match meta.permissions().mode() {
// S_IFSOCK 0140000 socket
0o140555 => ready(true),
perm => {
println!("socket has different permissions: {:?}", perm);
ready(false)
}
}
}
Err(_) => {
println!("socket doesn't exist yet");
ready(false)
}
}
})
.await;
}

////////////// UNIX STREAM TESTS //////////////
Expand Down Expand Up @@ -1219,8 +1237,24 @@ mod test {
.unwrap();
tokio::spawn(server);

let meta = std::fs::metadata(in_path).unwrap();
// S_IFSOCK 0140000 socket
assert_eq!(0o140421, meta.permissions().mode());
wait_for(|| {
match std::fs::metadata(&in_path) {
Ok(meta) => {
match meta.permissions().mode() {
// S_IFSOCK 0140000 socket
0o140421 => ready(true),
perm => {
println!("socket has different permissions: {:?}", perm);
ready(false)
}
}
}
Err(_) => {
println!("socket doesn't exist yet");
ready(false)
}
}
})
.await;
}
}
9 changes: 5 additions & 4 deletions src/sources/util/unix_datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ pub fn build_unix_datagram_source(
shutdown: ShutdownSignal,
out: SourceSender,
) -> crate::Result<Source> {
let socket = UnixDatagram::bind(&listen_path).expect("Failed to bind to datagram socket");
info!(message = "Listening.", path = ?listen_path, r#type = "unix_datagram");
Ok(Box::pin(async move {
let socket = UnixDatagram::bind(&listen_path).expect("Failed to bind to datagram socket");
info!(message = "Listening.", path = ?listen_path, r#type = "unix_datagram");

change_socket_permissions(&listen_path, socket_file_mode)?;
change_socket_permissions(&listen_path, socket_file_mode)
.expect("Failed to set socket permissions");

Ok(Box::pin(async move {
let result = listen(socket, max_length, decoder, shutdown, handle_events, out).await;

// Delete socket file.
Expand Down
11 changes: 7 additions & 4 deletions src/sources/util/unix_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ pub fn build_unix_stream_source(
shutdown: ShutdownSignal,
out: SourceSender,
) -> crate::Result<Source> {
let listener = UnixListener::bind(&listen_path).expect("Failed to bind to listener socket");
info!(message = "Listening.", path = ?listen_path, r#type = "unix");
dbg!("hello");
Ok(Box::pin(async move {
dbg!("creating");
let listener = UnixListener::bind(&listen_path).expect("Failed to bind to listener socket");
info!(message = "Listening.", path = ?listen_path, r#type = "unix");

change_socket_permissions(&listen_path, socket_file_mode)?;
change_socket_permissions(&listen_path, socket_file_mode)
.expect("Failed to set socket permssions");

Ok(Box::pin(async move {
let connection_open = OpenGauge::new();
let stream = UnixListenerStream::new(listener).take_until(shutdown.clone());
tokio::pin!(stream);
Expand Down

0 comments on commit fe98d71

Please sign in to comment.