Skip to content

Commit

Permalink
remote: support saving RepoTree objects directly to cache (iterative#…
Browse files Browse the repository at this point in the history
…3825)

* DvcTree: use out.dir_cache instead of out.collect_use_dir_cache

* RepoTree: support filtering out dvcfiles during walk()

* RepoTree: support recursive isdvc()

* remote: support computing checksums for tree objects

* RepoTree: pull dir contents on walk() if in `fetch` mode

* tests: add func test for DVC dir pull on RepoTree.walk()

* DvcTree: support granular open()

* RepoTree: add get_file_checksum()

* remote: revert checksum changes

* remote: add tree support for cache.save()

* tests: test saving directly to cache from RepoTree

* remote: local cache needs its own WorkingTree instance when repo.tree is
GitTree

* RepoTree: state context should be entered by open()/walk() caller

* RepoTree: only pull dir contents when cache has changed

* tree: update tree tests

* fix ds warnings, update comments

* fix windows tests
  • Loading branch information
pmrowla authored May 21, 2020
1 parent 4dac209 commit 75e795c
Show file tree
Hide file tree
Showing 9 changed files with 383 additions and 136 deletions.
25 changes: 15 additions & 10 deletions dvc/output/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,17 @@ def unprotect(self):
if self.exists:
self.remote.unprotect(self.path_info)

def get_dir_cache(self, **kwargs):
if not self.is_dir_checksum:
raise DvcException("cannot get dir cache for file checksum")
if self.cache.changed_cache_file(self.checksum):
self.repo.cloud.pull(
NamedCache.make("local", self.checksum, str(self)),
show_checksums=False,
**kwargs,
)
return self.dir_cache

def collect_used_dir_cache(
self, remote=None, force=False, jobs=None, filter_info=None
):
Expand All @@ -371,16 +382,10 @@ def collect_used_dir_cache(

cache = NamedCache()

if self.cache.changed_cache_file(self.checksum):
try:
self.repo.cloud.pull(
NamedCache.make("local", self.checksum, str(self)),
jobs=jobs,
remote=remote,
show_checksums=False,
)
except DvcException:
logger.debug(f"failed to pull cache for '{self}'")
try:
self.get_dir_cache(jobs=jobs, remote=remote)
except DvcException:
logger.debug(f"failed to pull cache for '{self}'")

if self.cache.changed_cache_file(self.checksum):
msg = (
Expand Down
117 changes: 83 additions & 34 deletions dvc/remote/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from dvc.progress import Tqdm
from dvc.remote.index import RemoteIndex, RemoteIndexNoop
from dvc.remote.slow_link_detection import slow_link_guard
from dvc.scm.tree import is_working_tree
from dvc.state import StateNoop
from dvc.utils import tmp_fname
from dvc.utils.fs import makedirs, move
Expand Down Expand Up @@ -253,13 +254,17 @@ def get_dir_checksum(self, path_info):
raise RemoteCacheRequiredError(path_info)

dir_info = self._collect_dir(path_info)
return self._save_dir_info(dir_info, path_info)

def _save_dir_info(self, dir_info, path_info=None):
checksum, tmp_info = self._get_dir_info_checksum(dir_info)
new_info = self.cache.checksum_to_path_info(checksum)
if self.cache.changed_cache_file(checksum):
self.cache.makedirs(new_info.parent)
self.cache.move(tmp_info, new_info, mode=self.CACHE_MODE)

self.state.save(path_info, checksum)
if path_info:
self.state.save(path_info, checksum)
self.state.save(new_info, checksum)

return checksum
Expand Down Expand Up @@ -454,27 +459,33 @@ def _do_link(self, from_info, to_info, link_method):
"Created '%s': %s -> %s", self.cache_types[0], from_info, to_info,
)

def _save_file(self, path_info, checksum, save_link=True):
def _save_file(self, path_info, checksum, save_link=True, tree=None):
assert checksum

cache_info = self.checksum_to_path_info(checksum)
if self.changed_cache(checksum):
self.move(path_info, cache_info, mode=self.CACHE_MODE)
self.link(cache_info, path_info)
elif self.iscopy(path_info) and self._cache_is_copy(path_info):
# Default relink procedure involves unneeded copy
self.unprotect(path_info)
if tree:
if self.changed_cache(checksum):
with tree.open(path_info, mode="rb") as fobj:
self.copy_fobj(fobj, cache_info)
else:
self.remove(path_info)
self.link(cache_info, path_info)
if self.changed_cache(checksum):
self.move(path_info, cache_info, mode=self.CACHE_MODE)
self.link(cache_info, path_info)
elif self.iscopy(path_info) and self._cache_is_copy(path_info):
# Default relink procedure involves unneeded copy
self.unprotect(path_info)
else:
self.remove(path_info)
self.link(cache_info, path_info)

if save_link:
self.state.save_link(path_info)
if save_link:
self.state.save_link(path_info)

# we need to update path and cache, since in case of reflink,
# or copy cache type moving original file results in updates on
# next executed command, which causes md5 recalculation
self.state.save(path_info, checksum)
if not tree or is_working_tree(tree):
self.state.save(path_info, checksum)
self.state.save(cache_info, checksum)

def _cache_is_copy(self, path_info):
Expand All @@ -499,22 +510,43 @@ def _cache_is_copy(self, path_info):
self.cache_type_confirmed = True
return self.cache_types[0] == "copy"

def _save_dir(self, path_info, checksum, save_link=True):
cache_info = self.checksum_to_path_info(checksum)
dir_info = self.get_dir_cache(checksum)
def _save_dir(self, path_info, checksum, save_link=True, tree=None):
if tree:
checksum = self._save_tree(path_info, tree)
else:
dir_info = self.get_dir_cache(checksum)

for entry in Tqdm(
dir_info, desc="Saving " + path_info.name, unit="file"
):
entry_info = path_info / entry[self.PARAM_RELPATH]
entry_checksum = entry[self.PARAM_CHECKSUM]
self._save_file(entry_info, entry_checksum, save_link=False)
for entry in Tqdm(
dir_info, desc="Saving " + path_info.name, unit="file"
):
entry_info = path_info / entry[self.PARAM_RELPATH]
entry_checksum = entry[self.PARAM_CHECKSUM]
self._save_file(entry_info, entry_checksum, save_link=False)

if save_link:
self.state.save_link(path_info)
if save_link:
self.state.save_link(path_info)

cache_info = self.checksum_to_path_info(checksum)
self.state.save(cache_info, checksum)
self.state.save(path_info, checksum)
if not tree or is_working_tree(tree):
self.state.save(path_info, checksum)

def _save_tree(self, path_info, tree):
# save tree directory to cache, collect dir cache during walk and
# return the resulting dir checksum
dir_info = []
for fname in tree.walk_files(path_info):
checksum = tree.get_file_checksum(fname)
file_info = {
self.PARAM_CHECKSUM: checksum,
self.PARAM_RELPATH: fname.relative_to(path_info).as_posix(),
}
self._save_file(fname, checksum, tree=tree)
dir_info.append(file_info)

return self._save_dir_info(
sorted(dir_info, key=itemgetter(self.PARAM_RELPATH))
)

def is_empty(self, path_info):
return False
Expand Down Expand Up @@ -543,22 +575,36 @@ def walk_files(self, path_info):
def protect(path_info):
pass

def save(self, path_info, checksum_info, save_link=True):
def save(self, path_info, checksum_info, save_link=True, tree=None):
if path_info.scheme != self.scheme:
raise RemoteActionNotImplemented(
f"save {path_info.scheme} -> {self.scheme}", self.scheme,
)

checksum = checksum_info[self.PARAM_CHECKSUM]
self._save(path_info, checksum, save_link)
if tree:
# save checksum will be computed during tree walk
checksum = None
else:
checksum = checksum_info[self.PARAM_CHECKSUM]
self._save(path_info, checksum, save_link, tree)

def _save(self, path_info, checksum, save_link=True, tree=None):
if tree:
logger.debug("Saving tree path '%s' to cache.", path_info)
else:
to_info = self.checksum_to_path_info(checksum)
logger.debug("Saving '%s' to '%s'.", path_info, to_info)

def _save(self, path_info, checksum, save_link=True):
to_info = self.checksum_to_path_info(checksum)
logger.debug("Saving '%s' to '%s'.", path_info, to_info)
if self.isdir(path_info):
self._save_dir(path_info, checksum, save_link)
if tree:
isdir = tree.isdir
save_link = False
else:
isdir = self.isdir

if isdir(path_info):
self._save_dir(path_info, checksum, save_link, tree)
return
self._save_file(path_info, checksum, save_link)
self._save_file(path_info, checksum, save_link, tree)

def _handle_transfer_exception(
self, from_info, to_info, exception, operation
Expand Down Expand Up @@ -697,6 +743,9 @@ def move(self, from_info, to_info, mode=None):
def copy(self, from_info, to_info):
raise RemoteActionNotImplemented("copy", self.scheme)

def copy_fobj(self, fobj, to_info):
raise RemoteActionNotImplemented("copy_fobj", self.scheme)

def symlink(self, from_info, to_info):
raise RemoteActionNotImplemented("symlink", self.scheme)

Expand Down
60 changes: 51 additions & 9 deletions dvc/remote/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,17 @@
)
from dvc.remote.index import RemoteIndexNoop
from dvc.scheme import Schemes
from dvc.scm.tree import is_working_tree
from dvc.scm.tree import WorkingTree, is_working_tree
from dvc.system import System
from dvc.utils import file_md5, relpath, tmp_fname
from dvc.utils.fs import copyfile, makedirs, move, remove, walk_files
from dvc.utils.fs import (
copy_fobj_to_file,
copyfile,
makedirs,
move,
remove,
walk_files,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -61,6 +68,21 @@ def cache_dir(self):
def cache_dir(self, value):
self.path_info = PathInfo(value) if value else None

@cached_property
def _work_tree(self):
if self.repo:
return WorkingTree(self.repo.root_dir)
return None

@property
def work_tree(self):
# When using repo.brancher, repo.tree may change to/from WorkingTree to
# GitTree arbitarily. When repo.tree is GitTree, local cache needs to
# use its own WorkingTree instance.
if self.repo and not is_working_tree(self.repo.tree):
return self._work_tree
return None

@classmethod
def supported(cls, config):
return True
Expand Down Expand Up @@ -92,8 +114,11 @@ def get(self, md5):
return self.checksum_to_path_info(md5).url

def exists(self, path_info):
assert is_working_tree(self.repo.tree)
assert isinstance(path_info, str) or path_info.scheme == "local"
if not self.repo:
return os.path.exists(path_info)
if self.work_tree and self.work_tree.exists(path_info):
return True
return self.repo.tree.exists(path_info)

def makedirs(self, path_info):
Expand Down Expand Up @@ -126,13 +151,19 @@ def is_empty(self, path_info):

return False

@staticmethod
def isfile(path_info):
return os.path.isfile(path_info)
def isfile(self, path_info):
if not self.repo:
return os.path.isfile(path_info)
if self.work_tree and self.work_tree.isfile(path_info):
return True
return self.repo.tree.isfile(path_info)

@staticmethod
def isdir(path_info):
return os.path.isdir(path_info)
def isdir(self, path_info):
if not self.repo:
return os.path.isdir(path_info)
if self.work_tree and self.work_tree.isdir(path_info):
return True
return self.repo.tree.isdir(path_info)

def iscopy(self, path_info):
return not (
Expand Down Expand Up @@ -187,6 +218,17 @@ def copy(self, from_info, to_info):
self.remove(tmp_info)
raise

def copy_fobj(self, fobj, to_info):
self.makedirs(to_info.parent)
tmp_info = to_info.parent / tmp_fname(to_info.name)
try:
copy_fobj_to_file(fobj, tmp_info)
os.chmod(tmp_info, self._file_mode)
os.rename(tmp_info, to_info)
except Exception:
self.remove(tmp_info)
raise

@staticmethod
def symlink(from_info, to_info):
System.symlink(from_info, to_info)
Expand Down
17 changes: 9 additions & 8 deletions dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,17 +488,18 @@ def open_by_relpath(self, path, remote=None, mode="r", encoding=None):
tree = RepoTree(self, stream=True)
path = os.path.join(self.root_dir, path)
try:
with tree.open(
os.path.join(self.root_dir, path),
mode=mode,
encoding=encoding,
remote=remote,
) as fobj:
yield fobj
with self.state:
with tree.open(
os.path.join(self.root_dir, path),
mode=mode,
encoding=encoding,
remote=remote,
) as fobj:
yield fobj
except FileNotFoundError as exc:
raise FileMissingError(path) from exc
except IsADirectoryError as exc:
raise DvcIsADirectoryError from exc
raise DvcIsADirectoryError(f"'{path}' is a directory") from exc

def close(self):
self.scm.close()
Expand Down
Loading

0 comments on commit 75e795c

Please sign in to comment.