diff --git a/dvc/cache.py b/dvc/cache.py new file mode 100644 index 0000000000..8a9b4a7d6e --- /dev/null +++ b/dvc/cache.py @@ -0,0 +1,25 @@ +import os + + +class Cache(object): + CACHE_DIR = 'cache' + + def __init__(self, dvc_dir): + self.cache_dir = os.path.join(dvc_dir, self.CACHE_DIR) + + @staticmethod + def init(dvc_dir): + cache_dir = os.path.join(dvc_dir, Cache.CACHE_DIR) + os.mkdir(cache_dir) + return Cache(dvc_dir) + + def all(self): + clist = [] + for cache in os.listdir(self.cache_dir): + path = os.path.join(self.cache_dir, cache) + if os.path.isfile(path): + clist.append(path) + return clist + + def get(self, md5): + return os.path.join(self.cache_dir, md5) diff --git a/dvc/cli.py b/dvc/cli.py index 5ed6338398..146c73ebe9 100644 --- a/dvc/cli.py +++ b/dvc/cli.py @@ -18,9 +18,8 @@ from dvc.command.instance_create import CmdInstanceCreate from dvc.command.config import CmdConfig from dvc.command.show_pipeline import CmdShowPipeline -from dvc.command.merge import CmdMerge from dvc.command.checkout import CmdCheckout -from dvc.state_file import StateFile +from dvc.stage import Stage from dvc import VERSION @@ -45,6 +44,7 @@ def parse_args(argv=None): action='store_true', default=False, help='Skip all git actions including reproducibility check and commits.') + parent_parser.add_argument( '-b', '--branch', @@ -84,10 +84,6 @@ def parse_args(argv=None): 'init', parents=[parent_parser], help='Initialize dvc over a directory (should already be a git dir).') - init_parser.add_argument( - '--data-dir', - default='data', - help='Data directory.') init_parser.set_defaults(func=CmdInit) # Run @@ -98,15 +94,20 @@ def parse_args(argv=None): run_parser.add_argument('-d', '--deps', action='append', - default = [], + default=[], help='Declare dependencies for reproducible cmd.') + run_parser.add_argument('-D', + '--deps-no-cache', + action='append', + default=[], + help='Declare dependencies that should not be cached for reproducible cmd.') run_parser.add_argument('-o', - '--out', + '--outs', action='append', default=[], help='Declare output data file (sync to cloud) for reproducible cmd.') - run_parser.add_argument('-g', - '--out-git', + run_parser.add_argument('-O', + '--outs-no-cache', action='append', default=[], help='Declare output regular file (sync to Git) for reproducible cmd.') @@ -117,6 +118,7 @@ def parse_args(argv=None): help='Lock data item - disable reproduction.') run_parser.add_argument('-f', '--file', + default=Stage.STAGE_FILE, help='Specify name of the state file') run_parser.add_argument('-c', '--cwd', @@ -132,10 +134,6 @@ def parse_args(argv=None): parent_sync_parser = argparse.ArgumentParser( add_help=False, parents=[parent_parser]) - parent_sync_parser.add_argument( - 'targets', - nargs='+', - help='File or directory to sync.') parent_sync_parser.add_argument('-j', '--jobs', type=int, @@ -171,7 +169,7 @@ def parse_args(argv=None): repro_parser.add_argument( 'targets', nargs='*', - default=[StateFile.DVCFILE_NAME], + default=[Stage.STAGE_FILE], help='Data items or stages to reproduce.') repro_parser.add_argument('-f', '--force', @@ -190,23 +188,9 @@ def parse_args(argv=None): 'remove', parents=[parent_parser], help='Remove data item from data directory.') - remove_parser.add_argument('target', - nargs='*', + remove_parser.add_argument('targets', + nargs='+', help='Target to remove - file or directory.') - remove_parser.add_argument('-l', - '--keep-in-cloud', - action='store_true', - default=False, - help='Do not remove data from cloud.') - remove_parser.add_argument('-r', - '--recursive', - action='store_true', - help='Remove directory recursively.') - remove_parser.add_argument('-c', - '--keep-in-cache', - action='store_true', - default=False, - help='Do not remove data from cache.') remove_parser.set_defaults(func=CmdRemove) # Add @@ -215,7 +199,7 @@ def parse_args(argv=None): parents=[parent_parser], help='Add files/directories to dvc') import_parser.add_argument( - 'input', + 'targets', nargs='+', help='Input files/directories') import_parser.set_defaults(func=CmdAdd) @@ -225,20 +209,16 @@ def parse_args(argv=None): 'lock', parents=[parent_parser], help='Lock') - lock_parser.add_argument('-l', - '--lock', - action='store_true', - default=False, - help='Lock data item - disable reproduction.') lock_parser.add_argument('-u', '--unlock', action='store_true', default=False, - help='Unlock data item - enable reproduction.') + help='Unlock stage - enable reproduction.') lock_parser.add_argument( 'files', nargs='*', - help='Data items to lock or unlock.') + default=[Stage.STAGE_FILE], + help='Stages to lock or unlock.') lock_parser.set_defaults(func=CmdLock) # Garbage collector @@ -406,13 +386,6 @@ def parse_args(argv=None): workflow_parser.set_defaults(func=CmdShowWorkflow) - # Merge - merge_parser = subparsers.add_parser( - 'merge', - parents=[parent_parser], - help='Merge') - merge_parser.set_defaults(func=CmdMerge) - # Checkout checkout_parser = subparsers.add_parser( 'checkout', diff --git a/dvc/cloud/instance_manager.py b/dvc/cloud/instance_manager.py index 48467b4cfa..9783674faf 100644 --- a/dvc/cloud/instance_manager.py +++ b/dvc/cloud/instance_manager.py @@ -3,8 +3,7 @@ class CloudSettings(object): - def __init__(self, path_factory, global_storage_path, cloud_config): - self.path_factory = path_factory + def __init__(self, global_storage_path, cloud_config): self.cloud_config = cloud_config self.global_storage_path = global_storage_path diff --git a/dvc/command/add.py b/dvc/command/add.py index c49a81207a..9111887725 100644 --- a/dvc/command/add.py +++ b/dvc/command/add.py @@ -1,60 +1,8 @@ -import os - from dvc.command.common.base import CmdBase -from dvc.logger import Logger -from dvc.state_file import StateFile -from dvc.path.data_item import DataItem class CmdAdd(CmdBase): - def __init__(self, settings): - super(CmdAdd, self).__init__(settings) - - def collect_file(self, fname): - return [self.settings.path_factory.data_item(fname)] - - def collect_dir(self, dname): - targets = [] - for root, dirs, files in os.walk(dname): - for fname in files: - targets += self.collect_file(os.path.join(root, fname)) - return targets - - def collect_targets(self, inputs): - targets = [] - for i in inputs: - if not os.path.isdir(i): - targets += self.collect_file(i) - else: - targets += self.collect_dir(i) - return targets - - def add_files(self, targets): - for data_item in targets: - data_item.move_data_to_cache() - - def create_state_files(self, targets): - """ - Create state files for all targets. - """ - for data_item in targets: - Logger.debug('Creating state file for {}'.format(data_item.data.relative)) - - fname = os.path.basename(data_item.data.relative + StateFile.STATE_FILE_SUFFIX) - out = StateFile.parse_deps_state(self.settings, [data_item.data.relative], - currdir=os.path.curdir) - state_file = StateFile(fname=fname, - cmd=None, - out=out, - out_git=[], - deps=[], - locked=True) - state_file.save() - Logger.debug('State file "{}" was created'.format(data_item.state.relative)) - def run(self): - targets = self.collect_targets(self.parsed_args.input) - self.add_files(targets) - self.create_state_files(targets) - msg = 'DVC add: {}'.format(str(self.parsed_args.input)) - self.commit_if_needed(msg) + for target in self.args.targets: + self.project.add(target) + return 0 diff --git a/dvc/command/checkout.py b/dvc/command/checkout.py index c251a04d59..cf558307b9 100644 --- a/dvc/command/checkout.py +++ b/dvc/command/checkout.py @@ -1,59 +1,7 @@ -import os - from dvc.command.common.base import CmdBase -from dvc.command.common.cache_dir import CacheDir -from dvc.config import ConfigI -from dvc.logger import Logger -from dvc.system import System class CmdCheckout(CmdBase): - def __init__(self, settings): - super(CmdCheckout, self).__init__(settings) - - @staticmethod - def cache_ok(item): - data = item.data.relative - cache = item.cache.relative - - if not os.path.isfile(data) or not os.path.isfile(cache): - return False - - if not System.samefile(data, cache): - return False - - return True - - @staticmethod - def checkout(items): - for item in items: - if CmdCheckout.cache_ok(item): - continue - - if os.path.isfile(item.data.relative): - os.remove(item.data.relative) - - System.hardlink(item.cache.relative, item.data.relative) - Logger.info('Checkout \'{}\''.format(item.data.relative)) - def run(self): - self.remove_not_tracked_hardlinks() - items = self.settings.path_factory.all_existing_data_items() - self.checkout(items) + self.project.checkout() return 0 - - def remove_not_tracked_hardlinks(self): - untracked_files = self.git.all_untracked_files() - - cache_dir = os.path.join(self.git.git_dir_abs, ConfigI.CACHE_DIR) - cached_files = CacheDir(cache_dir).find_caches(untracked_files) - - for file in cached_files: - Logger.info(u'Remove \'{}\''.format(file)) - os.remove(file) - - dir = os.path.dirname(file) - if not os.listdir(dir): - Logger.info(u'Remove empty directory \'{}\''.format(dir)) - os.removedirs(dir) - pass diff --git a/dvc/command/common/base.py b/dvc/command/common/base.py index 3b1bbd5fe9..99be905039 100644 --- a/dvc/command/common/base.py +++ b/dvc/command/common/base.py @@ -1,76 +1,20 @@ -from dvc.logger import Logger -from dvc.command.common.branch_changer import BranchChanger -from dvc.command.common.dvc_lock import DvcLock - +from dvc.project import Project class CmdBase(object): - def __init__(self, settings): - self._settings = settings - - if settings._parsed_args.quiet and not settings._parsed_args.verbose: - Logger.be_quiet() - elif not settings._parsed_args.quiet and settings._parsed_args.verbose: - Logger.be_verbose() - - @property - def settings(self): - return self._settings - - #NOTE: this name is really confusing. It should really be called "command" or smth, - # because it is only used for "command" argument from CmdRun. - @property - def args(self): - return self._settings.args - - @property - def cloud(self): - return self._settings.cloud - - @property - def parsed_args(self): - return self._settings._parsed_args - - @property - def config(self): - return self._settings.config + def __init__(self, args): + self.project = Project('.') + self.args = args - @property - def git(self): - return self._settings.git - - @property - def no_git_actions(self): - return self.parsed_args.no_git_actions - - def set_git_action(self, value): - self.parsed_args.no_git_actions = not value - - def set_locker(self, value): - self.parsed_args.no_lock = value - - def commit_if_needed(self, message, error=False): - if error or self.no_git_actions: - self.not_committed_changes_warning() - return 1 - else: - self.git.commit_all_changes_and_log_status(message) - return 0 - - @staticmethod - def not_committed_changes_warning(): - Logger.warn('changes were not committed to git') + if args.quiet and not args.verbose: + self.project.logger.be_quiet() + elif not args.quiet and args.verbose: + self.project.logger.be_verbose() def run_cmd(self): - with DvcLock(self.is_locker, self.git): - with BranchChanger(self.parsed_args.branch, self.parsed_args.new_branch, self.git): + with self.project.lock: + with self.project.scm.brancher(self.args.branch, self.args.new_branch): return self.run() # Abstract methods that have to be implemented by any inheritance class def run(self): pass - - @property - def is_locker(self): - if 'no_lock' in self.parsed_args.__dict__: - return not self.parsed_args.no_lock - return True diff --git a/dvc/command/common/branch_changer.py b/dvc/command/common/branch_changer.py index 3980aeaf24..94e6cf4208 100644 --- a/dvc/command/common/branch_changer.py +++ b/dvc/command/common/branch_changer.py @@ -7,7 +7,7 @@ def __init__(self, branch, msg): class BranchChanger(object): - def __init__(self, branch, new_branch, git): + def __init__(self, scm, branch, new_branch): if branch and new_branch: raise BranchChangerError("Commands conflict: --branch and --new-branch cannot be used at the same command") @@ -15,17 +15,17 @@ def __init__(self, branch, new_branch, git): self.branch = branch if branch else new_branch self.create_new = new_branch is not None - self.git = git + self.scm = scm def __enter__(self): if self.perform_action: - code, _, err = self.git.checkout(self.branch, self.create_new) + code, _, err = self.scm.checkout(self.branch, self.create_new) if code != 0: raise BranchChangerError(self.branch, err) return self def __exit__(self, type, value, traceback): if self.perform_action: - code, _, err = self.git.checkout_previous() + code, _, err = self.scm.checkout('-') if code != 0: raise BranchChangerError(self.branch, err) diff --git a/dvc/command/common/dvc_lock.py b/dvc/command/common/dvc_lock.py deleted file mode 100644 index 973d4afceb..0000000000 --- a/dvc/command/common/dvc_lock.py +++ /dev/null @@ -1,28 +0,0 @@ -import fasteners - -from dvc.command.common.common_error import CmdCommonError - - -class DvcLockerError(CmdCommonError): - def __init__(self, msg): - super(DvcLockerError, self).__init__('DVC locker error: {}'.format(msg)) - - -class DvcLock(object): - TIMEOUT = 5 - - def __init__(self, is_locker, git): - self.is_locker = is_locker - self.git = git - self.lock = None - - def __enter__(self): - if self.is_locker: - self.lock = fasteners.InterProcessLock(self.git.lock_file) - if not self.lock.acquire(timeout=self.TIMEOUT): - raise DvcLockerError('Cannot perform the cmd since DVC is busy and locked. Please retry the cmd later.') - return self.lock - - def __exit__(self, type, value, traceback): - if self.is_locker: - self.lock.release() diff --git a/dvc/command/config.py b/dvc/command/config.py index b948529d1c..4b731eab64 100644 --- a/dvc/command/config.py +++ b/dvc/command/config.py @@ -7,17 +7,6 @@ class CmdConfig(CmdBase): - def __init__(self, settings): - super(CmdConfig, self).__init__(settings) - - # Using configobj because it doesn't - # drop comments like configparser does. - self.configobj = configobj.ConfigObj(self._config_path, write_empty_values=True) - - @property - def _config_path(self): - return os.path.join(self.git.git_dir, Config.CONFIG_DIR, Config.CONFIG) - def _get_key(self, d, name, add=False): for k in d.keys(): if k.lower() == name.lower(): @@ -34,7 +23,7 @@ def unset(self): del self.configobj[self.section][self.opt] self.configobj.write() except Exception as exc: - Logger.error('Failed to unset \'{}\': {}'.format(self.parsed_args.name, exc)) + Logger.error('Failed to unset \'{}\': {}'.format(self.args.name, exc)) return 1 return 0 @@ -45,19 +34,19 @@ def show(self): def set(self): try: - self.configobj[self.section][self.opt] = self.parsed_args.value + self.configobj[self.section][self.opt] = self.args.value self.configobj.write() except Exception as exc: - Logger.error('Failed to set \'{}\' to \'{}\': {}'.format(self.parsed_args.name, - self.parsed_args.value, + Logger.error('Failed to set \'{}\' to \'{}\': {}'.format(self.args.name, + self.args.value, exc)) return 1 return 0 def check_opt(self): - _section, _opt = self.parsed_args.name.strip().split('.', 1) - add = (self.parsed_args.value != None and self.parsed_args.unset == False) + _section, _opt = self.args.name.strip().split('.', 1) + add = (self.args.value != None and self.args.unset == False) section = self._get_key(self.configobj, _section, add) @@ -76,13 +65,17 @@ def check_opt(self): return 0 def run(self): + # Using configobj because it doesn't + # drop comments like configparser does. + self.configobj = configobj.ConfigObj(self.project.config.config_file, write_empty_values=True) + if self.check_opt() != 0: return 1 - if self.parsed_args.unset: + if self.args.unset: return self.unset() - if self.parsed_args.value is None: + if self.args.value is None: return self.show() return self.set() diff --git a/dvc/command/data_sync.py b/dvc/command/data_sync.py index 434c90675d..6dc146de0a 100644 --- a/dvc/command/data_sync.py +++ b/dvc/command/data_sync.py @@ -1,31 +1,17 @@ -from dvc.command.common.base import CmdBase -from dvc.logger import Logger - import dvc.data_cloud as cloud +from dvc.command.common.base import CmdBase -class CmdDataBase(CmdBase): - def __init__(self, settings, cmd): - super(CmdDataBase, self).__init__(settings) - self.cmd = cmd - +class CmdDataPull(CmdBase): def run(self): - getattr(self.cloud, self.cmd)(self.parsed_args.targets, self.parsed_args.jobs) + self.project.pull(self.args.jobs) -class CmdDataPull(CmdDataBase): - def __init__(self, settings): - super(CmdDataPull, self).__init__(settings, 'pull') - - -class CmdDataPush(CmdDataBase): - def __init__(self, settings): - super(CmdDataPush, self).__init__(settings, 'push') +class CmdDataPush(CmdBase): + def run(self): + self.project.push(self.args.jobs) class CmdDataStatus(CmdBase): - def __init__(self, settings): - super(CmdDataStatus, self).__init__(settings) - def _show(self, status): for s in status: target, ret = s @@ -39,9 +25,9 @@ def _show(self, status): cloud.STATUS_NEW : 'new file:', } - Logger.info('\t{}\t{}'.format(prefix_map[ret], target.data.dvc)) + self.project.logger.info('\t{}\t{}'.format(prefix_map[ret], target)) def run(self): - status = self.cloud.status(self.parsed_args.targets, self.parsed_args.jobs) + status = self.project.status(self.args.jobs) self._show(status) return 0 diff --git a/dvc/command/gc.py b/dvc/command/gc.py index 7d138a553c..a4230f58d7 100644 --- a/dvc/command/gc.py +++ b/dvc/command/gc.py @@ -1,23 +1,7 @@ -import os - from dvc.command.common.base import CmdBase -from dvc.logger import Logger -from dvc.config import ConfigI -from dvc.state_file import StateFile class CmdGC(CmdBase): - def __init__(self, settings): - super(CmdGC, self).__init__(settings) - def run(self): - clist = [str(x) for x in StateFile.find_all_cache_files(self.git)] - - for cache in os.listdir(ConfigI.CACHE_DIR): - fname = os.path.join(ConfigI.CACHE_DIR, cache) - if os.path.basename(fname) in clist: - continue - os.remove(fname) - Logger.info('Cache \'{}\' was removed'.format(fname)) - + self.project.gc() return 0 diff --git a/dvc/command/init.py b/dvc/command/init.py index aeaa2f74bf..8402ec0172 100644 --- a/dvc/command/init.py +++ b/dvc/command/init.py @@ -1,112 +1,9 @@ -import os -from pathlib import Path +from dvc.project import Project -from dvc.command.common.base import CmdBase -from dvc.logger import Logger -from dvc.config import Config -from dvc.exceptions import DvcException -from dvc.path.data_item import DataItem +class CmdInit(object): + def __init__(self, args): + self.args = args - -class InitError(DvcException): - def __init__(self, msg): - DvcException.__init__(self, 'Init error: {}'.format(msg)) - - -class CmdInit(CmdBase): - CONFIG_TEMPLATE = '''[Global] -# Default target -Target = - -# Supported clouds: AWS, GCP -Cloud = AWS - -# This global storage path is going to be deprecated in the next version. -# Please use StoragePath from a specific cloud instead. -#StoragePath = - -# Log levels: Debug, Info, Warning and Error -LogLevel = Info - -[AWS] -CredentialPath = ~/.aws/credentials -CredentialSection = default - -StoragePath = dvc/tutorial - -# Default settings for AWS instances: -Type = t2.nano -Image = ami-2d39803a - -SpotPrice = -SpotTimeout = 300 - -KeyPairName = dvc-key -KeyPairDir = ~/.ssh -SecurityGroup = dvc-sg - -Region = us-east-1 -Zone = us-east-1a -SubnetId = - -Volume = my-100gb-drive-io - -Monitoring = false -EbsOptimized = false -AllDisksAsRAID0 = false - -[GCP] -StoragePath = -ProjectName = -''' - - def __init__(self, settings): - super(CmdInit, self).__init__(settings) - - @property - def is_locker(self): - return False - - def get_not_existing_path(self, *args): - path = Path(os.path.join(self.git.git_dir, *args)) - if path.exists(): - raise InitError('Path "{}" already exist'.format(path.name)) - return path - - def get_not_existing_conf_file_name(self): - file_name = os.path.join(self.git.git_dir, Config.CONFIG_DIR, Config.CONFIG) - if os.path.exists(file_name): - raise InitError('Configuration file "{}" already exist'.format(file_name)) - return file_name - - def run(self): - if not self.no_git_actions and not self.git.is_ready_to_go(): - return 1 - - if os.path.realpath(os.path.curdir) != self.settings.git.git_dir_abs: - Logger.error('DVC error: initialization could be done only from git root directory {}'.format( - self.settings.git.git_dir_abs - )) - return 1 - - config_dir_path = self.get_not_existing_path(Config.CONFIG_DIR) - cache_dir_path = self.get_not_existing_path(Config.CONFIG_DIR, Config.CACHE_DIR_NAME) - - conf_file_name = self.get_not_existing_conf_file_name() - - config_dir_path.mkdir() - cache_dir_path.mkdir() - Logger.info('Directories {}/, {}/ were created'.format( - config_dir_path.name, - os.path.join(config_dir_path.name, cache_dir_path.name))) - - conf_file = open(conf_file_name, 'wt') - conf_file.write(self.CONFIG_TEMPLATE) - conf_file.close() - - self.git.modify_gitignore([os.path.join(config_dir_path.name, cache_dir_path.name), - os.path.join(config_dir_path.name, os.path.basename(self.git.lock_file)), - '*' + DataItem.LOCAL_STATE_FILE_SUFFIX]) - - message = 'DVC init. cache dir {}'.format(cache_dir_path.name) - return self.commit_if_needed(message) + def run_cmd(self): + Project.init('.') + return 0 diff --git a/dvc/command/lock.py b/dvc/command/lock.py index 7a2b4ce2dd..66722b9102 100644 --- a/dvc/command/lock.py +++ b/dvc/command/lock.py @@ -1,41 +1,27 @@ from dvc.command.common.base import CmdBase -from dvc.logger import Logger -from dvc.state_file import StateFile +from dvc.stage import Stage class CmdLock(CmdBase): - def __init__(self, settings): - super(CmdLock, self).__init__(settings) - def run(self): - return self.lock_files(self.parsed_args.files, not self.parsed_args.unlock) - - def lock_files(self, files, target): - cmd = 'lock' if target else 'unlock' - - error = 0 - for file in files: + lock = not self.args.unlock + cmd = 'lock' if lock else 'unlock' + ret = 0 + for file in self.args.files: try: - data_item = self.settings.path_factory.existing_data_item(file) - state = StateFile.load(data_item, self.settings) + stage = Stage.load(self.project, file) - if state.locked and target: - Logger.warn('Data item {} is already locked'.format(data_item.data.relative)) - elif not state.locked and not target: - Logger.warn('Data item {} is already unlocked'.format(data_item.data.relative)) + if stage.locked and lock: + self.project.logger.warn('Stage {} is already locked'.format(file)) + elif not stage.locked and not lock: + self.project.logger.warn('Stage {} is already unlocked'.format(file)) else: - state.locked = target - Logger.debug('Saving status file for data item {}'.format(data_item.data.relative)) - state.save() - Logger.info('Data item {} was {}ed'.format(data_item.data.relative, cmd)) + stage.locked = lock + self.project.logger.debug('Saving stage file {}'.format(file)) + stage.dump() + self.project.logger.info('Stage {} was {}ed'.format(file, cmd)) except Exception as ex: - error += 1 - Logger.error('Unable to {} {}: {}'.format(cmd, file, ex)) - - if error > 0 and not self.no_git_actions: - Logger.error('Errors occurred. One or more repro cmd was not successful.') - self.not_committed_changes_warning() - else: - self.commit_if_needed('DVC lock: {}'.format(' '.join(self.args))) + ret = 1 + self.project.logger.error('Unable to {} {}: {}'.format(cmd, file, ex)) - return 0 + return ret diff --git a/dvc/command/merge.py b/dvc/command/merge.py deleted file mode 100644 index 1d6e8436db..0000000000 --- a/dvc/command/merge.py +++ /dev/null @@ -1,56 +0,0 @@ -import os - -from dvc.command.common.base import CmdBase -from dvc.config import ConfigI -from dvc.logger import Logger -from dvc.state_file import StateFile -from dvc.path.data_item import DataItem -from dvc.system import System -from dvc.command.checkout import CmdCheckout - - -class CmdMerge(CmdBase): - def __init__(self, settings): - super(CmdMerge, self).__init__(settings) - - def print_info(self, targets): - for fname in targets: - Logger.info('Restored original data after merge:') - Logger.info(' {}'.format(fname)) - - def collect_targets(self): - targets = [] - - for fname in self.git.get_last_merge_changed_files(): - if not StateFile._is_state_file(fname): - continue - - state = StateFile.load(fname) - if not state.cmd and state.locked: - targets.append(fname) - - return targets - - def checkout_targets(self, targets): - items = [] - for fname in targets: - self.git.checkout_file_before_last_merge(fname) - state = StateFile.load(fname) - for out in state.out: - item = self.settings.path_factory.data_item(os.path.join(state.cwd, out)) - items.append(item) - - CmdCheckout.checkout(items) - - msg = 'DVC merge files: {}'.format(' '.join(targets)) - self.commit_if_needed(msg) - - def run(self): - targets = self.collect_targets() - if not targets: - return 1 - - self.checkout_targets(targets) - self.print_info(targets) - - return 0 diff --git a/dvc/command/remove.py b/dvc/command/remove.py index 1293a1e2cf..45dcce1978 100644 --- a/dvc/command/remove.py +++ b/dvc/command/remove.py @@ -1,75 +1,8 @@ -import os +from dvc.command.common.base import CmdBase -from dvc.command.common.traverse import Traverse -from dvc.logger import Logger - - -class CmdRemove(Traverse): - def __init__(self, settings): - super(CmdRemove, self).__init__(settings, "remove") +class CmdRemove(CmdBase): def run(self): - if not self.parsed_args.target: - Logger.error('Nothing specified, nothing removed.') - return 1 - - return super(CmdRemove, self).run() - - def process_file(self, target): - Logger.debug(u'[Cmd-Remove] Remove file {}.'.format(target)) - - data_item = self._get_data_item(target) - - self._remove_cache_file(data_item) - self._remove_cloud_cache(data_item) - self._remove_state_file(data_item) - - os.remove(data_item.data.relative) - Logger.debug(u'[Cmd-Remove] Remove data item {}. Success.'.format(data_item.data.relative)) - pass - - def _remove_state_file(self, data_item): - if os.path.isfile(data_item.state.relative): - self._remove_dvc_path(data_item.state, 'state') - else: - Logger.warn(u'[Cmd-Remove] State file {} for data instance {} does not exist'.format( - data_item.state.relative, data_item.data.relative)) - - def _remove_cache_file(self, data_item): - if not self.parsed_args.keep_in_cache and os.path.isfile(data_item.cache.relative): - self._remove_dvc_path(data_item.cache, 'cache') - else: - if not self.parsed_args.keep_in_cache: - msg = u'[Cmd-Remove] Unable to find cache file {} for data item {}' - Logger.warn(msg.format(data_item.cache.relative, data_item.data.relative)) - pass - - def _remove_dvc_path(self, dvc_path, name): - Logger.debug(u'[Cmd-Remove] Remove {} {}.'.format(name, dvc_path.relative)) - os.remove(dvc_path.relative) - self.remove_dir_if_empty(dvc_path.relative) - Logger.debug(u'[Cmd-Remove] Remove {}. Success.'.format(name)) - - @staticmethod - def remove_dir_if_empty(file): - dir = os.path.dirname(file) - if dir != '' and not os.listdir(dir): - Logger.debug(u'[Cmd-Remove] Empty directory was removed {}.'.format(dir)) - os.rmdir(dir) - pass - - def traverse_dir_finalize(self, target): - os.rmdir(target) - - def is_recursive(self): - return self.parsed_args.recursive - - # Renaming - def remove_dir(self, target): - return self._traverse_dir(target) - - def remove_file(self, target): - self.process_file(target) - - def remove_all(self): - return self._traverse_all() + for target in self.args.targets: + self.project.remove(target) + return 0 diff --git a/dvc/command/repro.py b/dvc/command/repro.py index a9c295f346..c3b6a5e31e 100644 --- a/dvc/command/repro.py +++ b/dvc/command/repro.py @@ -1,155 +1,8 @@ -import os -import copy - -from dvc.command.run import CmdRun -from dvc.logger import Logger -from dvc.exceptions import DvcException -from dvc.state_file import StateFile -from dvc.system import System -from dvc.data_cloud import file_md5 - - -class ReproError(DvcException): - def __init__(self, msg): - DvcException.__init__(self, 'Run error: {}'.format(msg)) - - -class CmdRepro(CmdRun): - def __init__(self, settings): - super(CmdRepro, self).__init__(settings) +from dvc.command.common.base import CmdBase +class CmdRepro(CmdBase): def run(self): - recursive = not self.parsed_args.single_item - stages = [] - - for target in self.parsed_args.targets: - if StateFile._is_state_file(target): - stage = StateFile.load(target) - else: - stage = StateFile.find_by_output(self.settings, target) - - if stage: - stages.append(stage) - - self.repro_stages(stages, recursive, self.parsed_args.force) - names = [os.path.relpath(stage.path) for stage in stages] - return self.commit_if_needed('DVC repro: {}'.format(names)) - - def repro_stages(self, stages, recursive, force): - error = False - changed = False - - for stage in stages: - try: - change = ReproStage(self.settings, stage, recursive, force) - if change.reproduce(): - changed = True - Logger.info(u'Stage "{}" was reproduced.'.format(stage.path)) - else: - Logger.info(u'Reproduction is not required for stage "{}".'.format(stage.path)) - except ReproError as err: - Logger.error('Error in reproducing stage {}: {}'.format(stage.path, str(err))) - error = True - break - - if error and not self.no_git_actions: - Logger.error('Errors occurred. One or more repro cmd was not successful.') - self.not_committed_changes_warning() - - return changed and not error - - -class ReproStage(object): - def __init__(self, settings, stage, recursive, force): - self.git = settings.git - self.settings = settings - self._recursive = recursive - self._force = force - - self.stage = stage - - if not self.stage.cmd and not self.stage.locked: - msg = 'Error: stage "{}" is not locked, but has no command for reproduction' - raise ReproError(msg.format(stage.path)) - - def is_cache_exists(self): - for out in self.stage.out: - path = os.path.join(self.stage.cwd, out) - if not os.path.exists(path): - return False - return True - - def remove_output_files(self): - for out in self.stage.out: - path = os.path.join(self.stage.cwd, out) - Logger.debug('Removing output file {} before reproduction.'.format(path)) - try: - os.remove(path) - except Exception as ex: - msg = 'Output file {} cannot be removed before reproduction: {}' - Logger.debug(msg.format(path, ex)) - - def reproduce_run(self): - Logger.info('Reproducing run command for stage {}. Args: {}'.format( - self.stage.path, self.stage.cmd)) - - CmdRun.run_command(self.settings, self.stage) - - def reproduce_stage(self): - Logger.debug('Reproducing stage {}.'.format(self.stage.path)) - self.remove_output_files() - self.reproduce_run() - - def is_repro_required(self): - deps_changed = self.reproduce_deps() - if deps_changed or self._force or not self.is_cache_exists(): - return True - return False - - def reproduce(self): - Logger.debug('Reproduce stage {}. recursive={}, force={}'.format( - self.stage.path, self._recursive, self._force)) - - if self.stage.locked: - Logger.debug('Stage {} is not reproducible'.format(self.stage.path)) - return False - - if not self.is_repro_required(): - Logger.debug('Stage {} is up to date'.format(self.stage.path)) - return False - - Logger.debug('Stage {} is going to be reproduced'.format(self.stage.path)) - self.reproduce_stage() - return True - - def reproduce_dep(self, path, md5, recursive): - if not self.settings.path_factory.is_data_item(path): - if md5 != file_md5(os.path.join(self.git.git_dir_abs, path))[0]: - self.log_repro_reason('source {} was changed'.format(path)) - return True - return False - - stage = StateFile.find_by_output(self.settings, path) - if recursive: - ReproStage(self.settings, stage, self._recursive, self._force).reproduce() - - stage = StateFile.load(stage.path) - if md5 != stage.out[os.path.relpath(path, stage.cwd)]: - self.log_repro_reason('data item {} was changed - md5 sum doesn\'t match'.format(path)) - return True - - return False - - def reproduce_deps(self): - result = False - - for name,md5 in self.stage.deps.items(): - path = os.path.join(self.stage.cwd, name) - if self.reproduce_dep(path, md5, self._recursive): - result = True - - return result - - def log_repro_reason(self, reason): - msg = u'Repro is required for stage {} because of {}' - Logger.debug(msg.format(self.stage.path, reason)) + recursive = not self.args.single_item + self.project.reproduce(self.args.targets, + recursive=recursive, + force=self.args.force) diff --git a/dvc/command/run.py b/dvc/command/run.py index 296344753c..87f321d587 100644 --- a/dvc/command/run.py +++ b/dvc/command/run.py @@ -1,72 +1,14 @@ -import os - from dvc.command.common.base import CmdBase -from dvc.exceptions import DvcException -from dvc.logger import Logger -from dvc.state_file import StateFile -from dvc.executor import Executor - - -class RunError(DvcException): - def __init__(self, msg): - DvcException.__init__(self, 'Run error: {}'.format(msg)) class CmdRun(CmdBase): - def __init__(self, settings): - super(CmdRun, self).__init__(settings) - def run(self): - cmd = ' '.join(self.parsed_args.command) - - stage_file = self.get_stage_file() - if os.path.isfile(stage_file): - Logger.error("Stage file {} already exists".format(stage_file)) - return 1 - - state = StateFile(fname=os.path.join(self.parsed_args.cwd, stage_file), - cmd=cmd, - out=self.parsed_args.out, - out_git=self.parsed_args.out_git, - deps=self.parsed_args.deps, - locked=self.parsed_args.lock, - cwd=self.parsed_args.cwd) - - self.run_command(self.settings, state) - return self.commit_if_needed('DVC run: {}'.format(state.cmd)) - - def get_stage_file(self): - if self.parsed_args.file: - return self.parsed_args.file - - if self.parsed_args.out or self.parsed_args.out_git: - result = self.parsed_args.out[0] if self.parsed_args.out else self.parsed_args.out_git[0] - result = os.path.basename(result+StateFile.STATE_FILE_SUFFIX) - Logger.info(u'Using \'{}\' as a stage file'.format(result)) - return result - - Logger.info(u'Using \'{}\' as stage file'.format(StateFile.DVCFILE_NAME)) - return StateFile.DVCFILE_NAME - - @staticmethod - def run_command(settings, state): - Executor.exec_cmd_only_success(state.cmd, cwd=state.cwd, shell=True) - - CmdRun.move_output_to_cache(settings, state) - CmdRun.update_state_file(settings, state) - - @staticmethod - def update_state_file(settings, state): - Logger.debug('Update state file "{}"'.format(state.path)) - state.out = StateFile.parse_deps_state(settings, state.out, currdir=state.cwd) - state.out_git = StateFile.parse_deps_state(settings, state.out_git, currdir=state.cwd) - state.deps = StateFile.parse_deps_state(settings, state.deps, currdir=state.cwd) - state.save() - - @staticmethod - def move_output_to_cache(settings, state): - items = settings.path_factory.to_data_items(state.out)[0] - for item in items: - Logger.debug('Move output file "{}" to cache dir "{}" and create a hardlink'.format( - item.data.relative, item.cache_dir_abs)) - item.move_data_to_cache() + self.project.run(cmd=' '.join(self.args.command), + outs=self.args.outs, + outs_no_cache=self.args.outs_no_cache, + deps=self.args.deps, + deps_no_cache=self.args.deps_no_cache, + locked=self.args.lock, + fname=self.args.file, + cwd=self.args.cwd) + return 0 diff --git a/dvc/command/show_pipeline.py b/dvc/command/show_pipeline.py index be29d2c7c9..873378a452 100644 --- a/dvc/command/show_pipeline.py +++ b/dvc/command/show_pipeline.py @@ -1,17 +1,10 @@ import os import networkx as nx -from dvc.command.common.traverse import Traverse -from dvc.logger import Logger -from dvc.state_file import StateFile +from dvc.command.common.base import CmdBase -class CmdShowPipeline(Traverse): - def __init__(self, settings): - super(CmdShowPipeline, self).__init__(settings, "collect", do_not_start_from_root=False) - self.g = nx.DiGraph() - self.subs = [] - +class CmdShowPipeline(CmdBase): def draw(self, g, target, fname_suffix): fname = 'pipeline_' + fname_suffix try: @@ -36,7 +29,7 @@ def draw_targets(self, target): target = '.' return self.draw(self.g, target, target) - for t in self.parsed_args.target: + for t in self.args.target: fname_suffix = os.path.basename(os.path.normpath(t)) s = self.find_sub(t) @@ -47,16 +40,16 @@ def draw_targets(self, target): return 0 def run(self): - saved_targets = self.settings.parsed_args.target - self.settings.parsed_args.target = ['.'] - - ret = super(CmdShowPipeline, self).run() + self.g = nx.DiGraph() + self.subs = [] - self.settings.parsed_args.target = saved_targets + saved_targets = self.args.target + self.args.target = ['.'] + + for stage in self.project.stages(): + self.collect_stage(stage) - if ret != 0: - Logger.error('Failed to build dependency graph for the project') - return 1 + self.args.target = saved_targets # Try to find independent clusters which might occure # when a bunch of data items were used independently. @@ -64,27 +57,18 @@ def run(self): return self.draw_targets(saved_targets) - def process_file(self, target): - data_item = self._get_data_item(target) - name = data_item.data.relative + def collect_stage(self, stage): + name = os.path.relpath(stage.path, self.project.root_dir) state = StateFile.load(data_item, self.git) self.g.add_node(name) - for i in state.input_files: + for dep in stage.deps: + i = os.path.relpath(dep.path, self.project.root_dir) self.g.add_node(i) self.g.add_edge(i, name) - for o in state.output_files: - if o == name: - continue + for out in state.outs: + o = os.path.relpath(out.path, self.project.root_dir) self.g.add_node(o) self.g.add_edge(name, o) - - @property - def no_git_actions(self): - return True - - @staticmethod - def not_committed_changes_warning(): - pass diff --git a/dvc/command/show_workflow.py b/dvc/command/show_workflow.py index 8c0d2585a2..875f1a6743 100644 --- a/dvc/command/show_workflow.py +++ b/dvc/command/show_workflow.py @@ -1,27 +1,16 @@ from dvc.command.common.base import CmdBase -from dvc.logger import Logger +from dvc.git_wrapper import GitWrapper class CmdShowWorkflow(CmdBase): - def __init__(self, settings): - super(CmdShowWorkflow, self).__init__(settings) - def run(self): - target = self.settings.parsed_args.target + target = self.args.target if not target: - target = self._settings.config._config['Global'].get('Target', '') - Logger.debug(u'Set show workflow target as {}'.format(target)) + target = self.project.config._config['Global'].get('Target', '') + self.project.logger.debug(u'Set show workflow target as {}'.format(target)) - wf = self.git.get_all_commits(target, self.settings) - wf.build_graph(self.settings.parsed_args.dvc_commits, - self.settings.parsed_args.all_commits, - self.settings.parsed_args.max_commits) + wf = GitWrapper.get_all_commits(target, self.settings) + wf.build_graph(self.args.dvc_commits, + self.args.all_commits, + self.args.max_commits) return 0 - - @property - def no_git_actions(self): - return True - - @staticmethod - def not_committed_changes_warning(): - pass diff --git a/dvc/config.py b/dvc/config.py index f06d31394f..786833361e 100644 --- a/dvc/config.py +++ b/dvc/config.py @@ -14,67 +14,58 @@ def __init__(self, msg): DvcException.__init__(self, 'Config file error: {}'.format(msg)) -class ConfigI(object): - """ Basic config instance """ - CONFIG_DIR = '.dvc' - TARGET_FILE_DEFAULT = 'target' +class Config(object): CONFIG = 'config' - CACHE_DIR_NAME = 'cache' - - CACHE_DIR = os.path.join(CONFIG_DIR, CACHE_DIR_NAME) - - def __init__(self, cloud=None, conf_parser=None): - self._conf_parser = None - self._cloud = None - self.set(cloud, conf_parser) - - def set(self, cloud=None, conf_parser=None): - """ Set config params """ - self._cloud = cloud - self._conf_parser = conf_parser - - @property - def cache_dir(self): - """ Directory with cached data files """ - return self.CACHE_DIR - - @property - def cloud(self): - """ Cloud config """ - return self._cloud - - @property - def conf_parser(self): - return self._conf_parser - - -class Config(ConfigI): - """ Parsed config object """ - def __init__(self, conf_file=ConfigI.CONFIG, conf_pseudo_file=None): - """ - Params: - conf_file (String): configuration file - conf_pseudo_file (String): for unit testing, something that supports readline; - supersedes conf_file - """ - self._conf_file = conf_file - self._config = configparser.SafeConfigParser() + CONFIG_TEMPLATE = ''' +[Global] +# Supported clouds: AWS, GCP +Cloud = AWS + +# Log levels: Debug, Info, Warning and Error +LogLevel = Info + +[AWS] +CredentialPath = ~/.aws/credentials +CredentialSection = default + +StoragePath = dvc/tutorial + +# Default settings for AWS instances: +Type = t2.nano +Image = ami-2d39803a - if conf_pseudo_file is not None: - self._config.readfp(conf_pseudo_file) - else: - fname = os.path.join(self.CONFIG_DIR, conf_file) - if not os.path.isfile(fname): - raise ConfigError('Config file "{}" does not exist {}'.format(fname, os.getcwd())) - self._config.read(fname) +SpotPrice = +SpotTimeout = 300 - level = self._config['Global']['LogLevel'] - Logger.set_level(level) +KeyPairName = dvc-key +KeyPairDir = ~/.ssh +SecurityGroup = dvc-sg - super(Config, self).__init__(self._config['Global']['Cloud'], - self._config) +Region = us-east-1 +Zone = us-east-1a +SubnetId = - @property - def file(self): - """ Config file object """ - return self._conf_file +Volume = my-100gb-drive-io + +Monitoring = false +EbsOptimized = false +AllDisksAsRAID0 = false + +[GCP] +StoragePath = +ProjectName = +''' + + def __init__(self, dvc_dir): + self.dvc_dir = dvc_dir + self.config_file = os.path.join(dvc_dir, self.CONFIG) + + self._config = configparser.SafeConfigParser() + self._config.read(self.config_file) + Logger.set_level(self._config['Global']['LogLevel']) + + @staticmethod + def init(dvc_dir): + config_file = os.path.join(dvc_dir, Config.CONFIG) + open(config_file, 'w').write(Config.CONFIG_TEMPLATE) + return Config(dvc_dir) diff --git a/dvc/data_cloud.py b/dvc/data_cloud.py index 8a5752d2e9..7ae5244f9c 100644 --- a/dvc/data_cloud.py +++ b/dvc/data_cloud.py @@ -141,7 +141,7 @@ def storage_prefix(self): def cache_file_key(self, fname): """ Key of a file within the bucket """ - return '{}/{}'.format(self.storage_prefix, fname).strip('/') + return '{}/{}'.format(self.storage_prefix, os.path.basename(fname)).strip('/') @staticmethod def tmp_file(fname): @@ -154,57 +154,39 @@ def sanity_check(self): """ pass - def _import(self, bucket, path, item): + def _import(self, bucket, fin, fout): """ Cloud-specific method for importing data file. """ pass - def push(self, item): + def push(self, path): """ Cloud-specific method for pushing data """ pass - def pull(self, item): + def pull(self, path): """ Generic method for pulling data from the cloud """ - fname = item.cache.dvc - key_name = self.cache_file_key(fname) - return self._import(self.storage_bucket, key_name, item) - - def import_data(self, url, item): - """ Generic method for importing data """ - parsed = urlparse(url) - - return self._import(parsed.netloc, parsed.path, item) - - def sync(self, fname): - """ - Generic method for syncing data. It will decide on its own whether it needs - to push or pull data, depending on existance of local cache file. - """ - item = self._cloud_settings.path_factory.data_item(fname) - - if os.path.isfile(item.cache.dvc): - return self.push(item) - else: - return self.pull(item) + key_name = self.cache_file_key(path) + return self._import(self.storage_bucket, key_name, path) - def remove(self, item): + def remove(self, path): """ Cloud-specific method for removing data item from the cloud. """ pass - def _status(self, item): + def _status(self, path): """ Cloud-specific method for checking data item status. """ pass - def status(self, item): + def status(self, path): """ Generic method for checking data item status. """ - return STATUS_MAP.get(self._status(item), STATUS_UNKNOWN) + return STATUS_MAP.get(self._status(path), STATUS_UNKNOWN) + class DataCloudLOCAL(DataCloudBase): """ @@ -373,7 +355,7 @@ def import_data(self, url, item): class DataCloudAWS(DataCloudBase): """ DataCloud class for Amazon Web Services """ - def __init__(self, cloud_settings): # settings, config, cloud_config): + def __init__(self, cloud_settings): super(DataCloudAWS, self).__init__(cloud_settings) self._aws_creds = AWSCredentials(cloud_settings.cloud_config) @@ -455,11 +437,10 @@ def _download_tracker(fname): """ return fname + '.download' - def _import(self, bucket_name, key_name, data_item): + def _import(self, bucket_name, key_name, fname): bucket = self._get_bucket_aws(bucket_name) - fname = data_item.data.dvc tmp_file = self.tmp_file(fname) name = os.path.basename(fname) key = bucket.get_key(key_name) @@ -469,7 +450,7 @@ def _import(self, bucket_name, key_name, data_item): if self._cmp_checksum(key, fname): Logger.debug('File "{}" matches with "{}".'.format(fname, key_name)) - return data_item + return fname Logger.debug('Downloading cache file from S3 "{}/{}" to "{}"'.format(bucket.name, key_name, @@ -484,22 +465,11 @@ def _import(self, bucket_name, key_name, data_item): return None os.rename(tmp_file, fname) - data_item.move_data_to_cache() progress.finish_target(name) Logger.debug('Downloading completed') - return data_item - - def _read_upload_tracker(self, fname): - """ - Try reading upload tracker if present. - """ - try: - return open(self._upload_tracker(fname), 'r').read() - except Exception as exc: - Logger.debug("Failed to read upload tracker file for {}: {}".format(fname, exc)) - return None + return fname def _write_upload_tracker(self, fname, mp_id): """ @@ -604,48 +574,48 @@ def _push_multipart(self, key, fname): multipart.complete_upload() self._unlink_upload_tracker(fname) - def push(self, data_item): + def push(self, path): """ push, aws version """ - aws_key = self.cache_file_key(data_item.cache.dvc) + aws_key = self.cache_file_key(path) bucket = self._get_bucket_aws(self.storage_bucket) key = bucket.get_key(aws_key) if key: Logger.debug('File already uploaded to the cloud. Checksum validation...') - if self._cmp_checksum(key, data_item.cache.dvc): + if self._cmp_checksum(key, path): Logger.debug('File checksum matches. No uploading is needed.') - return data_item + return path Logger.debug('Checksum miss-match. Re-uploading is required.') key = bucket.new_key(aws_key) try: - self._push_multipart(key, data_item.cache.relative) + self._push_multipart(key, path) except Exception as exc: - Logger.error('Failed to upload "{}": {}'.format(data_item.cache.relative, exc)) + Logger.error('Failed to upload "{}": {}'.format(path, exc)) return None - progress.finish_target(os.path.basename(data_item.cache.relative)) + progress.finish_target(os.path.basename(path)) - return data_item + return path - def _status(self, data_item): - aws_key = self.cache_file_key(data_item.cache.dvc) + def _status(self, path): + aws_key = self.cache_file_key(path) bucket = self._get_bucket_aws(self.storage_bucket) key = bucket.get_key(aws_key) remote_exists = key is not None - local_exists = os.path.exists(data_item.cache.relative) + local_exists = os.path.exists(path) diff = None if remote_exists and local_exists: - diff = self._cmp_checksum(key, data_item.cache.dvc) + diff = self._cmp_checksum(key, path) return (local_exists, remote_exists, diff) - def remove(self, data_item): - aws_file_name = self.cache_file_key(data_item.cache.dvc) + def remove(self, path): + aws_file_name = self.cache_file_key(path) Logger.debug(u'[Cmd-Remove] Remove from cloud {}.'.format(aws_file_name)) @@ -702,11 +672,10 @@ def _cmp_checksum(blob, fname): return False - def _import(self, bucket_name, key, data_item): + def _import(self, bucket_name, key, fname): bucket = self._get_bucket_gc(bucket_name) - fname = data_item.data.dvc name = os.path.basename(fname) tmp_file = self.tmp_file(fname) @@ -717,7 +686,7 @@ def _import(self, bucket_name, key, data_item): if self._cmp_checksum(blob, fname): Logger.debug('File "{}" matches with "{}".'.format(fname, key)) - return data_item + return fname Logger.debug('Downloading cache file from gc "{}/{}"'.format(bucket.name, key)) @@ -732,51 +701,50 @@ def _import(self, bucket_name, key, data_item): return None os.rename(tmp_file, fname) - data_item.move_data_to_cache() progress.finish_target(name) Logger.debug('Downloading completed') - return data_item + return fname - def push(self, data_item): + def push(self, path): """ push, gcp version """ bucket = self._get_bucket_gc(self.storage_bucket) - blob_name = self.cache_file_key(data_item.cache.dvc) - name = os.path.basename(data_item.cache.dvc) + blob_name = self.cache_file_key(path) + name = os.path.basename(path) blob = bucket.get_blob(blob_name) if blob is not None and blob.exists(): - if self._cmp_checksum(blob, data_item.cache.dvc): - Logger.debug('checksum %s matches. Skipping upload' % data_item.cache.relative) - return data_item - Logger.debug('checksum %s mismatch. re-uploading' % data_item.cache.relative) + if self._cmp_checksum(blob, path): + Logger.debug('checksum %s matches. Skipping upload' % path) + return path + Logger.debug('checksum %s mismatch. re-uploading' % path) # same as in _import progress.update_target(name, 0, None) blob = bucket.blob(blob_name) - blob.upload_from_filename(data_item.cache.relative) + blob.upload_from_filename(path) progress.finish_target(name) - Logger.debug('uploading %s completed' % data_item.cache.relative) + Logger.debug('uploading %s completed' % path) - return data_item + return path - def _status(self, data_item): + def _status(self, path): """ status, gcp version """ bucket = self._get_bucket_gc(self.storage_bucket) - blob_name = self.cache_file_key(data_item.cache.dvc) + blob_name = self.cache_file_key(path) blob = bucket.get_blob(blob_name) remote_exists = blob is not None and blob.exists() - local_exists = os.path.exists(data_item.cache.relative) + local_exists = os.path.exists(path) diff = None if remote_exists and local_exists: - diff = self._cmp_checksum(blob, data_item.cache.dvc) + diff = self._cmp_checksum(blob, path) return (local_exists, remote_exists, diff) @@ -803,17 +771,8 @@ class DataCloud(object): '' : 'LOCAL', } - def __init__(self, settings): - assert isinstance(settings, dvc.settings.Settings) - - #To handle ConfigI case - if not hasattr(settings.config, '_config'): - self._settings = settings - self._cloud = DataCloudBase(None) - return - - self._settings = settings - self._config = self._settings.config._config + def __init__(self, config): + self._config = config cloud_type = self._config['Global'].get('Cloud', '').strip().upper() if cloud_type not in self.CLOUD_MAP.keys(): @@ -823,8 +782,7 @@ def __init__(self, settings): raise ConfigError('Can\'t find cloud section \'[%s]\' in config' % cloud_type) cloud_settings = self.get_cloud_settings(self._config, - cloud_type, - self._settings.path_factory) + cloud_type) self.typ = cloud_type self._cloud = self.CLOUD_MAP[cloud_type](cloud_settings) @@ -832,7 +790,7 @@ def __init__(self, settings): self.sanity_check() @staticmethod - def get_cloud_settings(config, cloud_type, path_factory): + def get_cloud_settings(config, cloud_type): """ Obtain cloud settings from config. """ @@ -841,7 +799,7 @@ def get_cloud_settings(config, cloud_type, path_factory): else: cloud_config = config[cloud_type] global_storage_path = config['Global'].get('StoragePath', None) - cloud_settings = CloudSettings(path_factory, global_storage_path, cloud_config) + cloud_settings = CloudSettings(global_storage_path, cloud_config) return cloud_settings def sanity_check(self): @@ -867,62 +825,11 @@ def sanity_check(self): self._cloud.sanity_check() - def _collect_target(self, target): - """ - Collect target as a file or directory. - """ - if self._settings.path_factory.is_data_item(target): - item = self._settings.path_factory.existing_data_item(target) - return [item] - elif os.path.isdir(target): - return self._settings.path_factory.all_existing_data_items(target) - - Logger.warn('Target "{}" does not exist'.format(target)) - - return [] - - def _collect_targets(self, targets): - """ - Collect every target as a data item. - """ - collected = [] - - for target in targets: - collected += self._collect_target(target) - - return collected - def _map_targets(self, func, targets, jobs): """ Process targets as data items in parallel. """ - collected = self._collect_targets(targets) - - return map_progress(func, collected, jobs) - - def _import(self, target): - """ - Generic method for importing targets in a cloud-agnostic way. - """ - url, item = target - parsed = urlparse(url) - - typ = self.SCHEME_MAP.get(parsed.scheme, None) - if typ == None: - Logger.error('Not supported scheme \'{}\''.format(parsed.scheme)) - return None - - #To handle ConfigI case - if not hasattr(self._settings.config, '_config'): - self._config = None - cloud_settings = None - else: - self._config = self._settings.config._config - cloud_settings = self.get_cloud_settings(self._config, typ, self._settings.path_factory) - - cloud = self.CLOUD_MAP[typ](cloud_settings) - - return cloud.import_data(url, item) + return map_progress(func, targets, jobs) def sync(self, targets, jobs=1): """ @@ -942,18 +849,6 @@ def pull(self, targets, jobs=1): """ return self._map_targets(self._cloud.pull, targets, jobs) - def import_data(self, targets, jobs=1): - """ - Import data items in a cloud-agnostic way. - """ - return map_progress(self._import, targets, jobs) - - def remove(self, item): - """ - Remove data items is a cloud-agnostic way. - """ - return self._cloud.remove(item) - def status(self, targets, jobs=1): """ Check status of data items in a cloud-agnostic way. diff --git a/dvc/exceptions.py b/dvc/exceptions.py index 11613c188d..f924677bce 100644 --- a/dvc/exceptions.py +++ b/dvc/exceptions.py @@ -1,3 +1,2 @@ class DvcException(Exception): - def __init__(self, msg): - Exception.__init__(self, msg) + pass diff --git a/dvc/git_wrapper.py b/dvc/git_wrapper.py index 1aee053c20..97b531acd9 100644 --- a/dvc/git_wrapper.py +++ b/dvc/git_wrapper.py @@ -7,7 +7,6 @@ from dvc.logger import Logger from dvc.config import Config from dvc.executor import Executor, ExecutorError -from dvc.path.data_item import DataItem from dvc.system import System from dvc.graph.workflow import Workflow from dvc.graph.commit import Commit diff --git a/dvc/lock.py b/dvc/lock.py new file mode 100644 index 0000000000..d6f21506ce --- /dev/null +++ b/dvc/lock.py @@ -0,0 +1,26 @@ +import os +import fasteners + +class Lock(object): + LOCK_FILE = 'lock' + TIMEOUT = 5 + + def __init__(self, dvc_dir): + self.lock_file = os.path.join(dvc_dir, self.LOCK_FILE) + self._lock = fasteners.InterProcessLock(self.lock_file) + + @staticmethod + def init(dvc_dir): + return Lock(dvc_dir) + + def lock(self): + self._lock.acquire(timeout=self.TIMEOUT) + + def unlock(self): + self._lock.release() + + def __enter__(self): + self.lock() + + def __exit__(self, type, value, tb): + self.unlock() diff --git a/dvc/main.py b/dvc/main.py index 276d60cd84..280055abab 100644 --- a/dvc/main.py +++ b/dvc/main.py @@ -1,27 +1,5 @@ -""" -main entry point / argument parsing for dvc -""" - -import sys -from dvc.settings import Settings -from dvc.logger import Logger -from dvc.config import ConfigError +from dvc.cli import parse_args def main(): - try: - settings = Settings(sys.argv[1:]) - instance = settings._parsed_args.func(settings) - except Exception as e: - # In case we didn't even manage to parse options - exc_info = '-v' in sys.argv or '--verbose' in sys.argv - Logger.error("Settings error: {}".format(e), exc_info=exc_info) - return 255 - - try: - ret = instance.run_cmd() - except Exception as e: - exc_info = settings.parsed_args.verbose - Logger.error("{} error: {}".format(instance.__class__.__name__, e), exc_info=exc_info) - return 254 - - return ret + args = parse_args() + return args.func(args).run_cmd() diff --git a/dvc/path/__init__.py b/dvc/path/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/dvc/path/data_item.py b/dvc/path/data_item.py deleted file mode 100644 index f9592919dc..0000000000 --- a/dvc/path/data_item.py +++ /dev/null @@ -1,114 +0,0 @@ -import os -import stat - -from dvc.config import ConfigI -from dvc.path.path import Path -from dvc.exceptions import DvcException -from dvc.system import System -from dvc.utils import cached_property -from dvc.data_cloud import file_md5 -from dvc.state_file import StateFile, LocalStateFile - - -class DataItemError(DvcException): - def __init__(self, msg): - super(DataItemError, self).__init__('Data item error: {}'.format(msg)) - - -class DataDirError(DvcException): - def __init__(self, msg): - super(DataDirError, self).__init__(msg) - - -class DataItemInConfigDirError(DataDirError): - def __init__(self, file): - msg = 'File "{}" is in config directory'.format(file) - super(DataDirError, self).__init__(msg) - - -class NotInGitDirError(DataDirError): - def __init__(self, file, git_dir): - msg = 'File "{}" is not in git directory "{}"'.format(file, git_dir) - super(NotInGitDirError, self).__init__(msg) - - -class DataItem(object): - LOCAL_STATE_FILE_PREFIX = '.' - LOCAL_STATE_FILE_SUFFIX = '.dvc_local_state' - CACHE_FILE_SEP = '_' - - def __init__(self, data_file, git, config, cache_file=None): - self._git = git - self._config = config - self._cache_file = cache_file - - self._data = Path(data_file, git) - - if not self._data.abs.startswith(self._git.git_dir_abs): - raise NotInGitDirError(data_file, self._git.git_dir_abs) - - if self._data.abs.startswith(os.path.join(self._git.git_dir_abs, self._config.CONFIG_DIR)): - raise DataItemInConfigDirError(data_file) - - def copy(self, cache_file=None): - if not cache_file: - cache_file = self._cache_file - - return DataItem(self._data.abs, self._git, self._config, cache_file) - - def __hash__(self): - return self.data.dvc.__hash__() - - def __eq__(self, other): - if other == None: - return False - - return self.data.dvc == other.data.dvc - - @property - def data(self): - return self._data - - def _state(self, prefix, suffix): - state_file = os.path.join(self._git.git_dir_abs, - os.path.dirname(self.data.dvc), - prefix + os.path.basename(self.data.dvc) + suffix) - return Path(state_file, self._git) - - @cached_property - def state(self): - return Path(StateFile.find(self).path, self._git) - - @cached_property - def cache_dir_abs(self): - return os.path.join(self._git.git_dir_abs, ConfigI.CACHE_DIR) - - @cached_property - def local_state(self): - return self._state(self.LOCAL_STATE_FILE_PREFIX, self.LOCAL_STATE_FILE_SUFFIX) - - @cached_property - def cache_dir(self): - return os.path.join(self._git.git_dir_abs, self._config.cache_dir) - - @property - def cache(self): - cache_dir = self.cache_dir_abs - - if self._cache_file: - file_name = os.path.relpath(os.path.realpath(self._cache_file), cache_dir) - else: - file_name = str(StateFile.find_md5(self)) - - cache_file = os.path.join(cache_dir, file_name) - return Path(cache_file, self._git) - - def move_data_to_cache(self): - md5 = file_md5(self.data.relative)[0] - self._cache_file = os.path.join(self.cache_dir_abs, md5) - self._git.modify_gitignore([self.data.relative]) - if not os.path.isfile(self.cache.relative): - System.hardlink(self.data.relative, self.cache.relative) - os.chmod(self.data.relative, stat.S_IREAD) - - LocalStateFile(self).save() diff --git a/dvc/path/factory.py b/dvc/path/factory.py deleted file mode 100644 index d81b6f1dbb..0000000000 --- a/dvc/path/factory.py +++ /dev/null @@ -1,59 +0,0 @@ -import os -import re - -from dvc.config import ConfigI -from dvc.path.data_item import DataItem, DataItemError, DataDirError -from dvc.path.path import Path -from dvc.path.stated_data_item import StatedDataItem -from dvc.system import System -from dvc.state_file import StateFile - - -class PathFactory(object): - def __init__(self, settings): - self._git = settings.git - self._config = settings.config - self._settings = settings - self._curr_dir_abs = System.realpath(os.curdir) - - def path(self, relative_raw): - return Path(relative_raw, self._git) - - def data_item(self, data_file, cache_file=None): - return DataItem(data_file, self._git, self._config, cache_file) - - def stated_data_item(self, state, data_file): - return StatedDataItem(state, data_file, self._git, self._config) - - def data_item_from_dvc_path(self, dvc_path, existing=True): - path = Path.from_dvc_path(dvc_path, self._git) - if existing: - return self.existing_data_item(path.relative) - else: - return self.data_item(path.relative) - - def is_data_item(self, fname): - return StateFile.find_by_output(self._settings, fname) != None - - def existing_data_item(self, fname): - if not self.is_data_item(fname): - raise DataItemError(u'No state file found for data file {}'.format(fname)) - - return DataItem(fname, self._git, self._config) - - def to_data_items(self, files): - result = [] - externally_created_files = [] - - for file in files: - try: - data_item = DataItem(file, self._git, self._config) - result.append(data_item) - except DataDirError: - externally_created_files.append(file) - - return result, externally_created_files - - def all_existing_data_items(self, subdir='.', cache_exists=True): - files = StateFile.find_all_data_files(self._git, subdir) - return self.to_data_items(files)[0] diff --git a/dvc/path/path.py b/dvc/path/path.py deleted file mode 100644 index de225eac1b..0000000000 --- a/dvc/path/path.py +++ /dev/null @@ -1,40 +0,0 @@ -import os - -from dvc.system import System - - -class Path(object): - def __init__(self, path, git): - if not os.path.isabs(path): - pwd = System.get_cwd() - path = os.path.normpath(os.path.join(pwd, path)) - - self._abs = path - self._dvc = os.path.relpath(self.abs, git.git_dir_abs) - self._relative = os.path.relpath(self._abs, git.curr_dir_abs) - self._filename = os.path.basename(self._abs) - self._dirname = os.path.dirname(self._abs) - - @staticmethod - def from_dvc_path(dvc_path, git): - return Path(os.path.join(git.git_dir_abs, dvc_path), git) - - @property - def dvc(self): - return self._dvc - - @property - def abs(self): - return self._abs - - @property - def relative(self): - return self._relative - - @property - def filename(self): - return self._filename - - @property - def dirname(self): - return self._dirname \ No newline at end of file diff --git a/dvc/path/stated_data_item.py b/dvc/path/stated_data_item.py deleted file mode 100644 index 15feb3ba68..0000000000 --- a/dvc/path/stated_data_item.py +++ /dev/null @@ -1,39 +0,0 @@ -from dvc.path.data_item import DataItem - - -class StatedDataItem(DataItem): - STATUS_UNTRACKED = '?' - STATUS_DELETE = 'D' - STATUS_MODIFIED = 'M' - STATUS_TYPE_CHANGED = 'T' - - def __init__(self, state, data_file, git, config, cache_file=None): - super(StatedDataItem, self).__init__(data_file, git, config, cache_file) - self._status = state - - @property - def status(self): - return self._status - - def _check_status(self, status): - return self._status.find(status) >= 0 - - @property - def is_removed(self): - return self._check_status(self.STATUS_DELETE) - - @property - def is_modified(self): - return self._check_status(self.STATUS_MODIFIED) \ - or self._check_status(self.STATUS_TYPE_CHANGED) - - @property - def is_new(self): - return self._check_status(self.STATUS_UNTRACKED) - - @property - def is_unusual(self): - return self.is_new or self.is_modified or self.is_removed - - def __repr__(self): - return u'({}, {})'.format(self.state, self.data.dvc) diff --git a/dvc/project.py b/dvc/project.py new file mode 100644 index 0000000000..ae86e1bc95 --- /dev/null +++ b/dvc/project.py @@ -0,0 +1,227 @@ +import os +import itertools +import networkx as nx + +from dvc.logger import Logger +from dvc.exceptions import DvcException +from dvc.stage import Stage, Output, Dependency +from dvc.config import Config +from dvc.state import State +from dvc.lock import Lock +from dvc.scm import SCM +from dvc.cache import Cache +from dvc.data_cloud import DataCloud + + +class PipelineError(DvcException): + pass + + +class StageNotInPipelineError(PipelineError): + pass + + +class StageNotFoundError(DvcException): + pass + + +class Pipeline(object): + + def __init__(self, project, G): + self.project = project + self.G = G + + def graph(self): + return self.G + + def stages(self): + return nx.get_node_attributes(self.G, 'stage') + + def changed(self, stage): + for node in nx.dfs_postorder_nodes(G, stage.path.relative_to(self.project.root_dir)): + if self.stages[node].changed(): + return True + return False + + def reproduce(self, stage): + if stage not in self.stages(): + raise StageNotInPipelineError() + + if not self.changed(stage): + raise PipelineNotChangedError() + + for node in nx.dfs_postorder_nodes(G, stage.path.relative_to(self.project.root_dir)): + self.stages[node].reproduce() + + stage.reproduce() + + +class Project(object): + DVC_DIR = '.dvc' + + def __init__(self, root_dir): + self.root_dir = os.path.abspath(root_dir) + self.dvc_dir = os.path.join(self.root_dir, self.DVC_DIR) + + self.scm = SCM(self.root_dir) + self.lock = Lock(self.dvc_dir) + self.cache = Cache(self.dvc_dir) + self.state = State(self.root_dir, self.dvc_dir) + self.config = Config(self.dvc_dir) + self.logger = Logger() + self.cloud = DataCloud(self.config._config) + + @staticmethod + def init(root_dir): + root_dir = os.path.abspath(root_dir) + dvc_dir = os.path.join(root_dir, Project.DVC_DIR) + os.mkdir(dvc_dir) + + config = Config.init(dvc_dir) + cache = Cache.init(dvc_dir) + state = State.init(root_dir, dvc_dir) + lock = Lock(dvc_dir) + + scm = SCM(root_dir) + scm.ignore_list([cache.cache_dir, + state.state_file, + lock.lock_file]) + scm.commit('DVC init') + + return Project(root_dir) + + def add(self, fname): + path = os.path.abspath(fname) + Stage.STAGE_FILE_SUFFIX + cwd = os.path.dirname(path) + outputs = [Output.loads(self, os.path.basename(fname), use_cache=True, cwd=cwd)] + stage = Stage(project=self, + path=path, + cmd=None, + cwd=cwd, + outs=outputs, + deps=[], + locked=True) + stage.save() + stage.dump() + + def remove(self, fname): + path = os.path.abspath(fname) + for out in self.outs(): + if out.path == path: + out.stage().remove() + + def _add_orphans(self, deps): + outs = [out.path for out in self.outs()] + for dep in deps: + if not dep.use_cache or dep.path in outs: + continue + self.add(dep.path) + + def run(self, cmd, deps, deps_no_cache, outs, outs_no_cache, locked, fname, cwd): + cwd = os.path.abspath(cwd) + path = os.path.join(cwd, fname) + outputs = Output.loads_from(self, outs, use_cache=True, cwd=cwd) + outputs += Output.loads_from(self, outs_no_cache, use_cache=False, cwd=cwd) + deps = Dependency.loads_from(self, deps, use_cache=True, cwd=cwd) + deps += Dependency.loads_from(self, deps_no_cache, use_cache=False, cwd=cwd) + + # Add files that were specified as cached dependencies and weren't added before + self._add_orphans(deps) + + stage = Stage(project=self, + path=path, + cmd=cmd, + cwd=cwd, + outs=outputs, + deps=deps, + locked=locked) + stage.run() + stage.dump() + + def reproduce(self, targets, recursive=False, force=False): + stages = nx.get_node_attributes(self.graph(), 'stage') + for target in targets: + node = os.path.relpath(os.path.abspath(target), self.root_dir) + if node not in stages: + raise StageNotFoundError(target) + + if recursive: + for node in nx.dfs_postorder_nodes(self.graph(), node): + stages[node].reproduce(force=force) + + stages[node].reproduce(force=force) + + def checkout(self): + for stage in self.stages(): + stage.checkout() + + def _used_cache(self): + clist = [] + for stage in self.stages(): + for entry in itertools.chain(stage.outs, stage.deps): + if not entry.use_cache: + continue + if entry.cache not in clist: + clist.append(entry.cache) + return clist + + def gc(self): + clist = self._used_cache() + for cache in self.cache.all(): + if cache in clist: + continue + os.unlink(cache) + self.logger.info('\'{}\' was removed'.format(cache)) + + def push(self, jobs=1): + self.cloud.push(self._used_cache(), jobs) + + def pull(self, jobs=1): + self.cloud.pull(self._used_cache(), jobs) + for stage in self.stages(): + for entry in itertools.chain(stage.outs, stage.deps): + if entry.use_cache: + entry.link() + + def status(self, jobs=1): + return self.cloud.status(self._used_cache(), jobs) + + def graph(self): + G = nx.DiGraph() + + for stage in self.stages(): + node = os.path.relpath(stage.path, self.root_dir) + G.add_node(node, stage=stage) + for dep in stage.deps: + dep_stage = dep.stage() + if not dep_stage: + continue + dep_node = os.path.relpath(dep_stage.path, self.root_dir) + G.add_node(dep_node, stage=dep_stage) + G.add_edge(node, dep_node) + + return G + + def stages(self): + stages = [] + for root, dirs, files in os.walk(self.root_dir): + for fname in files: + path = os.path.join(root, fname) + if not Stage.is_stage_file(path): + continue + stages.append(Stage.load(self, path)) + return stages + + def outs(self): + outs = [] + for stage in self.stages(): + outs += stage.outs + return outs + + def pipelines(self): + pipelines = [] + for G in nx.weakly_connected_component_subgraphs(self.graph()): + pipeline = Pipeline(self, G) + pipelines.append(pipeline) + + return pipelines diff --git a/dvc/scm.py b/dvc/scm.py new file mode 100644 index 0000000000..4a5082316a --- /dev/null +++ b/dvc/scm.py @@ -0,0 +1,87 @@ +import os +import git + +from dvc.exceptions import DvcException +from dvc.command.common.branch_changer import BranchChanger + + +class FileNotInRepoError(DvcException): + pass + + +class Base(object): + @staticmethod + def is_repo(root_dir): + return True + + def ignore(self, path): + pass + + def ignore_list(self, p_list): + return [self.ignore(path) for path in p_list] + + def add(self, path): + pass + + def add_list(self, p_list): + return [self.add(path) for path in p_list] + + def commit(self, msg): + pass + + def checkout(self, branch): + pass + + def branch(self, branch): + pass + + def brancher(self, branch, new_branch): + return BranchChanger(self, branch, new_branch) + + +class Git(Base): + GITIGNORE = '.gitignore' + GIT_DIR = '.git' + + def __init__(self, root_dir): + self.root_dir = root_dir + self.repo = git.Repo(root_dir) + + @staticmethod + def is_repo(root_dir): + git_dir = os.path.join(root_dir, Git.GIT_DIR) + return os.path.isdir(git_dir) + + def ignore(self, path): + entry = os.path.basename(path) + gitignore = os.path.join(os.path.dirname(path), self.GITIGNORE) + + if not gitignore.startswith(self.root_dir): + raise FileNotInRepoError() + + if os.path.exists(gitignore) and entry in open(gitignore, 'r').read(): + return + + open(gitignore, 'a').write('\n' + entry) + + def add(self, path): + self.repo.index.add(path) + + def commit(self, msg): + self.repo.index.commit(msg) + + def checkout(self, branch, create_new=False): + if create_new: + self.repo.git.checkout('HEAD', b=branch) + else: + self.repo.git.checkout(branch) + + def branch(self, branch): + self.repo.git.branch(branch) + + +def SCM(root_dir): + if Git.is_repo(root_dir): + return Git(root_dir) + + return Base(root_dir) diff --git a/dvc/stage.py b/dvc/stage.py new file mode 100644 index 0000000000..0fe8a57c27 --- /dev/null +++ b/dvc/stage.py @@ -0,0 +1,249 @@ +import os +import stat +import yaml +import itertools + +from dvc.system import System +from dvc.data_cloud import file_md5 +from dvc.exceptions import DvcException +from dvc.executor import Executor + + +class OutputNoCacheError(DvcException): + pass + + +class Output(object): + PARAM_PATH = 'path' + PARAM_MD5 = 'md5' + PARAM_CACHE = 'cache' + + def __init__(self, project, path, md5=None, use_cache=False): + self.project = project + self.path = path + self.md5 = md5 + self.use_cache = use_cache + + def _changed_md5(self): + return self.md5 != file_md5(self.path)[0] + + def changed(self): + if not self.use_cache: + changed = self._changed_md5() + else: + changed = not os.path.samefile(self.path, self.cache) + + if changed: + self.project.logger.debug('{} changed'.format(self.path)) + + return changed + + @property + def cache(self): + return self.project.cache.get(self.md5) + + def link(self, checkout=False): + if not self.use_cache: + raise OutputNoCacheError() + + if not os.path.exists(self.path) and not os.path.exists(self.cache): + raise OutputNoCacheError() + + if os.path.exists(self.path) and os.path.exists(self.cache) and os.path.samefile(self.path, self.cache): + return + + if os.path.exists(self.cache): + if os.path.exists(self.path): + # This means that we already have cache for this data. + # We remove data and link it to existing cache to save + # some space. + os.unlink(self.path) + src = self.cache + link = self.path + elif not checkout: + src = self.path + link = self.cache + else: + raise OutputNoCacheError() + + System.hardlink(src, link) + + def checkout(self): + if not self.use_cache: + return + self.link(checkout=True) + + def mtime(self): + return os.path.getmtime(self.path) + + def update(self, md5=None): + self.md5 = md5 + if not self.md5: + self.md5 = file_md5(self.path)[0] + self.project.state.update(self.path, self.md5, self.mtime()) + + def save(self): + if not self.use_cache: + return + + self.project.scm.ignore(self.path) + self.link() + os.chmod(self.path, stat.S_IREAD) + + def dumpd(self, cwd): + return { + Output.PARAM_PATH: os.path.relpath(self.path, cwd), + Output.PARAM_MD5: self.md5, + Output.PARAM_CACHE: self.use_cache + } + + @classmethod + def loadd(cls, project, d, cwd='.'): + path = os.path.join(cwd, d[Output.PARAM_PATH]) + md5 = d[Output.PARAM_MD5] + use_cache = d[Output.PARAM_CACHE] + return cls(project, path, md5, use_cache=use_cache) + + @classmethod + def loadd_from(cls, project, d_list, cwd='.'): + return [cls.loadd(project, x, cwd=cwd) for x in d_list] + + @classmethod + def loads(cls, project, s, use_cache=False, cwd='.'): + return cls(project, os.path.join(cwd, s), None, use_cache=use_cache) + + @classmethod + def loads_from(cls, project, s_list, use_cache=False, cwd='.'): + return [cls.loads(project, x, use_cache, cwd=cwd) for x in s_list] + + def stage(self): + for stage in self.project.stages(): + for out in stage.outs: + if self.path == out.path: + return stage + return None + + +class Dependency(Output): + def update(self): + md5 = None + state = self.project.state.get(self.path) + if state and state.mtime == self.mtime(): + md5 = state.md5 + msg = '{} using md5 from state file for dependency' + self.project.logger.debug(msg.format(self.path)) + + super(Dependency, self).update(md5=md5) + + +class Stage(object): + STAGE_FILE = 'Dvcfile' + STAGE_FILE_SUFFIX = '.dvc' + + PARAM_CMD = 'cmd' + PARAM_DEPS = 'deps' + PARAM_OUTS = 'outs' + PARAM_LOCKED = 'locked' + + def __init__(self, project, path=None, cmd=None, cwd=None, deps=[], outs=[], locked=False): + self.project = project + self.path = path + self.cmd = cmd + self.cwd = cwd + self.outs = outs + self.deps = deps + self.locked = locked + + @staticmethod + def is_stage_file(path): + if not os.path.isfile(path): + return False + + if not path.endswith(Stage.STAGE_FILE_SUFFIX) and os.path.basename(path) != Stage.STAGE_FILE: + return False + + return True + + def changed(self): + for entry in itertools.chain(self.outs, self.deps): + if entry.changed(): + self.project.logger.debug("{} changed".format(self.path)) + return True + return False + + def remove_outs(self): + for out in self.outs: + self.project.logger.debug("Removing '{}'".format(out.path)) + os.unlink(out.path) + + def remove(self): + self.remove_outs() + os.unlink(self.path) + + def reproduce(self, force=False): + if not self.changed() and not force: + return + + if self.cmd: + # Removing outputs only if we actually have command to reproduce + self.remove_outs() + self.run() + self.project.logger.debug("{} reproduced".format(self.path)) + + @staticmethod + def loadd(project, d, path): + path = os.path.abspath(path) + cwd = os.path.dirname(path) + cmd = d[Stage.PARAM_CMD] + deps = Dependency.loadd_from(project, d[Stage.PARAM_DEPS], cwd=cwd) + outs = Output.loadd_from(project, d[Stage.PARAM_OUTS], cwd=cwd) + locked = d[Stage.PARAM_LOCKED] + + return Stage(project=project, + path=path, + cmd=cmd, + cwd=cwd, + deps=deps, + outs=outs, + locked=locked) + + @staticmethod + def load(project, fname): + with open(fname, 'r') as fd: + return Stage.loadd(project, yaml.load(fd), fname) + + def dumpd(self): + deps = [x.dumpd(self.cwd) for x in self.deps] + outs = [x.dumpd(self.cwd) for x in self.outs] + + return { + Stage.PARAM_CMD: self.cmd, + Stage.PARAM_DEPS: deps, + Stage.PARAM_OUTS: outs, + Stage.PARAM_LOCKED: self.locked + } + + def dump(self, fname=None): + if not fname: + fname = self.path + + with open(fname, 'w') as fd: + yaml.dump(self.dumpd(), fd, default_flow_style=False) + + def save(self): + for dep in self.deps: + dep.update() + dep.save() + + for out in self.outs: + out.update() + out.save() + + def run(self): + if self.cmd: + Executor.exec_cmd_only_success(self.cmd, cwd=str(self.cwd), shell=True) + self.save() + + def checkout(self): + for entry in itertools.chain(self.outs, self.deps): + entry.checkout() diff --git a/dvc/state.py b/dvc/state.py new file mode 100644 index 0000000000..ddd579e928 --- /dev/null +++ b/dvc/state.py @@ -0,0 +1,79 @@ +import os +import tinydb + +from dvc.exceptions import DvcException + + +class StateEntry(object): + PARAM_PATH = 'path' + PARAM_MTIME = 'mtime' + PARAM_MD5 = 'md5' + + def __init__(self, root_dir, path, md5, mtime): + self.root_dir = root_dir + self.path = path + self.mtime = mtime + self.md5 = md5 + self.dvc_path = os.path.relpath(self.path, self.root_dir) + + def update(self, md5, mtime): + self.mtime = mtime + self.md5 = md5 + + @staticmethod + def loadd(root_dir, d): + path = os.path.join(root_dir, d[StateEntry.PARAM_PATH]) + mtime = d[StateEntry.PARAM_MTIME] + md5 = d[StateEntry.PARAM_MD5] + return StateEntry(root_dir, path, md5, mtime) + + def dumpd(self): + return { + self.PARAM_PATH: self.dvc_path, + self.PARAM_MD5: self.md5, + self.PARAM_MTIME: self.mtime + } + + +class StateDuplicateError(DvcException): + pass + + +class State(object): + STATE_FILE = 'state' + + def __init__(self, root_dir, dvc_dir): + self.root_dir = root_dir + self.dvc_dir = dvc_dir + self.state_file = os.path.join(dvc_dir, self.STATE_FILE) + self._db = tinydb.TinyDB(self.state_file) + self._q = tinydb.Query() + + @staticmethod + def init(root_dir, dvc_dir): + return State(root_dir, dvc_dir) + + def update(self, path, md5, mtime): + existing = self.get(path) + if not existing: + return self.add(path, md5, mtime) + + state = StateEntry(self.root_dir, path, md5, mtime) + self._db.update(state.dumpd(), self._q.path == state.dvc_path) + + return state + + def add(self, path, md5, mtime): + entry = StateEntry(self.root_dir, path, md5, mtime) + self._db.insert(entry.dumpd()) + return entry + + def get(self, path): + d_list = self._db.search(self._q.path == os.path.relpath(path, self.root_dir)) + if not len(d_list): + return None + + if len(d_list) > 1: + raise StateDuplicateError() + + return StateEntry.loadd(self.root_dir, d_list[0]) diff --git a/dvc/state_file.py b/dvc/state_file.py deleted file mode 100644 index 4e2af5ae6c..0000000000 --- a/dvc/state_file.py +++ /dev/null @@ -1,223 +0,0 @@ -import os -import yaml - -from dvc.exceptions import DvcException -from dvc.data_cloud import file_md5 -from dvc.path.path import Path - - -class StateFileError(DvcException): - def __init__(self, msg): - DvcException.__init__(self, 'State file error: {}'.format(msg)) - - -class StateFileBase(object): - @staticmethod - def _save(fname, data): - with open(fname, 'w') as fd: - yaml.dump(data, fd, default_flow_style=False) - - @staticmethod - def _load(fname, state_class, *args): - with open(fname, 'r') as fd: - data = yaml.load(fd) - return state_class.loadd(data, *args) - - -class StateFile(StateFileBase): - DVCFILE_NAME = 'Dvcfile' - STATE_FILE_SUFFIX = '.dvc' - - PARAM_CMD = 'cmd' - PARAM_DEPS = 'deps' - PARAM_LOCKED = 'locked' - PARAM_OUT = 'out' - PARAM_OUT_GIT = 'out-git' - - def __init__(self, - fname=None, - cmd=None, - out=None, - out_git=None, - deps=None, - locked=None, - cwd=''): - super(StateFile, self).__init__() - - self.cmd = cmd - self.out = out - self.out_git = out_git - self.deps = deps - self.locked = locked - - self.path = os.path.abspath(fname) if fname else None - self.cwd = cwd - - @staticmethod - def parse_deps_state(settings, deps, currdir=None): - state = {} - for dep in deps: - if settings.path_factory.is_data_item(dep): - item = settings.path_factory.data_item(dep) - md5 = StateFile.find_md5(item) - else: - md5 = file_md5(os.path.join(settings.git.git_dir_abs, dep))[0] - - if currdir: - name = os.path.relpath(dep, currdir) - else: - name = dep - - state[name] = md5 - return state - - @staticmethod - def loadd(data, fname=None): - return StateFile(fname=fname, - cmd=data.get(StateFile.PARAM_CMD, None), - out=data.get(StateFile.PARAM_OUT, None), - out_git=data.get(StateFile.PARAM_OUT_GIT, None), - deps=data.get(StateFile.PARAM_DEPS, []), - locked=data.get(StateFile.PARAM_LOCKED, None)) - - @staticmethod - def load(fname): - return StateFile._load(fname, StateFile, fname) - - @staticmethod - def _is_state_file(path): - return (path.endswith(StateFile.STATE_FILE_SUFFIX) or \ - os.path.basename(path) == StateFile.DVCFILE_NAME) and \ - os.path.isfile(path) - - @staticmethod - def _find_state(fname, dname): - name = os.path.relpath(fname, dname) - for entry in os.listdir(dname): - state_file = os.path.join(dname, entry) - if not StateFile._is_state_file(state_file): - continue - state = StateFile.load(os.path.join(dname, state_file)) - if name in state.out: - return state - return None - - @staticmethod - def _find(name, start, finish): - dname = start - fname = name - while True: - state = StateFile._find_state(fname, dname) - if state: - return state - - if dname == finish: - break - - dname = os.path.dirname(dname) - return None - - @staticmethod - def find(data_item): - return StateFile._find(data_item.data.abs, data_item.data.dirname, data_item._git.git_dir_abs) - - @staticmethod - def find_by_output(settings, output): - path = os.path.abspath(output) - return StateFile._find(path, os.path.dirname(path), settings.git.git_dir_abs) - - @staticmethod - def find_all_state_files(git, subdir='.'): - states = [] - for root, dirs, files in os.walk(os.path.join(git.git_dir_abs, subdir)): - for fname in files: - path = os.path.join(root, fname) - - if not StateFile._is_state_file(path): - continue - - states.append(path) - return states - - @staticmethod - def find_all_states(git, subdir='.'): - state_files = StateFile.find_all_state_files(git, subdir) - return [StateFile.load(state_file) for state_file in state_files] - - @staticmethod - def find_all_data_files(git, subdir='.'): - states = StateFile.find_all_states(git, subdir) - files = [] - for state in states: - for out in state.out: - files.append(os.path.join(state.cwd, out)) - return files - - @staticmethod - def find_all_cache_files(git, subdir='.'): - states = StateFile.find_all_states(git, subdir) - cache_files = [] - for state in states: - for out,md5 in state.out.items(): - cache_files.append(md5) - return cache_files - - @staticmethod - def find_md5(data_item): - state = StateFile.find(data_item) - name = os.path.relpath(data_item.data.abs, os.path.dirname(state.path)) - return state.out[name] - - @staticmethod - def loads(content): - data = yaml.load(content) - return StateFile.loadd(data) - - def save(self): - res = { - self.PARAM_CMD: self.cmd, - self.PARAM_OUT: self.out, - self.PARAM_OUT_GIT: self.out_git, - self.PARAM_DEPS: self.deps, - self.PARAM_LOCKED: self.locked - } - - self._save(self.path, res) - - -class LocalStateFile(StateFileBase): - MAGIC = 'DVC-Local-State' - VERSION = '0.1' - - PARAM_DATA_TIMESTAMP = 'DataTimestamp' - PARAM_CACHE_TIMESTAMP = 'CacheTimestamp' - - def __init__(self, data_item, data_timestamp=None, cache_timestamp=None): - super(LocalStateFile, self).__init__() - - self.data_item = data_item - self.data_timestamp = data_timestamp - self.cache_timestamp = cache_timestamp - - if not data_timestamp: - self.data_timestamp = os.path.getmtime(self.data_item.data.relative) - if not cache_timestamp: - self.cache_timestamp = os.path.getmtime(self.data_item.cache.relative) - - @staticmethod - def loadd(data): - return LocalStateFile(None, - data.get(LocalStateFile.PARAM_DATA_TIMESTAMP, None), - data.get(LocalStateFile.PARAM_CACHE_TIMESTAMP, None)) - - @staticmethod - def load(data_item): - return LocalStateFile._load(data_item.local_state.relative, LocalStateFile) - - def save(self): - res = { - self.PARAM_DATA_TIMESTAMP : self.data_timestamp, - self.PARAM_CACHE_TIMESTAMP : self.cache_timestamp - } - - self._save(self.data_item.local_state.relative, res) diff --git a/functest/common.sh b/functest/common.sh index 1cb1c759cf..d54f82356f 100644 --- a/functest/common.sh +++ b/functest/common.sh @@ -132,7 +132,6 @@ function dvc_create_repo() { mkdir data cp $DATA_CACHE/* data - dvc add data } function dvc_clean_cloud_aws() { diff --git a/functest/run.sh b/functest/run.sh index 3f9b1068ad..ea81b60384 100755 --- a/functest/run.sh +++ b/functest/run.sh @@ -5,12 +5,13 @@ set -e source common.sh TESTS+=" test-init.sh" -TESTS+=" test-add.sh" +TESTS+=" test-add-remove.sh" TESTS+=" test-sync.sh" TESTS+=" test-repro-code.sh" -TESTS+=" test-merge.sh" +#TESTS+=" test-merge.sh" TESTS+=" test-checkout.sh" TESTS+=" test-gc.sh" +TESTS+=" test-lock.sh" for t in $TESTS; do rm -rf $TEST_REPO @@ -18,4 +19,4 @@ for t in $TESTS; do ./$t done -rm -rf $TEST_REPO +#rm -rf $TEST_REPO diff --git a/functest/test-add-remove.sh b/functest/test-add-remove.sh new file mode 100755 index 0000000000..72fca7729d --- /dev/null +++ b/functest/test-add-remove.sh @@ -0,0 +1,22 @@ +#!/bin/bash + +set -e + +source common.sh + +dvc_create_repo + +dvc add data/foo +dvc_check_files data/foo.dvc data/foo + +dvc remove data/foo +if [ -f "data/foo" ]; then + echo "data/foo was not removed" + dvc_fail +fi +if [ -f "data/foo.dvc" ]; then + echo "data/foo.dvc was not removed" + dvc_fail +fi + +dvc_pass diff --git a/functest/test-add.sh b/functest/test-add.sh deleted file mode 100755 index 4b0655db87..0000000000 --- a/functest/test-add.sh +++ /dev/null @@ -1,13 +0,0 @@ -#!/bin/bash - -set -e - -source common.sh - -dvc_create_repo - -dvc_info "Add file" -cp $DATA_CACHE/foo data/local -dvc add data/local -dvc_check_files "data/local data/local.dvc data/.local.dvc_local_state" -dvc_pass diff --git a/functest/test-checkout.sh b/functest/test-checkout.sh index b58e7c0626..4d2a0897d6 100755 --- a/functest/test-checkout.sh +++ b/functest/test-checkout.sh @@ -6,7 +6,9 @@ source common.sh dvc_create_repo +dvc run -D code/code.sh -d data/foo -o data/foo1 bash code/code.sh data/foo data/foo1 + rm -f data/foo dvc checkout -dvc_check_files data/foo data/foo.dvc +dvc_check_files data/foo dvc_pass diff --git a/functest/test-lock.sh b/functest/test-lock.sh new file mode 100755 index 0000000000..3dfe6ca690 --- /dev/null +++ b/functest/test-lock.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +set -e + +source common.sh + +dvc_create_repo + +dvc run -D code/code.sh -d data/foo -o data/foo1 bash code/code.sh data/foo data/foo1 +cat Dvcfile | grep 'locked' | grep 'false' || dvc_fatal "stage locked after 'dvc run'" +dvc lock +cat Dvcfile | grep 'locked' | grep 'true' || dvc_fatal "stage not locked after 'dvc lock'" +dvc lock --unlock +cat Dvcfile | grep 'locked' | grep 'false' || dvc_fatal "stage locked after 'dvc lock --unlock'" +dvc_pass diff --git a/functest/test-repro-code.sh b/functest/test-repro-code.sh index 519468e972..27f071b81a 100755 --- a/functest/test-repro-code.sh +++ b/functest/test-repro-code.sh @@ -10,23 +10,22 @@ function test_generic() { dvc_create_repo dvc_info 'Copy foo into foo1' - dvc run -d code/code.sh -d data/foo -o data/foo1 bash code/code.sh data/foo data/foo1 + dvc run -D code/code.sh -d data/foo -o data/foo1 bash code/code.sh data/foo data/foo1 dvc_info 'Modify code' echo " " >> code/code.sh git commit -am 'Change code' dvc_info 'Reproduce foo1' - dvc repro data/foo1 + dvc repro dvc_check_files data/foo1 data/foo1.dvc if [ "$(cat data/foo1)" != "foo" ]; then dvc_fail fi dvc_info 'Modify foo' - dvc remove data/foo + rm -f data/foo cp $DATA_CACHE/bar data/foo - dvc add data/foo dvc_info 'Reproduce foo1 as default target' dvc repro @@ -48,9 +47,9 @@ function test_partial() { git commit -m "copy code" dvc_info "Create repro chain foo -> foo1 -> foo2 -> foo3" - dvc run -f copy_foo_foo1.dvc -d code/code1.sh -d data/foo -o data/foo1 cp data/foo data/foo1 - dvc run -f copy_foo1_foo2.dvc -d code/code2.sh -d data/foo1 -o data/foo2 cp data/foo1 data/foo2 - dvc run -f copy_foo2_foo3.dvc -d code/code3.sh -d data/foo2 -o data/foo3 cp data/foo2 data/foo3 + dvc run -f copy_foo_foo1.dvc -D code/code1.sh -d data/foo -o data/foo1 cp data/foo data/foo1 + dvc run -f copy_foo1_foo2.dvc -D code/code2.sh -d data/foo1 -o data/foo2 cp data/foo1 data/foo2 + dvc run -f copy_foo2_foo3.dvc -D code/code3.sh -d data/foo2 -o data/foo3 cp data/foo2 data/foo3 dvc_info "Save original timestamps" FOO_TS=$(dvc_timestamp data/foo) @@ -64,7 +63,7 @@ function test_partial() { git commit -m "modify code1.sh" dvc_info "Reproduce data/foo3" - dvc repro data/foo3 + dvc repro copy_foo2_foo3.dvc dvc_info "Check timestamps" if [ "$FOO_TS" != "$(dvc_timestamp data/foo)" ]; then @@ -72,8 +71,8 @@ function test_partial() { dvc_fail fi - if [ "$FOO1_TS" == "$(dvc_timestamp data/foo1)" ]; then - dvc_error "data/foo1 timestamp did not change" + if [ "$FOO1_TS" != "$(dvc_timestamp data/foo1)" ]; then + dvc_error "data/foo1 timestamp changed" dvc_fail fi diff --git a/functest/test-sync.sh b/functest/test-sync.sh index d9e3ebfa33..94463cded7 100755 --- a/functest/test-sync.sh +++ b/functest/test-sync.sh @@ -5,28 +5,30 @@ set -e source common.sh function test_sync() { + dvc run -D code/code.sh -d data/foo -o data/foo1 bash code/code.sh data/foo data/foo1 + dvc_info "Checking status" - dvc status data/ | grep "new file" || dvc_fail + dvc status | grep "new file" || dvc_fail dvc_info "Pushing data" - dvc push data/ + dvc push dvc_info "Checking status" - dvc status data/ + dvc status dvc_info "Removing all cache" rm -rf .dvc/cache/* rm -rf data/foo data/bar dvc_info "Checking status" - dvc status data/ | grep "deleted" || dvc_fail + dvc status | grep "deleted" || dvc_fail dvc_info "Pulling data" - dvc pull data/ + dvc pull dvc_check_files data/foo data/bar dvc_info "Checking status" - dvc status data/ + dvc status } function test_aws() { diff --git a/requirements.txt b/requirements.txt index cd8e3ff868..8a630af32a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -91,3 +91,5 @@ networkx==1.11 gcs-oauth2-boto-plugin==1.14 xmltodict==0.11.0 pyyaml==3.12 +tinydb==3.7.0 +gitpython==2.1.8 diff --git a/setup.py b/setup.py index 2d1ef2b839..1a005398bf 100644 --- a/setup.py +++ b/setup.py @@ -96,6 +96,8 @@ 'networkx==1.11', 'gcs-oauth2-boto-plugin==1.14', 'pyyaml==3.12', + 'tinydb==3.7.0', + 'gitpython==2.1.8', ] if platform.system() == 'Darwin':