From cc8dbef25e260ec4d413670fc3465ce4c4696347 Mon Sep 17 00:00:00 2001 From: Jonathan Cammisuli Date: Mon, 27 Nov 2023 09:38:27 -0500 Subject: [PATCH] fix(core): improve file gathering performance (#20377) --- Cargo.lock | 42 ++++++------- Cargo.toml | 4 +- packages/nx/Cargo.toml | 4 +- packages/nx/src/native/hasher.rs | 36 +++++------ packages/nx/src/native/index.d.ts | 1 - packages/nx/src/native/index.js | 3 +- .../native/plugins/js/ts_import_locators.rs | 15 ++--- packages/nx/src/native/walker.rs | 62 ++++++++----------- packages/nx/src/native/workspace/context.rs | 42 +++++++++---- 9 files changed, 103 insertions(+), 106 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c66f40ad7953e..79cf191b7a86f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -449,9 +449,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335" dependencies = [ "futures-channel", "futures-core", @@ -464,9 +464,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" dependencies = [ "futures-core", "futures-sink", @@ -474,15 +474,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" [[package]] name = "futures-executor" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc" dependencies = [ "futures-core", "futures-task", @@ -491,15 +491,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" [[package]] name = "futures-macro" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", @@ -508,21 +508,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" +checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" [[package]] name = "futures-task" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" [[package]] name = "futures-util" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" dependencies = [ "futures-channel", "futures-core", @@ -1378,8 +1378,6 @@ dependencies = [ "parking_lot", "rayon", "regex", - "serde", - "serde_json", "swc_common", "swc_ecma_ast", "swc_ecma_dep_graph", @@ -1560,9 +1558,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.56" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b63bdb0cd06f1f4dedf69b254734f9b45af66e4a031e42a7480257d9898b435" +checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" dependencies = [ "unicode-ident", ] diff --git a/Cargo.toml b/Cargo.toml index a12d1730c4c77..217f75ad61044 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,8 @@ [workspace] -resolver = "2" +resolver = '2' members = [ - 'packages/nx' + 'packages/nx', ] [profile.release] diff --git a/packages/nx/Cargo.toml b/packages/nx/Cargo.toml index 5d71864a74320..5f52800e8bce0 100644 --- a/packages/nx/Cargo.toml +++ b/packages/nx/Cargo.toml @@ -7,7 +7,7 @@ edition = '2021' anyhow = "1.0.71" colored = "2" crossbeam-channel = '0.5' -dashmap = { version = "5.5.3", features= ["rayon"] } +dashmap = { version = "5.5.3", features = ["rayon"] } fs_extra = "1.3.0" globset = "0.4.10" hashbrown = { version = "0.14.0", features = ["rayon"] } @@ -25,8 +25,6 @@ napi-derive = '2.9.3' nom = '7.1.3' regex = "1.9.1" rayon = "1.7.0" -serde = "1" -serde_json = "1" thiserror = "1.0.40" tokio = { version = "1.28.2", features = ["fs"] } tracing = "0.1.37" diff --git a/packages/nx/src/native/hasher.rs b/packages/nx/src/native/hasher.rs index df6e213dc7e75..6cbcf2bd36a6f 100644 --- a/packages/nx/src/native/hasher.rs +++ b/packages/nx/src/native/hasher.rs @@ -1,6 +1,7 @@ -use crate::native::utils::Normalize; -use crate::native::walker::nx_walker; -use std::collections::HashMap; +use std::fs::File; +use std::io::{BufRead, BufReader}; +use std::path::Path; +use tracing::trace; use xxhash_rust::xxh3; pub fn hash(content: &[u8]) -> String { @@ -16,25 +17,24 @@ pub fn hash_array(input: Vec) -> String { #[napi] pub fn hash_file(file: String) -> Option { - let Ok(content) = std::fs::read(file) else { + hash_file_path(file) +} + +#[inline] +pub fn hash_file_path>(path: P) -> Option { + let path = path.as_ref(); + let Ok(file) = File::open(path) else { + trace!("could not open file: {path:?}"); return None; }; - Some(hash(&content)) -} + let mut buffer = BufReader::new(file); + let Ok(content) = buffer.fill_buf() else { + trace!("could not read file: {path:?}"); + return None; + }; -#[napi] -pub fn hash_files(workspace_root: String) -> HashMap { - nx_walker(workspace_root, |rec| { - let mut collection: HashMap = HashMap::new(); - for (path, content) in rec { - collection.insert( - path.to_normalized_string(), - xxh3::xxh3_64(&content).to_string(), - ); - } - collection - }) + Some(hash(content)) } #[cfg(test)] diff --git a/packages/nx/src/native/index.d.ts b/packages/nx/src/native/index.d.ts index c2dd43e47cc04..201881840be6b 100644 --- a/packages/nx/src/native/index.d.ts +++ b/packages/nx/src/native/index.d.ts @@ -23,7 +23,6 @@ export function remove(src: string): void export function copy(src: string, dest: string): void export function hashArray(input: Array): string export function hashFile(file: string): string | null -export function hashFiles(workspaceRoot: string): Record export function findImports(projectFileMap: Record>): Array /** * Transfer the project graph from the JS world to the Rust world, so that we can pass the project graph via memory quicker diff --git a/packages/nx/src/native/index.js b/packages/nx/src/native/index.js index 558f4466de7fa..61c3a60ae4da5 100644 --- a/packages/nx/src/native/index.js +++ b/packages/nx/src/native/index.js @@ -246,7 +246,7 @@ if (!nativeBinding) { throw new Error(`Failed to load native binding`) } -const { expandOutputs, getFilesForOutputs, remove, copy, hashArray, hashFile, hashFiles, ImportResult, findImports, transferProjectGraph, HashPlanner, TaskHasher, EventType, Watcher, WorkspaceContext, WorkspaceErrors } = nativeBinding +const { expandOutputs, getFilesForOutputs, remove, copy, hashArray, hashFile, ImportResult, findImports, transferProjectGraph, HashPlanner, TaskHasher, EventType, Watcher, WorkspaceContext, WorkspaceErrors } = nativeBinding module.exports.expandOutputs = expandOutputs module.exports.getFilesForOutputs = getFilesForOutputs @@ -254,7 +254,6 @@ module.exports.remove = remove module.exports.copy = copy module.exports.hashArray = hashArray module.exports.hashFile = hashFile -module.exports.hashFiles = hashFiles module.exports.ImportResult = ImportResult module.exports.findImports = findImports module.exports.transferProjectGraph = transferProjectGraph diff --git a/packages/nx/src/native/plugins/js/ts_import_locators.rs b/packages/nx/src/native/plugins/js/ts_import_locators.rs index e384498d331a2..f298f95ad6e39 100644 --- a/packages/nx/src/native/plugins/js/ts_import_locators.rs +++ b/packages/nx/src/native/plugins/js/ts_import_locators.rs @@ -1361,16 +1361,11 @@ import('./dynamic-import.vue') ancestors.next(); let root = PathBuf::from(ancestors.next().unwrap()); - let files = nx_walker(root.clone(), move |receiver| { - let mut files = vec![]; - let glob = build_glob_set(&["**/*.[jt]s"]).unwrap(); - for (path, _) in receiver { - if glob.is_match(&path) { - files.push(root.join(path).to_normalized_string()); - } - } - files - }); + let glob = build_glob_set(&["**/*.[jt]s"]).unwrap(); + let files = nx_walker(root.clone()) + .filter(|(full_path, _)| glob.is_match(full_path)) + .map(|(full_path, _)| full_path.to_normalized_string()) + .collect::>(); let results: HashMap<_, _> = find_imports(HashMap::from([(String::from("nx"), files.clone())])) diff --git a/packages/nx/src/native/walker.rs b/packages/nx/src/native/walker.rs index 44759578cf0ab..a04b5e973d2c1 100644 --- a/packages/nx/src/native/walker.rs +++ b/packages/nx/src/native/walker.rs @@ -2,8 +2,9 @@ use std::path::{Path, PathBuf}; use std::thread; use std::thread::available_parallelism; -use crossbeam_channel::{unbounded, Receiver}; +use crossbeam_channel::unbounded; use ignore::WalkBuilder; +use tracing::trace; use crate::native::glob::build_glob_set; @@ -35,11 +36,9 @@ where } /// Walk the directory and ignore files from .gitignore and .nxignore -pub fn nx_walker(directory: P, f: Fn) -> Re +pub fn nx_walker

(directory: P) -> impl Iterator where P: AsRef, - Fn: FnOnce(Receiver<(PathBuf, Vec)>) -> Re + Send + 'static, - Re: Send + 'static, { let directory = directory.as_ref(); let nx_ignore = directory.join(".nxignore"); @@ -59,10 +58,11 @@ where let cpus = available_parallelism().map_or(2, |n| n.get()) - 1; - let (sender, receiver) = unbounded::<(PathBuf, Vec)>(); + let (sender, receiver) = unbounded(); - let receiver_thread = thread::spawn(|| f(receiver)); + trace!(?directory, "walking"); + let now = std::time::Instant::now(); walker.threads(cpus).build_parallel().run(|| { let tx = sender.clone(); Box::new(move |entry| { @@ -72,27 +72,29 @@ where return Continue; }; - let Ok(content) = std::fs::read(dir_entry.path()) else { + if dir_entry.file_type().is_some_and(|d| d.is_dir()) { return Continue; - }; + } let Ok(file_path) = dir_entry.path().strip_prefix(directory) else { return Continue; }; - tx.send((file_path.into(), content)).ok(); + tx.send((dir_entry.path().to_owned(), file_path.to_owned())) + .ok(); Continue }) }); + trace!("walked in {:?}", now.elapsed()); + let receiver_thread = thread::spawn(move || receiver.into_iter()); drop(sender); receiver_thread.join().unwrap() } #[cfg(test)] mod test { - use std::collections::HashMap; use std::{assert_eq, vec}; use assert_fs::prelude::*; @@ -124,32 +126,21 @@ mod test { #[test] fn it_walks_a_directory() { // handle empty workspaces - let content = nx_walker("/does/not/exist", |rec| { - let mut paths = vec![]; - for (path, _) in rec { - paths.push(path); - } - paths - }); + let content = nx_walker("/does/not/exist").collect::>(); assert!(content.is_empty()); let temp_dir = setup_fs(); - let content = nx_walker(temp_dir, |rec| { - let mut paths = HashMap::new(); - for (path, content) in rec { - paths.insert(path, content); - } - paths - }); + let mut content = nx_walker(&temp_dir).collect::>(); + content.sort(); assert_eq!( content, - HashMap::from([ - (PathBuf::from("baz/qux.txt"), "content@qux".into()), - (PathBuf::from("foo.txt"), "content1".into()), - (PathBuf::from("test.txt"), "content".into()), - (PathBuf::from("bar.txt"), "content2".into()), - ]) + vec![ + (temp_dir.join("bar.txt"), PathBuf::from("bar.txt")), + (temp_dir.join("baz/qux.txt"), PathBuf::from("baz/qux.txt")), + (temp_dir.join("foo.txt"), PathBuf::from("foo.txt")), + (temp_dir.join("test.txt"), PathBuf::from("test.txt")), + ] ); } @@ -180,13 +171,10 @@ nested/child-two/ ) .unwrap(); - let mut file_names = nx_walker(temp_dir, |rec| { - let mut file_names = vec![]; - for (path, _) in rec { - file_names.push(path.to_normalized_string()); - } - file_names - }); + let mut file_names = nx_walker(temp_dir) + .into_iter() + .map(|(_, p)| p.to_normalized_string()) + .collect::>(); file_names.sort(); diff --git a/packages/nx/src/native/workspace/context.rs b/packages/nx/src/native/workspace/context.rs index 3021d818cb458..315d785b22265 100644 --- a/packages/nx/src/native/workspace/context.rs +++ b/packages/nx/src/native/workspace/context.rs @@ -1,14 +1,15 @@ use napi::bindgen_prelude::External; use std::collections::HashMap; -use crate::native::hasher::hash; +use crate::native::hasher::{hash, hash_file_path}; use crate::native::utils::Normalize; use napi::bindgen_prelude::*; use rayon::prelude::*; use std::ops::Deref; use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::thread; +use std::thread::available_parallelism; +use std::{cmp, thread}; use crate::native::logger::enable_logger; use crate::native::project_graph::utils::{find_project_for_path, ProjectRootMappings}; @@ -30,6 +31,7 @@ pub struct WorkspaceContext { } type Files = Vec<(PathBuf, String)>; + struct FilesWorker(Option, Condvar)>>); impl FilesWorker { fn gather_files(workspace_root: &Path) -> Self { @@ -49,16 +51,34 @@ impl FilesWorker { trace!("locking files"); let (lock, cvar) = &*files_lock_clone; let mut workspace_files = lock.lock(); - let files = nx_walker(workspace_root, |rec| { - let mut file_hashes: Vec<(PathBuf, String)> = vec![]; - for (path, content) in rec { - file_hashes.push((path, hash(&content))); - } - file_hashes - }); - workspace_files.extend(files); - workspace_files.par_sort(); + let files = nx_walker(workspace_root).collect::>(); + let num_parallelism = cmp::max(available_parallelism().map_or(2, |n| n.get()) / 3, 2); + let chunks = files.len() / num_parallelism; + + let now = std::time::Instant::now(); + + let mut files = if chunks < num_parallelism { + files + .iter() + .filter_map(|(full_path, path)| { + hash_file_path(full_path).map(|hash| (path.to_owned(), hash)) + }) + .collect::>() + } else { + files + .par_chunks(chunks) + .flat_map_iter(|chunks| { + chunks.iter().filter_map(|(full_path, path)| { + hash_file_path(full_path).map(|hash| (path.to_owned(), hash)) + }) + }) + .collect::>() + }; + + files.par_sort(); + trace!("hashed and sorted workspace files in {:?}", now.elapsed()); + *workspace_files = files; let files_len = workspace_files.len(); trace!(?files_len, "files retrieved");