Skip to content

Commit

Permalink
RepoTree: add support for subrepo traversal (iterative#4381)
Browse files Browse the repository at this point in the history
Right now, no actual "functionality" is surfaced as
`subrepos` is always set to False by default for now.
  • Loading branch information
skshetry authored Aug 15, 2020
1 parent 8b41655 commit 0b1d4f1
Show file tree
Hide file tree
Showing 7 changed files with 347 additions and 76 deletions.
6 changes: 3 additions & 3 deletions dvc/repo/ls.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ def onerror(exc):
if not recursive:
for dname in dirs:
info = PathInfo(root) / dname
if not dvc_only or (
tree.dvctree and tree.dvctree.exists(info)
):
# pylint:disable=protected-access
_, dvctree = tree._get_tree_pairs(info) # noqa
if not dvc_only or (dvctree and dvctree.exists(info)):
dvc = tree.isdvc(info)
path = str(info.relative_to(path_info))
ret[path] = {
Expand Down
198 changes: 142 additions & 56 deletions dvc/repo/tree.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import logging
import os
import threading
from itertools import takewhile
from typing import Optional, Tuple

from funcy import wrap_with
from pygtrie import StringTrie

from dvc.dvcfile import is_valid_filename
from dvc.exceptions import OutputNotFoundError
Expand Down Expand Up @@ -248,78 +254,148 @@ class RepoTree(BaseTree): # pylint:disable=abstract-method
scheme = "local"
PARAM_CHECKSUM = "md5"

def __init__(self, repo, **kwargs):
def __init__(self, repo, subrepos=False, repo_factory=None, **kwargs):
super().__init__(repo, {"url": repo.root_dir})
if hasattr(repo, "dvc_dir"):
self.dvctree = DvcTree(repo, **kwargs)

if not repo_factory:
from dvc.repo import Repo

self.repo_factory = Repo
else:
# git-only erepo's do not need dvctree
self.dvctree = None
self.repo_factory = repo_factory

self._main_repo = repo
self.root_dir = repo.root_dir
self._traverse_subrepos = subrepos

self._discovered_subrepos = StringTrie(separator=os.sep)
self._discovered_subrepos[self.root_dir] = repo

self._dvctrees = {}
self._dvctree_configs = kwargs

if hasattr(repo, "dvc_dir"):
self._dvctrees[repo.root_dir] = DvcTree(repo, **kwargs)

def _get_repo(self, path):
repo = self._discovered_subrepos.get(path)
if repo:
return repo

prefix, repo = self._discovered_subrepos.longest_prefix(path)
if not prefix:
return None

parents = (parent.fspath for parent in PathInfo(path).parents)
dirs = [path] + list(takewhile(lambda p: p != prefix, parents))
dirs.reverse()
self._update(dirs, starting_repo=repo)
return self._discovered_subrepos.get(path)

@wrap_with(threading.Lock())
def _update(self, dirs, starting_repo):
repo = starting_repo
for d in dirs:
if self._is_dvc_repo(d):
repo = self.repo_factory(d)
self._dvctrees[repo.root_dir] = DvcTree(
repo, **self._dvctree_configs
)
self._discovered_subrepos[d] = repo

def _is_dvc_repo(self, dir_path):
if not self._traverse_subrepos:
return False

from dvc.repo import Repo

repo_path = os.path.join(dir_path, Repo.DVC_DIR)
return self._main_repo.tree.isdir(repo_path, use_dvcignore=False)

def _get_tree_pairs(self, path) -> Tuple["BaseTree", Optional["DvcTree"]]:
path = os.path.abspath(path)
repo = self._get_repo(path)
if not repo:
# path could be outside of the repo, so we just send them the main
# tree instead
return self._main_repo.tree, self._dvctrees.get(self.root_dir)

dvc_tree = self._dvctrees.get(repo.root_dir)
return repo.tree, dvc_tree

@property
def fetch(self):
if self.dvctree:
return self.dvctree.fetch
return False
return "fetch" in self._dvctree_configs

@property
def stream(self):
if self.dvctree:
return self.dvctree.stream
return False
return "stream" in self._dvctree_configs

def open(
self, path, mode="r", encoding="utf-8", **kwargs
): # pylint: disable=arguments-differ
if "b" in mode:
encoding = None

if self.dvctree and self.dvctree.exists(path):
return self.dvctree.open(
path, mode=mode, encoding=encoding, **kwargs
)
return self.repo.tree.open(path, mode=mode, encoding=encoding)
tree, dvc_tree = self._get_tree_pairs(path)
if dvc_tree and dvc_tree.exists(path):
return dvc_tree.open(path, mode=mode, encoding=encoding, **kwargs)
return tree.open(path, mode=mode, encoding=encoding)

def exists(
self, path, use_dvcignore=True
): # pylint: disable=arguments-differ
return self.repo.tree.exists(path) or (
self.dvctree and self.dvctree.exists(path)
)
tree, dvc_tree = self._get_tree_pairs(path)
return tree.exists(path) or (dvc_tree and dvc_tree.exists(path))

def isdir(self, path): # pylint: disable=arguments-differ
return self.repo.tree.isdir(path) or (
self.dvctree and self.dvctree.isdir(path)
)
tree, dvc_tree = self._get_tree_pairs(path)
return tree.isdir(path) or (dvc_tree and dvc_tree.isdir(path))

def isdvc(self, path, **kwargs):
return self.dvctree is not None and self.dvctree.isdvc(path, **kwargs)
_, dvc_tree = self._get_tree_pairs(path)
return dvc_tree is not None and dvc_tree.isdvc(path, **kwargs)

def isfile(self, path): # pylint: disable=arguments-differ
return self.repo.tree.isfile(path) or (
self.dvctree and self.dvctree.isfile(path)
)
tree, dvc_tree = self._get_tree_pairs(path)
return tree.isfile(path) or (dvc_tree and dvc_tree.isfile(path))

def isexec(self, path):
if self.dvctree and self.dvctree.exists(path):
return self.dvctree.isexec(path)
return self.repo.tree.isexec(path)
tree, dvc_tree = self._get_tree_pairs(path)
if dvc_tree and dvc_tree.exists(path):
return dvc_tree.isexec(path)
return tree.isexec(path)

def stat(self, path):
return self.repo.tree.stat(path)
tree, _ = self._get_tree_pairs(path)
return tree.stat(path)

def _walk_one(self, walk):
def _dvc_walk(self, walk):
try:
root, dirs, files = next(walk)
except StopIteration:
return
yield root, dirs, files
for _ in dirs:
yield from self._walk_one(walk)
yield from self._dvc_walk(walk)

def _subrepo_walk(self, dir_path, **kwargs):
tree, dvc_tree = self._get_tree_pairs(dir_path)
tree_walk = tree.walk(
dir_path, topdown=True, ignore_subrepos=not self._traverse_subrepos
)
if dvc_tree:
dvc_walk = dvc_tree.walk(dir_path, topdown=True, **kwargs)
else:
dvc_walk = None
yield from self._walk(tree_walk, dvc_walk, **kwargs)

def _walk(self, dvc_walk, repo_walk, dvcfiles=False):
def _walk(self, repo_walk, dvc_walk=None, dvcfiles=False):
assert repo_walk
try:
_, dvc_dirs, dvc_fnames = next(dvc_walk)
_, dvc_dirs, dvc_fnames = (
next(dvc_walk) if dvc_walk else (None, [], [])
)
repo_root, repo_dirs, repo_fnames = next(repo_walk)
except StopIteration:
return
Expand Down Expand Up @@ -347,12 +423,15 @@ def _walk(self, dvc_walk, repo_walk, dvcfiles=False):
repo_dirs[:] = [dirname for dirname in dirs if dirname in repo_set]

for dirname in dirs:
if dirname in shared:
yield from self._walk(dvc_walk, repo_walk, dvcfiles=dvcfiles)
dir_path = os.path.join(repo_root, dirname)
if self._is_dvc_repo(dir_path):
yield from self._subrepo_walk(dir_path, dvcfiles=dvcfiles)
elif dirname in shared:
yield from self._walk(repo_walk, dvc_walk, dvcfiles=dvcfiles)
elif dirname in dvc_set:
yield from self._walk_one(dvc_walk)
yield from self._dvc_walk(dvc_walk)
elif dirname in repo_set:
yield from self._walk_one(repo_walk)
yield from self._walk(repo_walk, None, dvcfiles=dvcfiles)

def walk(
self, top, topdown=True, onerror=None, dvcfiles=False, **kwargs
Expand Down Expand Up @@ -382,22 +461,28 @@ def walk(
onerror(NotADirectoryError(top))
return

dvc_exists = self.dvctree and self.dvctree.exists(top)
repo_exists = self.repo.tree.exists(top)
if dvc_exists and not repo_exists:
yield from self.dvctree.walk(
top, topdown=topdown, onerror=onerror, **kwargs
)
return
if repo_exists and not dvc_exists:
yield from self.repo.tree.walk(
top, topdown=topdown, onerror=onerror
tree, dvc_tree = self._get_tree_pairs(top)
dvc_exists = dvc_tree and dvc_tree.exists(top)
repo_exists = tree.exists(top)
if dvc_exists:
dvc_walk = dvc_tree.walk(top, topdown=topdown, **kwargs)
if repo_exists:
repo_walk = tree.walk(
top,
topdown=topdown,
ignore_subrepos=not self._traverse_subrepos,
)
yield from self._walk(repo_walk, dvc_walk, dvcfiles=dvcfiles)
else:
yield from dvc_walk
else:
repo_walk = tree.walk(
top,
topdown=topdown,
onerror=onerror,
ignore_subrepos=not self._traverse_subrepos,
)
return

dvc_walk = self.dvctree.walk(top, topdown=topdown, **kwargs)
repo_walk = self.repo.tree.walk(top, topdown=topdown)
yield from self._walk(dvc_walk, repo_walk, dvcfiles=dvcfiles)
yield from self._walk(repo_walk, None, dvcfiles=dvcfiles)

def walk_files(self, top, **kwargs): # pylint: disable=arguments-differ
for root, _, files in self.walk(top, **kwargs):
Expand All @@ -413,9 +498,10 @@ def get_file_hash(self, path_info):
"""
if not self.exists(path_info):
raise FileNotFoundError
if self.dvctree and self.dvctree.exists(path_info):
_, dvc_tree = self._get_tree_pairs(path_info)
if dvc_tree and dvc_tree.exists(path_info):
try:
return self.dvctree.get_file_hash(path_info)
return dvc_tree.get_file_hash(path_info)
except OutputNotFoundError:
pass
return file_md5(path_info, self)[0]
Expand Down Expand Up @@ -444,4 +530,4 @@ def copytree(self, top, dest):

@property
def hash_jobs(self): # pylint: disable=invalid-overridden-method
return self.repo.tree.hash_jobs
return self._main_repo.tree.hash_jobs
3 changes: 3 additions & 0 deletions dvc/tree/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,6 @@ def walk_files(self, top): # pylint: disable=arguments-differ
for file in files:
# NOTE: os.path.join is ~5.5 times slower
yield f"{root}{os.sep}{file}"

def _reset(self):
return self.__dict__.pop("dvcignore", None)
3 changes: 3 additions & 0 deletions dvc/tree/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,3 +357,6 @@ def _remove_unpacked_dir(self, hash_):
info = self.hash_to_path_info(hash_)
path_info = info.with_name(info.name + self.UNPACKED_DIR_SUFFIX)
self.remove(path_info)

def _reset(self):
return self.__dict__.pop("dvcignore", None)
Loading

0 comments on commit 0b1d4f1

Please sign in to comment.