Skip to content

Commit

Permalink
dvc: initial 'dvc pkg' implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Ruslan Kuprieiev <[email protected]>
  • Loading branch information
efiop committed Jun 17, 2019
1 parent b02f2b5 commit 5941ace
Show file tree
Hide file tree
Showing 20 changed files with 655 additions and 552 deletions.
1 change: 1 addition & 0 deletions .dvc/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
/state-journal
/state-wal
/cache
/pkg
108 changes: 73 additions & 35 deletions dvc/command/pkg.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,50 @@ class CmdPkgInstall(CmdBase):
def run(self):
try:
self.repo.pkg.install(
self.args.address,
self.args.target_dir,
self.args.select,
self.args.file,
self.args.url, version=self.args.version, name=self.args.name
)
return 0
except DvcException:
logger.exception(
"failed to install package '{}'".format(self.args.address)
"failed to install package '{}'".format(self.args.url)
)
return 1


def add_parser(subparsers, parent_parser):
from dvc.command.config import parent_config_parser
class CmdPkgUninstall(CmdBase):
def run(self):
ret = 0
for target in self.args.targets:
try:
self.repo.pkg.uninstall(target)
except DvcException:
logger.exception(
"failed to uninstall package '{}'".format(target)
)
ret = 1
return ret


class CmdPkgImport(CmdBase):
def run(self):
try:
self.repo.pkg.imp(
self.args.name,
self.args.src,
out=self.args.out,
version=self.args.version,
)
return 0
except DvcException:
logger.exception(
"failed to import '{}' from package '{}'".format(
self.args.src, self.args.name
)
)
return 1


def add_parser(subparsers, parent_parser):
PKG_HELP = "Manage DVC packages."
pkg_parser = subparsers.add_parser(
"pkg",
Expand All @@ -48,42 +76,52 @@ def add_parser(subparsers, parent_parser):
PKG_INSTALL_HELP = "Install package."
pkg_install_parser = pkg_subparsers.add_parser(
"install",
parents=[parent_config_parser, parent_parser],
parents=[parent_parser],
description=append_doc_link(PKG_INSTALL_HELP, "pkg-install"),
help=PKG_INSTALL_HELP,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
pkg_install_parser.add_argument("url", help="Package URL.")
pkg_install_parser.add_argument(
"address",
nargs="?",
default="",
help="Package address: git://<url> or https://github.com/...",
"--version", nargs="?", help="Package version."
)
pkg_install_parser.add_argument(
"target_dir",
metavar="target",
"--name",
nargs="?",
default=".",
help="Target directory to deploy package outputs. "
"Default value is the current dir.",
help=(
"Package alias. If not specified, the name will be determined "
"from URL."
),
)
pkg_install_parser.add_argument(
"-s",
"--select",
metavar="OUT",
action="append",
default=[],
help="Select and persist only specified outputs from a package. "
"The parameter can be used multiple times. "
"All outputs will be selected by default.",
pkg_install_parser.set_defaults(func=CmdPkgInstall)

PKG_UNINSTALL_HELP = "Uninstall package(s)."
pkg_uninstall_parser = pkg_subparsers.add_parser(
"uninstall",
parents=[parent_parser],
description=append_doc_link(PKG_UNINSTALL_HELP, "pkg-uninstall"),
help=PKG_UNINSTALL_HELP,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
pkg_install_parser.add_argument(
"-f",
"--file",
help="Specify name of the stage file. It should be "
"either 'Dvcfile' or have a '.dvc' suffix (e.g. "
"'prepare.dvc', 'clean.dvc', etc). "
"By default the file has 'mod_' prefix and imported package name "
"followed by .dvc",
pkg_uninstall_parser.add_argument(
"targets", nargs="*", default=[None], help="Package name."
)
pkg_install_parser.set_defaults(func=CmdPkgInstall)
pkg_uninstall_parser.set_defaults(func=CmdPkgUninstall)

PKG_IMPORT_HELP = "Import data from package."
pkg_import_parser = pkg_subparsers.add_parser(
"import",
parents=[parent_parser],
description=append_doc_link(PKG_IMPORT_HELP, "pkg-import"),
help=PKG_IMPORT_HELP,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
pkg_import_parser.add_argument("name", help="Package name or url.")
pkg_import_parser.add_argument("src", help="Path to data in the package.")
pkg_import_parser.add_argument(
"-o", "--out", nargs="?", help="Destination path to put data to."
)
pkg_import_parser.add_argument(
"--version", nargs="?", help="Package version."
)
pkg_import_parser.set_defaults(func=CmdPkgImport)
13 changes: 11 additions & 2 deletions dvc/dependency/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
from dvc.dependency.ssh import DependencySSH
from dvc.dependency.http import DependencyHTTP
from dvc.dependency.https import DependencyHTTPS
from .pkg import DependencyPKG

from dvc.remote import Remote
from dvc.pkg import Pkg


DEPS = [
DependencyGS,
Expand Down Expand Up @@ -44,6 +47,7 @@
SCHEMA = output.SCHEMA.copy()
del SCHEMA[schema.Optional(OutputBase.PARAM_CACHE)]
del SCHEMA[schema.Optional(OutputBase.PARAM_METRIC)]
SCHEMA[schema.Optional(DependencyPKG.PARAM_PKG)] = Pkg.SCHEMA


def _get(stage, p, info):
Expand All @@ -55,6 +59,10 @@ def _get(stage, p, info):
remote = Remote(stage.repo, name=parsed.netloc)
return DEP_MAP[remote.scheme](stage, p, info, remote=remote)

if info and info.get(DependencyPKG.PARAM_PKG):
pkg = info.pop(DependencyPKG.PARAM_PKG)
return DependencyPKG(pkg, stage, p, info)

for d in DEPS:
if d.supported(p):
return d(stage, p, info)
Expand All @@ -69,8 +77,9 @@ def loadd_from(stage, d_list):
return ret


def loads_from(stage, s_list):
def loads_from(stage, s_list, pkg=None):
ret = []
for s in s_list:
ret.append(_get(stage, s, {}))
info = {DependencyPKG.PARAM_PKG: pkg} if pkg else {}
ret.append(_get(stage, s, info))
return ret
40 changes: 40 additions & 0 deletions dvc/dependency/pkg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from __future__ import unicode_literals

import os
import copy

from dvc.utils.compat import urlparse

from .local import DependencyLOCAL


class DependencyPKG(DependencyLOCAL):
PARAM_PKG = "pkg"

def __init__(self, pkg, stage, *args, **kwargs):
self.pkg = stage.repo.pkg.get_pkg(**pkg)
super(DependencyLOCAL, self).__init__(stage, *args, **kwargs)

def _parse_path(self, remote, path):
out_path = os.path.join(
self.pkg.repo.root_dir, urlparse(path).path.lstrip("/")
)

out, = self.pkg.repo.find_outs_by_path(out_path)
self.info = copy.copy(out.info)
self._pkg_stage = copy.copy(out.stage.path)
return self.REMOTE.path_cls(out.cache_path)

@property
def is_in_repo(self):
return False

def dumpd(self):
ret = super(DependencyLOCAL, self).dumpd()
ret[self.PARAM_PKG] = self.pkg.dumpd()
return ret

def download(self, to, resume=False):
self.pkg.repo.fetch(self._pkg_stage)
to.info = copy.copy(self.info)
to.checkout()
98 changes: 96 additions & 2 deletions dvc/output/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from schema import Or, Optional

import dvc.prompt as prompt
from dvc.exceptions import DvcException
from dvc.utils.compat import str, urlparse
from dvc.remote.base import RemoteBASE
Expand Down Expand Up @@ -271,8 +272,8 @@ def verify_metric(self):
"verify metric is not supported for {}".format(self.scheme)
)

def download(self, to_info, resume=False):
self.remote.download([self.path_info], [to_info], resume=resume)
def download(self, to, resume=False):
self.remote.download([self.path_info], [to.path_info], resume=resume)

def checkout(self, force=False, progress_callback=None, tag=None):
if not self.use_cache:
Expand Down Expand Up @@ -323,3 +324,96 @@ def get_files_number(self):
def unprotect(self):
if self.exists:
self.remote.unprotect(self.path_info)

def _collect_used_dir_cache(self, remote=None, force=False, jobs=None):
"""Get a list of `info`s retaled to the given directory.
- Pull the directory entry from the remote cache if it was changed.
Example:
Given the following commands:
$ echo "foo" > directory/foo
$ echo "bar" > directory/bar
$ dvc add directory
It will return something similar to the following list:
[
{ 'path': 'directory/foo', 'md5': 'c157a79031e1', ... },
{ 'path': 'directory/bar', 'md5': 'd3b07384d113', ... },
]
"""

ret = []

if self.cache.changed_cache_file(self.checksum):
try:
self.repo.cloud.pull(
[
{
self.remote.PARAM_CHECKSUM: self.checksum,
"name": str(self),
}
],
jobs=jobs,
remote=remote,
show_checksums=False,
)
except DvcException:
logger.debug("failed to pull cache for '{}'".format(self))

if self.cache.changed_cache_file(self.checksum):
msg = (
"Missing cache for directory '{}'. "
"Cache for files inside will be lost. "
"Would you like to continue? Use '-f' to force."
)
if not force and not prompt.confirm(msg):
raise DvcException(
"unable to fully collect used cache"
" without cache for directory '{}'".format(self)
)
else:
return ret

for entry in self.dir_cache:
info = copy(entry)
path_info = self.path_info / entry[self.remote.PARAM_RELPATH]
info["name"] = str(path_info)
ret.append(info)

return ret

def get_used_cache(self, **kwargs):
"""Get a dumpd of the given `out`, with an entry including the branch.
The `used_cache` of an output is no more than its `info`.
In case that the given output is a directory, it will also
include the `info` of its files.
"""

if self.stage.is_pkg_import:
return []

if not self.use_cache:
return []

if not self.info:
logger.warning(
"Output '{}'({}) is missing version info. Cache for it will "
"not be collected. Use dvc repro to get your pipeline up to "
"date.".format(self, self.stage)
)
return []

ret = [{self.remote.PARAM_CHECKSUM: self.checksum, "name": str(self)}]

if not self.is_dir_checksum:
return ret

ret.extend(self._collect_used_dir_cache(**kwargs))

return ret
Loading

0 comments on commit 5941ace

Please sign in to comment.