Skip to content

Commit

Permalink
Errors in --loop wait for changes rather than re-running immediately (
Browse files Browse the repository at this point in the history
pantsbuild#18367)

As reported in pantsbuild#16727, errors occurring in a `--loop` currently retry
indefinitely, because we do not preserve the `Generation` value of a
Node in the case where an error is raised.

This change adjusts the signatures of `Graph` methods to preserve the
`Generation` value in case of errors, and additionally allows "pollers"
to be added to errored (`NotStarted`) `Node`s.

This fix is slightly different from what was [suggested on the
issue](pantsbuild#16727 (comment)):
it does _not_ reintroduce memoization of errors, as that would open a
can of worms around "which" errors are safe to memoize. Instead, only
the method signatures involved in `poll`ing are affected, essentially by
transposing `Result<(Value, Generation), ...>` to `(Result<Value, ..>,
Generation)`.

Fixes pantsbuild#16727.
  • Loading branch information
stuhood authored Feb 26, 2023
1 parent 8477460 commit eb9eda8
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 75 deletions.
98 changes: 71 additions & 27 deletions src/rust/engine/graph/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl<N: Node> AsRef<N::Item> for EntryResult<N> {
}
}

pub type NodeResult<N> = Result<(<N as Node>::Item, Generation), <N as Node>::Error>;
pub type NodeResult<N> = (Result<<N as Node>::Item, <N as Node>::Error>, Generation);

#[derive(Debug)]
pub enum EntryState<N: Node> {
Expand All @@ -170,6 +170,7 @@ pub enum EntryState<N: Node> {
NotStarted {
run_token: RunToken,
generation: Generation,
pollers: Vec<oneshot::Sender<()>>,
previous_result: Option<EntryResult<N>>,
},
// A node that is running. A running node that has been marked dirty re-runs rather than
Expand Down Expand Up @@ -206,6 +207,7 @@ impl<N: Node> EntryState<N> {
EntryState::NotStarted {
run_token: RunToken::initial(),
generation: Generation::initial(),
pollers: Vec::new(),
previous_result: None,
}
}
Expand Down Expand Up @@ -259,27 +261,38 @@ impl<N: Node> Entry<N> {
pub async fn poll(&self, context: &N::Context, last_seen_generation: Generation) {
let recv = {
let mut state = self.state.lock();
match *state {
let pollers = match *state {
EntryState::Completed {
ref result,
generation,
ref mut pollers,
..
} if generation == last_seen_generation && result.poll_should_wait(context) => {
// The Node is currently clean with the observed generation: add a poller on the
// Completed node that will be notified when it is dirtied or dropped. If the Node moves
// to another state, the received will be notified that the sender was dropped, and it
// will be converted into a successful result.
let (send, recv) = oneshot::channel();
pollers.push(send);
recv
// The Node is clean in this context, and the last seen generation matches.
pollers
}
EntryState::NotStarted {
generation,
ref mut pollers,
..
} if generation == last_seen_generation => {
// The Node has not yet been started, but the last seen generation matches. This
// means that an error occurred on a previous run of the node, but it has already been
// observed by the caller.
pollers
}
_ => {
// The generation didn't match or the Node wasn't Completed. It should be requested
// without waiting.
return;
}
}
};

// Add a poller on the node that will be notified when it is dirtied or dropped. If the Node
// moves to another state, the receiver will be notified that the sender was dropped.
let (send, recv) = oneshot::channel();
pollers.push(send);
recv
};
// Wait outside of the lock.
let _ = recv.await;
Expand Down Expand Up @@ -308,7 +321,7 @@ impl<N: Node> Entry<N> {
generation: Generation,
previous_dep_generations: Option<Vec<Generation>>,
previous_result: Option<EntryResult<N>>,
) -> (EntryState<N>, AsyncValueReceiver<NodeResult<N>>) {
) -> (EntryState<N>, AsyncValueReceiver<NodeResult<N>>, Generation) {
// Increment the RunToken to uniquely identify this work.
let run_token = run_token.next();
let context = context_factory.clone_for(entry_id);
Expand Down Expand Up @@ -359,7 +372,7 @@ impl<N: Node> Entry<N> {
abort_item = sender.aborted() => {
if let Some(res) = abort_item {
// We were aborted via terminate: complete with the given res.
Some(res.map(|v| v.0))
Some(res.0)
} else {
// We were aborted via drop: exit.
context2
Expand Down Expand Up @@ -387,6 +400,7 @@ impl<N: Node> Entry<N> {
is_cleaning,
},
receiver,
generation,
)
}

Expand All @@ -409,10 +423,18 @@ impl<N: Node> Entry<N> {
// cases we return early without swapping the state of the Node.
match *state {
EntryState::Running {
ref pending_value, ..
ref pending_value,
generation,
..
} => {
if let Some(receiver) = pending_value.receiver() {
return async move { receiver.recv().await.ok_or_else(N::Error::invalidated)? }.boxed();
return async move {
receiver
.recv()
.await
.unwrap_or_else(|| (Err(N::Error::invalidated()), generation.next()))
}
.boxed();
}
// Else: this node was just canceled: fall through to restart it.
}
Expand All @@ -421,17 +443,19 @@ impl<N: Node> Entry<N> {
generation,
..
} if result.is_clean(context) => {
return future::ready(Ok((result.as_ref().clone(), generation))).boxed();
return future::ready((Ok(result.as_ref().clone()), generation)).boxed();
}
_ => (),
};

// Otherwise, we'll need to swap the state of the Node, so take it by value.
let (next_state, receiver) = match mem::replace(&mut *state, EntryState::initial()) {
let (next_state, receiver, generation) = match mem::replace(&mut *state, EntryState::initial())
{
EntryState::NotStarted {
run_token,
generation,
previous_result,
..
}
| EntryState::Running {
run_token,
Expand Down Expand Up @@ -490,7 +514,13 @@ impl<N: Node> Entry<N> {
// Swap in the new state, and return the receiver.
*state = next_state;

async move { receiver.recv().await.ok_or_else(N::Error::invalidated)? }.boxed()
async move {
receiver
.recv()
.await
.unwrap_or_else(|| (Err(N::Error::invalidated()), generation.next()))
}
.boxed()
}

///
Expand Down Expand Up @@ -521,6 +551,7 @@ impl<N: Node> Entry<N> {
EntryState::NotStarted {
run_token: run_token.next(),
generation,
pollers: Vec::new(),
previous_result,
}
}
Expand Down Expand Up @@ -582,10 +613,12 @@ impl<N: Node> Entry<N> {
if let Some(previous_result) = previous_result.as_mut() {
previous_result.dirty();
}
sender.send(Err(e));
generation = generation.next();
sender.send((Err(e), generation));
EntryState::NotStarted {
run_token: run_token.next(),
generation,
pollers: Vec::new(),
previous_result,
}
}
Expand All @@ -597,7 +630,7 @@ impl<N: Node> Entry<N> {
// Node was re-executed (ie not cleaned) and had a different result value.
generation = generation.next()
};
sender.send(Ok((next_result.as_ref().clone(), generation)));
sender.send((Ok(next_result.as_ref().clone()), generation));
EntryState::Completed {
result: next_result,
pollers: Vec::new(),
Expand All @@ -616,7 +649,7 @@ impl<N: Node> Entry<N> {
self.cacheable_with_output(Some(result.as_ref())),
has_uncacheable_deps,
);
sender.send(Ok((result.as_ref().clone(), generation)));
sender.send((Ok(result.as_ref().clone()), generation));
EntryState::Completed {
result,
pollers: Vec::new(),
Expand Down Expand Up @@ -710,6 +743,7 @@ impl<N: Node> Entry<N> {
*state = EntryState::NotStarted {
run_token: run_token.next(),
generation,
pollers: Vec::new(),
previous_result,
};
}
Expand All @@ -729,14 +763,18 @@ impl<N: Node> Entry<N> {
ref mut pollers,
..
} => {
// Notify all pollers (ignoring any that have gone away.)
for poller in pollers.drain(..) {
let _ = poller.send(());
}
// Drop the pollers, which will notify them of a change.
pollers.clear();
result.dirty();
return;
}
&mut EntryState::NotStarted { .. } => return,
&mut EntryState::NotStarted {
ref mut pollers, ..
} => {
// Drop the pollers, which will notify them of a change.
pollers.clear();
return;
}
&mut EntryState::Running { .. } if !self.node.cacheable() => {
// An uncacheable node cannot be interrupted.
return;
Expand All @@ -760,6 +798,7 @@ impl<N: Node> Entry<N> {
EntryState::NotStarted {
run_token,
generation,
pollers: Vec::new(),
previous_result,
}
}
Expand All @@ -776,8 +815,13 @@ impl<N: Node> Entry<N> {
pub(crate) fn terminate(&mut self, err: N::Error) {
let state = &mut *self.state.lock();
test_trace_log!("Terminating node {:?} with {:?}", self.node, err);
if let EntryState::Running { pending_value, .. } = state {
let _ = pending_value.try_abort(Err(err));
if let EntryState::Running {
pending_value,
generation,
..
} = state
{
let _ = pending_value.try_abort((Err(err), generation.next()));
};
}

Expand Down
22 changes: 9 additions & 13 deletions src/rust/engine/graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ impl<N: Node> Graph<N> {
src_id: Option<EntryId>,
context: &N::Context,
dst_node: N,
) -> Result<(N::Item, Generation), N::Error> {
) -> (Result<N::Item, N::Error>, Generation) {
// Compute information about the dst under the Graph lock, and then release it.
let (dst_retry, mut entry, entry_id) = {
// Get or create the destination, and then insert the dep and return its state.
Expand Down Expand Up @@ -478,8 +478,7 @@ impl<N: Node> Graph<N> {
let context = context.clone();
loop {
match entry.get_node_result(&context, entry_id).await {
Ok(r) => break Ok(r),
Err(err) if err == N::Error::invalidated() => {
(Err(err), _) if err == N::Error::invalidated() => {
let node = {
let inner = self.inner.lock();
inner.unsafe_entry_for_id(entry_id).node().clone()
Expand All @@ -491,7 +490,7 @@ impl<N: Node> Graph<N> {
sleep(self.invalidation_delay).await;
continue;
}
Err(other_err) => break Err(other_err),
res => break res,
}
}
} else {
Expand All @@ -515,8 +514,8 @@ impl<N: Node> Graph<N> {
context: &N::Context,
dst_node: N,
) -> Result<N::Item, N::Error> {
let (res, _generation) = self.get_inner(src_id, context, dst_node).await?;
Ok(res)
let (res, _generation) = self.get_inner(src_id, context, dst_node).await;
res
}

///
Expand All @@ -536,7 +535,7 @@ impl<N: Node> Graph<N> {
token: Option<LastObserved>,
delay: Option<Duration>,
context: &N::Context,
) -> Result<(N::Item, LastObserved), N::Error> {
) -> (Result<N::Item, N::Error>, LastObserved) {
// If the node is currently clean at the given token, Entry::poll will delay until it has
// changed in some way.
if let Some(LastObserved(generation)) = token {
Expand All @@ -552,8 +551,8 @@ impl<N: Node> Graph<N> {
};

// Re-request the Node.
let (res, generation) = self.get_inner(None, context, node).await?;
Ok((res, LastObserved(generation)))
let (res, generation) = self.get_inner(None, context, node).await;
(res, LastObserved(generation))
}

///
Expand Down Expand Up @@ -595,10 +594,7 @@ impl<N: Node> Graph<N> {
.unwrap_or_else(|| panic!("Dependency not present in Graph."))
.clone();
async move {
let (_, generation) = dep_entry
.get_node_result(context, dep_id)
.await
.map_err(|_| ())?;
let (_, generation) = dep_entry.get_node_result(context, dep_id).await;
if generation == previous_dep_generation {
// Matched.
Ok(())
Expand Down
Loading

0 comments on commit eb9eda8

Please sign in to comment.