Skip to content

Commit

Permalink
Since I'm committing to lockness, bring back the Arc
Browse files Browse the repository at this point in the history
The whole point of removing the Arc was that we had our own atomic and could therefore replicate the same behavior, but if we're using lockness, we'll be able to send errors from the drop impl.

Signed-off-by: Alex Saveau <[email protected]>
  • Loading branch information
SUPERCILEX committed Nov 23, 2022
1 parent d551596 commit 0dade84
Showing 1 changed file with 57 additions and 104 deletions.
161 changes: 57 additions & 104 deletions fuc_engine/src/ops/remove.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ use std::{
ffi::{CStr, CString, OsString},
fmt::Debug,
fs, io,
mem::ManuallyDrop,
num::NonZeroUsize,
os::unix::ffi::OsStringExt,
path::{Path, MAIN_SEPARATOR},
ptr::NonNull,
sync::atomic::{AtomicIsize, Ordering},
sync::Arc,
thread,
};

Expand Down Expand Up @@ -103,7 +101,6 @@ async fn run_deletion_scheduler<'a, F: IntoIterator<Item = Cow<'a, Path>>>(
let node = TreeNode {
path: CString::new(OsString::from(file.into_owned()).into_vec()).unwrap(),
parent: None,
remaining_children: AtomicIsize::new(0),
};
let tx = tx.clone();
move || delete_dir(node, &tx)
Expand All @@ -130,108 +127,57 @@ fn delete_dir(
node: TreeNode,
tasks: &UnboundedSender<JoinHandle<Result<(), Error>>>,
) -> Result<(), Error> {
// TODO don't always allocate this.
// TODO fix memory leaks on errors
let node = Box::leak(Box::new(node));

{
thread_local! {
static BUF: UnsafeCell<Vec<u8>> = UnsafeCell::new(Vec::with_capacity(8192));
}

let mut children = 0;

BUF.with(|buf| {
let dir = open(
node.path.as_c_str(),
OFlag::O_RDONLY | OFlag::O_DIRECTORY,
Mode::empty(),
)
.map_io_err(|| format!("Failed to open directory: {:?}", node.path))?;

for file in RawDir::new(&dir, unsafe { &mut *buf.get() }.spare_capacity_mut()) {
const DOT: &CStr = CStr::from_bytes_with_nul(b".\0").ok().unwrap();
const DOT_DOT: &CStr = CStr::from_bytes_with_nul(b"..\0").ok().unwrap();

let file =
file.map_io_err(|| format!("Failed to read directory: {:?}", node.path))?;
if file.name == DOT || file.name == DOT_DOT {
continue;
}

if file.file_type == FileType::Directory {
// TODO fix the error handling with respect to children not getting updated
children += 1;
tasks
.send(task::spawn_blocking({
let node = TreeNode {
path: {
let prefix = node.path.to_bytes();
let name = file.name.to_bytes_with_nul();

let mut path =
Vec::with_capacity(prefix.len() + 1 + name.len());
path.extend_from_slice(prefix);
path.push(MAIN_SEPARATOR as u8);
path.extend_from_slice(name);
unsafe { CString::from_vec_with_nul_unchecked(path) }
},
parent: Some(node.into()),
remaining_children: AtomicIsize::new(0),
};
let tasks = tasks.clone();
move || delete_dir(node, &tasks)
}))
.map_err(|_| Error::Internal)?;
} else {
unlinkat(&dir, file.name, UnlinkatFlags::NoRemoveDir)
.map_io_err(|| format!("Failed to delete file: {file:?}"))?;
}
}
Ok(())
})?;

if children > 0 {
children = children
+ node
.remaining_children
.fetch_add(children, Ordering::Relaxed);
debug_assert!(children >= 0, "Deleted more directories than we have!");
if children > 0 {
return Ok(());
}
}
thread_local! {
static BUF: UnsafeCell<Vec<u8>> = UnsafeCell::new(Vec::with_capacity(8192));
}

let mut next = Some(NonNull::from(node));
while let Some(node) = next {
let node = ManuallyDrop::new(unsafe { Box::from_raw(node.as_ptr()) });

next = node.parent;
unlinkat(AT_FDCWD, node.path.as_c_str(), UnlinkatFlags::RemoveDir)
.map_io_err(|| format!("Failed to delete directory: {:?}", node.path))?;

// We must be the last user of this allocation b/c:
// - If we came from outside the loop, we would have exited above in the
// children check.
// - If we're coming from inside the loop, the remaining_children check operates
// on the parent and would block the next iteration.
ManuallyDrop::into_inner(node);
// TODO don't always allocate this.
let node = Arc::new(node);

BUF.with(|buf| {
let dir = open(
node.path.as_c_str(),
OFlag::O_RDONLY | OFlag::O_DIRECTORY,
Mode::empty(),
)
.map_io_err(|| format!("Failed to open directory: {:?}", node.path))?;

for file in RawDir::new(&dir, unsafe { &mut *buf.get() }.spare_capacity_mut()) {
const DOT: &CStr = CStr::from_bytes_with_nul(b".\0").ok().unwrap();
const DOT_DOT: &CStr = CStr::from_bytes_with_nul(b"..\0").ok().unwrap();

let file = file.map_io_err(|| format!("Failed to read directory: {:?}", node.path))?;
if file.name == DOT || file.name == DOT_DOT {
continue;
}

if let Some(parent) = next {
// TODO using Relaxed here is almost certainly wrong. Do some more research and
// figure out the correct ordering.
if unsafe { parent.as_ref() }
.remaining_children
.fetch_sub(1, Ordering::Relaxed)
!= 1
{
// There are still active children, let the last of them do the cleanup.
break;
if file.file_type == FileType::Directory {
tasks
.send(task::spawn_blocking({
let node = TreeNode {
path: {
let prefix = node.path.to_bytes();
let name = file.name.to_bytes_with_nul();

let mut path = Vec::with_capacity(prefix.len() + 1 + name.len());
path.extend_from_slice(prefix);
path.push(MAIN_SEPARATOR as u8);
path.extend_from_slice(name);
unsafe { CString::from_vec_with_nul_unchecked(path) }
},
parent: Some(node.clone()),
};
let tasks = tasks.clone();
move || delete_dir(node, &tasks)
}))
.map_err(|_| Error::Internal)?;
} else {
unlinkat(&dir, file.name, UnlinkatFlags::NoRemoveDir)
.map_io_err(|| format!("Failed to delete file: {file:?}"))?;
}
}
}

Ok(())
})?;
Ok(())
}

Expand All @@ -256,8 +202,15 @@ impl<T> IoErr<Result<T, Error>> for Result<T, Errno> {

struct TreeNode {
path: CString,
parent: Option<NonNull<TreeNode>>,
remaining_children: AtomicIsize,
// TODO use this to send the done signal when None
parent: Option<Arc<TreeNode>>,
}

unsafe impl Send for TreeNode {}
impl Drop for TreeNode {
fn drop(&mut self) {
// TODO Send this error over lockness
unlinkat(AT_FDCWD, self.path.as_c_str(), UnlinkatFlags::RemoveDir)
.map_io_err(|| format!("Failed to delete directory: {:?}", self.path))
.unwrap();
}
}

0 comments on commit 0dade84

Please sign in to comment.