diff --git a/rplugin/python3/defx/kind/sftp.py b/rplugin/python3/defx/kind/sftp.py index 710f074..2f538ab 100644 --- a/rplugin/python3/defx/kind/sftp.py +++ b/rplugin/python3/defx/kind/sftp.py @@ -15,10 +15,12 @@ import re import time import typing +import site +from paramiko import SFTPFile, SFTPClient from defx.action import ActionAttr from defx.action import ActionTable -from defx.kind.file import Kind +from defx.kind.file import Kind as Base from defx.clipboard import ClipboardAction from defx.context import Context from defx.defx import Defx @@ -26,6 +28,9 @@ from defx.util import readable, fnamemodify from defx.view import View +site.addsitedir(str(Path(__file__).parent.parent)) +from sftp.sftp_path import SFTPPath + _action_table: typing.Dict[str, ActionTable] = {} ACTION_FUNC = typing.Callable[[View, Defx, Context], None] @@ -42,7 +47,7 @@ def inner_wrapper(view: View, defx: Defx, context: Context) -> None: return wrapper -class Kind(Kind): +class Kind(Base): def __init__(self, vim: Nvim) -> None: self.vim = vim @@ -54,9 +59,9 @@ def get_actions(self) -> typing.Dict[str, ActionTable]: return actions -def check_overwrite(view: View, dest: Path, src: Path) -> Path: +def check_overwrite(view: View, dest: SFTPPath, src: SFTPPath) -> str: if not src.exists() or not dest.exists(): - return Path('') + return '' s_stat = src.stat() s_mtime = s_stat.st_mtime @@ -70,36 +75,23 @@ def check_overwrite(view: View, dest: Path, src: Path) -> Path: choice: int = view._vim.call('defx#util#confirm', f'{dest} already exists. Overwrite?', '&Force\n&No\n&Rename\n&Time\n&Underbar', 0) - ret: Path = Path('') + ret = '' if choice == 1: - ret = dest + ret = str(dest) elif choice == 2: - ret = Path('') + ret = '' elif choice == 3: - ret = Path(view._vim.call( + ret = view._vim.call( 'defx#util#input', f'{src} -> ', str(dest), - ('dir' if src.is_dir() else 'file'))) + ('dir' if src.is_dir() else 'file')) elif choice == 4 and d_mtime < s_mtime: - ret = src + ret = str(src) elif choice == 5: - ret = Path(str(dest) + '_') + ret = str(dest) + '_' return ret -def execute_job(view: View, args: typing.List[str]) -> None: - view._vim.call('defx#util#close_async_job') - - if view._vim.call('has', 'nvim'): - jobfunc = 'jobstart' - jobopts = {} - else: - jobfunc = 'job_start' - jobopts = {'in_io': 'null', 'out_io': 'null', 'err_io': 'null'} - - view._vim.vars['defx#_async_job'] = view._vim.call(jobfunc, args, jobopts) - - def switch(view: View) -> None: windows = [x for x in range(1, view._vim.call('winnr', '$') + 1) if view._vim.call('getwinvar', x, '&buftype') == ''] @@ -111,45 +103,58 @@ def switch(view: View) -> None: view._vim.command('noautocmd rightbelow vnew') -@action(name='cd') -def _cd(view: View, defx: Defx, context: Context) -> None: - """ - Change the current directory. - """ - source_name = defx._source.name - is_parent = context.args and context.args[0] == '..' - prev_cwd = Path(defx._cwd) - - if is_parent: - path = prev_cwd.parent - else: - if context.args: - if len(context.args) > 1: - source_name = context.args[0] - path = Path(context.args[1]) - else: - path = Path(context.args[0]) - else: - path = Path.home() - path = prev_cwd.joinpath(path) - if not readable(path): - error(view._vim, f'{path} is invalid.') - path = path.resolve() - if source_name == 'file' and not path.is_dir(): - error(view._vim, f'{path} is invalid.') - return - - view.cd(defx, source_name, str(path), context.cursor) - if is_parent: - view.search_file(prev_cwd, defx._index) - - @action(name='check_redraw', attr=ActionAttr.NO_TAGETS) def _check_redraw(view: View, defx: Defx, context: Context) -> None: # slow for remote path pass +''' +@action(name='copy') +def _copy(view: View, defx: Defx, context: Context) -> None: + if not context.targets: + return + + message = 'Copy to the clipboard: {}'.format( + str(context.targets[0]['action__path']) + if len(context.targets) == 1 + else str(len(context.targets)) + ' files') + view.print_msg(message) + + view._clipboard.action = ClipboardAction.COPY + view._clipboard.candidates = context.targets + + +@action(name='link') +def _link(view: View, defx: Defx, context: Context) -> None: + if not context.targets: + return + + message = 'Link to the clipboard: {}'.format( + str(context.targets[0]['action__path']) + if len(context.targets) == 1 + else str(len(context.targets)) + ' files') + view.print_msg(message) + + view._clipboard.action = ClipboardAction.LINK + view._clipboard.candidates = context.targets + + +@action(name='move') +def _move(view: View, defx: Defx, context: Context) -> None: + if not context.targets: + return + + message = 'Move to the clipboard: {}'.format( + str(context.targets[0]['action__path']) + if len(context.targets) == 1 + else str(len(context.targets)) + ' files') + view.print_msg(message) + view._clipboard.action = ClipboardAction.MOVE + view._clipboard.candidates = context.targets +''' + + @action(name='new_directory') def _new_directory(view: View, defx: Defx, context: Context) -> None: """ @@ -164,12 +169,14 @@ def _new_directory(view: View, defx: Defx, context: Context) -> None: else: cwd = str(Path(candidate['action__path']).parent) - new_filename = cwd_input( - view._vim, cwd, - 'Please input a new directory name: ', '', 'file') + new_filename: str = str(view._vim.call( + 'defx#util#input', + 'Please input a new filename: ', '', 'file')) if not new_filename: return - filename = Path(cwd).joinpath(new_filename) + + client: SFTPClient = defx._source.client + filename = SFTPPath(client, cwd).joinpath(new_filename) if not filename: return @@ -196,13 +203,15 @@ def _new_file(view: View, defx: Defx, context: Context) -> None: else: cwd = str(Path(candidate['action__path']).parent) - new_filename = cwd_input( - view._vim, cwd, - 'Please input a new filename: ', '', 'file') + new_filename: str = str(view._vim.call( + 'defx#util#input', + 'Please input a new filename: ', '', 'file')) if not new_filename: return + + client: SFTPClient = defx._source.client isdir = new_filename[-1] == '/' - filename = Path(cwd).joinpath(new_filename) + filename = SFTPPath(client, cwd).joinpath(new_filename) if not filename: return @@ -234,20 +243,17 @@ def _new_multiple_files(view: View, defx: Defx, context: Context) -> None: else: cwd = str(Path(candidate['action__path']).parent) - save_cwd = view._vim.call('getcwd') - cd(view._vim, cwd) - str_filenames: str = view._vim.call( 'input', 'Please input new filenames: ', '', 'file') - cd(view._vim, save_cwd) if not str_filenames: return None + client: SFTPClient = defx._source.client for name in shlex.split(str_filenames): is_dir = name[-1] == '/' - filename = Path(cwd).joinpath(name) + filename = SFTPPath(client, cwd).joinpath(name) if filename.exists(): error(view._vim, f'{filename} already exists') continue @@ -268,6 +274,7 @@ def _paste(view: View, defx: Defx, context: Context) -> None: candidate = view.get_cursor_candidate(context.cursor) if not candidate: return + client: SFTPClient = defx._source.client if candidate['is_opened_tree'] or candidate['is_root']: cwd = str(candidate['action__path']) @@ -278,12 +285,12 @@ def _paste(view: View, defx: Defx, context: Context) -> None: dest = None for index, candidate in enumerate(view._clipboard.candidates): path = candidate['action__path'] - dest = Path(cwd).joinpath(path.name) + dest = SFTPPath(client, cwd).joinpath(path.name) if dest.exists(): overwrite = check_overwrite(view, dest, path) - if overwrite == Path(''): + if overwrite == '': continue - dest = overwrite + dest = SFTPPath(client, overwrite) if not path.exists() or path == dest: continue @@ -304,7 +311,7 @@ def _paste(view: View, defx: Defx, context: Context) -> None: else: shutil.copy2(str(path), dest) elif action == ClipboardAction.MOVE: - shutil.move(str(path), cwd) + client.rename(str(path), cwd) # Check rename if not path.is_dir(): @@ -346,7 +353,7 @@ def _remove(view: View, defx: Defx, context: Context) -> None: path = target['action__path'] if path.is_dir(): - shutil.rmtree(str(path)) + path.rmdir() else: path.unlink() @@ -354,6 +361,11 @@ def _remove(view: View, defx: Defx, context: Context) -> None: view._vim.call('bufnr', str(path))) +@action(name='remove_trash', attr=ActionAttr.REDRAW) +def _remove_trash(view: View, defx: Defx, context: Context) -> None: + view.print_msg('remove_trash is not supported') + + @action(name='rename') def _rename(view: View, defx: Defx, context: Context) -> None: """ @@ -368,15 +380,15 @@ def _rename(view: View, defx: Defx, context: Context) -> None: {'buffer_name': 'defx'}) return + client: SFTPClient = defx._source.client for target in context.targets: old = target['action__path'] - new_filename = cwd_input( - view._vim, defx._cwd, - f'Old name: {old}\nNew name: ', str(old), 'file') + new_filename: str = view._vim.call( + 'input', f'Old name: {old}\nNew name: ', str(old), 'file') view._vim.command('redraw') if not new_filename: return - new = Path(defx._cwd).joinpath(new_filename) + new = SFTPPath(client, new_filename) if not new or new == old: continue if str(new).lower() != str(old).lower() and new.exists(): diff --git a/rplugin/python3/defx/sftp/sftp_path.py b/rplugin/python3/defx/sftp/sftp_path.py index b04c2e6..72c8d16 100644 --- a/rplugin/python3/defx/sftp/sftp_path.py +++ b/rplugin/python3/defx/sftp/sftp_path.py @@ -7,7 +7,7 @@ def _get_real_path(path): - m = re.search('//.+@(.*)', path) + m = re.search('sftp//.+@(.*)', path) if m: m_path = re.search('[^/]*/(.*)', m.groups()[0]) if m_path: @@ -19,47 +19,90 @@ def _get_real_path(path): class SFTPPath(PurePosixPath): - def __new__(cls, sftp: SFTPClient, path: str, stat: SFTPAttributes = None): + def __new__(cls, client: SFTPClient, path: str, + stat: SFTPAttributes = None): self = super().__new__(cls, path) - self.sftp: SFTPClient = sftp - # self.path: str = _get_real_path(path) + self.client: SFTPClient = client self.path: str = path + # self.uri: str = path # sftp://user@host/path + # self._head = head # sftp://user@host/ self._stat: SFTPAttributes = stat return self + @classmethod + def parse_path(cls, path: str) -> typing.Tuple[str]: + head, path_str = re.match('(//[^/]+)?/?(.*)', path).groups() + if head is None: + path_str = '/' + path_str + return (head, path_str) + def __eq__(self, other): return self.__str__() == str(other) def __str__(self): return self.path + def exists(self): + try: + return bool(self.stat()) + except FileNotFoundError: + return False + + def is_dir(self) -> bool: + mode = self.stat().st_mode + return not stat.S_ISREG(mode) + + def is_symlink(self) -> bool: + mode = self.stat().st_mode + return stat.S_ISLNK(mode) + def iterdir(self) -> typing.Iterable(SFTPPath): - for f in self.sftp.listdir_attr(self.path): + for f in self.client.listdir_attr(self.path): yield self.joinpath(f.filename, f) + def joinpath(self, name: str, stat: SFTPAttributes = None): + sep = '' if self.path == '/' else '/' + new_path = self.path + sep + name + return SFTPPath(self.client, new_path, stat) + + def mkdir(self, parents=False, exist_ok=False): + # TODO: mkdir recursively + self.client.mkdir(self.path) + + @property + def parent(self): + if self.path == '/': + return self + parts = self.path.split('/') + new_path = '/'.join(parts[:-1]) + return SFTPPath(self.client, new_path) + def relative_to(self, other) -> SFTPPath: return self - def joinpath(self, name: str, stat: SFTPAttributes = None): - new_path = self.path + '/' + name - return SFTPPath(self.sftp, new_path, stat) + def rename(self, new: SFTPPath) -> SFTPPath: + self.client.rename(self.path, new.path) - def exists(self): - try: - bool(self.stat()) - except FileNotFoundError: - return False + def resolve(self) -> SFTPPath: + client = self.client + new_path = client.normalize(self.path) + return SFTPPath(client, new_path) + + def rmdir(self): + # TODO: remove recursively + self.client.rmdir(self.path) def stat(self) -> SFTPAttributes: if self._stat: return self._stat else: - return self.sftp.stat(self.path) + return self.client.stat(self.path) - def is_dir(self) -> bool: - mode = self.stat().st_mode - return not stat.S_ISREG(mode) + def touch(self, exist_ok=True): + self.client.open(self.path, mode='x') - def is_symlink(self) -> bool: - mode = self.stat().st_mode - return stat.S_ISLNK(mode) + def unlink(self, missing_ok=False): + self.client.unlink(self.path) + +if __name__ == '__main__': + print(SFTPPath.parse_path('//hoge@13.4.3')) diff --git a/rplugin/python3/defx/source/sftp.py b/rplugin/python3/defx/source/sftp.py index cbe26ec..5239ca1 100644 --- a/rplugin/python3/defx/source/sftp.py +++ b/rplugin/python3/defx/source/sftp.py @@ -26,6 +26,7 @@ def __init__(self, vim: Nvim) -> None: self.client: SFTPClient = None self.username: str = '' self.hostname: str = '' + self.path_head: str = '' self.vars = { 'root': None, @@ -40,8 +41,11 @@ def init_client(self, hostname, username) -> None: def get_root_candidate( self, context: Context, path: Path ) -> typing.Dict[str, typing.Any]: - path = self._parse_arg(str(path)) - word = "//{}@{}".format(self.username, self.hostname) + str(path) + self.vim.call('defx#util#print_message', str(path)) + path_str = self._parse_arg(str(path)) + path = SFTPPath(self.client, path_str) + word = str(path) + self.vim.call('defx#util#print_message', str(path)) if word[-1:] != '/': word += '/' if self.vars['root']: @@ -56,8 +60,8 @@ def get_root_candidate( def gather_candidates( self, context: Context, path: Path ) -> typing.List[typing.Dict[str, typing.Any]]: - path = self._parse_arg(str(path)) - self.vim.call('defx#util#print_message', str(path)) + path_str = self._parse_arg(str(path)) + path = SFTPPath(self.client, path_str) candidates = [] for f in path.iterdir(): @@ -68,22 +72,22 @@ def gather_candidates( }) return candidates - def _parse_arg(self, path: str) -> None: - m = re.search('//(.+)@(.+)', path) # include username? + def _parse_arg(self, path: str) -> str: + head, rmt_path = SFTPPath.parse_path(path) + if head is None: + return path + m = re.match('//(.+)@(.+)', head) # include username? if m: - username, tail = m.groups() - m_path = re.search('([^/]+)/(.*)', tail) - if m_path: - hostname, file = m_path.groups() - else: - hostname = tail - file = '.' - if (username != self.username or - hostname != self.hostname): - # TODO: error handling(cannot connect) - self.init_client(hostname, username) - self.username = username - self.hostname = hostname - return SFTPPath(self.client, self.client.normalize(file)) + username, hostname = m.groups() else: - return SFTPPath(self.client, path) + hostname = re.match('//(.+)', head).groups()[0] + username = '' + if (username != self.username or + hostname != self.hostname): + # TODO: error handling(cannot connect) + self.init_client(hostname, username) + self.username = username + self.hostname = hostname + if rmt_path == '': + rmt_path = '.' + return self.client.normalize(rmt_path)