Skip to content

Commit

Permalink
api: prepare for dvcx summon/publish
Browse files Browse the repository at this point in the history
Also removed type python summons from `dvc.api`
  • Loading branch information
Suor committed Jan 29, 2020
1 parent e6d0d29 commit 614515e
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 135 deletions.
196 changes: 61 additions & 135 deletions dvc/api.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
from builtins import open as builtin_open
import importlib
import os
import sys
from contextlib import contextmanager, _GeneratorContextManager as GCM
import threading

from funcy import wrap_with
from funcy import cached_property, lmap
import ruamel.yaml
from voluptuous import Schema, Required, Invalid

Expand All @@ -18,10 +15,6 @@ class SummonError(DvcException):
pass


class SummonErrorNoObjectFound(SummonError):
pass


class UrlNotDvcRepoError(DvcException):
"""Thrown if given url is not a DVC repository.
Expand Down Expand Up @@ -100,14 +93,11 @@ def _make_repo(repo_url=None, rev=None):


class SummonFile(object):
DEF_NAME = "dvcsummon.yaml"
DOBJ_SECTION = "dvc-objects"

DEFAULT_FILENAME = "dvcsummon.yaml"
SCHEMA = Schema(
{
Required(DOBJ_SECTION): {
Required("dvc-objects", default={}): {
str: {
"description": str,
"meta": dict,
Required("summon"): {
Required("type"): str,
Expand All @@ -119,43 +109,10 @@ class SummonFile(object):
}
)

PYTHON_SCHEMA = Schema(
{
Required("type"): "python",
Required("call"): str,
"args": dict,
"deps": [str],
}
)

def __init__(self, repo_obj, summon_file=None):
def __init__(self, repo_obj, summon_file):
self.repo = repo_obj
self.filename = summon_file or SummonFile.DEF_NAME
self.path = os.path.join(self.repo.root_dir, summon_file)
self.dobjs = self._read_summon_content().get(self.DOBJ_SECTION)

def _read_summon_content(self):
try:
with builtin_open(self.path, "r") as fobj:
return SummonFile.SCHEMA(ruamel.yaml.safe_load(fobj.read()))
except FileNotFoundError as exc:
raise SummonError("Summon file not found") from exc
except ruamel.yaml.YAMLError as exc:
raise SummonError("Failed to parse summon file") from exc
except Invalid as exc:
raise SummonError(str(exc)) from exc

def _write_summon_content(self):
try:
with builtin_open(self.path, "w") as fobj:
content = SummonFile.SCHEMA(self.dobjs)
ruamel.yaml.serialize_all(content, fobj)
except ruamel.yaml.YAMLError as exc:
raise SummonError(
"Summon file '{}' schema error".format(self.path)
) from exc
except Exception as exc:
raise SummonError(str(exc)) from exc
self.filename = summon_file
self._path = os.path.join(self.repo.root_dir, summon_file)

@staticmethod
@contextmanager
Expand All @@ -169,7 +126,7 @@ def prepare(repo=None, rev=None, summon_file=None):
Returns a SummonFile instance, which contains references to a Repo
object, named object specification and resolved paths to deps.
"""
summon_file = summon_file or SummonFile.DEF_NAME
summon_file = summon_file or SummonFile.DEFAULT_FILENAME
with _make_repo(repo, rev=rev) as _repo:
_require_dvc(_repo)
try:
Expand All @@ -179,108 +136,77 @@ def prepare(repo=None, rev=None, summon_file=None):
str(exc) + " at '{}' in '{}'".format(summon_file, _repo)
) from exc.__cause__

@staticmethod
def deps_paths(dobj):
return dobj["summon"].get("deps", [])

def deps_abs_paths(self, dobj):
return [
os.path.join(self.repo.root_dir, p) for p in self.deps_paths(dobj)
]
@cached_property
def objects(self):
return self._read_yaml()["dvc-objects"]

def outs(self, dobj):
return [
self.repo.find_out_by_relpath(d) for d in self.deps_paths(dobj)
]
def _read_yaml(self):
try:
with builtin_open(self._path, mode="r") as fd:
return self.SCHEMA(ruamel.yaml.safe_load(fd.read()))
except FileNotFoundError as exc:
raise SummonError("Summon file not found") from exc
except ruamel.yaml.YAMLError as exc:
raise SummonError("Failed to parse summon file") from exc
except Invalid as exc:
raise SummonError(str(exc)) from None

def pull(self, dobj):
outs = self.outs(dobj)
def _write_yaml(self, objects):
try:
with builtin_open(self._path, "w") as fd:
content = self.SCHEMA({"dvc-objects": objects})
ruamel.yaml.safe_dump(content, fd)
except Invalid as exc:
raise SummonError(str(exc)) from None

with self.repo.state:
for out in outs:
self.repo.cloud.pull(out.get_used_cache())
out.checkout()
def abs(self, path):
return os.path.join(self.repo.root_dir, path)

def push(self, dobj):
paths = self.deps_abs_paths(dobj)
def pull(self, targets):
self.repo.pull([self.abs(target) for target in targets])

with self.repo.state:
for path in paths:
self.repo.add(path)
self.repo.add(path)
def pull_deps(self, dobj):
self.pull(dobj["summon"].get("deps", []))

def get_dobject(self, name):
def get(self, name):
"""
Given a summonable object's name, search for it on the given content
Given a summonable object's name, search for it this file
and return its description.
"""

if name not in self.dobjs:
raise SummonErrorNoObjectFound(
"No object with name '{}' in file '{}'".format(name, self.path)
if name not in self.objects:
raise SummonError(
"No object with name '{}' in '{}'".format(name, self.filename)
)

return self.dobjs[name]
return self.objects[name]

def set(self, name, dobj, overwrite=True):
if not os.path.exists(self._path):
self.objects = self.SCHEMA({})["dvc-objects"]

def update_dobj(self, name, new_dobj, overwrite=True):
if (new_dobj[name] not in self.dobjs) or overwrite:
self.dobjs[name] = new_dobj
else:
if name in self.objects and not overwrite:
raise SummonError(
"DVC-object '{}' already exist in '{}'".format(
name, self.filename
)
"There is an existing summonable object named '{}' in '{}:{}'."
" Use SummonFile.set(..., overwrite=True) to"
" overwrite it.".format(name, self.repo.url, self.filename)
)

self._write_summon_content()


@wrap_with(threading.Lock())
def _invoke_method(call, args, path):
# XXX: Some issues with this approach:
# * Import will pollute sys.modules
# * sys.path manipulation is "theoretically" not needed,
# but tests are failing for an unknown reason.
cwd = os.getcwd()

try:
os.chdir(path)
sys.path.insert(0, path)
method = _import_string(call)
return method(**args)
finally:
os.chdir(cwd)
sys.path.pop(0)


def summon(
name, repo=None, rev=None, summon_file=SummonFile.DEF_NAME, args=None
):
"""Instantiate an object described in the `summon_file`."""
with SummonFile.prepare(repo, rev, summon_file) as desc:
dobj = desc.get_dobject(name)
try:
summon_dict = SummonFile.PYTHON_SCHEMA(dobj["summon"])
except Invalid as exc:
raise SummonError(str(exc)) from exc
self.objects[name] = dobj
self._write_yaml(self.objects)

desc.pull(dobj)
_args = {**summon_dict.get("args", {}), **(args or {})}
return _invoke_method(summon_dict["call"], _args, desc.repo.root_dir)


def _import_string(import_name):
"""Imports an object based on a string.
Useful to delay import to not load everything on startup.
Use dotted notaion in `import_name`, e.g. 'dvc.remote.gs.RemoteGS'.
# Add deps and push to remote
deps = dobj["summon"].get("deps", [])
stages = []
if deps:
stages = self.repo.add(
lmap(self.abs, deps), fname=self.abs(name + ".dvc")
)
self.repo.push()

:return: imported object
"""
if "." in import_name:
module, obj = import_name.rsplit(".", 1)
else:
return importlib.import_module(import_name)
return getattr(importlib.import_module(module), obj)
# Create commit and push
self.repo.scm.add([self._path] + [stage.path for stage in stages])
self.repo.scm.commit("Add {} to {}".format(name, self.filename))
self.repo.scm.push()


def _require_dvc(repo):
Expand Down
3 changes: 3 additions & 0 deletions dvc/scm/git/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ def checkout(self, branch, create_new=False):
else:
self.repo.git.checkout(branch)

def push(self):
self.repo.remote().push()

def branch(self, branch):
self.repo.git.branch(branch)

Expand Down

0 comments on commit 614515e

Please sign in to comment.