Skip to content

Commit

Permalink
Fix immutable inputs DCL bug. (pantsbuild#14016)
Browse files Browse the repository at this point in the history
Previously we used the double-checked-cell-async crate (See:
https://github.com/chrislearn/double-checked-cell-async/blob/46cd3b04eddddbe279282143fe8a936d5854588c/src/lib.rs#L228-L260),
which performed the second check of the double check lock with relaxed
ordering. The relevant code is snipped below and annotated:

```rust
pub struct DoubleCheckedCell<T> {
    value: UnsafeCell<Option<T>>,
    initialized: AtomicBool,
    lock: Mutex<()>,
}

impl<T> DoubleCheckedCell<T> {
    pub fn new() -> DoubleCheckedCell<T> {
        DoubleCheckedCell {
            value: UnsafeCell::new(None),
            initialized: AtomicBool::new(false),
            lock: Mutex::new(()),
        }
    }

    pub async fn get_or_try_init<Fut, E>(&self, init: Fut) -> Result<&T, E>
    where
        Fut: Future<Output = Result<T, E>>
    {
        // 1.) 1st load & check.
        if !self.initialized.load(Ordering::Acquire) {
            // 2.) Lock.
            let _lock = self.lock.lock().await;
            // 3.) 2nd load & check.
            if !self.initialized.load(Ordering::Relaxed) {
                {
                    // 4.) Critical section.
                    let result = init.await?;
                    let value = unsafe { &mut *self.value.get() };
                    value.replace(result);
                }
                // 5.) Store with lock held.
                self.initialized.store(true, Ordering::Release);
            }
        }
        let value = unsafe { &*self.value.get() };
        Ok(unsafe { value.as_ref().unchecked_unwrap() })
    }
}
```

Per the C++11 memory model used by Rust (See:
https://en.cppreference.com/w/cpp/language/memory_model), this would
seem to indicate the second load could be reordered to occur anywhere
after the 1st load with acquire ordering and anywhere before the store
with released ordering. If that second load was reordered to occur
before the lock was acquired, two threads could enter the critical
section in serial and the second thread would try to materialize to
paths that already were created and marked read-only. Switch to the
async-oncecell crate which performs both loads of the double-checked
lock with acquire ordering, ensuring they are not re-ordered with
respect to the interleaved non-atomics code.

Also fixup the materialization process to be atomic. We now cleanup
materialization chroots when materialization fails and only move their
contents to the destination path if the full materialization has
succeeded.

Fixes pantsbuild#13899
  • Loading branch information
jsirois authored Jan 8, 2022
1 parent 30b48b2 commit 1fe755b
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 73 deletions.
41 changes: 12 additions & 29 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/rust/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ cache = { path = "cache" }
concrete_time = { path = "concrete_time" }
crossbeam-channel = "0.4"
derivative = "2.2"
double-checked-cell-async = "2.0"
async-oncecell = "0.2"
either = "1.6"
fnv = "1.0.5"
fs = { path = "fs" }
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/fs/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async-trait = "=0.1.42"
protos = { path = "../../protos" }
bytes = "1.0"
concrete_time = { path = "../../concrete_time" }
double-checked-cell-async = "2.0"
async-oncecell = "0.2"
grpc_util = { path = "../../grpc_util" }
fs = { path = ".." }
futures = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};

use async_oncecell::OnceCell;
use async_trait::async_trait;
use bytes::Bytes;
use double_checked_cell_async::DoubleCheckedCell;
use fs::{default_cache_path, DigestEntry, FileContent, FileEntry, Permissions, RelativePath};
use futures::future::{self, BoxFuture, Either, FutureExt, TryFutureExt};
use grpc_util::prost::MessageExt;
Expand Down Expand Up @@ -235,7 +235,7 @@ impl Store {
upload_timeout: Duration,
rpc_retries: usize,
rpc_concurrency_limit: usize,
capabilities_cell_opt: Option<Arc<DoubleCheckedCell<ServerCapabilities>>>,
capabilities_cell_opt: Option<Arc<OnceCell<ServerCapabilities>>>,
batch_api_size_limit: usize,
) -> Result<Store, String> {
Ok(Store {
Expand Down
9 changes: 4 additions & 5 deletions src/rust/engine/fs/store/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::ops::Range;
use std::sync::Arc;
use std::time::{Duration, Instant};

use async_oncecell::OnceCell;
use bytes::{Bytes, BytesMut};
use double_checked_cell_async::DoubleCheckedCell;
use futures::Future;
use futures::StreamExt;
use grpc_util::retry::{retry_call, status_is_retryable};
Expand All @@ -32,7 +32,7 @@ pub struct ByteStore {
_rpc_attempts: usize,
byte_stream_client: Arc<ByteStreamClient<LayeredService>>,
cas_client: Arc<ContentAddressableStorageClient<LayeredService>>,
capabilities_cell: Arc<DoubleCheckedCell<ServerCapabilities>>,
capabilities_cell: Arc<OnceCell<ServerCapabilities>>,
capabilities_client: Arc<CapabilitiesClient<LayeredService>>,
batch_api_size_limit: usize,
}
Expand Down Expand Up @@ -76,7 +76,7 @@ impl ByteStore {
upload_timeout: Duration,
rpc_retries: usize,
rpc_concurrency_limit: usize,
capabilities_cell_opt: Option<Arc<DoubleCheckedCell<ServerCapabilities>>>,
capabilities_cell_opt: Option<Arc<OnceCell<ServerCapabilities>>>,
batch_api_size_limit: usize,
) -> Result<ByteStore, String> {
let tls_client_config = if cas_address.starts_with("https://") {
Expand Down Expand Up @@ -107,8 +107,7 @@ impl ByteStore {
_rpc_attempts: rpc_retries + 1,
byte_stream_client,
cas_client,
capabilities_cell: capabilities_cell_opt
.unwrap_or_else(|| Arc::new(DoubleCheckedCell::new())),
capabilities_cell: capabilities_cell_opt.unwrap_or_else(|| Arc::new(OnceCell::new())),
capabilities_client,
batch_api_size_limit,
})
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/process_execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ parking_lot = "0.11"
itertools = "0.10"
serde = "1.0.104"
bincode = "1.2.1"
double-checked-cell-async = "2.0"
async-oncecell = "0.2"
rand = "0.8"
prost = "0.9"
prost-types = "0.9"
Expand Down
130 changes: 105 additions & 25 deletions src/rust/engine/process_execution/src/immutable_inputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,78 @@ use std::collections::{BTreeMap, HashMap};
use std::path::{Path, PathBuf};
use std::sync::Arc;

use double_checked_cell_async::DoubleCheckedCell;
use async_oncecell::OnceCell;
use fs::{Permissions, RelativePath};
use futures::TryFutureExt;
use hashing::Digest;
use parking_lot::Mutex;
use store::Store;
use tempfile::TempDir;

use crate::WorkdirSymlink;

async fn rename_readonly_directory(
src: impl AsRef<Path>,
dest: impl AsRef<Path>,
map_rename_err: impl Fn(std::io::Error) -> String,
) -> Result<(), String> {
// If you try to rename a read-only directory (mode 0o555) under masOS you get permission
// denied; so we temporarily make the directory writeable by the current process in order to be
// able to rename it without error.
#[cfg(target_os = "macos")]
{
use std::os::unix::fs::PermissionsExt;
tokio::fs::set_permissions(&src, std::fs::Permissions::from_mode(0o755))
.map_err(|e| {
format!(
"Failed to prepare {src:?} perms for a rename to {dest:?}: {err}",
src = src.as_ref(),
dest = dest.as_ref(),
err = e
)
})
.await?;
}
tokio::fs::rename(&src, &dest)
.map_err(map_rename_err)
.await?;
#[cfg(target_os = "macos")]
{
use std::os::unix::fs::PermissionsExt;
tokio::fs::set_permissions(&dest, std::fs::Permissions::from_mode(0o555))
.map_err(|e| {
format!(
"Failed to seal {dest:?} as read-only: {err}",
dest = dest.as_ref(),
err = e
)
})
.await?;
}
Ok(())
}

/// Holds Digests materialized into a temporary directory, for symlinking into local sandboxes.
pub struct ImmutableInputs {
store: Store,
// The TempDir that digests are materialized in.
workdir: TempDir,
// A map from Digest to the location it has been materialized at. The DoubleCheckedCell allows
// for cooperation between threads attempting to create Digests.
contents: Mutex<HashMap<Digest, Arc<DoubleCheckedCell<PathBuf>>>>,
contents: Mutex<HashMap<Digest, Arc<OnceCell<PathBuf>>>>,
}

impl ImmutableInputs {
pub fn new(store: Store, base: &Path) -> Result<Self, String> {
let workdir = TempDir::new_in(base).map_err(|e| {
format!(
"Failed to create temporary directory for immutable inputs: {}",
e
)
})?;
let workdir = tempfile::Builder::new()
.prefix("immutable_inputs")
.tempdir_in(base)
.map_err(|e| {
format!(
"Failed to create temporary directory for immutable inputs: {}",
e
)
})?;
Ok(Self {
store,
workdir,
Expand All @@ -39,28 +84,63 @@ impl ImmutableInputs {
/// Returns an absolute Path to immutably consume the given Digest from.
async fn path(&self, digest: Digest) -> Result<PathBuf, String> {
let cell = self.contents.lock().entry(digest).or_default().clone();
let value: Result<_, String> = cell
let value = cell
.get_or_try_init(async {
let digest_str = digest.hash.to_hex();

let path = self.workdir.path().join(digest_str);
if let Ok(meta) = tokio::fs::metadata(&path).await {
// TODO: If this error triggers, it indicates that we have previously checked out this
// directory, either due to a race condition, or due to a previous failure to
// materialize. See https://github.com/pantsbuild/pants/issues/13899
return Err(format!(
"Destination for immutable digest already exists: {:?}",
meta
));
}
let chroot = TempDir::new_in(self.workdir.path()).map_err(|e| {
format!(
"Failed to create a temporary directory for materialization of immutable input \
digest {:?}: {}",
digest, e
)
})?;
self
.store
.materialize_directory(path.clone(), digest, Permissions::ReadOnly)
.materialize_directory(chroot.path().to_path_buf(), digest, Permissions::ReadOnly)
.await?;
Ok(path)
let src = chroot.into_path();
let dest = self.workdir.path().join(digest.hash.to_hex());
rename_readonly_directory(&src, &dest, |e| {
// TODO(John Sirois): This diagnostic is over the top and should be trimmed down once
// we have confidence in the fix. We've had issues with permission denied errors in
// the past though; so all this information is in here to root-cause the issue should
// it persist.
let maybe_collision_metadata = std::fs::metadata(&dest);
let maybe_unwriteable_parent_metadata = dest
.parent()
.ok_or(format!(
"The destination directory for digest {:?} of {:?} has no parent dir.",
&digest, &dest
))
.map(|p| std::fs::metadata(&p));
format!(
"Failed to move materialized immutable input for {digest:?} from {src:?} to \
{dest:?}: {err}\n\
Parent directory (un-writeable parent dir?) metadata: {parent_metadata:?}\n\
Destination directory (collision?) metadata: {existing_metadata:?}\n\
Current immutable check outs (~dup fingerprints / differing sizes?): {contents:?}
",
digest = digest,
src = src,
dest = &dest,
// If the parent dir is un-writeable, which is unexpected, we will get permission
// denied on the rename.
parent_metadata = maybe_unwriteable_parent_metadata,
// If the destination directory already exists then we have a leaky locking regime or
// broken materialization failure cleanup.
existing_metadata = maybe_collision_metadata,
// Two digests that have different size_bytes but the same fingerprint is a bug in its
// own right, but would lead to making the same `digest_str` accessible via two
// different Digest keys here; so display all the keys and values to be able to spot
// this should it occur.
contents = self.contents.lock(),
err = e
)
})
.await?;
Ok::<_, String>(dest)
})
.await;
Ok(value?.clone())
.await?;
Ok(value.clone())
}

///
Expand Down
Loading

0 comments on commit 1fe755b

Please sign in to comment.