Skip to content

Commit

Permalink
Record zipkin traces for remote file operations (pantsbuild#8124)
Browse files Browse the repository at this point in the history
### Problem

We want to enable more accurate troubleshooting of dependency issues
by recording the time taken by remote file operations in the zipkin trace
produced by pants.

### Solution

On the python side, pass the session down to the rust ffi so that
mutations to the workunit_store contained in that session get
propagated.

On the rust side, take the session ptr from the ffi interface and
propagate the workunit_store down to the following functions in the
remote store:
* ByteStore::store_bytes,
* ByteStore::load_bytes_with, and
* ByteStore::list_missing_digests.

In each of these functions, record a `WorkUnit` for the work that is done
and add it to the `WorkUnitStore`.

### Result

This is enough to populate the zipkin trace as the python side is
already converting the `engine`'s `WorkUnit`'s into zipkin spans and
passing them to the zipkin server through `py_zipkin`.

Still TODO (in future PRs):
* Decide how to use the workunit_store to extract relevant information
  in fs_util (for now, it's simply passed down and ignored)
* Decide if we need to use the WorkUnitStore in src/node.rs and in
  process_execution/cache.rs.
* Create an integration test for this change (currently, it was manually
   verified that each of the three functions appear in the zipkin trace
   when running a pants command remotely)
  • Loading branch information
pierrechevalier83 authored and stuhood committed Aug 5, 2019
1 parent cdb2207 commit 2108156
Show file tree
Hide file tree
Showing 16 changed files with 754 additions and 358 deletions.
75 changes: 27 additions & 48 deletions src/python/pants/engine/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,50 +285,6 @@ def _run_and_return_roots(self, session, execution_request):
self._native.lib.nodes_destroy(raw_roots)
return roots

def capture_snapshots(self, path_globs_and_roots):
"""Synchronously captures Snapshots for each matching PathGlobs rooted at a its root directory.
This is a blocking operation, and should be avoided where possible.
:param path_globs_and_roots tuple<PathGlobsAndRoot>: The PathGlobs to capture, and the root
directory relative to which each should be captured.
:returns: A tuple of Snapshots.
"""
result = self._native.lib.capture_snapshots(
self._scheduler,
self._to_value(_PathGlobsAndRootCollection(path_globs_and_roots)),
)
return self._raise_or_return(result)

def merge_directories(self, directory_digests):
"""Merges any number of directories.
:param directory_digests: Tuple of DirectoryDigests.
:return: A Digest.
"""
result = self._native.lib.merge_directories(
self._scheduler,
self._to_value(_DirectoryDigests(directory_digests)),
)
return self._raise_or_return(result)

def materialize_directories(self, directories_paths_and_digests):
"""Creates the specified directories on the file system.
:param directories_paths_and_digests tuple<DirectoryToMaterialize>: Tuple of the path and
digest of the directories to materialize.
:returns: Nothing or an error.
"""
# Ensure there isn't more than one of the same directory paths and paths do not have the same prefix.
dir_list = [dpad.path for dpad in directories_paths_and_digests]
check_no_overlapping_paths(dir_list)

result = self._native.lib.materialize_directories(
self._scheduler,
self._to_value(_DirectoriesToMaterialize(directories_paths_and_digests)),
)
return self._raise_or_return(result)

def lease_files_in_graph(self):
self._native.lib.lease_files_in_graph(self._scheduler)

Expand Down Expand Up @@ -572,19 +528,42 @@ def capture_snapshots(self, path_globs_and_roots):
directory relative to which each should be captured.
:returns: A tuple of Snapshots.
"""
return self._scheduler.capture_snapshots(path_globs_and_roots)
result = self._scheduler._native.lib.capture_snapshots(
self._scheduler._scheduler,
self._session,
self._scheduler._to_value(_PathGlobsAndRootCollection(path_globs_and_roots)),
)
return self._scheduler._raise_or_return(result)

def merge_directories(self, directory_digests):
return self._scheduler.merge_directories(directory_digests)
"""Merges any number of directories.
:param directory_digests: Tuple of DirectoryDigests.
:return: A Digest.
"""
result = self._scheduler._native.lib.merge_directories(
self._scheduler._scheduler,
self._session,
self._scheduler._to_value(_DirectoryDigests(directory_digests)),
)
return self._scheduler._raise_or_return(result)

def materialize_directories(self, directories_paths_and_digests):
"""Creates the specified directories on the file system.
:param directories_paths_and_digests tuple<DirectoryToMaterialize>: Tuple of the path and
digest of the directories to materialize.
:returns: Nothing or an error.
"""
return self._scheduler.materialize_directories(directories_paths_and_digests)
# Ensure there isn't more than one of the same directory paths and paths do not have the same prefix.
dir_list = [dpad.path for dpad in directories_paths_and_digests]
check_no_overlapping_paths(dir_list)

result = self._scheduler._native.lib.materialize_directories(
self._scheduler._scheduler,
self._session,
self._scheduler._to_value(_DirectoriesToMaterialize(directories_paths_and_digests)),
)
return self._scheduler._raise_or_return(result)

def lease_files_in_graph(self):
self._scheduler.lease_files_in_graph()
Expand Down
41 changes: 23 additions & 18 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 14 additions & 2 deletions src/rust/engine/engine_cffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,7 @@ pub extern "C" fn match_path_globs(path_globs: Handle, paths_buf: BufferBuffer)
#[no_mangle]
pub extern "C" fn capture_snapshots(
scheduler_ptr: *mut Scheduler,
session_ptr: *mut Session,
path_globs_and_root_tuple_wrapper: Handle,
) -> PyResult {
let values = externs::project_multi(&path_globs_and_root_tuple_wrapper.into(), "dependencies");
Expand Down Expand Up @@ -746,6 +747,7 @@ pub extern "C" fn capture_snapshots(
return e.into();
}
};
let workunit_store = with_session(session_ptr, |session| session.workunit_store());

with_scheduler(scheduler_ptr, |scheduler| {
let core = scheduler.core.clone();
Expand All @@ -761,6 +763,7 @@ pub extern "C" fn capture_snapshots(
root,
path_globs,
digest_hint,
workunit_store.clone(),
)
.map(move |snapshot| nodes::Snapshot::store_snapshot(&core, &snapshot))
})
Expand All @@ -775,6 +778,7 @@ pub extern "C" fn capture_snapshots(
#[no_mangle]
pub extern "C" fn merge_directories(
scheduler_ptr: *mut Scheduler,
session_ptr: *mut Session,
directories_value: Handle,
) -> PyResult {
let digests_result: Result<Vec<hashing::Digest>, String> =
Expand All @@ -789,6 +793,7 @@ pub extern "C" fn merge_directories(
return e.into();
}
};
let workunit_store = with_session(session_ptr, |session| session.workunit_store());

with_scheduler(scheduler_ptr, |scheduler| {
scheduler
Expand All @@ -797,6 +802,7 @@ pub extern "C" fn merge_directories(
.block_on(store::Snapshot::merge_directories(
scheduler.core.store(),
digests,
workunit_store,
))
.map(|dir| nodes::Snapshot::store_directory(&scheduler.core, &dir))
.into()
Expand All @@ -806,6 +812,7 @@ pub extern "C" fn merge_directories(
#[no_mangle]
pub extern "C" fn materialize_directories(
scheduler_ptr: *mut Scheduler,
session_ptr: *mut Session,
directories_paths_and_digests_value: Handle,
) -> PyResult {
let values = externs::project_multi(&directories_paths_and_digests_value.into(), "dependencies");
Expand All @@ -826,13 +833,18 @@ pub extern "C" fn materialize_directories(
return e.into();
}
};

let workunit_store = with_session(session_ptr, |session| session.workunit_store());
with_scheduler(scheduler_ptr, |scheduler| {
scheduler.core.executor.block_on(
futures::future::join_all(
dir_and_digests
.into_iter()
.map(|(dir, digest)| scheduler.core.store().materialize_directory(dir, digest))
.map(|(dir, digest)| {
scheduler
.core
.store()
.materialize_directory(dir, digest, workunit_store.clone())
})
.collect::<Vec<_>>(),
)
.map(|_| ()),
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/fs/brfs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ store = { path = "../store" }
task_executor = { path = "../../task_executor" }
time = "0.1.39"
tokio = "0.1"
workunit_store = { path = "../../workunit_store" }

[dev-dependencies]
bytes = "0.4.5"
Expand Down
Loading

0 comments on commit 2108156

Please sign in to comment.