Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AsyncFd::shutdown #23

Merged
merged 1 commit into from
Jan 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Add AsyncFd::shutdown
  • Loading branch information
Thomasdezeeuw committed Jan 7, 2023
commit 71962d433ab3f2cd7e97f922eb8b898e5b41bfa0
23 changes: 23 additions & 0 deletions src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ impl AsyncFd {
Recv::new(self, buf, flags)
}

/// Shuts down the read, write, or both halves of this connection.
pub const fn shutdown<'fd>(&'fd self, how: std::net::Shutdown) -> Shutdown<'fd> {
let how = match how {
std::net::Shutdown::Read => libc::SHUT_RD,
std::net::Shutdown::Write => libc::SHUT_WR,
std::net::Shutdown::Both => libc::SHUT_RDWR,
};
Shutdown::new(self, how)
}

/// Accept a new socket stream ([`AsyncFd`]).
///
/// If an accepted stream is returned, the remote address of the peer is
Expand Down Expand Up @@ -198,6 +208,19 @@ op_future! {
},
}

// Shutdown.
op_future! {
fn AsyncFd::shutdown -> (),
struct Shutdown<'fd> {
// Doesn't need any fields.
},
setup_state: flags: libc::c_int,
setup: |submission, fd, (), how| unsafe {
submission.shutdown(fd.fd, how);
},
map_result: |n| Ok(debug_assert!(n == 0)),
}

// Accept.
op_future! {
fn AsyncFd::accept -> (AsyncFd, SocketAddr),
Expand Down
6 changes: 6 additions & 0 deletions src/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,12 @@ impl Submission {
self.inner.len = len;
}

pub(crate) unsafe fn shutdown(&mut self, fd: RawFd, how: libc::c_int) {
self.inner.opcode = libc::IORING_OP_SHUTDOWN as u8;
self.inner.fd = fd;
self.inner.len = how as u32;
}

/// Create a accept submission starting.
///
/// Avaialable since Linux kernel 5.5.
Expand Down
34 changes: 33 additions & 1 deletion tests/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use std::io::{Read, Write};
use std::mem;
use std::net::{SocketAddr, SocketAddrV4, TcpListener, TcpStream};
use std::net::{Shutdown, SocketAddr, SocketAddrV4, TcpListener, TcpStream};
use std::pin::Pin;

use a10::io::ReadBufPool;
Expand Down Expand Up @@ -414,6 +414,38 @@ fn send_zc_extractor() {
assert_eq!(&buf[0..n], DATA2);
}

#[test]
fn shutdown() {
let sq = test_queue();
let waker = Waker::new();

// Bind a socket.
let listener = TcpListener::bind("127.0.0.1:0").expect("failed to bind listener");
let local_addr = match listener.local_addr().unwrap() {
SocketAddr::V4(addr) => addr,
_ => unreachable!(),
};

// Create a socket and connect the listener.
let stream = waker.block_on(tcp_ipv4_socket(sq));
let addr = addr_storage(&local_addr);
let addr_len = mem::size_of::<libc::sockaddr_in>() as libc::socklen_t;
let mut connect_future = stream.connect(addr, addr_len);
// Poll the future to schedule the operation.
assert!(poll_nop(Pin::new(&mut connect_future)).is_pending());

let (mut client, _) = listener.accept().expect("failed to accept connection");

waker.block_on(connect_future).expect("failed to connect");

waker
.block_on(stream.shutdown(Shutdown::Write))
.expect("failed to shutdown");
let mut buf = vec![0; 10];
let n = client.read(&mut buf).expect("failed to send data");
assert_eq!(n, 0);
}

fn addr_storage(addres: &SocketAddrV4) -> libc::sockaddr_storage {
// SAFETY: zeroed out `sockaddr_storage` is valid.
let mut addr: libc::sockaddr_storage = unsafe { mem::zeroed() };
Expand Down