From 624ccfc7444b55f67e473569976480f80942aea4 Mon Sep 17 00:00:00 2001 From: Alexander Schepanovski Date: Wed, 17 Jul 2019 22:50:00 +0700 Subject: [PATCH 1/4] remote: honor --jobs for status collection --- dvc/remote/base.py | 4 ++-- dvc/remote/local/__init__.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/dvc/remote/base.py b/dvc/remote/base.py index 8e35ffe419..7b7198d802 100644 --- a/dvc/remote/base.py +++ b/dvc/remote/base.py @@ -589,7 +589,7 @@ def changed_cache(self, checksum): return self._changed_dir_cache(checksum) return self.changed_cache_file(checksum) - def cache_exists(self, checksums): + def cache_exists(self, checksums, jobs=None): """Check if the given checksums are stored in the remote. There are two ways of performing this check: @@ -618,7 +618,7 @@ def exists_with_progress(chunks): return self.batch_exists(chunks, callback=progress_callback) if self.no_traverse and hasattr(self, "batch_exists"): - with ThreadPoolExecutor(max_workers=self.JOBS) as executor: + with ThreadPoolExecutor(max_workers=jobs or self.JOBS) as executor: path_infos = [self.checksum_to_path_info(x) for x in checksums] chunks = to_chunks(path_infos, num_chunks=self.JOBS) results = executor.map(exists_with_progress, chunks) diff --git a/dvc/remote/local/__init__.py b/dvc/remote/local/__init__.py index 564cda289c..c35502df7a 100644 --- a/dvc/remote/local/__init__.py +++ b/dvc/remote/local/__init__.py @@ -241,7 +241,7 @@ def move(self, from_info, to_info): move(inp, tmp) move(tmp, outp) - def cache_exists(self, md5s): + def cache_exists(self, md5s, jobs=None): return [ checksum for checksum in progress(md5s) @@ -306,7 +306,7 @@ def status( md5s = list(ret) logger.info("Collecting information from local cache...") - local_exists = self.cache_exists(md5s) + local_exists = self.cache_exists(md5s, jobs=jobs) # This is a performance optimization. We can safely assume that, # if the resources that we want to fetch are already cached, @@ -316,7 +316,7 @@ def status( remote_exists = local_exists else: logger.info("Collecting information from remote cache...") - remote_exists = list(remote.cache_exists(md5s)) + remote_exists = list(remote.cache_exists(md5s, jobs=jobs)) self._fill_statuses(ret, local_exists, remote_exists) From 31f1ded3153ab5c12b90f3899d800dc09d334336 Mon Sep 17 00:00:00 2001 From: Alexander Schepanovski Date: Sun, 21 Jul 2019 13:38:44 +0700 Subject: [PATCH 2/4] remote: don't close sftp connections until parent ssh conn closed Since this is multiplexing over the same TCP connection no system resources are withheld, so this should be a no-brainer optimization. As a side-effect this should fix "open max - close all - try open - Administratively prohobited" error @darabi experiencing in #2280. --- dvc/remote/ssh/__init__.py | 18 ++++----- dvc/remote/ssh/connection.py | 71 +++++++++++++----------------------- 2 files changed, 34 insertions(+), 55 deletions(-) diff --git a/dvc/remote/ssh/__init__.py b/dvc/remote/ssh/__init__.py index a50d812e8f..2ed8b06e84 100644 --- a/dvc/remote/ssh/__init__.py +++ b/dvc/remote/ssh/__init__.py @@ -153,15 +153,15 @@ def _exists(chunk_and_channel): return ret with self.ssh(path_infos[0]) as ssh: - with ssh.open_max_sftp_channels() as channels: - max_workers = len(channels) - - with ThreadPoolExecutor(max_workers=max_workers) as executor: - paths = [path_info.path for path_info in path_infos] - chunks = to_chunks(paths, num_chunks=max_workers) - chunks_and_channels = zip(chunks, channels) - outcome = executor.map(_exists, chunks_and_channels) - results = list(itertools.chain.from_iterable(outcome)) + channels = ssh.open_max_sftp_channels() + max_workers = len(channels) + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + paths = [path_info.path for path_info in path_infos] + chunks = to_chunks(paths, num_chunks=max_workers) + chunks_and_channels = zip(chunks, channels) + outcome = executor.map(_exists, chunks_and_channels) + results = list(itertools.chain.from_iterable(outcome)) return results diff --git a/dvc/remote/ssh/connection.py b/dvc/remote/ssh/connection.py index 2ce7de360c..8362857b8c 100644 --- a/dvc/remote/ssh/connection.py +++ b/dvc/remote/ssh/connection.py @@ -3,7 +3,6 @@ import logging import errno import stat -from contextlib import contextmanager from funcy import cached_property try: @@ -61,26 +60,22 @@ def __init__(self, host, *args, **kwargs): self._ssh.connect(host, *args, **kwargs) self._ssh.get_transport().set_keepalive(10) - self._sftp = None - self._sftp_alive = False + self._sftp_channels = [] - def _sftp_connect(self): - if not self._sftp or not self._sftp_alive: - self._sftp = self._ssh.open_sftp() - self._sftp_alive = True + @property + def sftp(self): + if not self._sftp_channels: + self._sftp_channels = [self._ssh.open_sftp()] + return self._sftp_channels[0] def close(self): - if self._sftp: - self._sftp.close() - self._sftp_alive = False - + for sftp in self._sftp_channels: + sftp.close() self._ssh.close() def st_mode(self, path): - self._sftp_connect() - with ignore_file_not_found(): - return self._sftp.stat(path).st_mode + return self.sftp.stat(path).st_mode return 0 @@ -97,8 +92,6 @@ def islink(self, path): return stat.S_ISLNK(self.st_mode(path)) def makedirs(self, path): - self._sftp_connect() - # Single stat call will say whether this is a dir, a file or a link st_mode = self.st_mode(path) @@ -117,7 +110,7 @@ def makedirs(self, path): if tail: try: - self._sftp.mkdir(path) + self.sftp.mkdir(path) except IOError as e: # Since paramiko errors are very vague we need to recheck # whether it's because path already exists or something else @@ -129,11 +122,8 @@ def walk(self, directory, topdown=True): # used as a template. # # [1] https://github.com/python/cpython/blob/master/Lib/os.py - - self._sftp_connect() - try: - dir_entries = self._sftp.listdir_attr(directory) + dir_entries = self.sftp.listdir_attr(directory) except IOError as exc: raise DvcException( "couldn't get the '{}' remote directory files list".format( @@ -169,7 +159,7 @@ def walk_files(self, directory): def _remove_file(self, path): with ignore_file_not_found(): - self._sftp.remove(path) + self.sftp.remove(path) def _remove_dir(self, path): for root, dirs, files in self.walk(path, topdown=False): @@ -181,52 +171,45 @@ def _remove_dir(self, path): for dname in dirs: path = posixpath.join(root, dname) with ignore_file_not_found(): - self._sftp.rmdir(dname) + self.sftp.rmdir(dname) with ignore_file_not_found(): - self._sftp.rmdir(path) + self.sftp.rmdir(path) def remove(self, path): - self._sftp_connect() - if self.isdir(path): self._remove_dir(path) else: self._remove_file(path) def download(self, src, dest, no_progress_bar=False, progress_title=None): - self._sftp_connect() - if no_progress_bar: - self._sftp.get(src, dest) + self.sftp.get(src, dest) else: if not progress_title: progress_title = os.path.basename(src) - self._sftp.get(src, dest, callback=create_cb(progress_title)) + self.sftp.get(src, dest, callback=create_cb(progress_title)) progress.finish_target(progress_title) def move(self, src, dst): self.makedirs(posixpath.dirname(dst)) - self._sftp_connect() - self._sftp.rename(src, dst) + self.sftp.rename(src, dst) def upload(self, src, dest, no_progress_bar=False, progress_title=None): - self._sftp_connect() - self.makedirs(posixpath.dirname(dest)) tmp_file = tmp_fname(dest) if no_progress_bar: - self._sftp.put(src, tmp_file) + self.sftp.put(src, tmp_file) else: if not progress_title: progress_title = posixpath.basename(dest) - self._sftp.put(src, tmp_file, callback=create_cb(progress_title)) + self.sftp.put(src, tmp_file, callback=create_cb(progress_title)) progress.finish_target(progress_title) - self._sftp.rename(tmp_file, dest) + self.sftp.rename(tmp_file, dest) def execute(self, cmd): stdin, stdout, stderr = self._ssh.exec_command(cmd) @@ -307,18 +290,14 @@ def cp(self, src, dest): self.makedirs(posixpath.dirname(dest)) self.execute("cp {} {}".format(src, dest)) - @contextmanager def open_max_sftp_channels(self): - try: - channels = [] + # If there are more than 1 it means we've already opened max amount + if len(self._sftp_channels) <= 1: while True: try: - channels.append(self._ssh.open_sftp()) + self._sftp_channels.append(self._ssh.open_sftp()) except paramiko.ssh_exception.ChannelException: - if not channels: + if not self._sftp_channels: raise break - yield channels - finally: - for channel in channels: - channel.close() + return self._sftp_channels From 6afc2a38ba63b443771d2850fae3687f153ba097 Mon Sep 17 00:00:00 2001 From: Alexander Schepanovski Date: Sun, 21 Jul 2019 13:40:29 +0700 Subject: [PATCH 3/4] test: close pools on tests cleanup This avoids weird errors as pools are collected by gc in broken env. --- tests/conftest.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index 1d983ea7bb..8010ff18f0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -169,3 +169,11 @@ def erepo(repo_dir): yield repo finally: repo.tearDown() + + +@pytest.fixture(scope="session", autouse=True) +def _close_pools(): + from dvc.remote.pool import close_pools + + yield + close_pools() From 1f90b426fb2217974b3cf3d1c3f05b3e950e772c Mon Sep 17 00:00:00 2001 From: Alexander Schepanovski Date: Sun, 21 Jul 2019 13:41:04 +0700 Subject: [PATCH 4/4] remote: prettify get_connection/pool() signatures --- dvc/remote/pool.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dvc/remote/pool.py b/dvc/remote/pool.py index e15fdb689a..3f92056db9 100644 --- a/dvc/remote/pool.py +++ b/dvc/remote/pool.py @@ -4,8 +4,8 @@ @contextmanager -def get_connection(*args, **kwargs): - pool = get_pool(*args, **kwargs) +def get_connection(conn_func, *args, **kwargs): + pool = get_pool(conn_func, *args, **kwargs) conn = pool.get_connection() try: yield conn @@ -17,8 +17,8 @@ def get_connection(*args, **kwargs): @memoize -def get_pool(*args, **kwargs): - return Pool(*args, **kwargs) +def get_pool(conn_func, *args, **kwargs): + return Pool(conn_func, *args, **kwargs) def close_pools():