Skip to content

Commit

Permalink
Move Session to its own file. (pantsbuild#11222)
Browse files Browse the repository at this point in the history
### Problem

`Scheduler` and `Session` play distinct roles, and have each grown large. The `Scheduler` is global, and typically created once per process, whereas the `Session` is created once per run, and so there will (soon) be more than one of them in existence concurrently.

But they are both still living in the same file, making it slightly harder to understand and isolate their roles.

### Solution

Move `Session` to its own module: `session.rs`.

[ci skip-build-wheels]
  • Loading branch information
stuhood authored Nov 20, 2020
1 parent b7409ff commit 6b79455
Show file tree
Hide file tree
Showing 4 changed files with 290 additions and 265 deletions.
2 changes: 1 addition & 1 deletion src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::time::Duration;
use crate::core::Failure;
use crate::intrinsics::Intrinsics;
use crate::nodes::{NodeKey, WrappedNode};
use crate::scheduler::Session;
use crate::session::Session;
use crate::tasks::{Rule, Tasks};
use crate::types::Types;

Expand Down
4 changes: 3 additions & 1 deletion src/rust/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ mod intrinsics;
mod nodes;
mod scheduler;
mod selectors;
mod session;
mod tasks;
mod types;

pub use crate::context::{Core, ExecutionStrategyOptions, RemotingOptions};
pub use crate::core::{Failure, Function, Key, Params, TypeId, Value};
pub use crate::intrinsics::Intrinsics;
pub use crate::scheduler::{ExecutionRequest, ExecutionTermination, Scheduler, Session};
pub use crate::scheduler::{ExecutionRequest, ExecutionTermination, Scheduler};
pub use crate::session::Session;
pub use crate::tasks::{Rule, Tasks};
pub use crate::types::Types;
275 changes: 12 additions & 263 deletions src/rust/engine/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,28 @@

use std::collections::{BTreeMap, HashMap, HashSet};
use std::convert::TryInto;
use std::future::Future;
use std::io;
use std::path::{Path, PathBuf};
use std::sync::{mpsc, Arc};
use std::time::{Duration, Instant};

use futures::{future, FutureExt};
use futures::future;

use crate::context::{Context, Core};
use crate::core::{Failure, Params, TypeId, Value};
use crate::externs;
use crate::nodes::{NodeKey, Select, Visualizer};
use crate::nodes::{Select, Visualizer};
use crate::session::{ExecutionEvent, ObservedValueResult, Root, Session};

use cpython::Python;
use futures::compat::Future01CompatExt;
use graph::{InvalidationResult, LastObserved};
use hashing::{Digest, EMPTY_DIGEST};
use log::{debug, info, warn};
use parking_lot::{Mutex, RwLock};
use task_executor::Executor;
use tempfile::TempDir;
use tokio::process;
use ui::ConsoleUI;
use uuid::Uuid;
use watch::Invalidatable;
use workunit_store::{UserMetadataPyValue, WorkunitStore};

pub enum ExecutionTermination {
// Raised as a vanilla keyboard interrupt on the python side.
Expand All @@ -39,255 +35,6 @@ pub enum ExecutionTermination {
Fatal(String),
}

enum ExecutionEvent {
Completed(Vec<ObservedValueResult>),
Stderr(String),
}

type ObservedValueResult = Result<(Value, Option<LastObserved>), Failure>;

// Root requests are limited to Select nodes, which produce (python) Values.
type Root = Select;

// When enabled, the interval at which all stragglers that have been running for longer than a
// threshold should be logged. The threshold might become configurable, but this might not need
// to be.
const STRAGGLER_LOGGING_INTERVAL: Duration = Duration::from_secs(30);

///
/// An enum for the two cases of `--[no-]dynamic-ui`.
///
enum SessionDisplay {
// The dynamic UI is enabled, and the ConsoleUI should interact with a TTY.
ConsoleUI(ConsoleUI),
// The dynamic UI is disabled, and we should use only logging.
Logging {
straggler_threshold: Duration,
straggler_deadline: Option<Instant>,
},
}

///
/// A Session represents a related series of requests (generally: one run of the pants CLI) on an
/// underlying Scheduler, and is a useful scope for metrics.
///
/// Both Scheduler and Session are exposed to python and expected to be used by multiple threads, so
/// they use internal mutability in order to avoid exposing locks to callers.
///
struct InnerSession {
// The total size of the graph at Session-creation time.
preceding_graph_size: usize,
// The set of roots that have been requested within this session, with associated LastObserved
// times if they were polled.
roots: Mutex<HashMap<Root, Option<LastObserved>>>,
// The display mechanism to use in this Session.
display: Mutex<SessionDisplay>,
// A place to store info about workunits in rust part
workunit_store: WorkunitStore,
// The unique id for this Session: used for metrics gathering purposes.
build_id: String,
// Per-Session values that have been set for this session.
session_values: Mutex<Value>,
// An id used to control the visibility of uncacheable rules. Generally this is identical for an
// entire Session, but in some cases (in particular, a `--loop`) the caller wants to retain the
// same Session while still observing new values for uncacheable rules like Goals.
//
// TODO: Figure out how the `--loop` interplays with metrics. It's possible that for metrics
// purposes, each iteration of a loop should be considered to be a new Session, but for now the
// Session/build_id would be stable.
run_id: Mutex<Uuid>,
should_report_workunits: bool,
workunit_metadata_map: RwLock<HashMap<UserMetadataPyValue, Value>>,
}

#[derive(Clone)]
pub struct Session(Arc<InnerSession>);

impl Session {
pub fn new(
scheduler: &Scheduler,
should_render_ui: bool,
build_id: String,
should_report_workunits: bool,
session_values: Value,
) -> Session {
let workunit_store = WorkunitStore::new(!should_render_ui);
let display = Mutex::new(if should_render_ui {
SessionDisplay::ConsoleUI(ConsoleUI::new(
workunit_store.clone(),
scheduler.core.local_parallelism,
))
} else {
SessionDisplay::Logging {
// TODO: This threshold should likely be configurable, but the interval we render at
// probably does not need to be.
straggler_threshold: Duration::from_secs(60),
straggler_deadline: None,
}
});

let inner_session = InnerSession {
preceding_graph_size: scheduler.core.graph.len(),
roots: Mutex::new(HashMap::new()),
display,
workunit_store,
build_id,
session_values: Mutex::new(session_values),
run_id: Mutex::new(Uuid::new_v4()),
should_report_workunits,
workunit_metadata_map: RwLock::new(HashMap::new()),
};
Session(Arc::new(inner_session))
}

pub fn with_metadata_map<F, T>(&self, f: F) -> T
where
F: Fn(&mut HashMap<UserMetadataPyValue, Value>) -> T,
{
f(&mut self.0.workunit_metadata_map.write())
}

fn extend(&self, new_roots: Vec<(Root, Option<LastObserved>)>) {
let mut roots = self.0.roots.lock();
roots.extend(new_roots);
}

fn zip_last_observed(&self, inputs: &[Root]) -> Vec<(Root, Option<LastObserved>)> {
let roots = self.0.roots.lock();
inputs
.iter()
.map(|root| {
let last_observed = roots.get(root).cloned().unwrap_or(None);
(root.clone(), last_observed)
})
.collect()
}

fn root_nodes(&self) -> Vec<NodeKey> {
let roots = self.0.roots.lock();
roots.keys().map(|r| r.clone().into()).collect()
}

pub fn session_values(&self) -> Value {
self.0.session_values.lock().clone()
}

pub fn preceding_graph_size(&self) -> usize {
self.0.preceding_graph_size
}

pub fn should_report_workunits(&self) -> bool {
self.0.should_report_workunits
}

pub fn workunit_store(&self) -> WorkunitStore {
self.0.workunit_store.clone()
}

pub fn build_id(&self) -> &String {
&self.0.build_id
}

pub fn run_id(&self) -> Uuid {
let run_id = self.0.run_id.lock();
*run_id
}

pub fn new_run_id(&self) {
let mut run_id = self.0.run_id.lock();
*run_id = Uuid::new_v4();
}

pub async fn write_stdout(&self, msg: &str) -> Result<(), String> {
if let SessionDisplay::ConsoleUI(ref mut ui) = *self.0.display.lock() {
ui.write_stdout(msg).await
} else {
print!("{}", msg);
Ok(())
}
}

pub fn write_stderr(&self, msg: &str) {
if let SessionDisplay::ConsoleUI(ref mut ui) = *self.0.display.lock() {
ui.write_stderr(msg);
} else {
eprint!("{}", msg);
}
}

pub async fn with_console_ui_disabled<T>(&self, f: impl Future<Output = T>) -> T {
match *self.0.display.lock() {
SessionDisplay::ConsoleUI(ref mut ui) => ui.with_console_ui_disabled(f).await,
SessionDisplay::Logging { .. } => f.await,
}
}

fn maybe_display_initialize(&self, executor: &Executor, sender: &mpsc::Sender<ExecutionEvent>) {
let result = match *self.0.display.lock() {
SessionDisplay::ConsoleUI(ref mut ui) => {
let sender = sender.clone();
ui.initialize(
executor.clone(),
Box::new(move |msg: &str| {
// If we fail to send, it's because the execute loop has exited: we fail the callback to
// have the logging module directly log to stderr at that point.
sender
.send(ExecutionEvent::Stderr(msg.to_owned()))
.map_err(|_| ())
}),
)
}
SessionDisplay::Logging {
ref mut straggler_deadline,
..
} => {
*straggler_deadline = Some(Instant::now() + STRAGGLER_LOGGING_INTERVAL);
Ok(())
}
};
if let Err(e) = result {
warn!("{}", e);
}
}

pub async fn maybe_display_teardown(&self) {
let teardown = match *self.0.display.lock() {
SessionDisplay::ConsoleUI(ref mut ui) => ui.teardown().boxed(),
SessionDisplay::Logging {
ref mut straggler_deadline,
..
} => {
*straggler_deadline = None;
async { Ok(()) }.boxed()
}
};
if let Err(e) = teardown.await {
warn!("{}", e);
}
}

fn maybe_display_render(&self) {
match *self.0.display.lock() {
SessionDisplay::ConsoleUI(ref mut ui) => ui.render(),
SessionDisplay::Logging {
straggler_threshold,
ref mut straggler_deadline,
} => {
if straggler_deadline
.map(|sd| sd < Instant::now())
.unwrap_or(false)
{
*straggler_deadline = Some(Instant::now() + STRAGGLER_LOGGING_INTERVAL);
self
.0
.workunit_store
.log_straggling_workunits(straggler_threshold);
}
}
}
}
}

pub struct ExecutionRequest {
// Set of roots for an execution, in the order they were declared.
pub roots: Vec<Root>,
Expand Down Expand Up @@ -338,10 +85,12 @@ impl Scheduler {

pub fn visualize(&self, session: &Session, path: &Path) -> io::Result<()> {
let context = Context::new(self.core.clone(), session.clone());
self
.core
.graph
.visualize(Visualizer::default(), &session.root_nodes(), path, &context)
self.core.graph.visualize(
Visualizer::default(),
&session.roots_nodes(),
path,
&context,
)
}

pub fn add_root_select(
Expand Down Expand Up @@ -393,7 +142,7 @@ impl Scheduler {
self
.core
.graph
.visit_live_reachable(&session.root_nodes(), &context, |n, _| {
.visit_live_reachable(&session.roots_nodes(), &context, |n, _| {
if n.fs_subject().is_some() {
count += 1;
}
Expand Down Expand Up @@ -543,7 +292,7 @@ impl Scheduler {
sender: mpsc::Sender<ExecutionEvent>,
) {
let context = Context::new(self.core.clone(), session.clone());
let roots = session.zip_last_observed(&request.roots);
let roots = session.roots_zip_last_observed(&request.roots);
let poll = request.poll;
let poll_delay = request.poll_delay;
let core = context.core.clone();
Expand All @@ -569,7 +318,7 @@ impl Scheduler {
results: Vec<ObservedValueResult>,
) -> Vec<Result<Value, Failure>> {
// Store the roots that were operated on and their LastObserved values.
session.extend(
session.roots_extend(
results
.iter()
.zip(roots.iter())
Expand Down
Loading

0 comments on commit 6b79455

Please sign in to comment.