Skip to content

Commit

Permalink
repofs: use repo paths
Browse files Browse the repository at this point in the history
  • Loading branch information
efiop committed Apr 18, 2022
1 parent b3b0ec1 commit 9d75e82
Show file tree
Hide file tree
Showing 20 changed files with 213 additions and 199 deletions.
5 changes: 2 additions & 3 deletions dvc/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ def get_url(path, repo=None, rev=None, remote=None):
directory in the remote storage.
"""
with Repo.open(repo, rev=rev, subrepos=True, uninitialized=True) as _repo:
fs_path = _repo.fs.path.join(_repo.root_dir, path)
with reraise(FileNotFoundError, PathMissingError(path, repo)):
info = _repo.repo_fs.info(fs_path)
info = _repo.repo_fs.info(path)

if not info["isdvc"]:
raise OutputNotFoundError(path, repo)

cloud = info["repo"].cloud
dvc_path = _repo.fs.path.relpath(fs_path, info["repo"].root_dir)
_, _, dvc_fs, dvc_path = _repo.repo_fs._get_fs_pair(path)

if not os.path.isabs(path):
dvc_path = dvc_path.replace("\\", "/")
Expand Down
9 changes: 3 additions & 6 deletions dvc/data/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(
checksum: Optional[str] = None,
**kwargs,
):
from dvc.fs.repo import RepoFileSystem
super().__init__(fs_path, fs, hash_info, **kwargs)
self.checksum = checksum or fs.checksum(fs_path)

Expand Down Expand Up @@ -60,9 +61,6 @@ def to_bytes(self):
# ReferenceHashFiles should currently only be serialized in
# memory and not to disk
fs_path = self.fs_path
if isinstance(self.fs, RepoFileSystem):
fs_path = self.fs.path.relpath(fs_path, self.fs.root_dir)

dict_ = {
self.PARAM_PATH: fs_path,
self.PARAM_HASH: self.hash_info,
Expand Down Expand Up @@ -95,11 +93,10 @@ def from_bytes(cls, data: bytes, fs_cache: Optional[dict] = None):
if not fs:
config = dict(config_pairs)
if RepoFileSystem.PARAM_REPO_URL in config:
fs = RepoFileSystem(**config)
fs_path = fs.path.join(fs.root_dir, fs_path)
fs_cls = RepoFileSystem
else:
fs_cls = get_fs_cls(config, scheme=scheme)
fs = fs_cls(**config)
fs = fs_cls(**config)
return ReferenceHashFile(
fs_path,
fs,
Expand Down
15 changes: 9 additions & 6 deletions dvc/dependency/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,14 @@ def _get_used_and_obj(
if locked and self.def_repo.get(self.PARAM_REV_LOCK) is None:
self.def_repo[self.PARAM_REV_LOCK] = rev

path = os.path.abspath(os.path.join(repo.root_dir, self.def_path))
if not obj_only:
try:
for odb, obj_ids in repo.used_objs(
[path],
[
os.path.abspath(
os.path.join(repo.root_dir, self.def_path)
)
],
force=True,
jobs=kwargs.get("jobs"),
recursive=True,
Expand All @@ -132,7 +135,7 @@ def _get_used_and_obj(
try:
staging, _, staged_obj = stage(
local_odb,
path,
self.def_path,
repo.repo_fs,
local_odb.fs.PARAM_CHECKSUM,
)
Expand Down Expand Up @@ -172,17 +175,17 @@ def iter_objs():
continue
if (
obj.fs.repo_url in checked_urls
or obj.fs.root_dir in checked_urls
or obj.fs._root_dir in checked_urls
):
continue
self_url = self.repo.url or self.repo.root_dir
if (
obj.fs.repo_url is not None
and obj.fs.repo_url == self_url
or obj.fs.root_dir == self.repo.root_dir
or obj.fs._root_dir == self.repo.root_dir
):
raise CircularImportError(self, obj.fs.repo_url, self_url)
checked_urls.update([obj.fs.repo_url, obj.fs.root_dir])
checked_urls.update([obj.fs.repo_url, obj.fs._root_dir])

def get_obj(self, filter_info=None, **kwargs):
locked = kwargs.get("locked", True)
Expand Down
3 changes: 2 additions & 1 deletion dvc/fs/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ def overlaps(self, left, right):
return self.isin_or_eq(left, right) or self.isin(right, left)

def relpath(self, path, start):
assert start
if not start:
return path
return self.flavour.relpath(path, start=start)

def relparts(self, path, base):
Expand Down
94 changes: 54 additions & 40 deletions dvc/fs/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,19 @@ def __init__(

self._main_repo = repo
self.hash_jobs = repo.fs.hash_jobs
self.root_dir: str = repo.root_dir
self._root_dir: str = repo.root_dir
self._traverse_subrepos = subrepos

self._subrepos_trie = PathStringTrie()
"""Keeps track of each and every path with the corresponding repo."""

self._subrepos_trie[self.root_dir] = repo
self._subrepos_trie[self._root_dir] = repo

self._dvcfss = {}
"""Keep a dvcfs instance of each repo."""

if hasattr(repo, "dvc_dir"):
self._dvcfss[repo.root_dir] = DvcFileSystem(repo=repo)
self._dvcfss[self._root_dir] = DvcFileSystem(repo=repo)

@property
def repo_url(self):
Expand All @@ -131,7 +131,7 @@ def repo_url(self):
def config(self):
return {
self.PARAM_REPO_URL: self.repo_url,
self.PARAM_REPO_ROOT: self.root_dir,
self.PARAM_REPO_ROOT: self._root_dir,
self.PARAM_REV: getattr(self._main_repo.fs, "rev", None),
self.PARAM_CACHE_DIR: os.path.abspath(
self._main_repo.odb.local.cache_dir
Expand Down Expand Up @@ -202,13 +202,13 @@ def _get_repo(self, path: str) -> Optional["Repo"]:

prefix, repo = self._subrepos_trie.longest_prefix(path)
if not prefix:
return None
return self._main_repo

parents = (parent for parent in self.path.parents(path))
dirs = [path] + list(takewhile(lambda p: p != prefix, parents))
dirs.reverse()
self._update(dirs, starting_repo=repo)
return self._subrepos_trie.get(path)
return self._subrepos_trie.get(path) or self._main_repo

@wrap_with(threading.Lock())
def _update(self, dirs, starting_repo):
Expand Down Expand Up @@ -240,22 +240,31 @@ def _get_fs_pair(
"""
Returns a pair of fss based on repo the path falls in, using prefix.
"""
path = os.path.abspath(path)
if os.path.isabs(path):
return None, None, self._main_repo.dvcfs, path

# fallback to the top-level repo if repo was not found
# this can happen if the path is outside of the repo
repo = self._get_repo(path) or self._main_repo
parts = self.path.parts(path)
if parts and parts[0] == os.curdir:
parts = parts[1:]

dvc_fs = self._dvcfss.get(repo.root_dir)
fs_path = self._main_repo.fs.path.join(
self._main_repo.root_dir, *parts
)
repo = self._get_repo(fs_path)
fs = repo.fs

if path.startswith(repo.root_dir):
dvc_path = path[len(repo.root_dir) + 1 :]
repo_parts = fs.path.relparts(repo.root_dir, self._main_repo.root_dir)
if repo_parts[0] == os.curdir:
repo_parts = repo_parts[1:]

dvc_path = dvc_path.replace("\\", "/")
else:
dvc_path = path
dvc_parts = parts[len(repo_parts) :]
if dvc_parts and dvc_parts[0] == os.curdir:
dvc_parts = dvc_parts[1:]

return repo.fs, path, dvc_fs, dvc_path
dvc_fs = self._dvcfss.get(repo.root_dir)
dvc_path = dvc_fs.path.join(*dvc_parts) if dvc_parts else ""

return fs, fs_path, dvc_fs, dvc_path

def open(
self, path, mode="r", encoding="utf-8", **kwargs
Expand Down Expand Up @@ -397,14 +406,16 @@ def get_file(
self, from_info, to_file, callback=DEFAULT_CALLBACK, **kwargs
):
fs, fs_path, dvc_fs, dvc_path = self._get_fs_pair(from_info)
try:
fs.get_file( # pylint: disable=protected-access
fs_path, to_file, callback=callback, **kwargs
)
return
except FileNotFoundError:
if not dvc_fs:
raise

if fs:
try:
fs.get_file( # pylint: disable=protected-access
fs_path, to_file, callback=callback, **kwargs
)
return
except FileNotFoundError:
if not dvc_fs:
raise

dvc_fs.get_file( # pylint: disable=protected-access
dvc_path, to_file, callback=callback, **kwargs
Expand All @@ -417,25 +428,28 @@ def info(self, path, **kwargs):
dvcignore = repo.dvcignore
ignore_subrepos = kwargs.get("ignore_subrepos", True)

try:
dvc_info = dvc_fs.info(dvc_path)
except FileNotFoundError:
dvc_info = None
dvc_info = None
if dvc_fs:
try:
dvc_info = dvc_fs.info(dvc_path)
except FileNotFoundError:
pass

try:
fs_info = fs.info(fs_path)
if dvcignore.is_ignored(
fs, fs_path, ignore_subrepos=ignore_subrepos
):
fs_info = None
except (FileNotFoundError, NotADirectoryError):
if not dvc_info:
raise
fs_info = None
fs_info = None
if fs:
try:
fs_info = fs.info(fs_path)
if dvcignore.is_ignored(
fs, fs_path, ignore_subrepos=ignore_subrepos
):
fs_info = None
except (FileNotFoundError, NotADirectoryError):
if not dvc_info:
raise

# NOTE: if some parent in fs_path turns out to be a file, it means
# that that whole repofs branch doesn't exist.
if not fs_info and dvc_info:
if fs and not fs_info and dvc_info:
for parent in self.path.parents(fs_path):
try:
if fs.info(parent)["type"] != "directory":
Expand Down
5 changes: 5 additions & 0 deletions dvc/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,11 @@ def is_in_repo(self):
os.path.realpath(self.fs_path), self.repo.root_dir
)

@property
def repo_path(self):
assert self.is_in_repo
return relpath(self.fs_path, self.repo.root_dir)

@property
def use_scm_ignore(self):
if not self.is_in_repo:
Expand Down
1 change: 0 additions & 1 deletion dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,6 @@ def open_by_relpath(self, path, remote=None, mode="r", encoding=None):
from dvc.fs.repo import RepoFileSystem

fs = RepoFileSystem(self, subrepos=True)
path = self.fs.path.join(self.root_dir, path)
try:
with fs.open(
path, mode=mode, encoding=encoding, remote=remote
Expand Down
24 changes: 13 additions & 11 deletions dvc/repo/collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,22 @@ def _collect_paths(
from dvc.fs.repo import RepoFileSystem
from dvc.utils import relpath

fs_paths = [os.path.abspath(target) for target in targets]
fs = RepoFileSystem(repo)

target_paths = []
for fs_path in fs_paths:
for target in targets:
if os.path.isabs(target):
target = relpath(target, repo.root_dir)

if recursive and fs.isdir(fs_path):
target_paths.extend(repo.dvcignore.find(fs, fs_path))
if recursive and fs.isdir(target):
target_paths.extend(repo.dvcignore.find(fs, target))

if not fs.exists(fs_path):
rel = relpath(fs_path)
if not fs.exists(target):
if rev == "workspace" or rev == "":
logger.warning("'%s' was not found in current workspace.", rel)
logger.warning("'%s' was not found in current workspace.", target)
else:
logger.warning("'%s' was not found at: '%s'.", rel, rev)
target_paths.append(fs_path)
logger.warning("'%s' was not found at: '%s'.", target, rev)
target_paths.append(target)
return target_paths


Expand All @@ -58,12 +58,14 @@ def _filter_duplicates(
) -> Tuple[Outputs, StrPaths]:
res_outs: Outputs = []
fs_res_paths = fs_paths
from dvc.utils import relpath

for out in outs:
if out.fs_path in fs_paths:
rel = relpath(out.fs_path, out.stage.repo.root_dir)
if rel in fs_paths:
res_outs.append(out)
# MUTATING THE SAME LIST!!
fs_res_paths.remove(out.fs_path)
fs_res_paths.remove(rel)

return res_outs, fs_res_paths

Expand Down
5 changes: 1 addition & 4 deletions dvc/repo/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ def get(url, path, out=None, rev=None, jobs=None):
with external_repo(
url=url, rev=rev, cache_dir=tmp_dir, cache_types=cache_types
) as repo:
from_fs_path = os.path.abspath(os.path.join(repo.root_dir, path))
repo.repo_fs.download(
from_fs_path, os.path.abspath(out), jobs=jobs
)
repo.repo_fs.download(path, os.path.abspath(out), jobs=jobs)
finally:
remove(tmp_dir)
4 changes: 1 addition & 3 deletions dvc/repo/ls.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ def ls(url, path=None, rev=None, recursive=None, dvc_only=False):
from . import Repo

with Repo.open(url, rev=rev, subrepos=True, uninitialized=True) as repo:
fs_path = repo.root_dir
if path:
fs_path = os.path.abspath(repo.fs.path.join(fs_path, path))
fs_path = path or ""

ret = _ls(repo, fs_path, recursive, dvc_only)

Expand Down
5 changes: 3 additions & 2 deletions dvc/repo/metrics/show.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,12 @@ def _read_metrics(repo, metrics, rev, onerror=None):

res = {}
for metric in metrics:
if not fs.isfile(metric):
fs_path = repo.fs.path.relpath(metric, repo.root_dir)
if not fs.isfile(fs_path):
continue

res[fs.path.relpath(metric, os.getcwd())] = _read_metric(
metric, fs, rev, onerror=onerror
fs_path, fs, rev, onerror=onerror
)

return res
Expand Down
4 changes: 2 additions & 2 deletions dvc/repo/plots/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ def _collect_plots(
recursive=recursive,
)

result = {plot.fs_path: _plot_props(plot) for plot in plots}
result.update({fs_path: {} for fs_path in fs_paths})
result = {relpath(plot.fs_path, repo.root_dir): _plot_props(plot) for plot in plots}
result.update({relpath(fs_path, repo.root_dir): {} for fs_path in fs_paths})
return result


Expand Down
2 changes: 1 addition & 1 deletion dvc/repo/reproduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def _get_stage_files(stage: "Stage") -> typing.Iterator[str]:
if (
not dep.use_scm_ignore
and dep.is_in_repo
and not stage.repo.repo_fs.isdvc(dep.fs_path)
and not stage.repo.repo_fs.isdvc(dep.fs.path.relpath(dep.fs_path, stage.repo.root_dir))
):
yield dep.fs_path
for out in stage.outs:
Expand Down
Loading

0 comments on commit 9d75e82

Please sign in to comment.