Skip to content

Commit

Permalink
Merge pull request iterative#2426 from mroutis/close-1652
Browse files Browse the repository at this point in the history
ssh: support linking
  • Loading branch information
efiop authored Sep 9, 2019
2 parents af8a7aa + 3c5bcbd commit 2c003fd
Show file tree
Hide file tree
Showing 14 changed files with 391 additions and 276 deletions.
8 changes: 6 additions & 2 deletions dvc/command/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import argparse
import logging
import uuid
import itertools

try:
import psutil
Expand Down Expand Up @@ -69,9 +70,12 @@ def run(self):
def get_fs_type(path):
partition = {
pathlib.Path(part.mountpoint): (part.fstype, part.device)
for part in psutil.disk_partitions()
for part in psutil.disk_partitions(all=True)
}
for parent in pathlib.Path(path).parents:

path = pathlib.Path(path)

for parent in itertools.chain([path], path.parents):
if parent in partition:
return partition[parent]
return ("unkown", "none")
Expand Down
76 changes: 73 additions & 3 deletions dvc/remote/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from multiprocessing import cpu_count
from functools import partial
from concurrent.futures import ThreadPoolExecutor
from copy import copy
from dvc.remote.slow_link_detection import slow_link_guard

import dvc.prompt as prompt
from dvc.config import Config
Expand Down Expand Up @@ -79,6 +81,7 @@ class RemoteBASE(object):
PARAM_RELPATH = "relpath"
CHECKSUM_DIR_SUFFIX = ".dir"
CHECKSUM_JOBS = max(1, min(4, cpu_count() // 2))
DEFAULT_CACHE_TYPES = ["copy"]

state = StateNoop()

Expand Down Expand Up @@ -119,6 +122,15 @@ def __init__(self, repo, config):
self.no_traverse = config.get(Config.SECTION_REMOTE_NO_TRAVERSE, True)
self._dir_info = {}

types = config.get(Config.SECTION_CACHE_TYPE, None)
if types:
if isinstance(types, str):
types = [t.strip() for t in types.split(",")]
self.cache_types = types
else:
self.cache_types = copy(self.DEFAULT_CACHE_TYPES)
self.cache_type_confirmed = False

def __repr__(self):
return "{class_name}: '{path_info}'".format(
class_name=type(self).__name__,
Expand Down Expand Up @@ -352,7 +364,47 @@ def changed(self, path_info, checksum_info):
return False

def link(self, from_info, to_info):
self.copy(from_info, to_info)
self._link(from_info, to_info, self.cache_types)

def _link(self, from_info, to_info, link_types):
assert self.isfile(from_info)

self.makedirs(to_info.parent)

self._try_links(from_info, to_info, link_types)

@slow_link_guard
def _try_links(self, from_info, to_info, link_types):
while link_types:
link_method = getattr(self, link_types[0])
try:
self._do_link(from_info, to_info, link_method)
self.cache_type_confirmed = True
return

except DvcException as exc:
msg = "Cache type '{}' is not supported: {}"
logger.debug(msg.format(link_types[0], str(exc)))
del link_types[0]

raise DvcException("no possible cache types left to try out.")

def _do_link(self, from_info, to_info, link_method):
if self.exists(to_info):
raise DvcException("Link '{}' already exists!".format(to_info))

link_method(from_info, to_info)

if self.protected:
self.protect(to_info)

msg = "Created {}'{}': {} -> {}".format(
"protected " if self.protected else "",
self.cache_types[0],
from_info,
to_info,
)
logger.debug(msg)

def _save_file(self, path_info, checksum, save_link=True):
assert checksum
Expand Down Expand Up @@ -391,9 +443,15 @@ def is_empty(self, path_info):
return False

def isfile(self, path_info):
raise NotImplementedError
"""Optional: Overwrite only if the remote has a way to distinguish
between a directory and a file.
"""
return True

def isdir(self, path_info):
"""Optional: Overwrite only if the remote has a way to distinguish
between a directory and a file.
"""
return False

def walk(self, path_info):
Expand Down Expand Up @@ -512,6 +570,15 @@ def move(self, from_info, to_info):
def copy(self, from_info, to_info):
raise RemoteActionNotImplemented("copy", self.scheme)

def symlink(self, from_info, to_info):
raise RemoteActionNotImplemented("symlink", self.scheme)

def hardlink(self, from_info, to_info):
raise RemoteActionNotImplemented("hardlink", self.scheme)

def reflink(self, from_info, to_info):
raise RemoteActionNotImplemented("reflink", self.scheme)

def exists(self, path_info):
raise NotImplementedError

Expand Down Expand Up @@ -720,7 +787,10 @@ def _link_matches(self, path_info):
return True

def makedirs(self, path_info):
raise NotImplementedError
"""Optional: Implement only if the remote needs to create
directories before copying/linking/moving data
"""
pass

def _checkout_dir(
self, path_info, checksum, force, progress_callback=None
Expand Down
141 changes: 50 additions & 91 deletions dvc/remote/local/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
from __future__ import unicode_literals

from copy import copy

from dvc.scheme import Schemes
from dvc.remote.local.slow_link_detection import slow_link_guard
from dvc.utils.compat import str, fspath_py35, open

import os
import stat
import errno
from shortuuid import uuid
import shutil
import logging
from functools import partial

Expand Down Expand Up @@ -51,12 +47,6 @@ class RemoteLOCAL(RemoteBASE):
UNPACKED_DIR_SUFFIX = ".unpacked"

DEFAULT_CACHE_TYPES = ["reflink", "copy"]
CACHE_TYPE_MAP = {
"copy": shutil.copyfile,
"symlink": System.symlink,
"hardlink": System.hardlink,
"reflink": System.reflink,
}

SHARED_MODE_MAP = {None: (0o644, 0o755), "group": (0o664, 0o775)}

Expand All @@ -71,15 +61,6 @@ def __init__(self, repo, config):
# cache files are set to be read-only for everyone
self._file_mode = stat.S_IREAD | stat.S_IRGRP | stat.S_IROTH

types = config.get(Config.SECTION_CACHE_TYPE, None)
if types:
if isinstance(types, str):
types = [t.strip() for t in types.split(",")]
self.cache_types = types
else:
self.cache_types = copy(self.DEFAULT_CACHE_TYPES)
self.cache_type_confirmed = False

# A clunky way to detect cache dir
storagepath = config.get(Config.SECTION_LOCAL_STORAGEPATH, None)
cache_dir = config.get(Config.SECTION_REMOTE_URL, storagepath)
Expand Down Expand Up @@ -126,80 +107,14 @@ def get(self, md5):

return self.checksum_to_path_info(md5).url

def exists(self, path_info):
@staticmethod
def exists(path_info):
assert path_info.scheme == "local"
return os.path.lexists(fspath_py35(path_info))

def makedirs(self, path_info):
makedirs(path_info, exist_ok=True, mode=self._dir_mode)

def link(self, from_info, to_info):
self._link(from_info, to_info, self.cache_types)

def _link(self, from_info, to_info, link_types):
from_path = from_info.fspath
to_path = to_info.fspath

assert os.path.isfile(from_path)

dname = os.path.dirname(to_path)
if not os.path.exists(dname):
os.makedirs(dname)

# NOTE: just create an empty file for an empty cache
if os.path.getsize(from_path) == 0:
open(to_path, "w+").close()

msg = "Created empty file: {} -> {}".format(from_path, to_path)
logger.debug(msg)
return

self._try_links(from_info, to_info, link_types)

@classmethod
def _get_link_method(cls, link_type):
try:
return cls.CACHE_TYPE_MAP[link_type]
except KeyError:
raise DvcException(
"Cache type: '{}' not supported!".format(link_type)
)

def _do_link(self, from_info, to_info, link_method):
if self.exists(to_info):
raise DvcException("Link '{}' already exists!".format(to_info))
else:
link_method(from_info.fspath, to_info.fspath)

if self.protected:
self.protect(to_info)

msg = "Created {}'{}': {} -> {}".format(
"protected " if self.protected else "",
self.cache_types[0],
from_info,
to_info,
)
logger.debug(msg)

@slow_link_guard
def _try_links(self, from_info, to_info, link_types):
i = len(link_types)
while i > 0:
link_method = self._get_link_method(link_types[0])
try:
self._do_link(from_info, to_info, link_method)
self.cache_type_confirmed = True
return

except DvcException as exc:
msg = "Cache type '{}' is not supported: {}"
logger.debug(msg.format(link_types[0], str(exc)))
del link_types[0]
i -= 1

raise DvcException("no possible cache types left to try out.")

def already_cached(self, path_info):
assert path_info.scheme in ["", "local"]

Expand All @@ -221,12 +136,18 @@ def is_empty(self, path_info):

return False

def isfile(self, path_info):
@staticmethod
def isfile(path_info):
return os.path.isfile(fspath_py35(path_info))

def isdir(self, path_info):
@staticmethod
def isdir(path_info):
return os.path.isdir(fspath_py35(path_info))

@staticmethod
def getsize(path_info):
return os.path.getsize(fspath_py35(path_info))

def walk(self, path_info):
return dvc_walk(path_info, self.repo.dvcignore)

Expand All @@ -253,6 +174,44 @@ def move(self, from_info, to_info):

move(from_info, to_info, mode=mode)

@staticmethod
def copy(from_info, to_info):
System.copy(from_info, to_info)

@staticmethod
def symlink(from_info, to_info):
System.symlink(from_info, to_info)

def hardlink(self, from_info, to_info):
# If there are a lot of empty files (which happens a lot in datasets),
# and the cache type is `hardlink`, we might reach link limits and
# will get something like: `too many links error`
#
# This is because all those empty files will have the same checksum
# (i.e. 68b329da9893e34099c7d8ad5cb9c940), therfore, they will be
# linked to the same file in the cache.
#
# From https://en.wikipedia.org/wiki/Hard_link
# * ext4 limits the number of hard links on a file to 65,000
# * Windows with NTFS has a limit of 1024 hard links on a file
#
# That's why we simply create an empty file rather than a link.
if self.getsize(from_info) == 0:
self.open(to_info, "w").close()

logger.debug(
"Created empty file: {src} -> {dest}".format(
src=str(from_info), dest=str(to_info)
)
)
return

System.hardlink(from_info, to_info)

@staticmethod
def reflink(from_info, to_info):
System.reflink(from_info, to_info)

def cache_exists(self, checksums, jobs=None, name=None):
return [
checksum
Expand Down Expand Up @@ -286,8 +245,8 @@ def _download(
name=name,
)

def open(self, path_info, mode="r", encoding=None):
assert mode in {"r", "rt", "rb"}
@staticmethod
def open(path_info, mode="r", encoding=None):
return open(fspath_py35(path_info), mode=mode, encoding=encoding)

def _group(self, checksum_infos, show_checksums=False):
Expand Down
Loading

0 comments on commit 2c003fd

Please sign in to comment.