diff --git a/onedrived/od_watcher.py b/onedrived/od_watcher.py index 8f19c53..1195e82 100644 --- a/onedrived/od_watcher.py +++ b/onedrived/od_watcher.py @@ -16,8 +16,7 @@ class LocalRepositoryWatcher: - FLAGS = _inotify_flags.CREATE | _inotify_flags.CLOSE_WRITE | _inotify_flags.DELETE | \ - _inotify_flags.DELETE_SELF | _inotify_flags.MOVE_SELF | _inotify_masks.MOVE + FLAGS = _inotify_flags.CREATE | _inotify_flags.CLOSE_WRITE | _inotify_flags.DELETE | _inotify_masks.MOVE BUSY_RETRY_INTERVAL_SEC = 30 FD_READ_DELAY_MSEC = 200 @@ -181,10 +180,16 @@ def _handle_move_pair(self, move_pair, to_repo, from_repo=None): (from_item_record.type == ItemRecordType.FOLDER) == (_inotify_flags.ISDIR in to_flags): logging.info('Use Move API to move item "%s/%s" in Drive %s to "%s/%s".', from_parent_relpath, from_ev.name, from_repo.drive.id, to_parent_relpath, to_ev.name) - self.task_pool.add_task(tasks.move_item.MoveItemTask( - repo=to_repo, task_pool=self.task_pool, parent_relpath=from_parent_relpath, item_name=from_ev.name, - new_parent_relpath=to_parent_relpath, new_name=to_ev.name, item_id=from_item_record.item_id, - is_folder=_inotify_flags.ISDIR in from_flags)) + if tasks.move_item.MoveItemTask( + repo=to_repo, task_pool=self.task_pool, parent_relpath=from_parent_relpath, item_name=from_ev.name, + new_parent_relpath=to_parent_relpath, new_name=to_ev.name, item_id=from_item_record.item_id, + is_folder=_inotify_flags.ISDIR in from_flags).handle(): + if _inotify_flags.ISDIR in to_flags: + self.add_watch(to_parent_dir + '/' + to_ev.name) + else: + logging.error('Failed to use Move API to move item "%s/%s". Fallback to dir merge.', + from_parent_dir, from_ev.name) + self._add_merge_dir_task(to_repo, to_parent_relpath) return self._handle_unpaired_move_from(from_ev, from_flags, from_parent_dir, from_parent_relpath, @@ -311,6 +316,7 @@ def _handle_unpaired_move_to(self, to_ev, to_flags, to_repo, self.task_pool.add_task(tasks.create_folder.CreateFolderTask( repo=to_repo, task_pool=self.task_pool, item_name=to_ev.name, parent_relpath=to_parent_relpath, upload_if_success=True, abort_if_local_gone=True)) + # After the directory is created, it will be merged and thus the watcher updated. else: to_dir_request = self._get_item_request_by_relpath(to_repo, to_parent_relpath) self.task_pool.add_task(tasks.upload_file.UploadFileTask( @@ -325,11 +331,6 @@ def handle_event(self, ev, flags, move_pairs): """ parent_dir = self.watch_descriptors[ev.wd] - if _inotify_flags.DELETE_SELF in flags or _inotify_flags.MOVE_SELF in flags: - self.watch_descriptors.pop(ev.wd) - logging.info('Delete watcher on path "%s".', parent_dir) - return - repo = self._path_to_repo(parent_dir) if repo is None: logging.warning('Repo not found for %s on path "%s". Flags={%s}.', @@ -347,6 +348,9 @@ def handle_event(self, ev, flags, move_pairs): str(ev), parent_dir + '/' + ev.name, ','.join([str(f) for f in flags])) return + if event_isdir and (_inotify_flags.MOVED_FROM in flags or _inotify_flags.DELETE in flags): + self.rm_watch(parent_dir + '/' + ev.name) + if ev.cookie in move_pairs: # Event is part of a move-from + move-to sequence. Handle the two events at move-to time. if _inotify_flags.MOVED_TO in flags: @@ -410,10 +414,8 @@ def _recognize_event_patterns(events): return move_pairs, all_events def process_events(self): - if not self._lock.acquire(blocking=False): + while not self._lock.acquire(blocking=False, timeout=self.BUSY_RETRY_INTERVAL_SEC): logging.warning('Failed to acquire the lock. Will retry in %d sec.', self.BUSY_RETRY_INTERVAL_SEC) - self.loop.call_later(self.BUSY_RETRY_INTERVAL_SEC, self.process_events) - return events = self.notifier.read(timeout=0, read_delay=self.FD_READ_DELAY_MSEC) if len(events): move_pairs, all_events = self._recognize_event_patterns(events)