Skip to content

Commit

Permalink
Eagerly clean running nodes rather than interrupting them. (pantsbuil…
Browse files Browse the repository at this point in the history
…d#18855)

As reported in pantsbuild#10705: our current strategy for dirtying running nodes
eagerly interrupts them, even if cleaning their dependencies might show
that it is not necessary. In practice, this means that almost any file
change in the repository will cause a run of pants to restart (most) of
what it was doing.

This change moves to dirtying running nodes (in `Entry::dirty`) by
notifying them that their dependencies might have changed (via
`AsyncValueSender.interrupt`, which is received in
`spawn_node_execution`), and allowing them to attempt to clean the
dependencies that they have observed so far. If the node determines that
the dependencies it has observed so far are unchanged, it continues
running without being interrupted: otherwise, it is canceled as it would
have been before this change.

Since dependency generations used to be collected lazily in `complete`
(meaning that they weren't available to be used to clean a node _while_
it was running), pantsbuild#18835 made `graph::Context` concrete, and this change
begins recording dependency generations on the concrete `Context` as
they are requested by a node.

This change fixes pantsbuild#10705, meaning that it should be possible to make
edits to unrelated files in more cases, without causing running
processes (e.g. those spawned by `test` or `run`) to restart. For
example, try adding comments to relevant `BUILD` files while a test is
running, and observe that it does not restart unless the edit impacts
the inputs to the test run.
  • Loading branch information
stuhood authored May 1, 2023
1 parent a2a174a commit 0c673b9
Show file tree
Hide file tree
Showing 8 changed files with 442 additions and 415 deletions.
57 changes: 27 additions & 30 deletions src/rust/engine/async_value/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
// Arc<Mutex> can be more clear than needing to grok Orderings:
#![allow(clippy::mutex_atomic)]

use std::pin::pin;
use std::sync::{Arc, Weak};

use tokio::sync::{oneshot, watch};
use tokio::sync::{mpsc, watch};

///
/// A cancellable value computed by one sender, and broadcast to multiple receivers.
Expand All @@ -37,27 +38,32 @@ use tokio::sync::{oneshot, watch};
/// 2. implicitly if all receivers go away
///
/// NB: This is currently a `tokio::sync::watch` (which supports the second case), plus a
/// separate cancellation signal via `tokio::sync::oneshot` (to support the first case).
/// separate channel to support the first case, and to support other types of feedback to the
/// process producing the value.
///
#[derive(Debug)]
pub struct AsyncValue<T: Clone + Send + Sync + 'static> {
pub struct AsyncValue<T: Clone + Send + Sync + 'static, I> {
item_receiver: Weak<watch::Receiver<Option<T>>>,
abort_sender: Option<oneshot::Sender<T>>,
interrupt_sender: mpsc::UnboundedSender<I>,
}

impl<T: Clone + Send + Sync + 'static> AsyncValue<T> {
pub fn new() -> (AsyncValue<T>, AsyncValueSender<T>, AsyncValueReceiver<T>) {
let (abort_sender, abort_receiver) = oneshot::channel();
impl<T: Clone + Send + Sync + 'static, I> AsyncValue<T, I> {
pub fn new() -> (
AsyncValue<T, I>,
AsyncValueSender<T, I>,
AsyncValueReceiver<T>,
) {
let (interrupt_sender, interrupt_receiver) = mpsc::unbounded_channel();
let (item_sender, item_receiver) = watch::channel(None);
let item_receiver = Arc::new(item_receiver);
(
AsyncValue {
item_receiver: Arc::downgrade(&item_receiver),
abort_sender: Some(abort_sender),
interrupt_sender,
},
AsyncValueSender {
item_sender,
abort_receiver,
interrupt_receiver,
},
AsyncValueReceiver { item_receiver },
)
Expand All @@ -74,12 +80,11 @@ impl<T: Clone + Send + Sync + 'static> AsyncValue<T> {
.map(|item_receiver| AsyncValueReceiver { item_receiver })
}

pub fn try_abort(&mut self, t: T) -> Result<(), T> {
if let Some(abort_sender) = self.abort_sender.take() {
abort_sender.send(t)
} else {
Ok(())
}
pub fn try_interrupt(&mut self, i: I) -> Result<(), I> {
self
.interrupt_sender
.send(i)
.map_err(|send_error| send_error.0)
}
}

Expand Down Expand Up @@ -109,29 +114,21 @@ impl<T: Clone + Send + Sync + 'static> AsyncValueReceiver<T> {
}
}

pub struct AsyncValueSender<T: Clone + Send + Sync + 'static> {
pub struct AsyncValueSender<T: Clone + Send + Sync + 'static, I> {
item_sender: watch::Sender<Option<T>>,
abort_receiver: oneshot::Receiver<T>,
interrupt_receiver: mpsc::UnboundedReceiver<I>,
}

impl<T: Clone + Send + Sync + 'static> AsyncValueSender<T> {
impl<T: Clone + Send + Sync + 'static, I> AsyncValueSender<T, I> {
pub fn send(self, item: T) {
let _ = self.item_sender.send(Some(item));
}

pub async fn aborted(&mut self) -> Option<T> {
pub async fn interrupted(&mut self) -> Option<I> {
let mut recv = pin!(self.interrupt_receiver.recv());
tokio::select! {
res = &mut self.abort_receiver => {
match res {
Ok(res) => {
// Aborted with a value.
Some(res)
},
Err(_) => {
// Was dropped.
None
},
}
res = &mut recv => {
res
}
_ = self.item_sender.closed() => { None }
}
Expand Down
20 changes: 10 additions & 10 deletions src/rust/engine/async_value/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ use tokio::time::sleep;

#[tokio::test]
async fn send() {
let (_value, sender, receiver) = AsyncValue::new();
let (_value, sender, receiver) = AsyncValue::<_, ()>::new();
let _send_task = tokio::spawn(async move { sender.send(42) });
assert_eq!(Some(42), receiver.recv().await);
}

#[tokio::test]
async fn cancel_explicit() {
let (value, mut sender, receiver) = AsyncValue::<()>::new();
let (value, mut sender, receiver) = AsyncValue::<(), ()>::new();

// A task that will never do any meaningful work, and just wait to be canceled.
let _send_task = tokio::spawn(async move { sender.aborted().await });
let _send_task = tokio::spawn(async move { sender.interrupted().await });

// Ensure that a value is not received.
tokio::select! {
Expand All @@ -33,10 +33,10 @@ async fn cancel_explicit() {

#[tokio::test]
async fn cancel_implicit() {
let (value, mut sender, receiver) = AsyncValue::<()>::new();
let (value, mut sender, receiver) = AsyncValue::<(), ()>::new();

// A task that will never do any meaningful work, and just wait to be canceled.
let send_task = tokio::spawn(async move { sender.aborted().await });
let send_task = tokio::spawn(async move { sender.interrupted().await });

// Ensure that a value is not received.
tokio::select! {
Expand All @@ -52,20 +52,20 @@ async fn cancel_implicit() {
}

#[tokio::test]
async fn abort_explicit() {
let (mut value, mut sender, receiver) = AsyncValue::<()>::new();
async fn interrupt_explicit() {
let (mut value, mut sender, receiver) = AsyncValue::<(), ()>::new();

// A task that will never do any meaningful work, and just wait to be canceled.
let send_task = tokio::spawn(async move { sender.aborted().await });
let send_task = tokio::spawn(async move { sender.interrupted().await });

// Ensure that a value is not received.
tokio::select! {
_ = sleep(Duration::from_secs(1)) => {},
_ = receiver.recv() => { panic!("Should have continued to wait.") }
}

// Explicitly abort the task, and confirm that it exits and cancels the work.
value.try_abort(()).unwrap();
// Explicitly interrupt the task, and confirm that it exits and cancels the work.
value.try_interrupt(()).unwrap();
assert_eq!(Some(()), send_task.await.unwrap());
assert_eq!(None, receiver.recv().await);
}
74 changes: 65 additions & 9 deletions src/rust/engine/graph/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use std::ops::Deref;
use std::sync::atomic::{self, AtomicU32, AtomicUsize};
use std::sync::Arc;

use parking_lot::Mutex;
use workunit_store::RunId;

use crate::node::{CompoundNode, EntryId, Node};
use crate::entry::Generation;
use crate::node::{CompoundNode, EntryId, Node, NodeError};
use crate::Graph;

struct InnerContext<N: Node + Send> {
Expand All @@ -17,6 +19,12 @@ struct InnerContext<N: Node + Send> {
graph: Graph<N>,
}

#[derive(Clone, Default)]
pub(crate) struct DepState {
pub(crate) generations: Vec<(EntryId, Generation)>,
pub(crate) has_uncacheable_deps: bool,
}

///
/// A context passed between running Nodes which is used to request and record dependencies.
///
Expand All @@ -26,13 +34,15 @@ struct InnerContext<N: Node + Send> {
#[derive(Clone)]
pub struct Context<N: Node + Send> {
entry_id: Option<EntryId>,
dep_state: Arc<Mutex<Option<DepState>>>,
inner: Arc<InnerContext<N>>,
}

impl<N: Node + Send> Context<N> {
pub(crate) fn new(graph: Graph<N>, context: N::Context, run_id: RunId) -> Self {
Self {
entry_id: None,
dep_state: Arc::default(),
inner: Arc::new(InnerContext {
context,
run_id: AtomicU32::new(run_id.0),
Expand All @@ -46,17 +56,18 @@ impl<N: Node + Send> Context<N> {
/// Get the future value for the given Node implementation.
///
pub async fn get<CN: CompoundNode<N>>(&self, node: CN) -> Result<CN::Item, N::Error> {
let node_result = self
let (node_result, _generation) = self
.inner
.graph
.get(self.entry_id, self, node.into())
.await?;
Ok(node_result.try_into().unwrap_or_else(|_| {
panic!(
.get_inner(self.entry_id, self, node.into())
.await;

node_result?.try_into().map_err(|_| {
N::Error::generic(format!(
"The CompoundNode implementation for {} was ambiguous.",
std::any::type_name::<CN>()
)
}))
))
})
}

pub fn run_id(&self) -> RunId {
Expand All @@ -82,14 +93,59 @@ impl<N: Node + Send> Context<N> {
&self.inner.stats
}

pub(crate) fn dep_record(
&self,
dep_id: EntryId,
generation: Generation,
uncacheable: bool,
) -> Result<(), N::Error> {
let mut maybe_dep_state = self.dep_state.lock();
if let Some(dep_state) = maybe_dep_state.as_mut() {
dep_state.generations.push((dep_id, generation));
dep_state.has_uncacheable_deps |= uncacheable;
Ok(())
} else {
// This case can occur if a Node has spawned background work which continues to attempt
// to request dependencies in the background.
Err(N::Error::generic(format!(
"Could not request additional dependencies for {:?}: the Node has completed.",
self.entry_id
)))
}
}

///
/// Gets the dependency generations which have been computed for this Node so far. May not be
/// called after `complete` has been called for a node.
///
pub(crate) fn dep_generations_so_far(&self, node: &N) -> Vec<(EntryId, Generation)> {
(*self.dep_state.lock())
.clone()
.unwrap_or_else(|| panic!("Node {node} has already completed."))
.generations
}

///
/// Completes the Context for this EntryId, returning the dependency generations that were
/// recorded while it was running. May only be called once.
///
pub(crate) fn complete(&self, node: &N) -> DepState {
self
.dep_state
.lock()
.take()
.unwrap_or_else(|| panic!("Node {node} was completed multiple times."))
}

///
/// Creates a clone of this Context to be used for a different Node.
///
/// To clone a Context for use by the _same_ Node, `Clone` is used directly.
///
pub(crate) fn clone_for(&self, entry_id: EntryId) -> Self {
Context {
Self {
entry_id: Some(entry_id),
dep_state: Arc::new(Mutex::new(Some(DepState::default()))),
inner: self.inner.clone(),
}
}
Expand Down
Loading

0 comments on commit 0c673b9

Please sign in to comment.