Skip to content

Commit

Permalink
dvc: add size/nfiles for deps/outs (iterative#4836)
Browse files Browse the repository at this point in the history
* dvc: add size for deps/outs

Related to iterative#3256

* dvc: add nfiles for deps/outs

* dvc: put size/nfiles into the hash_info
  • Loading branch information
efiop authored Nov 4, 2020
1 parent e9a9415 commit e1b82c5
Show file tree
Hide file tree
Showing 23 changed files with 214 additions and 94 deletions.
39 changes: 28 additions & 11 deletions dvc/cache/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def _save_file(self, path_info, tree, hash_info, save_link=True, **kwargs):
# we need to update path and cache, since in case of reflink,
# or copy cache type moving original file results in updates on
# next executed command, which causes md5 recalculation
self.tree.state.save(path_info, hash_info.value)
self.tree.state.save(path_info, hash_info)
else:
if self.changed_cache(hash_info):
with tree.open(path_info, mode="rb") as fobj:
Expand All @@ -241,7 +241,7 @@ def _save_file(self, path_info, tree, hash_info, save_link=True, **kwargs):
if callback:
callback(1)

self.tree.state.save(cache_info, hash_info.value)
self.tree.state.save(cache_info, hash_info)

def _cache_is_copy(self, path_info):
"""Checks whether cache uses copies."""
Expand Down Expand Up @@ -287,6 +287,7 @@ def _get_dir_info_hash(self, dir_info):
hash_info = self.tree.get_file_hash(to_info)
hash_info.value += self.tree.CHECKSUM_DIR_SUFFIX
hash_info.dir_info = self._to_dict(dir_info)
hash_info.nfiles = len(dir_info)

return hash_info, to_info

Expand All @@ -305,7 +306,7 @@ def save_dir_info(self, dir_info, hash_info=None):
self.tree.makedirs(new_info.parent)
self.tree.move(tmp_info, new_info, mode=self.CACHE_MODE)

self.tree.state.save(new_info, hi.value)
self.tree.state.save(new_info, hi)

return hi

Expand All @@ -326,10 +327,10 @@ def _save_dir(self, path_info, tree, hash_info, save_link=True, **kwargs):
if save_link:
self.tree.state.save_link(path_info)
if self.tree.exists(path_info):
self.tree.state.save(path_info, hi.value)
self.tree.state.save(path_info, hi)

cache_info = self.tree.hash_to_path_info(hi.value)
self.tree.state.save(cache_info, hi.value)
self.tree.state.save(cache_info, hi)

@use_state
def save(self, path_info, tree, hash_info, save_link=True, **kwargs):
Expand Down Expand Up @@ -461,7 +462,7 @@ def _checkout_file(

self.link(cache_info, path_info)
self.tree.state.save_link(path_info)
self.tree.state.save(path_info, hash_info.value)
self.tree.state.save(path_info, hash_info)
if progress_callback:
progress_callback(str(path_info))

Expand Down Expand Up @@ -501,7 +502,7 @@ def _checkout_dir(
modified = True
self.safe_remove(entry_info, force=force)
self.link(entry_cache_info, entry_info)
self.tree.state.save(entry_info, entry_hash)
self.tree.state.save(entry_info, entry_hash_info)
if progress_callback:
progress_callback(str(entry_info))

Expand All @@ -511,7 +512,7 @@ def _checkout_dir(
)

self.tree.state.save_link(path_info)
self.tree.state.save(path_info, hash_info.value)
self.tree.state.save(path_info, hash_info)

# relink is not modified, assume it as nochange
return added, not added and modified and not relink
Expand Down Expand Up @@ -690,9 +691,20 @@ def _merge_dirs(self, ancestor_info, our_info, their_info):

# Sorting the list by path to ensure reproducibility
return sorted(
self._from_dict(merged), key=itemgetter(self.tree.PARAM_RELPATH)
self._from_dict(merged), key=itemgetter(self.tree.PARAM_RELPATH),
)

def _get_dir_size(self, dir_info):
def _getsize(entry):
return self.tree.getsize(
self.tree.hash_to_path_info(entry[self.tree.PARAM_CHECKSUM])
)

try:
return sum(_getsize(entry) for entry in dir_info)
except FileNotFoundError:
return None

def merge(self, ancestor_info, our_info, their_info):
assert our_info
assert their_info
Expand All @@ -706,7 +718,9 @@ def merge(self, ancestor_info, our_info, their_info):
their = self.get_dir_cache(their_info)

merged = self._merge_dirs(ancestor, our, their)
return self.save_dir_info(merged)
hash_info = self.save_dir_info(merged)
hash_info.size = self._get_dir_size(merged)
return hash_info

@use_state
def get_hash(self, tree, path_info):
Expand All @@ -715,9 +729,12 @@ def get_hash(self, tree, path_info):
assert hash_info.name == self.tree.PARAM_CHECKSUM
return hash_info

return self.save_dir_info(hash_info.dir_info, hash_info)
hi = self.save_dir_info(hash_info.dir_info, hash_info)
hi.size = hash_info.size
return hi

def set_dir_info(self, hash_info):
assert hash_info.isdir

hash_info.dir_info = self._to_dict(self.get_dir_cache(hash_info))
hash_info.nfiles = len(hash_info.dir_info)
3 changes: 2 additions & 1 deletion dvc/cache/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,8 @@ def pull(self, named_cache, remote, jobs=None, show_checksums=False):
# be removed upon status, while files corrupted during
# download will not be moved from tmp_file
# (see `BaseTree.download()`)
self.tree.state.save(cache_file, checksum)
hash_info = HashInfo(self.tree.PARAM_CHECKSUM, checksum)
self.tree.state.save(cache_file, hash_info)

return ret

Expand Down
28 changes: 24 additions & 4 deletions dvc/hash_info.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,46 @@
from collections import OrderedDict
from dataclasses import dataclass, field

HASH_DIR_SUFFIX = ".dir"


@dataclass
class HashInfo:
PARAM_SIZE = "size"
PARAM_NFILES = "nfiles"

name: str
value: str
dir_info: dict = field(default=None, compare=False)
size: int = field(default=None, compare=False)
nfiles: int = field(default=None, compare=False)

def __bool__(self):
return bool(self.value)

@classmethod
def from_dict(cls, d):
if not d:
_d = d.copy() if d else {}
size = _d.pop(cls.PARAM_SIZE, None)
nfiles = _d.pop(cls.PARAM_NFILES, None)

if not _d:
return cls(None, None)
((name, value),) = d.items()
return cls(name, value)

((name, value),) = _d.items()
return cls(name, value, size=size, nfiles=nfiles)

def to_dict(self):
return {self.name: self.value} if self else {}
ret = OrderedDict()
if not self:
return ret

ret[self.name] = self.value
if self.size is not None:
ret[self.PARAM_SIZE] = self.size
if self.nfiles is not None:
ret[self.PARAM_NFILES] = self.nfiles
return ret

@property
def isdir(self):
Expand Down
3 changes: 3 additions & 0 deletions dvc/output/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from funcy import collecting, project
from voluptuous import And, Any, Coerce, Length, Lower, Required, SetTo

from dvc.hash_info import HashInfo
from dvc.output.base import BaseOutput
from dvc.output.gs import GSOutput
from dvc.output.hdfs import HDFSOutput
Expand Down Expand Up @@ -59,6 +60,8 @@
SCHEMA[BaseOutput.PARAM_METRIC] = BaseOutput.METRIC_SCHEMA
SCHEMA[BaseOutput.PARAM_PLOT] = bool
SCHEMA[BaseOutput.PARAM_PERSIST] = bool
SCHEMA[HashInfo.PARAM_SIZE] = int
SCHEMA[HashInfo.PARAM_NFILES] = int


def _get(
Expand Down
11 changes: 9 additions & 2 deletions dvc/output/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,15 @@ def _check_can_merge(self, out):
my = self.dumpd()
other = out.dumpd()

my.pop(self.tree.PARAM_CHECKSUM)
other.pop(self.tree.PARAM_CHECKSUM)
ignored = [
self.tree.PARAM_CHECKSUM,
HashInfo.PARAM_SIZE,
HashInfo.PARAM_NFILES,
]

for opt in ignored:
my.pop(opt, None)
other.pop(opt, None)

if my != other:
raise MergeError(
Expand Down
8 changes: 7 additions & 1 deletion dvc/schema.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from voluptuous import Any, Optional, Required, Schema

from dvc import dependency, output
from dvc.hash_info import HashInfo
from dvc.output import CHECKSUMS_SCHEMA, BaseOutput
from dvc.parsing import FOREACH_KWD, IN_KWD, SET_KWD, USE_KWD, VARS_KWD
from dvc.stage.params import StageParams
Expand All @@ -18,7 +19,12 @@
StageParams.PARAM_ALWAYS_CHANGED: bool,
}

DATA_SCHEMA = {**CHECKSUMS_SCHEMA, Required("path"): str}
DATA_SCHEMA = {
**CHECKSUMS_SCHEMA,
Required("path"): str,
HashInfo.PARAM_SIZE: int,
HashInfo.PARAM_NFILES: int,
}
LOCK_FILE_STAGE_SCHEMA = {
Required(StageParams.PARAM_CMD): str,
StageParams.PARAM_DEPS: [DATA_SCHEMA],
Expand Down
6 changes: 5 additions & 1 deletion dvc/stage/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ def __init__(self, stage):


def _get_cache_hash(cache, key=False):
from dvc.hash_info import HashInfo

if key:
cache["outs"] = [out["path"] for out in cache.get("outs", [])]
return dict_sha256(cache)
return dict_sha256(
cache, exclude=[HashInfo.PARAM_SIZE, HashInfo.PARAM_NFILES]
)


def _get_stage_hash(stage):
Expand Down
18 changes: 9 additions & 9 deletions dvc/stage/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,18 +134,18 @@ def to_pipeline_file(stage: "PipelineStage"):
def to_single_stage_lockfile(stage: "Stage") -> dict:
assert stage.cmd

def _dumpd(item):
ret = [
(item.PARAM_PATH, item.def_path),
*item.hash_info.to_dict().items(),
]

return OrderedDict(ret)

res = OrderedDict([("cmd", stage.cmd)])
params, deps = split_params_deps(stage)
deps, outs = [
[
OrderedDict(
[
(PARAM_PATH, item.def_path),
*item.hash_info.to_dict().items(),
]
)
for item in sort_by_path(items)
]
[_dumpd(item) for item in sort_by_path(items)]
for items in [deps, stage.outs]
]
params = _serialize_params_values(params)
Expand Down
5 changes: 5 additions & 0 deletions dvc/stage/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dvc.utils.fs import path_isin

from ..dependency import ParamsDependency
from ..hash_info import HashInfo
from ..tree.local import LocalTree
from ..tree.s3 import S3Tree
from ..utils import dict_md5, format_link, relpath
Expand Down Expand Up @@ -136,6 +137,8 @@ def stage_dump_eq(stage_cls, old_d, new_d):
for out in outs:
out.pop(LocalTree.PARAM_CHECKSUM, None)
out.pop(S3Tree.PARAM_CHECKSUM, None)
out.pop(HashInfo.PARAM_SIZE, None)
out.pop(HashInfo.PARAM_NFILES, None)

# outs and deps are lists of dicts. To check equality, we need to make
# them independent of the order, so, we convert them to dicts.
Expand Down Expand Up @@ -171,6 +174,8 @@ def compute_md5(stage):
stage.PARAM_FROZEN,
BaseOutput.PARAM_METRIC,
BaseOutput.PARAM_PERSIST,
HashInfo.PARAM_SIZE,
HashInfo.PARAM_NFILES,
],
)

Expand Down
Loading

0 comments on commit e1b82c5

Please sign in to comment.