Skip to content

Commit

Permalink
Merge pull request iterative#740 from efiop/master
Browse files Browse the repository at this point in the history
Add initial support for s3/gs outputs/deps/cache_dirs
efiop authored Jun 4, 2018
2 parents 456ba27 + dd610c0 commit 5d16991
Showing 31 changed files with 772 additions and 459 deletions.
226 changes: 27 additions & 199 deletions dvc/cache.py
Original file line number Diff line number Diff line change
@@ -1,210 +1,38 @@
import os
import json
import shutil

from dvc.state import State, LinkState
from dvc.system import System
from dvc.logger import Logger
from dvc.utils import move, remove
from dvc.lock import Lock
from dvc.exceptions import DvcException
from dvc.config import Config
from dvc.remote import Remote


class Cache(object):
CACHE_DIR = 'cache'
CACHE_DIR_LOCK = 'cache.lock'
CACHE_TYPES = ['reflink', 'hardlink', 'symlink', 'copy']
CACHE_TYPE_MAP = {
'copy': shutil.copyfile,
'symlink': System.symlink,
'hardlink': System.hardlink,
'reflink': System.reflink,
}

def __init__(self, root_dir, dvc_dir, cache_dir=None, cache_type=None):
self.cache_type = cache_type
def __init__(self, project):
config = project.config._config[Config.SECTION_CACHE]

cache_dir = cache_dir if cache_dir else self.CACHE_DIR
if os.path.isabs(cache_dir):
self.cache_dir = cache_dir
local = config.get(Config.SECTION_CACHE_LOCAL, None)
if local:
sect = project.config._config[Config.SECTION_REMOTE_FMT.format(local)]
else:
self.cache_dir = os.path.abspath(os.path.realpath(os.path.join(dvc_dir, cache_dir)))

if not os.path.exists(self.cache_dir):
os.mkdir(self.cache_dir)

self.state = State(self.cache_dir)
self.link_state = LinkState(root_dir, dvc_dir)
self.lock = Lock(self.cache_dir, name=self.CACHE_DIR_LOCK)

@staticmethod
def init(root_dir, dvc_dir, cache_dir=None):
return Cache(root_dir, dvc_dir, cache_dir=None)

def all(self):
with self.lock:
clist = []
for entry in os.listdir(self.cache_dir):
subdir = os.path.join(self.cache_dir, entry)
if not os.path.isdir(subdir):
continue

for cache in os.listdir(subdir):
path = os.path.join(subdir, cache)
clist.append(path)

return clist

def get(self, md5):
if not md5:
sect = {}
cache_dir = config.get(Config.SECTION_CACHE_DIR, self.CACHE_DIR)
if not os.path.isabs(cache_dir):
cache_dir = os.path.abspath(os.path.realpath(os.path.join(project.dvc_dir, cache_dir)))
sect[Config.SECTION_REMOTE_URL] = cache_dir
t = config.get(Config.SECTION_CACHE_TYPE, None)
if t:
sect[Config.SECTION_CACHE_TYPE] = t

self.local = Remote(project, sect)

self.s3 = self._get_remote(project, config, Config.SECTION_CACHE_S3)
self.gs = self._get_remote(project, config, Config.SECTION_CACHE_GS)
self.ssh = self._get_remote(project, config, Config.SECTION_CACHE_SSH)

def _get_remote(self, project, config, name):
remote = config.get(name, None)
if not remote:
return None

return os.path.join(self.cache_dir, md5[0:2], md5[2:])

def path_to_md5(self, path):
relpath = os.path.relpath(path, self.cache_dir)
return os.path.dirname(relpath) + os.path.basename(relpath)

def _changed(self, md5):
cache = self.get(md5)
if self.state.changed(cache, md5=md5):
if os.path.exists(cache):
Logger.warn('Corrupted cache file {}'.format(os.path.relpath(cache)))
remove(cache)
return True

return False

def changed(self, md5):
with self.lock:
return self._changed(md5)

def link(self, src, link):
dname = os.path.dirname(link)
if not os.path.exists(dname):
os.makedirs(dname)

if self.cache_type != None:
types = [self.cache_type]
else:
types = self.CACHE_TYPES

for typ in types:
try:
self.CACHE_TYPE_MAP[typ](src, link)
self.link_state.update(link)
return
except Exception as exc:
msg = 'Cache type \'{}\' is not supported'.format(typ)
Logger.debug(msg)
if typ == types[-1]:
raise DvcException(msg, cause=exc)

@staticmethod
def load_dir_cache(path):
if os.path.isabs(path):
relpath = os.path.relpath(path)
else:
relpath = path

try:
with open(path, 'r') as fd:
d = json.load(fd)
except Exception as exc:
msg = u'Failed to load dir cache \'{}\''
Logger.error(msg.format(relpath), exc)
return []

if not isinstance(d, list):
msg = u'Dir cache file format error \'{}\': skipping the file'
Logger.error(msg.format(relpath))
return []

return d

@staticmethod
def get_dir_cache(path):
res = {}
d = Cache.load_dir_cache(path)

for entry in d:
res[entry[State.PARAM_RELPATH]] = entry[State.PARAM_MD5]

return res

def dir_cache(self, cache):
res = {}
dir_cache = self.get_dir_cache(cache)

for relpath, md5 in dir_cache.items():
res[relpath] = self.get(md5)

return res

@staticmethod
def is_dir_cache(cache):
return cache.endswith(State.MD5_DIR_SUFFIX)

def _checkout(self, path, md5):
cache = self.get(md5)

if not cache or not os.path.exists(cache) or self._changed(md5):
if cache:
Logger.warn(u'\'{}({})\': cache file not found'.format(os.path.relpath(cache),
os.path.relpath(path)))
remove(path)
return

if os.path.exists(path):
msg = u'Data \'{}\' exists. Removing before checkout'
Logger.debug(msg.format(os.path.relpath(path)))
remove(path)

msg = u'Checking out \'{}\' with cache \'{}\''
Logger.debug(msg.format(os.path.relpath(path), os.path.relpath(cache)))

if not self.is_dir_cache(cache):
self.link(cache, path)
return

dir_cache = self.dir_cache(cache)
for relpath, c in dir_cache.items():
p = os.path.join(path, relpath)
self.link(c, p)

def checkout(self, path, md5):
with self.lock:
return self._checkout(path, md5)

def _save_file(self, path):
md5 = self.state.update(path)
cache = self.get(md5)
if self._changed(md5):
move(path, cache)
self.state.update(cache)
self._checkout(path, md5)

def _save_dir(self, path):
md5 = self.state.update(path)
cache = self.get(md5)
dname = os.path.dirname(cache)
dir_info = self.state.collect_dir(path)

for entry in dir_info:
relpath = entry[State.PARAM_RELPATH]
p = os.path.join(path, relpath)

self._save_file(p)

if not os.path.isdir(dname):
os.makedirs(dname)

with open(cache, 'w+') as fd:
json.dump(dir_info, fd, sort_keys=True)

def save(self, path):
with self.lock:
if os.path.isdir(path):
self._save_dir(path)
else:
self._save_file(path)
sect = project.config._config[Config.SECTION_REMOTE_FMT.format(remote)]
return Remote(project, sect)
8 changes: 4 additions & 4 deletions dvc/cloud/aws.py
Original file line number Diff line number Diff line change
@@ -4,10 +4,10 @@

import boto3

from dvc.config import Config
from dvc.logger import Logger
from dvc.progress import progress
from dvc.cloud.base import DataCloudError, DataCloudBase
from dvc.cloud.base import DataCloudBase, DataCloudError
from dvc.config import Config


def sizeof_fmt(num, suffix='B'):
@@ -81,7 +81,7 @@ def _pull_key(self, key, fname, no_progress_bar=False):
self._makedirs(fname)

tmp_file = self.tmp_file(fname)
name = os.path.relpath(fname, self._cloud_settings.cache.cache_dir)
name = os.path.relpath(fname, self._cloud_settings.cache.local.cache_dir)

if self._cmp_checksum(key, fname):
Logger.debug('File "{}" matches with "{}".'.format(fname, key.name))
@@ -126,7 +126,7 @@ def _new_key(self, path):

def _push_key(self, key, path):
""" push, aws version """
name = os.path.relpath(path, self._cloud_settings.cache.cache_dir)
name = os.path.relpath(path, self._cloud_settings.cache.local.cache_dir)
cb = self.create_cb_push(name, path)
try:
self.s3.Object(key.bucket, key.name).upload_file(path, Callback=cb)
17 changes: 8 additions & 9 deletions dvc/cloud/base.py
Original file line number Diff line number Diff line change
@@ -2,10 +2,10 @@
import re
import tempfile

from dvc.config import Config
from dvc.logger import Logger
from dvc.exceptions import DvcException
from dvc.config import Config, ConfigError
from dvc.cache import Cache
from dvc.remote.local import RemoteLOCAL


STATUS_UNKNOWN = 0
@@ -72,7 +72,6 @@ def storage_path(self):
Precedence: Storage, then cloud specific
"""

if self._cloud_settings.global_storage_path:
return self._cloud_settings.global_storage_path

@@ -108,7 +107,7 @@ def storage_prefix(self):

def cache_file_key(self, fname):
""" Key of a file within the bucket """
relpath = os.path.relpath(fname, self._cloud_settings.cache.cache_dir)
relpath = os.path.relpath(fname, self._cloud_settings.cache.local.cache_dir)
relpath = relpath.replace('\\', '/')
return '{}/{}'.format(self.storage_prefix, relpath).strip('/')

@@ -130,7 +129,7 @@ def collect(self, arg):
path, local = arg
ret = [path]

if not Cache.is_dir_cache(path):
if not RemoteLOCAL.is_dir_cache(path):
return ret

if local:
@@ -146,15 +145,15 @@ def collect(self, arg):
self._pull_key(key, tmp, no_progress_bar=True)
dir_path = tmp

for relpath, md5 in Cache.get_dir_cache(dir_path).items():
cache = self._cloud_settings.cache.get(md5)
for relpath, md5 in RemoteLOCAL.get_dir_cache(dir_path).items():
cache = self._cloud_settings.cache.local.get(md5)
ret.append(cache)

return ret

def _cmp_checksum(self, blob, fname):
md5 = self._cloud_settings.cache.path_to_md5(fname)
if self._cloud_settings.cache.state.changed(fname, md5=md5):
md5 = self._cloud_settings.cache.local.path_to_md5(fname)
if self._cloud_settings.cache.local.state.changed(fname, md5=md5):
return False

return True
4 changes: 2 additions & 2 deletions dvc/cloud/gcp.py
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ def connect(self):
def _pull_key(self, key, path, no_progress_bar=False):
self._makedirs(path)

name = os.path.relpath(path, self._cloud_settings.cache.cache_dir)
name = os.path.relpath(path, self._cloud_settings.cache.local.cache_dir)
tmp_file = self.tmp_file(path)

if self._cmp_checksum(key, path):
@@ -69,7 +69,7 @@ def _new_key(self, path):

def _push_key(self, key, path):
""" push, gcp version """
name = os.path.relpath(path, self._cloud_settings.cache.cache_dir)
name = os.path.relpath(path, self._cloud_settings.cache.local.cache_dir)

progress.update_target(name, 0, None)

2 changes: 1 addition & 1 deletion dvc/cloud/local.py
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ class DataCloudLOCAL(DataCloudBase):
REGEX = r'^(?P<path>(/+|.:\\+).*)$'

def cache_file_key(self, path):
return os.path.relpath(os.path.abspath(path), self._cloud_settings.cache.cache_dir)
return os.path.relpath(os.path.abspath(path), self._cloud_settings.cache.local.cache_dir)

def _get_key(self, path):
key_name = self.cache_file_key(path)
2 changes: 1 addition & 1 deletion dvc/cloud/ssh.py
Original file line number Diff line number Diff line change
@@ -52,7 +52,7 @@ def get_sftp(self):
return self.ssh.open_sftp()

def cache_file_key(self, path):
relpath = os.path.relpath(os.path.abspath(path), self._cloud_settings.cache.cache_dir)
relpath = os.path.relpath(os.path.abspath(path), self._cloud_settings.cache.local.cache_dir)
return relpath.replace('\\', '/')

def _isfile_remote(self, path):
Loading

0 comments on commit 5d16991

Please sign in to comment.