Skip to content

Commit

Permalink
Add concrete Context type to the graph crate (pantsbuild#18835)
Browse files Browse the repository at this point in the history
The `graph` crate currently has a `trait Context`, but no concrete
`Context`. As the use-cases for the `Context` have expanded, many
implementation details of the `Graph` have ended up as public in the
trait, even though they would be better off as private.

This change extracts a concrete `struct graph::Context` which holds an
opaque user context. `graph::Context` has `impl Deref` to the user
context type, which minimizes edits to callsites. There should be no
changes in performance or behavior.

(The primary motivation for this change is that pantsbuild#18855 needs to record
additional information on the `Context`, which felt like the straw that
broke the camel's back and justified splitting out a concrete
`Context`.)
  • Loading branch information
stuhood authored Apr 30, 2023
1 parent 89aac45 commit 0047da1
Showing 15 changed files with 353 additions and 382 deletions.
1 change: 1 addition & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/rust/engine/graph/Cargo.toml
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ parking_lot = "0.12"
petgraph = "0.6"
task_executor = { path = "../task_executor" }
tokio = { version = "1.21", features = ["time", "parking_lot"] }
workunit_store = { path = "../workunit_store" }

[dev-dependencies]
rand = "0.8"
111 changes: 111 additions & 0 deletions src/rust/engine/graph/src/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2018 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).

use std::ops::Deref;
use std::sync::atomic::{self, AtomicU32, AtomicUsize};
use std::sync::Arc;

use workunit_store::RunId;

use crate::node::{CompoundNode, EntryId, Node};
use crate::Graph;

struct InnerContext<N: Node + Send> {
context: N::Context,
run_id: AtomicU32,
stats: Stats,
graph: Graph<N>,
}

///
/// A context passed between running Nodes which is used to request and record dependencies.
///
/// Parametrized by:
/// N: Node - The Node type that this Context is being used for.
///
#[derive(Clone)]
pub struct Context<N: Node + Send> {
entry_id: Option<EntryId>,
inner: Arc<InnerContext<N>>,
}

impl<N: Node + Send> Context<N> {
pub(crate) fn new(graph: Graph<N>, context: N::Context, run_id: RunId) -> Self {
Self {
entry_id: None,
inner: Arc::new(InnerContext {
context,
run_id: AtomicU32::new(run_id.0),
stats: Stats::default(),
graph,
}),
}
}

///
/// Get the future value for the given Node implementation.
///
pub async fn get<CN: CompoundNode<N>>(&self, node: CN) -> Result<CN::Item, N::Error> {
let node_result = self
.inner
.graph
.get(self.entry_id, self, node.into())
.await?;
Ok(node_result.try_into().unwrap_or_else(|_| {
panic!(
"The CompoundNode implementation for {} was ambiguous.",
std::any::type_name::<CN>()
)
}))
}

pub fn run_id(&self) -> RunId {
RunId(self.inner.run_id.load(atomic::Ordering::SeqCst))
}

pub fn new_run_id(&self) {
self.inner.run_id.store(
self.inner.graph.generate_run_id().0,
atomic::Ordering::SeqCst,
);
}

pub fn context(&self) -> &N::Context {
&self.inner.context
}

pub fn graph(&self) -> &Graph<N> {
&self.inner.graph
}

pub(crate) fn stats(&self) -> &Stats {
&self.inner.stats
}

///
/// Creates a clone of this Context to be used for a different Node.
///
/// To clone a Context for use by the _same_ Node, `Clone` is used directly.
///
pub(crate) fn clone_for(&self, entry_id: EntryId) -> Self {
Context {
entry_id: Some(entry_id),
inner: self.inner.clone(),
}
}
}

impl<N: Node> Deref for Context<N> {
type Target = N::Context;

fn deref(&self) -> &Self::Target {
&self.inner.context
}
}

#[derive(Default)]
pub(crate) struct Stats {
pub ran: AtomicUsize,
pub cleaning_succeeded: AtomicUsize,
pub cleaning_failed: AtomicUsize,
}
83 changes: 36 additions & 47 deletions src/rust/engine/graph/src/entry.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
// Copyright 2022 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).
use std::mem;
use std::sync::atomic;
use std::sync::Arc;

use crate::node::{EntryId, Node, NodeContext, NodeError};
use crate::context::Context;
use crate::node::{EntryId, Node, NodeError};
use crate::test_trace_log;

use async_value::{AsyncValue, AsyncValueReceiver, AsyncValueSender};
use futures::channel::oneshot;
use futures::future::{self, BoxFuture, FutureExt};
use parking_lot::Mutex;
use workunit_store::RunId;

///
/// A token that uniquely identifies one run of a Node in the Graph. Each run of a Node (via
/// `N::Context::spawn`) has a different RunToken associated with it. When a run completes, if
/// the current RunToken of its Node no longer matches the RunToken of the spawned work (because
/// the Node was `cleared`), the work is discarded. See `Entry::complete` for more information.
/// A token that uniquely identifies one run of a Node in the Graph. Each run of a Node has a
/// different RunToken associated with it. When a run completes, if the current RunToken of its
/// Node no longer matches the RunToken of the spawned work (because the Node was `cleared`), the
/// work is discarded. See `Entry::complete` for more information.
///
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct RunToken(u32);
@@ -64,34 +67,34 @@ pub enum EntryResult<N: Node> {
Dirty(N::Item),
/// Similar to Clean, but the value may only be consumed in the same Run that produced it, and
/// _must_ (unlike UncacheableDependencies) be recomputed in a new Run.
Uncacheable(N::Item, <<N as Node>::Context as NodeContext>::RunId),
Uncacheable(N::Item, RunId),
/// A value that was computed from an Uncacheable node, and is thus Run-specific. If the Run id
/// of a consumer matches, the value can be considered to be Clean: otherwise, is considered to
/// be Dirty.
UncacheableDependencies(N::Item, <<N as Node>::Context as NodeContext>::RunId),
UncacheableDependencies(N::Item, RunId),
}

impl<N: Node> EntryResult<N> {
fn new(
item: N::Item,
context: &N::Context,
context: &Context<N>,
cacheable: bool,
has_uncacheable_deps: bool,
) -> EntryResult<N> {
if !cacheable {
EntryResult::Uncacheable(item, context.run_id().clone())
EntryResult::Uncacheable(item, context.run_id())
} else if has_uncacheable_deps {
EntryResult::UncacheableDependencies(item, context.run_id().clone())
EntryResult::UncacheableDependencies(item, context.run_id())
} else {
EntryResult::Clean(item)
}
}

fn is_clean(&self, context: &N::Context) -> bool {
fn is_clean(&self, context: &Context<N>) -> bool {
match self {
EntryResult::Clean(..) => true,
EntryResult::Uncacheable(_, run_id) => context.run_id() == run_id,
EntryResult::UncacheableDependencies(.., run_id) => context.run_id() == run_id,
EntryResult::Uncacheable(_, run_id) => context.run_id() == *run_id,
EntryResult::UncacheableDependencies(.., run_id) => context.run_id() == *run_id,
EntryResult::Dirty(..) => false,
}
}
@@ -105,15 +108,15 @@ impl<N: Node> EntryResult<N> {

/// Returns true if this result should block for polling (because there is no work to do
/// currently to clean it).
fn poll_should_wait(&self, context: &N::Context) -> bool {
fn poll_should_wait(&self, context: &Context<N>) -> bool {
match self {
EntryResult::Uncacheable(_, run_id) => context.run_id() == run_id,
EntryResult::Uncacheable(_, run_id) => context.run_id() == *run_id,
EntryResult::Dirty(..) => false,
EntryResult::Clean(..) | EntryResult::UncacheableDependencies(_, _) => true,
}
}

fn peek(&self, context: &N::Context) -> Option<N::Item> {
fn peek(&self, context: &Context<N>) -> Option<N::Item> {
if self.is_clean(context) {
Some(self.as_ref().clone())
} else {
@@ -134,7 +137,7 @@ impl<N: Node> EntryResult<N> {
}

/// Assert that the value is in "a dirty state", and move it to a clean state.
fn clean(&mut self, context: &N::Context, cacheable: bool, has_uncacheable_deps: bool) {
fn clean(&mut self, context: &Context<N>, cacheable: bool, has_uncacheable_deps: bool) {
let value = match self {
EntryResult::Dirty(value) => value.clone(),
EntryResult::UncacheableDependencies(value, _) => value.clone(),
@@ -258,7 +261,7 @@ impl<N: Node> Entry<N> {
/// be changed in any way. If the node is not clean, or the generation mismatches, returns
/// immediately.
///
pub async fn poll(&self, context: &N::Context, last_seen_generation: Generation) {
pub async fn poll(&self, context: &Context<N>, last_seen_generation: Generation) {
let recv = {
let mut state = self.state.lock();
let pollers = match *state {
@@ -301,7 +304,7 @@ impl<N: Node> Entry<N> {
///
/// If the Future for this Node has already completed, returns a clone of its result.
///
pub fn peek(&self, context: &N::Context) -> Option<N::Item> {
pub fn peek(&self, context: &Context<N>) -> Option<N::Item> {
let state = self.state.lock();
match *state {
EntryState::Completed { ref result, .. } => result.peek(context),
@@ -314,7 +317,7 @@ impl<N: Node> Entry<N> {
/// the Graph lock and call back into the graph lock to set the final value.
///
pub(crate) fn spawn_node_execution(
context_factory: &N::Context,
context_factory: &Context<N>,
node: &N,
entry_id: EntryId,
run_token: RunToken,
@@ -343,11 +346,17 @@ impl<N: Node> Entry<N> {
// If dependency generations mismatched or failed to fetch, clear the node's dependencies
// and indicate that it should re-run.
context.graph().cleaning_failed(entry_id, run_token);
context.stats().cleaning_failed += 1;
context
.stats()
.cleaning_failed
.fetch_add(1, atomic::Ordering::SeqCst);
false
} else {
// Dependencies have not changed: Node is clean.
context.stats().cleaning_succeeded += 1;
context
.stats()
.cleaning_succeeded
.fetch_add(1, atomic::Ordering::SeqCst);
true
}
} else {
@@ -362,12 +371,12 @@ impl<N: Node> Entry<N> {
} else {
// The Node needs to (re-)run!
let res = node.run(context.clone()).await;
context.stats().ran += 1;
context.stats().ran.fetch_add(1, atomic::Ordering::SeqCst);
Some(res)
}
};

context_factory.spawn(async move {
let _join = context2.graph().executor.clone().native_spawn(async move {
let maybe_res = tokio::select! {
abort_item = sender.aborted() => {
if let Some(res) = abort_item {
@@ -414,7 +423,7 @@ impl<N: Node> Entry<N> {
///
pub(crate) fn get_node_result(
&mut self,
context: &N::Context,
context: &Context<N>,
entry_id: EntryId,
) -> BoxFuture<NodeResult<N>> {
let mut state = self.state.lock();
@@ -576,7 +585,7 @@ impl<N: Node> Entry<N> {
///
pub(crate) fn complete(
&mut self,
context: &N::Context,
context: &Context<N>,
result_run_token: RunToken,
dep_generations: Vec<Generation>,
sender: AsyncValueSender<NodeResult<N>>,
@@ -856,34 +865,14 @@ impl<N: Node> Entry<N> {
}
}

pub fn is_clean(&self, context: &N::Context) -> bool {
match *self.state.lock() {
EntryState::NotStarted {
ref previous_result,
..
}
| EntryState::Running {
ref previous_result,
..
} => {
if let Some(result) = previous_result {
result.is_clean(context)
} else {
true
}
}
EntryState::Completed { ref result, .. } => result.is_clean(context),
}
}

pub(crate) fn has_uncacheable_deps(&self) -> bool {
match *self.state.lock() {
EntryState::Completed { ref result, .. } => result.has_uncacheable_deps(),
EntryState::NotStarted { .. } | EntryState::Running { .. } => false,
}
}

pub(crate) fn format(&self, context: &N::Context) -> String {
pub(crate) fn format(&self, context: &Context<N>) -> String {
let state = match self.peek(context) {
Some(ref nr) => {
let item = format!("{nr:?}");
Loading

0 comments on commit 0047da1

Please sign in to comment.