forked from iterative/dvc
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
235 additions
and
84 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
#!/bin/bash | ||
|
||
PYTHONPATH=$NEATLYNX_HOME python $NEATLYNX_HOME/neatlynx/cmd_data_sync.py $@ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
import os | ||
import hashlib | ||
from boto.s3.connection import S3Connection | ||
|
||
from neatlynx.cmd_base import CmdBase, Logger | ||
from neatlynx.exceptions import NeatLynxException | ||
|
||
|
||
class DataSyncError(NeatLynxException): | ||
def __init__(self, msg): | ||
NeatLynxException.__init__(self, 'Data sync error: {}'.format(msg)) | ||
|
||
|
||
def sizeof_fmt(num, suffix='B'): | ||
for unit in ['','K','M','G','T','P','E','Z']: | ||
if abs(num) < 1024.0: | ||
return "%3.1f%s%s" % (num, unit, suffix) | ||
num /= 1024.0 | ||
return "%.1f%s%s" % (num, 'Y', suffix) | ||
|
||
|
||
def percent_cb(complete, total): | ||
Logger.verbose('{} transferred out of {}'.format(sizeof_fmt(complete), sizeof_fmt(total))) | ||
|
||
|
||
def file_md5(fname): | ||
hash_md5 = hashlib.md5() | ||
with open(fname, "rb") as f: | ||
for chunk in iter(lambda: f.read(1024*100), b""): | ||
hash_md5.update(chunk) | ||
return hash_md5.hexdigest() | ||
|
||
|
||
class CmdDataSync(CmdBase): | ||
def __init__(self): | ||
CmdBase.__init__(self) | ||
|
||
conn = S3Connection(self.config.aws_access_key_id, self.config.aws_secret_access_key) | ||
|
||
bucket_name = self.config.aws_storage_bucket | ||
self._bucket = conn.lookup(bucket_name) | ||
if not self._bucket: | ||
self._bucket = conn.create_bucket(bucket_name) | ||
Logger.info('S3 bucket "{}" was created'.format(bucket_name)) | ||
pass | ||
|
||
def define_args(self, parser): | ||
self.add_string_arg(parser, 'target', 'Target to sync - file or directory') | ||
pass | ||
|
||
def run(self): | ||
target = self.args.target | ||
if os.path.islink(target): | ||
return self.sync_symlink(target) | ||
|
||
if os.path.isdir(target): | ||
return self.sync_dir(target) | ||
|
||
raise DataSyncError('File "{}" does not exit'.format(target)) | ||
|
||
def sync_dir(self, dir): | ||
for f in os.listdir(dir): | ||
fname = os.path.join(dir, f) | ||
if os.path.isdir(fname): | ||
self.sync_dir(fname) | ||
elif os.path.islink(fname): | ||
self.sync_symlink(fname) | ||
else: | ||
raise DataSyncError('Unsupported file type "{}"'.format(fname)) | ||
pass | ||
|
||
def sync_symlink(self, file): | ||
cache_file_rel_data = os.path.join(os.path.dirname(file), os.readlink(file)) | ||
cache_file = os.path.relpath(os.path.relpath(cache_file_rel_data), os.path.realpath(os.curdir)) | ||
|
||
if os.path.isfile(cache_file): | ||
self.sync_to_cloud(cache_file) | ||
else: | ||
self.sync_from_cloud(cache_file) | ||
pass | ||
pass | ||
|
||
def sync_from_cloud(self, cache_file): | ||
s3_file = self._get_target_s3_name(cache_file) | ||
key = self._bucket.get_key(s3_file) | ||
if not key: | ||
raise DataSyncError('File "{}" is not exist in the cloud'.format(cache_file)) | ||
|
||
Logger.info('Downloading cache file "{}" from S3 {}/{}'.format(cache_file, self._bucket.name, s3_file)) | ||
key.get_contents_to_filename(cache_file, cb=percent_cb) | ||
Logger.info('Downloading completed') | ||
pass | ||
|
||
def _get_target_s3_name(self, cache_file): | ||
cache_file_rel = os.path.relpath(cache_file, self.config.cache_dir) | ||
cache_file_rel = cache_file_rel.replace(os.sep, '/').strip('/') | ||
|
||
target_file = self.config.aws_storage_prefix + '/' + cache_file_rel | ||
return target_file | ||
|
||
def sync_to_cloud(self, cache_file): | ||
target_file = self._get_target_s3_name(cache_file) | ||
|
||
key = self._bucket.get_key(target_file) | ||
if key: | ||
Logger.verbose('File already uploaded to the cloud. Checking checksum...') | ||
|
||
md5_cloud = key.etag[1:-1] | ||
md5_local = file_md5(cache_file) | ||
if md5_cloud == md5_local: | ||
Logger.verbose('File checksum matches. No uploading is needed.') | ||
return | ||
|
||
Logger.info('Checksum miss-match. Re-uploading is required.') | ||
|
||
Logger.info('Uploading cache file "{}" to S3 {}/{}'.format(cache_file, self._bucket.name, target_file)) | ||
key = self._bucket.new_key(target_file) | ||
key.set_contents_from_filename(cache_file, cb=percent_cb) | ||
Logger.info('Uploading completed') | ||
pass | ||
|
||
|
||
if __name__ == '__main__': | ||
import sys | ||
try: | ||
sys.exit(CmdDataSync().run()) | ||
except NeatLynxException as e: | ||
Logger.error(e) | ||
sys.exit(1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.