Skip to content

Commit

Permalink
utils: extend to_chunks functionality, can specify number of chunks o…
Browse files Browse the repository at this point in the history
…r length of chunk
  • Loading branch information
pared committed Jun 28, 2019
1 parent 3d5c42e commit 7f8da34
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 28 deletions.
2 changes: 1 addition & 1 deletion dvc/remote/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ def exists_with_progress(chunks):
if self.no_traverse and hasattr(self, "batch_exists"):
with ThreadPoolExecutor(max_workers=self.JOBS) as executor:
path_infos = [self.checksum_to_path_info(x) for x in checksums]
chunks = to_chunks(path_infos, self.JOBS)
chunks = to_chunks(path_infos, num_chunks=self.JOBS)
results = executor.map(exists_with_progress, chunks)
in_remote = itertools.chain.from_iterable(results)
ret = list(itertools.compress(checksums, in_remote))
Expand Down
6 changes: 3 additions & 3 deletions dvc/remote/local/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,9 @@ def _get_chunks(self, download, remote, status_info, status, jobs):
from_infos = cache

return (
to_chunks(from_infos, jobs),
to_chunks(to_infos, jobs),
to_chunks(names, jobs),
to_chunks(from_infos, num_chunks=jobs),
to_chunks(to_infos, num_chunks=jobs),
to_chunks(names, num_chunks=jobs),
)

def _process(
Expand Down
2 changes: 1 addition & 1 deletion dvc/remote/ssh/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def _exists(chunk_and_channel):

with ThreadPoolExecutor(max_workers=max_workers) as executor:
paths = [path_info.path for path_info in path_infos]
chunks = to_chunks(paths, max_workers)
chunks = to_chunks(paths, num_chunks=max_workers)
chunks_and_channels = zip(chunks, channels)
outcome = executor.map(_exists, chunks_and_channels)
results = list(itertools.chain.from_iterable(outcome))
Expand Down
6 changes: 4 additions & 2 deletions dvc/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import logging

from dvc.config import Config
from dvc.utils import remove, current_timestamp, relpath, chunk
from dvc.utils import remove, current_timestamp, relpath, to_chunks
from dvc.exceptions import DvcException
from dvc.utils.fs import get_mtime_and_size, get_inode
from dvc.utils.compat import fspath_py35
Expand Down Expand Up @@ -450,7 +450,9 @@ def remove_unused_links(self, used):
remove(path)
unused.append(relpath)

for chunk_unused in chunk(unused, SQLITE_MAX_VARIABLES_NUMBER):
for chunk_unused in to_chunks(
unused, chunk_size=SQLITE_MAX_VARIABLES_NUMBER
):
cmd = "DELETE FROM {} WHERE path IN ({})".format(
self.LINK_STATE_TABLE, ",".join(["?"] * len(chunk_unused))
)
Expand Down
36 changes: 24 additions & 12 deletions dvc/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,33 @@ def remove(path):
raise


def to_chunks(l, jobs):
n = int(math.ceil(len(l) / jobs))
def _split(list_to_split, chunk_size):
return [
list_to_split[i : i + chunk_size]
for i in range(0, len(list_to_split), chunk_size)
]


def _to_chunks_by_chunks_number(list_to_split, num_chunks):
chunk_size = int(math.ceil(float(len(list_to_split)) / num_chunks))

if len(list_to_split) == 1:
return [list_to_split]

if len(l) == 1:
return [l]
if chunk_size == 0:
chunk_size = 1

if n == 0:
n = 1
return _split(list_to_split, chunk_size)

return [l[x : x + n] for x in range(0, len(l), n)]

def to_chunks(list_to_split, num_chunks=None, chunk_size=None):
if (num_chunks and chunk_size) or (not num_chunks and not chunk_size):
raise ValueError(
"One and only one of 'num_chunks', 'chunk_size' must be defined"
)
if chunk_size:
return _split(list_to_split, chunk_size)
return _to_chunks_by_chunks_number(list_to_split, num_chunks)


# NOTE: Check if we are in a bundle
Expand Down Expand Up @@ -382,8 +399,3 @@ def relpath(path, start=os.curdir):
):
return path
return os.path.relpath(path, start)


def chunk(iterable, chunk_size):
for i in range(0, len(iterable), chunk_size):
yield iterable[i : i + chunk_size]
32 changes: 23 additions & 9 deletions tests/unit/utils/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,30 @@
import pytest

from dvc.utils import chunk
from dvc.utils import to_chunks


@pytest.mark.parametrize(
"list_to_chunk, chunk_size, expected_chunks",
[
([1, 2, 3, 4], 1, [[1], [2], [3], [4]]),
([1, 2, 3, 4], 2, [[1, 2], [3, 4]]),
([1, 2, 3, 4], 3, [[1, 2, 3], [4]]),
],
"chunk_size, expected_chunks",
[(1, [[1], [2], [3], [4]]), (2, [[1, 2], [3, 4]]), (3, [[1, 2, 3], [4]])],
)
def test_chunk(list_to_chunk, chunk_size, expected_chunks):
result = list(chunk(list_to_chunk, chunk_size))
def test_to_chunks_chunk_size(chunk_size, expected_chunks):
list_to_chunk = [1, 2, 3, 4]
result = list(to_chunks(list_to_chunk, chunk_size=chunk_size))
assert result == expected_chunks


@pytest.mark.parametrize("num_chunks, chunk_size", [(1, 2), (None, None)])
def test_to_chunks_should_raise(num_chunks, chunk_size):
list_to_chunk = [1, 2, 3]
with pytest.raises(ValueError):
to_chunks(list_to_chunk, num_chunks, chunk_size)


@pytest.mark.parametrize(
"num_chunks, expected_chunks",
[(4, [[1], [2], [3], [4]]), (3, [[1, 2], [3, 4]]), (2, [[1, 2], [3, 4]])],
)
def test_to_chunks_num_chunks(num_chunks, expected_chunks):
list_to_chunk = [1, 2, 3, 4]
result = to_chunks(list_to_chunk, num_chunks=num_chunks)
assert result == expected_chunks

0 comments on commit 7f8da34

Please sign in to comment.