Skip to content

Commit

Permalink
[misc] A more conservative way of cleaning up dead records; solved is…
Browse files Browse the repository at this point in the history
…sue of repeatedly appending hostname suffix; more unit tests.

Signed-off-by: Xiangyu Bu <[email protected]>
  • Loading branch information
xybu committed Feb 4, 2017
1 parent 98fbb99 commit df1c417
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 111 deletions.
3 changes: 1 addition & 2 deletions onedrived/od_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,7 @@ def delete_temp_files(all_accounts):


def repo_updated_callback(repo):
global task_pool, webhook_server
if task_pool:
if task_pool and task_pool.outstanding_task_count == 0:
item_request = repo.authenticator.client.item(drive=repo.drive.id, path='/')
task_pool.add_task(merge_dir.MergeDirectoryTask(
repo=repo, task_pool=task_pool, rel_path='', item_request=item_request,
Expand Down
37 changes: 12 additions & 25 deletions onedrived/od_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ class ItemRecordType:

class ItemRecordStatus:
OK = 0
MARKED = 255


class RepositoryType:
Expand Down Expand Up @@ -107,7 +106,18 @@ def get_item_by_path(self, item_name, parent_relpath):
'created_time, modified_time, status, sha1_hash, record_time FROM items '
'WHERE name=? AND parent_path=? LIMIT 1', (item_name, parent_relpath))
rec = q.fetchone()
return ItemRecord(rec) if rec else rec
return ItemRecord(rec) if rec else None

def get_immediate_children_of_dir(self, relpath):
"""
:param str relpath:
:return dict(str, ItemRecord):
"""
with self._lock:
q = self._conn.execute('SELECT id, type, name, parent_id, parent_path, etag, ctag, size, size_local, '
'created_time, modified_time, status, sha1_hash, record_time FROM items '
'WHERE parent_path=?', (relpath,))
return {rec[2]: ItemRecord(rec) for rec in q.fetchall() if rec}

def delete_item(self, item_name, parent_relpath, is_folder=False):
"""
Expand Down Expand Up @@ -141,29 +151,6 @@ def move_item(self, item_name, parent_relpath, new_name, new_parent_relpath, is_
cursor.execute('UPDATE items SET parent_path=?, name=? WHERE parent_path=? AND name=?',
(new_parent_relpath, new_name, parent_relpath, item_name))

def unmark_items(self, item_name, parent_relpath, is_folder=False):
"""
:param str item_name: Name of the item.
:param str parent_relpath: Relative path of its parent item.
:param True | False is_folder: True to indicate that the item is a folder (delete all children).
"""
with self._lock, self._conn, closing(self._conn.cursor()) as cursor:
if is_folder:
item_relpath = parent_relpath + '/' + item_name
cursor.execute('UPDATE items SET status=? WHERE parent_path=? OR parent_path LIKE ?',
(ItemRecordStatus.OK, item_relpath, item_relpath + '/%'))
cursor.execute('UPDATE items SET status=? WHERE parent_path=? AND name=?',
(ItemRecordStatus.OK, parent_relpath, item_name))

def mark_all_items(self, mark=ItemRecordStatus.MARKED):
with self._lock, self._conn:
self._conn.execute('UPDATE items SET status=?', (mark, ))

def sweep_marked_items(self):
with self._lock, self._conn:
q = self._conn.execute('DELETE FROM items WHERE status=?', (ItemRecordStatus.MARKED, ))
logging.info('Deleted %d dead records from database.', q.rowcount)

def update_item(self, item, parent_relpath, size_local=0, status=ItemRecordStatus.OK):
"""
:param onedrivesdk.model.item.Item item:
Expand Down
7 changes: 2 additions & 5 deletions onedrived/od_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@ def __init__(self):
self._lock = threading.Lock()

def close(self, n=1):
with self._lock:
self.queued_tasks.clear()
self.tasks_by_path.clear()
for _ in range(n):
self.semaphore.release()
for _ in range(n):
self.semaphore.release()

def add_task(self, task):
"""
Expand Down
114 changes: 62 additions & 52 deletions onedrived/od_tasks/merge_dir.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,38 @@
from ..od_api_helper import get_item_modified_datetime, item_request_call
from ..od_dateutils import datetime_to_timestamp, diff_timestamps
from ..od_hashutils import hash_match, sha1_value
from ..od_repo import ItemRecordType, ItemRecordStatus
from ..od_repo import ItemRecordType


def rename_with_suffix(parent_abspath, name, host_name):
suffix = ' (' + host_name + ')'
parent_abspath = parent_abspath + '/'

# Calculate the file name without suffix.
ent_name, ent_ext = os.path.splitext(name)
if ent_name.endswith(suffix):
ent_name = ent_name[:-len(suffix)]

new_name = ent_name + suffix + ent_ext
if os.path.exists(parent_abspath + new_name):
count = 1
if ' ' in ent_name:
ent_name, count_str = ent_name.rsplit(' ', maxsplit=1)
if count_str.isdigit() and count_str[0] != '0':
count = int(count_str) + 1
new_name = ent_name + ' ' + str(count) + suffix + ent_ext
while os.path.exists(parent_abspath + new_name):
count += 1
new_name = ent_name + ' ' + str(count) + suffix + ent_ext
shutil.move(parent_abspath + name, parent_abspath + new_name)
return new_name


def get_os_stat(path):
try:
return os.stat(path)
except FileNotFoundError:
return None


class MergeDirectoryTask(base.TaskBase):
Expand Down Expand Up @@ -73,63 +104,47 @@ def _list_local_names(self):
ent_list.add(ent)
return ent_list

def _rename_with_local_suffix(self, name):
def _move(new_name):
shutil.move(self.local_abspath + '/' + name, self.local_abspath + '/' + new_name)
return new_name

suffix = ' (' + self.repo.context.host_name + ')'
new_name = name + suffix
if not os.path.exists(self.local_abspath + '/' + new_name):
return _move(new_name)
count = 1
new_name = name + ' ' + str(count) + suffix
while os.path.exists(self.local_abspath + '/' + new_name):
count += 1
new_name = name + ' ' + str(count) + suffix
return _move(new_name)

def handle(self):
if not os.path.isdir(self.local_abspath):
logging.error('Error: Local path "%s" is not a directory.' % self.local_abspath)
return

self.repo.context.watcher.rm_watch(self.repo, self.local_abspath)

try:
all_local_items = self._list_local_names()
except (IOError, OSError) as e:
logging.error('Error syncing "%s": %s.', self.local_abspath, e)
logging.error('Error merging dir "%s": %s.', self.local_abspath, e)
return

self.repo.context.watcher.rm_watch(self.repo, self.local_abspath)
all_records = self.repo.get_immediate_children_of_dir(self.rel_path)

if not self.assume_remote_unchanged or not self.parent_remote_unchanged:
try:
all_remote_items = item_request_call(self.repo, self.item_request.children.get)
except onedrivesdk.error.OneDriveError as e:
logging.error('Encountered API Error: %s. Skip directory "%s".', e, self.rel_path)
# Unmark the records under this dir so that they will not be swept in the next round.
if self.rel_path == '':
self.repo.mark_all_items(mark=ItemRecordStatus.OK)
else:
parent_relpath, dirname = os.path.split(self.rel_path)
self.repo.unmark_items(item_name=dirname, parent_relpath=parent_relpath, is_folder=True)
return

for remote_item in all_remote_items:
remote_is_folder = remote_item.folder is not None
all_local_items.discard(remote_item.name) # Remove remote item from untouched list.
if not self.repo.path_filter.should_ignore(self.rel_path + '/' + remote_item.name, remote_is_folder):
self._handle_remote_item(remote_item, all_local_items)
self._handle_remote_item(remote_item, all_local_items, all_records)
else:
logging.debug('Ignored remote path "%s/%s".', self.rel_path, remote_item.name)

for n in all_local_items:
self._handle_local_item(n)
self._handle_local_item(n, all_records)

for rec_name, rec in all_records.items():
logging.info('Record for item %s (%s/%s) is dead. Delete it it.', rec.item_id, rec.parent_path, rec_name)
self.repo.delete_item(rec_name, rec.parent_path, is_folder=rec.type == ItemRecordType.FOLDER)

self.repo.context.watcher.add_watch(self.repo, self.local_abspath)

def _rename_local_and_download_remote(self, remote_item, all_local_items):
all_local_items.add(self._rename_with_local_suffix(remote_item.name))
all_local_items.add(rename_with_suffix(self.local_abspath, remote_item.name, self.repo.context.host_name))
self.task_pool.add_task(
download_file.DownloadFileTask(self.repo, self.task_pool, remote_item, self.rel_path))

Expand Down Expand Up @@ -186,8 +201,6 @@ def _handle_remote_file_with_record(self, remote_item, item_record, item_stat, i
item_local_abspath, local_mtime_ts, remote_mtime_ts)
fix_owner_and_timestamp(item_local_abspath, self.repo.context.user_uid, remote_mtime_ts)
self.repo.update_item(remote_item, self.rel_path, item_stat.st_size)
else:
self.repo.unmark_items(item_record.item_name, item_record.parent_path, is_folder=False)
else:
# Content of local file has changed. Because we assume the remote item was synced before, we overwrite
# the remote item with local one.
Expand Down Expand Up @@ -301,7 +314,8 @@ def _handle_remote_folder(self, remote_item, item_local_abspath, record, all_loc
self.repo, self.task_pool, self.item_request, self.rel_path, remote_item.name))
return
# If the remote metadata doesn't agree with record, keep both by renaming the local file.
all_local_items.add(self._rename_with_local_suffix(remote_item.name))
all_local_items.add(
rename_with_suffix(self.local_abspath, remote_item.name, self.repo.context.host_name))

if not os.path.exists(item_local_abspath):
if remote_dir_matches_record:
Expand All @@ -324,24 +338,19 @@ def _handle_remote_folder(self, remote_item, item_local_abspath, record, all_loc
except OSError as e:
logging.error('Error occurred when merging directory "%s": %s', item_local_abspath, e)

@staticmethod
def get_os_stat(path):
try:
return os.stat(path)
except FileNotFoundError:
return None

def _handle_remote_item(self, remote_item, all_local_items):
def _handle_remote_item(self, remote_item, all_local_items, all_records):
"""
:param onedrivesdk.model.item.Item remote_item:
:param [str] all_local_items:
:param dict(str, onedrived.od_repo.ItemRecord) all_records:
"""
# So we have three pieces of information -- the remote item metadata, the record in database, and the inode
# on local file system. For the case of handling a remote item, the last two may be missing.
item_local_abspath = self.local_abspath + '/' + remote_item.name
record = self.repo.get_item_by_path(item_name=remote_item.name, parent_relpath=self.rel_path)
record = all_records.pop(remote_item.name, None)

try:
stat = self.get_os_stat(item_local_abspath)
stat = get_os_stat(item_local_abspath)
except OSError as e:
logging.error('Error occurred when accessing path "%s": %s.', item_local_abspath, e)
return
Expand All @@ -354,7 +363,7 @@ def _handle_remote_item(self, remote_item, all_local_items):
logging.info('Remote item "%s/%s" is neither a file nor a directory yet local counterpart exists. '
'Rename local item.', self.rel_path, remote_item.name)
try:
new_name = self._rename_with_local_suffix(remote_item.name)
new_name = rename_with_suffix(self.local_abspath, remote_item.name, self.repo.context.host_name)
all_local_items.add(new_name)
except OSError as e:
logging.error('Error renaming "%s/%s": %s. Skip this item due to unsolvable type conflict.',
Expand All @@ -379,11 +388,10 @@ def _handle_local_folder(self, item_name, item_record, item_local_abspath):
return
if item_record is not None and item_record.type == ItemRecordType.FOLDER:
if self.assume_remote_unchanged:
# The record is a dir but its children haven't been touched. Only unmark the record itself.
self.repo.unmark_items(item_name=item_name, parent_relpath=item_record.parent_path, is_folder=False)
rel_path = self.rel_path + '/' + item_name
self.task_pool.add_task(MergeDirectoryTask(
repo=self.repo, task_pool=self.task_pool, rel_path=self.rel_path + '/' + item_name,
item_request=self.repo.authenticator.client.item(drive=self.repo.drive.id, id=item_record.item_id),
repo=self.repo, task_pool=self.task_pool, rel_path=rel_path,
item_request=self.repo.authenticator.client.item(drive=self.repo.drive.id, path=rel_path),
assume_remote_unchanged=True, parent_remote_unchanged=self.assume_remote_unchanged))
else:
send2trash(item_local_abspath)
Expand Down Expand Up @@ -415,7 +423,6 @@ def _handle_local_folder(self, item_name, item_record, item_local_abspath):
if self.assume_remote_unchanged:
logging.info('Remote item for local dir "%s" is a file that has been deleted locally. '
'Delete the remote item and upload the file.', item_local_abspath)
self.repo.unmark_items(item_name=item_name, parent_relpath=item_record.parent_path, is_folder=False)
if not delete_item.DeleteRemoteItemTask(
repo=self.repo, task_pool=self.task_pool, parent_relpath=self.rel_path,
item_name=item_name, item_id=item_record.item_id, is_folder=False).handle():
Expand Down Expand Up @@ -456,7 +463,6 @@ def _handle_local_file(self, item_name, item_record, item_stat, item_local_abspa
if self.assume_remote_unchanged:
if not equal_ts:
fix_owner_and_timestamp(item_local_abspath, self.repo.context.user_uid, record_ts)
self.repo.unmark_items(item_name=item_name, parent_relpath=item_record.parent_path, is_folder=False)
else:
logging.debug('Local file "%s" used to exist remotely but not found. Delete it.',
item_local_abspath)
Expand All @@ -475,20 +481,24 @@ def _handle_local_file(self, item_name, item_record, item_stat, item_local_abspa
logging.error('Failed to delete outdated remote directory "%s/%s" of Drive %s.',
self.rel_path, item_name, self.repo.drive.id)
# Keep the record so that the branch can be revisited next time.
self.repo.unmark_items(item_name=item_name, parent_relpath=item_record.parent_path, is_folder=False)
return
logging.debug('Local file "%s" is new to OneDrive. Upload it.', item_local_abspath)

self.task_pool.add_task(upload_file.UploadFileTask(
self.repo, self.task_pool, self.item_request, self.rel_path, item_name))

def _handle_local_item(self, item_name):
def _handle_local_item(self, item_name, all_records):
"""
:param str item_name:
:param dict(str, onedrived.od_repo.ItemRecord) all_records:
:return:
"""
item_local_abspath = self.local_abspath + '/' + item_name
record = self.repo.get_item_by_path(item_name, self.rel_path)
record = all_records.pop(item_name, None)
try:
if os.path.isfile(item_local_abspath):
# stat can be None because the function can be called long after dir is listed.
stat = self.get_os_stat(item_local_abspath)
stat = get_os_stat(item_local_abspath)
self._handle_local_file(item_name, record, stat, item_local_abspath)
elif os.path.isdir(item_local_abspath):
self._handle_local_folder(item_name, record, item_local_abspath)
Expand Down
3 changes: 0 additions & 3 deletions onedrived/od_tasks/start_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ def __repr__(self):
def handle(self):
try:
if os.path.isdir(self.repo.local_root):
# Clean up dead records in the database.
self.repo.sweep_marked_items()
self.repo.mark_all_items()
# And add a recursive merge task to task queue.
item_request = self.repo.authenticator.client.item(drive=self.repo.drive.id, path='/')
self.task_pool.add_task(merge_dir.MergeDirectoryTask(self.repo, self.task_pool, '', item_request))
Expand Down
5 changes: 4 additions & 1 deletion onedrived/od_tasks/upload_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ def handle(self):
returned_item = item_request_call(self.repo, item_request.upload_async,
local_path=self.local_abspath, upload_status=self.update_progress)
if not isinstance(returned_item, onedrivesdk.Item):
returned_item = onedrivesdk.Item(returned_item._prop_dict)
if hasattr(returned_item, '_prop_dict'):
returned_item = onedrivesdk.Item(returned_item._prop_dict)
else:
returned_item = item_request_call(self.repo, item_request.get)
self.update_timestamp_and_record(returned_item, item_stat)
self.task_pool.release_path(self.local_abspath)
logging.info('Finished uploading file "%s".', self.local_abspath)
Expand Down
Loading

0 comments on commit df1c417

Please sign in to comment.