Skip to content

Commit

Permalink
[WIP] Introduce wdir for DVC stage files (iterative#1658)
Browse files Browse the repository at this point in the history
* introduce wdir logic to superceed and expand -f and -cwd

* add stage file name and wdir validation tests and fix some checks
  • Loading branch information
shcheklein authored and efiop committed Feb 25, 2019
1 parent 220b4b2 commit 3853690
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 78 deletions.
14 changes: 8 additions & 6 deletions dvc/command/run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import unicode_literals

import os
import argparse

import dvc.logger as logger
Expand Down Expand Up @@ -38,6 +37,7 @@ def run(self):
deps=self.args.deps,
fname=self.args.file,
cwd=self.args.cwd,
wdir=self.args.wdir,
no_exec=self.args.no_exec,
overwrite=overwrite,
ignore_build_cache=self.args.ignore_build_cache,
Expand Down Expand Up @@ -128,11 +128,13 @@ def add_parser(subparsers, parent_parser):
"a stage filename.",
)
run_parser.add_argument(
"-c",
"--cwd",
default=os.path.curdir,
help="Directory within your repo to run your command and place "
"stage file in.",
"-c", "--cwd", default=None, help="Deprecated, use -w and -f instead."
)
run_parser.add_argument(
"-w",
"--wdir",
default=None,
help="Directory within your repo to run your command in.",
)
run_parser.add_argument(
"--no-exec",
Expand Down
12 changes: 6 additions & 6 deletions dvc/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,26 +67,26 @@ def __init__(self, output):
)


class WorkingDirectoryAsOutputError(DvcException):
class StagePathAsOutputError(DvcException):
"""Thrown if directory that stage is going to be saved in is specified as
an output of anothe stage.
an output of another stage.
Args:
cwd (str): path to the directory.
fname (str): path to the stage file that has cwd specified as an
output.
"""

def __init__(self, cwd, fname):
assert isinstance(cwd, str) or isinstance(cwd, builtin_str)
def __init__(self, wdir, fname):
assert isinstance(wdir, str) or isinstance(wdir, builtin_str)
assert isinstance(fname, str) or isinstance(fname, builtin_str)
msg = (
"current working directory '{cwd}' is specified as an output in"
" '{fname}'. Use another CWD to prevent any data removal.".format(
cwd=cwd, fname=fname
cwd=wdir, fname=fname
)
)
super(WorkingDirectoryAsOutputError, self).__init__(msg)
super(StagePathAsOutputError, self).__init__(msg)


class CircularDependencyError(DvcException):
Expand Down
4 changes: 2 additions & 2 deletions dvc/output/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(

if not os.path.isabs(p):
p = self.remote.to_ospath(p)
p = os.path.join(stage.cwd, p)
p = os.path.join(stage.wdir, p)
p = os.path.abspath(os.path.normpath(p))

self.path_info = {"scheme": "local", "path": p}
Expand Down Expand Up @@ -58,7 +58,7 @@ def dumpd(self):
ret = super(OutputLOCAL, self).dumpd()
if self.is_local:
path = self.remote.unixpath(
os.path.relpath(self.path, self.stage.cwd)
os.path.relpath(self.path, self.stage.wdir)
)
else:
path = self.url
Expand Down
13 changes: 4 additions & 9 deletions dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def graph(self, stages=None, from_directory=None):
import networkx as nx
from dvc.exceptions import (
OutputDuplicationError,
WorkingDirectoryAsOutputError,
StagePathAsOutputError,
)

G = nx.DiGraph()
Expand All @@ -314,15 +314,10 @@ def graph(self, stages=None, from_directory=None):
outs.append(out)

for stage in stages:
path_dir = os.path.dirname(stage.path) + os.sep
for out in outs:
overlaps = stage.cwd == out.path or stage.cwd.startswith(
out.path + os.sep
)

if overlaps:
raise WorkingDirectoryAsOutputError(
stage.cwd, stage.relpath
)
if path_dir.startswith(out.path + os.sep):
raise StagePathAsOutputError(stage.wdir, stage.relpath)

# collect the whole DAG
for stage in stages:
Expand Down
4 changes: 2 additions & 2 deletions dvc/repo/move.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def move(self, from_path, to_path):
import dvc.output as Output
from dvc.stage import Stage

from_out = Output.loads_from(Stage(self, cwd=os.curdir), [from_path])[0]
from_out = Output.loads_from(Stage(self), [from_path])[0]

to_path = _expand_target_path(from_path, to_path)

Expand All @@ -52,7 +52,7 @@ def move(self, from_path, to_path):
os.path.basename(to_path) + Stage.STAGE_FILE_SUFFIX,
)

stage.cwd = os.path.abspath(
stage.wdir = os.path.abspath(
os.path.join(os.curdir, os.path.dirname(to_path))
)

Expand Down
6 changes: 4 additions & 2 deletions dvc/repo/run.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import os
from __future__ import unicode_literals


def run(
Expand All @@ -10,7 +10,8 @@ def run(
metrics=None,
metrics_no_cache=None,
fname=None,
cwd=os.curdir,
cwd=None,
wdir=None,
no_exec=False,
overwrite=False,
ignore_build_cache=False,
Expand All @@ -36,6 +37,7 @@ def run(
fname=fname,
cmd=cmd,
cwd=cwd,
wdir=wdir,
outs=outs,
outs_no_cache=outs_no_cache,
metrics=metrics,
Expand Down
115 changes: 76 additions & 39 deletions dvc/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,16 @@ def __init__(self, msg):
super(StageFileBadNameError, self).__init__(msg)


class StageBadCwdError(DvcException):
def __init__(self, cwd):
msg = "stage cwd '{}' is outside of the current dvc repo"
super(StageBadCwdError, self).__init__(msg.format(cwd))
class StagePathOutsideError(DvcException):
def __init__(self, path):
msg = "stage working or file path '{}' is outside of dvc repo"
super(StagePathOutsideError, self).__init__(msg.format(path))


class StagePathNotFoundError(DvcException):
def __init__(self, path):
msg = "stage working or file path '{}' does not exist"
super(StagePathNotFoundError, self).__init__(msg.format(path))


class StageCommitError(DvcException):
Expand Down Expand Up @@ -102,13 +108,15 @@ class Stage(object):

PARAM_MD5 = "md5"
PARAM_CMD = "cmd"
PARAM_WDIR = "wdir"
PARAM_DEPS = "deps"
PARAM_OUTS = "outs"
PARAM_LOCKED = "locked"

SCHEMA = {
Optional(PARAM_MD5): Or(str, None),
Optional(PARAM_CMD): Or(str, None),
Optional(PARAM_WDIR): Or(str, None),
Optional(PARAM_DEPS): Or(And(list, Schema([dependency.SCHEMA])), None),
Optional(PARAM_OUTS): Or(And(list, Schema([output.SCHEMA])), None),
Optional(PARAM_LOCKED): bool,
Expand All @@ -119,7 +127,7 @@ def __init__(
repo,
path=None,
cmd=None,
cwd=os.curdir,
wdir=os.curdir,
deps=None,
outs=None,
md5=None,
Expand All @@ -133,7 +141,7 @@ def __init__(
self.repo = repo
self.path = path
self.cmd = cmd
self.cwd = cwd
self.wdir = wdir
self.outs = outs
self.deps = deps
self.md5 = md5
Expand Down Expand Up @@ -290,33 +298,36 @@ def validate(d, fname=None):
raise StageFileFormatError(fname, exc)

@classmethod
def _stage_fname_cwd(cls, fname, cwd, outs, add):
if fname and cwd:
return (fname, cwd)
def _stage_fname(cls, fname, outs, add):
if fname:
return fname

if not outs:
return (cls.STAGE_FILE, cwd if cwd else os.getcwd())
return cls.STAGE_FILE

out = outs[0]
if out.scheme == "local":
path = os.path
else:
path = posixpath

if not fname:
fname = path.basename(out.path) + cls.STAGE_FILE_SUFFIX
fname = path.basename(out.path) + cls.STAGE_FILE_SUFFIX

if not cwd or (add and out.is_local):
cwd = path.dirname(out.path)
if add and out.is_local:
fname = path.join(path.dirname(out.path), fname)

return (fname, cwd)
return fname

@staticmethod
def _check_inside_repo(repo, cwd):
def _check_stage_path(repo, path):
assert repo is not None
proj_dir = os.path.realpath(repo.root_dir)
if not os.path.realpath(cwd).startswith(proj_dir):
raise StageBadCwdError(cwd)

if not os.path.exists(os.path.realpath(path)):
raise StagePathNotFoundError(path)

proj_dir = os.path.realpath(repo.root_dir) + os.path.sep
if not (os.path.realpath(path) + os.path.sep).startswith(proj_dir):
raise StagePathOutsideError(path)

@property
def is_cached(self):
Expand Down Expand Up @@ -359,14 +370,14 @@ def create(
metrics=None,
metrics_no_cache=None,
fname=None,
cwd=os.curdir,
cwd=None,
wdir=None,
locked=False,
add=False,
overwrite=True,
ignore_build_cache=False,
remove_outs=False,
):

if outs is None:
outs = []
if deps is None:
Expand All @@ -378,7 +389,20 @@ def create(
if metrics_no_cache is None:
metrics_no_cache = []

stage = Stage(repo=repo, cwd=cwd, cmd=cmd, locked=locked)
# Backward compatibility for `cwd` option
if wdir is None and cwd is not None:
if fname is not None and os.path.basename(fname) != fname:
raise StageFileBadNameError(
"stage file name '{fname}' may not contain subdirectories"
" if '-c|--cwd' (deprecated) is specified. Use '-w|--wdir'"
" along with '-f' to specify stage file path and working"
" directory.".format(fname=fname)
)
wdir = cwd
else:
wdir = os.curdir if wdir is None else wdir

stage = Stage(repo=repo, wdir=wdir, cmd=cmd, locked=locked)

stage.outs = output.loads_from(stage, outs, use_cache=True)
stage.outs += output.loads_from(
Expand All @@ -393,22 +417,18 @@ def create(
stage._check_circular_dependency()
stage._check_duplicated_arguments()

if fname is not None and os.path.basename(fname) != fname:
raise StageFileBadNameError(
"stage file name '{fname}' should not contain subdirectories."
" Use '-c|--cwd' to change location of the stage file.".format(
fname=fname
)
)
fname = Stage._stage_fname(fname, stage.outs, add=add)
wdir = os.path.abspath(wdir)

fname, cwd = Stage._stage_fname_cwd(fname, cwd, stage.outs, add=add)

Stage._check_inside_repo(repo, cwd)
if cwd is not None:
path = os.path.join(wdir, fname)
else:
path = os.path.abspath(fname)

cwd = os.path.abspath(cwd)
path = os.path.join(cwd, fname)
Stage._check_stage_path(repo, wdir)
Stage._check_stage_path(repo, os.path.dirname(path))

stage.cwd = cwd
stage.wdir = wdir
stage.path = path

# NOTE: remove outs before we check build cache
Expand Down Expand Up @@ -463,11 +483,16 @@ def load(repo, fname):
d = yaml.safe_load(fd) or {}

Stage.validate(d, fname=os.path.relpath(fname))
path = os.path.abspath(fname)

stage = Stage(
repo=repo,
path=os.path.abspath(fname),
cwd=os.path.dirname(os.path.abspath(fname)),
path=path,
wdir=os.path.abspath(
os.path.join(
os.path.dirname(path), d.get(Stage.PARAM_WDIR, ".")
)
),
cmd=d.get(Stage.PARAM_CMD),
md5=d.get(Stage.PARAM_MD5),
locked=d.get(Stage.PARAM_LOCKED, False),
Expand All @@ -484,6 +509,7 @@ def dumpd(self):
for key, value in {
Stage.PARAM_MD5: self.md5,
Stage.PARAM_CMD: self.cmd,
Stage.PARAM_WDIR: self.wdir,
Stage.PARAM_LOCKED: self.locked,
Stage.PARAM_DEPS: [d.dumpd() for d in self.deps],
Stage.PARAM_OUTS: [o.dumpd() for o in self.outs],
Expand All @@ -501,9 +527,13 @@ def dump(self, fname=None):
file=os.path.relpath(fname)
)
)
d = self.dumpd()

d[Stage.PARAM_WDIR] = os.path.relpath(
self.wdir, os.path.dirname(fname)
)
with open(fname, "w") as fd:
yaml.safe_dump(self.dumpd(), fd, default_flow_style=False)
yaml.safe_dump(d, fd, default_flow_style=False)

self.repo.files_to_git_add.append(os.path.relpath(fname))

Expand All @@ -516,6 +546,13 @@ def _compute_md5(self):
if self.PARAM_MD5 in d.keys():
del d[self.PARAM_MD5]

# Ignore the wdir default value. In this case stage file w/o
# wdir has the same md5 as a file with the default value specified.
# It's important for backward compatibility with pipelines that
# didn't have WDIR in their stage files.
if d.get(self.PARAM_WDIR) == ".":
del d[self.PARAM_WDIR]

# NOTE: excluding parameters that don't affect the state of the
# pipeline. Not excluding `OutputLOCAL.PARAM_CACHE`, because if
# it has changed, we might not have that output in our cache.
Expand Down Expand Up @@ -614,7 +651,7 @@ def _run(self):

p = subprocess.Popen(
self.cmd,
cwd=self.cwd,
cwd=self.wdir,
shell=True,
env=fix_env(os.environ),
executable=executable,
Expand Down
Loading

0 comments on commit 3853690

Please sign in to comment.