Skip to content

Commit

Permalink
progress: add main bars (iterative#3594)
Browse files Browse the repository at this point in the history
* progress: add main push/pull bar

Fixes iterative#3452
Related iterative#1840
Related iterative#3565

* adjust desc

* fix merge

Co-authored-by: Ruslan Kuprieiev <[email protected]>
Co-authored-by: Ruslan Kuprieiev <[email protected]>
  • Loading branch information
3 people authored Apr 25, 2020
1 parent e2981d2 commit b312895
Showing 1 changed file with 38 additions and 33 deletions.
71 changes: 38 additions & 33 deletions dvc/remote/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,11 @@ def _process(
file_mode=self._file_mode,
)
status = STATUS_DELETED
desc = "Downloading"
else:
func = remote.upload
status = STATUS_NEW
desc = "Uploading"

if jobs is None:
jobs = remote.JOBS
Expand All @@ -466,42 +468,45 @@ def _process(
dir_plans = self._get_plans(download, remote, dir_status, status)
file_plans = self._get_plans(download, remote, file_status, status)

if len(dir_plans[0]) + len(file_plans[0]) == 0:
total = len(dir_plans[0]) + len(file_plans[0])
if total == 0:
return 0

with ThreadPoolExecutor(max_workers=jobs) as executor:
if download:
fails = sum(executor.map(func, *dir_plans))
fails += sum(executor.map(func, *file_plans))
else:
# for uploads, push files first, and any .dir files last

file_futures = {}
for from_info, to_info, name in zip(*file_plans):
file_futures[to_info] = executor.submit(
func, from_info, to_info, name
)
dir_futures = {}
for from_info, to_info, name in zip(*dir_plans):
wait_futures = {
future
for file_path, future in file_futures.items()
if file_path in dir_paths[to_info]
}
dir_futures[to_info] = executor.submit(
self._dir_upload,
func,
wait_futures,
from_info,
to_info,
name,
)
fails = sum(
future.result()
for future in concat(
file_futures.values(), dir_futures.values()
with Tqdm(total=total, unit="file", desc=desc) as pbar:
func = pbar.wrap_fn(func)
with ThreadPoolExecutor(max_workers=jobs) as executor:
if download:
fails = sum(executor.map(func, *dir_plans))
fails += sum(executor.map(func, *file_plans))
else:
# for uploads, push files first, and any .dir files last

file_futures = {}
for from_info, to_info, name in zip(*file_plans):
file_futures[to_info] = executor.submit(
func, from_info, to_info, name
)
dir_futures = {}
for from_info, to_info, name in zip(*dir_plans):
wait_futures = {
future
for file_path, future in file_futures.items()
if file_path in dir_paths[to_info]
}
dir_futures[to_info] = executor.submit(
self._dir_upload,
func,
wait_futures,
from_info,
to_info,
name,
)
fails = sum(
future.result()
for future in concat(
file_futures.values(), dir_futures.values()
)
)
)

if fails:
if download:
Expand Down

0 comments on commit b312895

Please sign in to comment.