Skip to content

Commit

Permalink
Data item class renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
dmpetrov committed Mar 26, 2017
1 parent eaff710 commit 7990422
Show file tree
Hide file tree
Showing 15 changed files with 135 additions and 134 deletions.
4 changes: 2 additions & 2 deletions dvc/cmd_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,5 @@ def run(self):
def get_cache_file_s3_name(self, cache_file):
cache_prefix_file_name = os.path.relpath(os.path.realpath(cache_file), os.path.realpath(self.git.git_dir))
file_name = os.path.relpath(cache_prefix_file_name, self.config.cache_dir)
nlx_file_path_trim = file_name.replace(os.sep, '/').strip('/')
return self.config.aws_storage_prefix + '/' + nlx_file_path_trim
dvc_file_path_trim = file_name.replace(os.sep, '/').strip('/')
return self.config.aws_storage_prefix + '/' + dvc_file_path_trim
28 changes: 14 additions & 14 deletions dvc/cmd_data_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,40 +71,40 @@ def import_file(self, input, output, is_reproducible):
if os.path.isdir(output):
output = os.path.join(output, os.path.basename(input))

data_path = self.path_factory.data_path(output)
data_item = self.path_factory.data_item(output)

if os.path.exists(data_path.data.relative):
raise DataImportError('Output file "{}" already exists'.format(data_path.data.relative))
if not os.path.isdir(os.path.dirname(data_path.data.relative)):
if os.path.exists(data_item.data.relative):
raise DataImportError('Output file "{}" already exists'.format(data_item.data.relative))
if not os.path.isdir(os.path.dirname(data_item.data.relative)):
raise DataImportError('Output file directory "{}" does not exists'.format(
os.path.dirname(data_path.data.relative)))
os.path.dirname(data_item.data.relative)))

cache_dir = os.path.dirname(data_path.cache.relative)
cache_dir = os.path.dirname(data_item.cache.relative)
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)

if CmdDataImport.is_url(input):
Logger.debug('Downloading file {} ...'.format(input))
self.download_file(input, data_path.cache.relative)
self.download_file(input, data_item.cache.relative)
Logger.debug('Input file "{}" was downloaded to cache "{}"'.format(
input, data_path.cache.relative))
input, data_item.cache.relative))
else:
copyfile(input, data_path.cache.relative)
copyfile(input, data_item.cache.relative)
Logger.debug('Input file "{}" was copied to cache "{}"'.format(
input, data_path.cache.relative))
input, data_item.cache.relative))

data_path.create_symlink()
data_item.create_symlink()
Logger.debug('Symlink from data file "{}" to the cache file "{}" was created'.
format(data_path.data.relative, data_path.cache.relative))
format(data_item.data.relative, data_item.cache.relative))

state_file = StateFile(data_path.state.relative,
state_file = StateFile(data_item.state.relative,
self.git,
[input],
[output],
[],
is_reproducible)
state_file.save()
Logger.debug('State file "{}" was created'.format(data_path.state.relative))
Logger.debug('State file "{}" was created'.format(data_item.state.relative))
pass

URL_REGEX = re.compile(
Expand Down
38 changes: 19 additions & 19 deletions dvc/cmd_data_remove.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ def remove_dir(self, target):
if not self.args.recursive:
raise DataRemoveError('Directory "%s" cannot be removed. Use --recurcive flag.' % target)

data_path = self.path_factory.data_path(target)
if data_path.data_dvc_short == '':
data_item = self.path_factory.data_item(target)
if data_item.data_dvc_short == '':
raise DataRemoveError('Data directory "%s" cannot be removed' % target)

return self.remove_dir_file_by_file(target)
Expand All @@ -86,35 +86,35 @@ def remove_dir_if_empty(file):

def remove_data_instance(self, target):
# it raises exception if not a symlink is provided
data_path = self.path_factory.existing_data_path(target)
data_item = self.path_factory.existing_data_item(target)

self._remove_cache_file(data_path)
self._remove_state_file(data_path)
self._remove_cloud_cache(data_path)
self._remove_cache_file(data_item)
self._remove_state_file(data_item)
self._remove_cloud_cache(data_item)

os.remove(data_path.data.relative)
os.remove(data_item.data.relative)
pass

def _remove_cloud_cache(self, data_path):
def _remove_cloud_cache(self, data_item):
if not self.args.keep_in_cloud:
aws_key = self.cache_file_aws_key(data_path.cache.dvc)
aws_key = self.cache_file_aws_key(data_item.cache.dvc)
self.remove_from_cloud(aws_key)

def _remove_state_file(self, data_path):
if os.path.isfile(data_path.state.relative):
os.remove(data_path.state.relative)
self.remove_dir_if_empty(data_path.state.relative)
def _remove_state_file(self, data_item):
if os.path.isfile(data_item.state.relative):
os.remove(data_item.state.relative)
self.remove_dir_if_empty(data_item.state.relative)
else:
Logger.warn(u'State file {} for data instance {} does not exist'.format(
data_path.state.relative, data_path.data.relative))
data_item.state.relative, data_item.data.relative))

def _remove_cache_file(self, data_path):
if not self.args.keep_in_cache and os.path.isfile(data_path.cache.relative):
os.remove(data_path.cache.relative)
self.remove_dir_if_empty(data_path.cache.relative)
def _remove_cache_file(self, data_item):
if not self.args.keep_in_cache and os.path.isfile(data_item.cache.relative):
os.remove(data_item.cache.relative)
self.remove_dir_if_empty(data_item.cache.relative)
else:
if not self.args.keep_in_cache:
Logger.warn(u'Unable to find cache file for data instance %s' % data_path.data.relative)
Logger.warn(u'Unable to find cache file for data instance %s' % data_item.data.relative)

def remove_from_cloud(self, aws_file_name):
conn = S3Connection(self.config.aws_access_key_id, self.config.aws_secret_access_key)
Expand Down
30 changes: 15 additions & 15 deletions dvc/cmd_data_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ def define_args(self, parser):

def run(self):
if os.path.islink(self.args.target):
data_path = self.path_factory.existing_data_path(self.args.target)
return self.sync_symlink(data_path)
data_item = self.path_factory.existing_data_item(self.args.target)
return self.sync_symlink(data_item)

if os.path.isdir(self.args.target):
return self.sync_dir(self.args.target)
Expand All @@ -67,48 +67,48 @@ def sync_dir(self, dir):
if os.path.isdir(fname):
self.sync_dir(fname)
elif os.path.islink(fname):
self.sync_symlink(self.path_factory.existing_data_path(fname))
self.sync_symlink(self.path_factory.existing_data_item(fname))
else:
raise DataSyncError('Unsupported file type "{}"'.format(fname))
pass

def sync_symlink(self, data_path):
if os.path.isfile(data_path.cache.relative):
self.sync_to_cloud(data_path)
def sync_symlink(self, data_item):
if os.path.isfile(data_item.cache.relative):
self.sync_to_cloud(data_item)
else:
self.sync_from_cloud(data_path)
self.sync_from_cloud(data_item)
pass

def sync_from_cloud(self, data_path):
aws_key = self.cache_file_aws_key(data_path.cache.dvc)
def sync_from_cloud(self, item):
aws_key = self.cache_file_aws_key(item.cache.dvc)
key = self._bucket.get_key(aws_key)
if not key:
raise DataSyncError('File "{}" does not exist in the cloud'.format(aws_key))

Logger.printing('Downloading cache file from S3 "{}/{}"'.format(self._bucket.name,
aws_key))
key.get_contents_to_filename(data_path.cache.relative, cb=percent_cb)
key.get_contents_to_filename(item.cache.relative, cb=percent_cb)
Logger.printing('Downloading completed')
pass

def sync_to_cloud(self, data_path):
aws_key = self.cache_file_aws_key(data_path.cache.dvc)
def sync_to_cloud(self, data_item):
aws_key = self.cache_file_aws_key(data_item.cache.dvc)
key = self._bucket.get_key(aws_key)
if key:
Logger.debug('File already uploaded to the cloud. Checksum validation...')

md5_cloud = key.etag[1:-1]
md5_local = file_md5(data_path.cache.relative)
md5_local = file_md5(data_item.cache.relative)
if md5_cloud == md5_local:
Logger.debug('File checksum matches. No uploading is needed.')
return

Logger.printing('Checksum miss-match. Re-uploading is required.')

Logger.printing('Uploading cache file "{}" to S3 "{}"'.format(data_path.cache.relative,
Logger.printing('Uploading cache file "{}" to S3 "{}"'.format(data_item.cache.relative,
aws_key))
key = self._bucket.new_key(aws_key)
key.set_contents_from_filename(data_path.cache.relative, cb=percent_cb)
key.set_contents_from_filename(data_item.cache.relative, cb=percent_cb)
Logger.printing('Uploading completed')
pass

Expand Down
40 changes: 20 additions & 20 deletions dvc/cmd_repro.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,29 @@ def run(self):
if not self.skip_git_actions and not self.git.is_ready_to_go():
return 1

data_path_list, external_files = self.path_factory.to_data_path(self.args.target)
if external_files:
data_item_list, external_files_names = self.path_factory.to_data_items(self.args.target)
if external_files_names:
Logger.error('Files from outside of the data directory "{}" could not be reproduced: {}'.
format(self.config.data_dir, ' '.join(external_files)))
format(self.config.data_dir, ' '.join(external_files_names)))
return 1

error = False
changed = False
for data_path in data_path_list:
for data_item in data_item_list:
try:
repro_change = ReproChange(data_path, self)
repro_change = ReproChange(data_item, self)
if repro_change.reproduce():
changed = True
Logger.info(u'Data file "{}" was reproduced.'.format(
data_path.data.relative
data_item.data.relative
))
else:
Logger.info(u'Reproduction is not required for data file "{}".'.format(
data_path.data.relative
data_item.data.relative
))
except ReproError as err:
Logger.error('Error in reproducing data file {}: {}'.format(
data_path.data.relative, str(err)
data_item.data.relative, str(err)
))
error = True
break
Expand All @@ -100,19 +100,19 @@ def run(self):


class ReproChange(object):
def __init__(self, data_path, cmd_obj):
self._data_path = data_path
def __init__(self, data_item, cmd_obj):
self._data_item = data_item
self.git = cmd_obj.git
self._cmd_obj = cmd_obj
self._state = StateFile.load(data_path.state.relative, self.git)
self._state = StateFile.load(data_item.state.relative, self.git)

cmd_obj._code = self._state.code_sources # HACK!!!

argv = self._state.norm_argv

if not argv:
raise ReproError('Error: parameter {} is nor defined in state file "{}"'.
format(StateFile.PARAM_NORM_ARGV, data_path.state.relative))
format(StateFile.PARAM_NORM_ARGV, data_item.state.relative))
if len(argv) < 2:
raise ReproError('Error: reproducible command in state file "{}" is too short'.
format(self._state.file))
Expand All @@ -129,39 +129,39 @@ def were_direct_dependencies_changed(self):

def reproduce_data_file(self):
Logger.debug('Reproducing data file "{}". Removing the file...'.format(
self._data_path.data.relative))
os.remove(self._data_path.data.relative)
self._data_item.data.relative))
os.remove(self._data_item.data.relative)

Logger.debug('Reproducing data file "{}". Re-runs command: {}'.format(
self._data_path.data.relative, ' '.join(self._repro_argv)))
self._data_item.data.relative, ' '.join(self._repro_argv)))
return self._cmd_obj.run_command(self._repro_argv)

def reproduce(self, force=False):
were_input_files_changed = False

if not self._state.is_reproducible:
Logger.debug('Data file "{}" is not reproducible'.format(self._data_path.data.relative))
Logger.debug('Data file "{}" is not reproducible'.format(self._data_item.data.relative))
return False

for input_file in self._state.input_files:
try:
data_path = self._cmd_obj.path_factory.data_path(input_file)
data_item = self._cmd_obj.path_factory.data_item(input_file)
except NotInDataDirError:
raise ReproError(u'The dependency files "{}" is not a data file'.format(input_file))
except Exception as ex:
raise ReproError(u'The dependency files "{}" can not be reproduced: {}'.format(
input_file, ex))

change = ReproChange(data_path, self._cmd_obj)
change = ReproChange(data_item, self._cmd_obj)
if change.reproduce(force):
were_input_files_changed = True

was_source_code_changed = self.git.were_files_changed(self._data_path.data.relative,
was_source_code_changed = self.git.were_files_changed(self._data_item.data.relative,
self._state.code_sources)

if not force and not was_source_code_changed and not were_input_files_changed:
Logger.debug('Data file "{}" is up to date'.format(
self._data_path.data.relative))
self._data_item.data.relative))
return False

return self.reproduce_data_file()
Expand Down
27 changes: 14 additions & 13 deletions dvc/cmd_run.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import sys
import shutil
import fasteners

Expand Down Expand Up @@ -90,28 +91,28 @@ def run_command(self, argv, stdout=None, stderr=None):
self.remove_new_files(repo_change)
return False

changed_files_nlx = self.git.abs_paths_to_nlx(repo_change.changed_files)
output_files = changed_files_nlx + self.git.abs_paths_to_nlx(self.declaration_output_files)
args_files_nlx = self.git.abs_paths_to_nlx(self.get_data_files_from_args(argv))
changed_files_nlx = self.git.abs_paths_to_dvc(repo_change.changed_files)
output_files = changed_files_nlx + self.git.abs_paths_to_dvc(self.declaration_output_files)
args_files_nlx = self.git.abs_paths_to_dvc(self.get_data_files_from_args(argv))

input_files_from_args = list(set(args_files_nlx) - set(changed_files_nlx))
input_files = self.git.abs_paths_to_nlx(input_files_from_args + self.declaration_input_files)
input_files = self.git.abs_paths_to_dvc(input_files_from_args + self.declaration_input_files)

for data_path in repo_change.data_paths_for_changed_files:
dirname = os.path.dirname(data_path.cache.relative)
for data_item in repo_change.data_items_for_changed_files:
dirname = os.path.dirname(data_item.cache.relative)
if not os.path.isdir(dirname):
os.makedirs(dirname)

Logger.debug('Move output file "{}" to cache dir "{}" and create a symlink'.format(
data_path.data.relative, data_path.cache.relative))
shutil.move(data_path.data.relative, data_path.cache.relative)
data_item.data.relative, data_item.cache.relative))
shutil.move(data_item.data.relative, data_item.cache.relative)

data_path.create_symlink()
data_item.create_symlink()

nlx_code_sources = map(lambda x: self.git.abs_paths_to_nlx([x])[0], self.code)
nlx_code_sources = map(lambda x: self.git.abs_paths_to_dvc([x])[0], self.code)

Logger.debug('Create state file "{}"'.format(data_path.state.relative))
state_file = StateFile(data_path.state.relative, self.git,
Logger.debug('Create state file "{}"'.format(data_item.state.relative))
state_file = StateFile(data_item.state.relative, self.git,
input_files,
output_files,
nlx_code_sources,
Expand Down Expand Up @@ -166,7 +167,7 @@ def get_data_files_from_args(self, argv):
for arg in argv:
try:
if os.path.isfile(arg):
self.path_factory.data_path(arg)
self.path_factory.data_item(arg)
result.append(arg)
except NotInDataDirError:
pass
Expand Down
2 changes: 1 addition & 1 deletion dvc/git_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def abs_paths_to_relative(files):

return result

def abs_paths_to_nlx(self, files):
def abs_paths_to_dvc(self, files):
result = []
for file in files:
result.append(os.path.relpath(os.path.abspath(file), self.git_dir_abs))
Expand Down
Loading

0 comments on commit 7990422

Please sign in to comment.