Skip to content

Commit

Permalink
Download packages in parallel (conda#11841)
Browse files Browse the repository at this point in the history
* format package_cache_data.py with black
* download packages in threads
* improve test coverage
* Apply suggestions from code review

Co-authored-by: Jannis Leidel <[email protected]>
  • Loading branch information
dholth and jezdez authored Sep 30, 2022
1 parent 07e9fb5 commit 7ffe442
Show file tree
Hide file tree
Showing 5 changed files with 522 additions and 223 deletions.
28 changes: 23 additions & 5 deletions conda/base/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from errno import ENOENT
from functools import lru_cache
from logging import getLogger
from typing import Optional
import os
from os.path import abspath, basename, expanduser, isdir, isfile, join, split as path_split
import platform
Expand Down Expand Up @@ -180,10 +181,16 @@ class Context(Configuration):
# multithreading in various places
_default_threads = ParameterLoader(PrimitiveParameter(0, element_type=int),
aliases=('default_threads',))
# download repodata
_repodata_threads = ParameterLoader(PrimitiveParameter(0, element_type=int),
aliases=('repodata_threads',))
_verify_threads = ParameterLoader(PrimitiveParameter(0, element_type=int),
aliases=('verify_threads',))
# download packages; determined experimentally
_fetch_threads = ParameterLoader(
PrimitiveParameter(5, element_type=int), aliases=("fetch_threads",)
)
_verify_threads = ParameterLoader(
PrimitiveParameter(0, element_type=int), aliases=("verify_threads",)
)
# this one actually defaults to 1 - that is handled in the property below
_execute_threads = ParameterLoader(PrimitiveParameter(0, element_type=int),
aliases=('execute_threads',))
Expand Down Expand Up @@ -460,15 +467,19 @@ def platform(self):
return _platform_map.get(sys.platform, 'unknown')

@property
def default_threads(self):
def default_threads(self) -> Optional[int]:
return self._default_threads if self._default_threads else None

@property
def repodata_threads(self):
def repodata_threads(self) -> Optional[int]:
return self._repodata_threads if self._repodata_threads else self.default_threads

@property
def verify_threads(self):
def fetch_threads(self) -> Optional[int]:
return self._fetch_threads if self._fetch_threads else self.default_threads

@property
def verify_threads(self) -> Optional[int]:
if self._verify_threads:
threads = self._verify_threads
elif self.default_threads:
Expand Down Expand Up @@ -952,6 +963,7 @@ def category_map(self):
"repodata_fns",
"use_only_tar_bz2",
"repodata_threads",
"fetch_threads",
),
"Basic Conda Configuration": ( # TODO: Is there a better category name here?
"envs_dirs",
Expand Down Expand Up @@ -1296,6 +1308,12 @@ def description_map(self):
see much benefit here.
"""
),
fetch_threads=dals(
"""
Threads to use when downloading packages. When not set,
defaults to None, which uses the default ThreadPoolExecutor behavior.
"""
),
force_reinstall=dals(
"""
Ensure that any user-requested package for the current operation is uninstalled
Expand Down
18 changes: 15 additions & 3 deletions conda/common/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):

class ProgressBar(object):

def __init__(self, description, enabled=True, json=False):
def __init__(self, description, enabled=True, json=False, position=None, leave=True):
"""
Args:
description (str):
Expand All @@ -457,8 +457,15 @@ def __init__(self, description, enabled=True, json=False):
elif enabled:
bar_format = "{desc}{bar} | {percentage:3.0f}% "
try:
self.pbar = tqdm(desc=description, bar_format=bar_format, ascii=True, total=1,
file=sys.stdout)
self.pbar = tqdm(
desc=description,
bar_format=bar_format,
ascii=True,
total=1,
file=sys.stdout,
position=position,
leave=leave,
)
except EnvironmentError as e:
if e.errno in (EPIPE, ESHUTDOWN):
self.enabled = False
Expand All @@ -481,6 +488,11 @@ def update_to(self, fraction):
def finish(self):
self.update_to(1)

def refresh(self):
"""Force refresh i.e. once 100% has been reached"""
if self.enabled and not self.json:
self.pbar.refresh()

@swallow_broken_pipe
def close(self):
if self.enabled and self.json:
Expand Down
Loading

0 comments on commit 7ffe442

Please sign in to comment.