Skip to content

Commit

Permalink
remote: use .dir checksum existence to infer file contents existence (i…
Browse files Browse the repository at this point in the history
…terative#3632)

* repo: separate dir cache and file cache in memory

- `used_cache()`/`get_used_cache()` in repo/stage/output now return
  tuples of (dir_cache, file_cache) instead of one flat/merged cache

* update tests for new get_used_cache behavior

* remote: if .dir checksum exists on remote, assume contents also exists

- affects all commands which use `cache_exists()` (remote status)

* push: only upload .dir file after full file contents has been uploaded

* gc: always remove .dir checksums first

* functional tests for push/gc

* repo: support nesting dir caches in NamedCache

* remote: NamedCache updates

* Fix tests

* Fix deepsource warnings

* tests: use pytest mocker fixture

* Update dvc/cache.py

Co-Authored-By: Saugat Pachhai <[email protected]>

Co-authored-by: Ruslan Kuprieiev <[email protected]>
Co-authored-by: Saugat Pachhai <[email protected]>
  • Loading branch information
3 people authored Apr 14, 2020
1 parent 65854c4 commit 7275843
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 50 deletions.
76 changes: 69 additions & 7 deletions dvc/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,39 @@ def __init__(self, repo):
azure = _make_remote_property("azure")


class NamedCacheItem:
def __init__(self):
self.names = set()
self.children = defaultdict(NamedCacheItem)

def __eq__(self, other):
return self.names == other.names and self.children == other.children

def child_keys(self):
for key, child in self.children.items():
yield key
yield from child.child_keys()

def child_names(self):
for key, child in self.children.items():
yield key, child.names
yield from child.child_names()

def add(self, checksum, item):
self.children[checksum].update(item)

def update(self, item, suffix=""):
if suffix:
self.names.update(n + suffix for n in item.names)
else:
self.names.update(item.names)
for checksum, child_item in item.children.items():
self.children[checksum].update(child_item)


class NamedCache(object):
def __init__(self):
self._items = defaultdict(lambda: defaultdict(set))
self._items = defaultdict(lambda: defaultdict(NamedCacheItem))
self.external = defaultdict(set)

@classmethod
Expand All @@ -86,19 +116,51 @@ def __getitem__(self, key):
return self._items[key]

def add(self, scheme, checksum, name):
self._items[scheme][checksum].add(name)
"""Add a mapped name for the specified checksum."""
self._items[scheme][checksum].names.add(name)

def add_child_cache(self, checksum, cache, suffix=""):
"""Add/update child cache for the specified checksum."""
for scheme, src in cache._items.items():
dst = self._items[scheme][checksum].children
for child_checksum, item in src.items():
dst[child_checksum].update(item, suffix=suffix)

for repo_pair, files in cache.external.items():
self.external[repo_pair].update(files)

def add_external(self, url, rev, path):
self.external[url, rev].add(path)

def update(self, cache, suffix=""):
for scheme, src in cache._items.items():
dst = self._items[scheme]
for checksum, names in src.items():
if suffix:
dst[checksum].update(n + suffix for n in names)
else:
dst[checksum].update(names)
for checksum, item in src.items():
dst[checksum].update(item, suffix=suffix)

for repo_pair, files in cache.external.items():
self.external[repo_pair].update(files)

def scheme_keys(self, scheme):
"""Iterate over a flat list of all keys for the specified scheme,
including children.
"""
for key, item in self._items[scheme].items():
yield key
yield from item.child_keys()

def scheme_names(self, scheme):
"""Iterate over a flat list of checksum, names items for the specified
scheme, including children.
"""
for key, item in self._items[scheme].items():
yield key, item.names
yield from item.child_names()

def dir_keys(self, scheme):
return (
key for key, item in self._items[scheme].items() if item.children
)

def child_keys(self, scheme, checksum):
return self._items[scheme][checksum].child_keys()
9 changes: 5 additions & 4 deletions dvc/data_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,13 @@ def pull(self, cache, jobs=None, remote=None, show_checksums=False):
return downloaded_items_num

def _save_pulled_checksums(self, cache):
for checksum in cache["local"].keys():
for checksum in cache.scheme_keys("local"):
cache_file = self.repo.cache.local.checksum_to_path_info(checksum)
if self.repo.cache.local.exists(cache_file):
# We can safely save here, as existing corrupted files will be
# removed upon status, while files corrupted during download
# will not be moved from tmp_file (see `RemoteBASE.download()`)
# We can safely save here, as existing corrupted files will
# be removed upon status, while files corrupted during
# download will not be moved from tmp_file
# (see `RemoteBASE.download()`)
self.repo.state.save(cache_file, checksum)

def status(self, cache, jobs=None, remote=None, show_checksums=False):
Expand Down
4 changes: 3 additions & 1 deletion dvc/output/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,9 @@ def get_used_cache(self, **kwargs):
if not self.is_dir_checksum:
return ret

ret.update(self._collect_used_dir_cache(**kwargs))
ret.add_child_cache(
self.checksum, self._collect_used_dir_cache(**kwargs),
)

return ret

Expand Down
12 changes: 8 additions & 4 deletions dvc/remote/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,14 +732,18 @@ def all(self, jobs=None, name=None):
)

def gc(self, named_cache, jobs=None):
logger.debug("named_cache: {} jobs: {}".format(named_cache, jobs))
used = self.extract_used_local_checksums(named_cache)

if self.scheme != "":
used.update(named_cache[self.scheme])
used.update(named_cache.scheme_keys(self.scheme))

removed = False
for checksum in self.all(jobs, str(self.path_info)):
# checksums must be sorted to ensure we always remove .dir files first
for checksum in sorted(
self.all(jobs, str(self.path_info)),
key=self.is_dir_checksum,
reverse=True,
):
if checksum in used:
continue
path_info = self.checksum_to_path_info(checksum)
Expand Down Expand Up @@ -1247,7 +1251,7 @@ def _get_unpacked_dir_names(self, checksums):
return set()

def extract_used_local_checksums(self, named_cache):
used = set(named_cache["local"])
used = set(named_cache.scheme_keys("local"))
unpacked = self._get_unpacked_dir_names(used)
return used | unpacked

Expand Down
160 changes: 136 additions & 24 deletions dvc/remote/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import logging
import os
import stat
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed, ThreadPoolExecutor
from functools import partial

from funcy import concat

from shortuuid import uuid

from dvc.compat import fspath_py35
Expand Down Expand Up @@ -255,37 +257,102 @@ def status(
show_checksums=False,
download=False,
):
# Return flattened dict containing all status info
dir_status, file_status, _ = self._status(
named_cache,
remote,
jobs=jobs,
show_checksums=show_checksums,
download=download,
)
return dict(dir_status, **file_status)

def _status(
self,
named_cache,
remote,
jobs=None,
show_checksums=False,
download=False,
):
"""Return a tuple of (dir_status_info, file_status_info, dir_mapping).
dir_status_info contains status for .dir files, file_status_info
contains status for all other files, and dir_mapping is a dict of
{dir_path_info: set(file_path_info...)} which can be used to map
a .dir file to its file contents.
"""
logger.debug(
"Preparing to collect status from {}".format(remote.path_info)
)
md5s = list(named_cache[self.scheme])
md5s = set(named_cache.scheme_keys(self.scheme))

logger.debug("Collecting information from local cache...")
local_exists = self.cache_exists(md5s, jobs=jobs, name=self.cache_dir)
local_exists = frozenset(
self.cache_exists(md5s, jobs=jobs, name=self.cache_dir)
)

# This is a performance optimization. We can safely assume that,
# if the resources that we want to fetch are already cached,
# there's no need to check the remote storage for the existence of
# those files.
if download and sorted(local_exists) == sorted(md5s):
if download and local_exists == md5s:
remote_exists = local_exists
else:
logger.debug("Collecting information from remote cache...")
remote_exists = list(
remote.cache_exists(
md5s, jobs=jobs, name=str(remote.path_info)
remote_exists = set()
dir_md5s = set(named_cache.dir_keys(self.scheme))
if dir_md5s:
# If .dir checksum exists on the remote, assume directory
# contents also exists on the remote
for dir_checksum in remote._cache_object_exists(dir_md5s):
file_checksums = list(
named_cache.child_keys(self.scheme, dir_checksum)
)
logger.debug(
"'{}' exists on remote, "
"assuming '{}' files also exist".format(
dir_checksum, len(file_checksums)
)
)
md5s.remove(dir_checksum)
remote_exists.add(dir_checksum)
md5s.difference_update(file_checksums)
remote_exists.update(file_checksums)
if md5s:
remote_exists.update(
remote.cache_exists(
md5s, jobs=jobs, name=str(remote.path_info)
)
)
)

ret = {
checksum: {"name": checksum if show_checksums else " ".join(names)}
for checksum, names in named_cache[self.scheme].items()
}
self._fill_statuses(ret, local_exists, remote_exists)
def make_names(checksum, names):
return {"name": checksum if show_checksums else " ".join(names)}

dir_status = {}
file_status = {}
dir_paths = {}
for checksum, item in named_cache[self.scheme].items():
if item.children:
dir_status[checksum] = make_names(checksum, item.names)
file_status.update(
{
child_checksum: make_names(child_checksum, child.names)
for child_checksum, child in item.children.items()
}
)
dir_paths[remote.checksum_to_path_info(checksum)] = frozenset(
map(remote.checksum_to_path_info, item.child_keys())
)
else:
file_status[checksum] = make_names(checksum, item.names)

self._fill_statuses(dir_status, local_exists, remote_exists)
self._fill_statuses(file_status, local_exists, remote_exists)

self._log_missing_caches(ret)
self._log_missing_caches(dict(dir_status, **file_status))

return ret
return dir_status, file_status, dir_paths

@staticmethod
def _fill_statuses(checksum_info_dir, local_exists, remote_exists):
Expand Down Expand Up @@ -347,31 +414,76 @@ def _process(
if jobs is None:
jobs = remote.JOBS

status_info = self.status(
dir_status, file_status, dir_paths = self._status(
named_cache,
remote,
jobs=jobs,
show_checksums=show_checksums,
download=download,
)

plans = self._get_plans(download, remote, status_info, status)
dir_plans = self._get_plans(download, remote, dir_status, status)
file_plans = self._get_plans(download, remote, file_status, status)

if len(plans[0]) == 0:
if len(dir_plans[0]) + len(file_plans[0]) == 0:
return 0

if jobs > 1:
with ThreadPoolExecutor(max_workers=jobs) as executor:
fails = sum(executor.map(func, *plans))
else:
fails = sum(map(func, *plans))
with ThreadPoolExecutor(max_workers=jobs) as executor:
if download:
fails = sum(executor.map(func, *dir_plans))
fails += sum(executor.map(func, *file_plans))
else:
# for uploads, push files first, and any .dir files last

file_futures = {}
for from_info, to_info, name in zip(*file_plans):
file_futures[to_info] = executor.submit(
func, from_info, to_info, name
)
dir_futures = {}
for from_info, to_info, name in zip(*dir_plans):
wait_futures = {
future
for file_path, future in file_futures.items()
if file_path in dir_paths[to_info]
}
dir_futures[to_info] = executor.submit(
self._dir_upload,
func,
wait_futures,
from_info,
to_info,
name,
)
fails = sum(
future.result()
for future in concat(
file_futures.values(), dir_futures.values()
)
)

if fails:
if download:
raise DownloadError(fails)
raise UploadError(fails)

return len(plans[0])
return len(dir_plans[0]) + len(file_plans[0])

@staticmethod
def _dir_upload(func, futures, from_info, to_info, name):
for future in as_completed(futures):
if future.result():
# do not upload this .dir file if any file in this
# directory failed to upload
logger.debug(
"failed to upload full contents of '{}', "
"aborting .dir file upload".format(name)
)
logger.error(
"failed to upload '{}' to '{}'".format(from_info, to_info)
)
return 1
return func(from_info, to_info, name)

def push(self, named_cache, remote, jobs=None, show_checksums=False):
return self._process(
Expand Down
5 changes: 3 additions & 2 deletions dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,9 @@ def used_cache(
`all_branches`/`all_tags`/`all_commits` to expand the scope.
Returns:
A dictionary with Schemes (representing output's location) as keys,
and a list with the outputs' `dumpd` as values.
A dictionary with Schemes (representing output's location) mapped
to items containing the output's `dumpd` names and the output's
children (if the given output is a directory).
"""
from dvc.cache import NamedCache

Expand Down
1 change: 0 additions & 1 deletion dvc/repo/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from dvc.scm.base import CloneError
from dvc.path_info import PathInfo


logger = logging.getLogger(__name__)


Expand Down
Loading

0 comments on commit 7275843

Please sign in to comment.