From 4580f402692f564794455317bf76ebaf822948c0 Mon Sep 17 00:00:00 2001 From: christopher-henderson Date: Tue, 27 Oct 2020 19:57:17 -0700 Subject: [PATCH 1/4] some cleanup and refactor --- docs/classes/errors.rst | 2 - docs/classes/sync.rst | 3 -- pstream/_async/async_stream.py | 74 ++++++++++++++--------------- pstream/_async/{shim.py => util.py} | 74 ++++++++++++++--------------- pstream/_sync/stream.py | 33 ++++++------- pstream/_sync/util.py | 12 +++++ pstream/errors.py | 5 -- pstream/utils/__init__.py | 0 pstream/utils/defensive.py | 34 ------------- tests/_async/test_shim.py | 16 +++---- tests/sync/test_stream.py | 62 ++++++++++-------------- 11 files changed, 136 insertions(+), 179 deletions(-) rename pstream/_async/{shim.py => util.py} (61%) create mode 100644 pstream/_sync/util.py delete mode 100644 pstream/utils/__init__.py delete mode 100644 pstream/utils/defensive.py diff --git a/docs/classes/errors.rst b/docs/classes/errors.rst index 08344ef..034d05a 100644 --- a/docs/classes/errors.rst +++ b/docs/classes/errors.rst @@ -6,11 +6,9 @@ PStream: fluent, async, iteration =================================== - .. automodule:: pstream.errors :members: - Indices and tables ================== diff --git a/docs/classes/sync.rst b/docs/classes/sync.rst index 292145c..0ea427a 100644 --- a/docs/classes/sync.rst +++ b/docs/classes/sync.rst @@ -6,12 +6,9 @@ PStream: fluent, async, iteration =================================== -Hi! - .. autoclass:: pstream.Stream :members: - Indices and tables ================== diff --git a/pstream/_async/async_stream.py b/pstream/_async/async_stream.py index a7fe1da..4d5a0b3 100644 --- a/pstream/_async/async_stream.py +++ b/pstream/_async/async_stream.py @@ -19,7 +19,7 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -from .shim import AsyncShim, shim, async_shim, not_infinite_a, not_infinite_s +from .util import AsyncAdaptor, unwrap, not_infinite from pstream.errors import InfiniteCollectionError from pstream._async.functors import * @@ -63,11 +63,11 @@ def __init__(self, initial: Collection[T] = None): """ if initial is None: initial = [] - self.stream = AsyncShim.new(initial) + self.stream = AsyncAdaptor.new(initial) self._infinite = False - @not_infinite_a - @async_shim + @not_infinite + @unwrap async def collect(self) -> List[T]: """ Evaluates the stream, consuming it and returning a list of the final output. @@ -83,7 +83,7 @@ async def collect(self) -> List[T]: """ return await collect(self.stream) - @shim + @unwrap def chain(self, *iterables: Collection[T]): """ Returns a stream that links an arbitrary number of iterators to this iterator, in a chain. @@ -100,8 +100,8 @@ def chain(self, *iterables: Collection[T]): self.stream = chain(self.stream, *iterables) return self - @not_infinite_a - @async_shim + @unwrap + @not_infinite async def count(self): """ Evaluates the stream, consuming it and returning a count of the number of elements in the stream. @@ -116,7 +116,7 @@ async def count(self): """ return await count(self.stream) - @shim + @unwrap def distinct(self): """ Returns a stream of distinct elements. Distinction is computed by applying the builtin `hash` function @@ -135,7 +135,7 @@ def distinct(self): self.stream = distinct(self.stream) return self - @shim + @unwrap def distinct_with(self, key: Callable[[T], U]): """ Returns a stream of distinct elements. Distinction is computed by applying the builtin `hash` function @@ -160,7 +160,7 @@ def distinct_with(self, key: Callable[[T], U]): self.stream = distinct_with(key, self.stream) return self - @shim + @unwrap def enumerate(self): """ Returns a stream that yields the current count and the element during iteration. @@ -187,7 +187,7 @@ def enumerate(self): self.stream = enumerate(self.stream) return self - @shim + @unwrap def flatten(self): """ Returns a stream that flattens one level of nesting in a stream of elements that are themselves iterators. @@ -213,7 +213,7 @@ def flatten(self): self.stream = flatten(self.stream) return self - @shim + @unwrap def filter(self, predicate: Callable[[T], bool]): """ Returns a stream that filters each element using `predicate`. Only elements for which `predicate` @@ -233,7 +233,7 @@ def filter(self, predicate: Callable[[T], bool]): self.stream = filter(predicate, self.stream) return self - @shim + @unwrap def filter_false(self, predicate: Callable[[T], bool]): """ Returns a stream that filters each element using `predicate`. Only elements for which `predicate` @@ -253,8 +253,8 @@ def filter_false(self, predicate: Callable[[T], bool]): self.stream = filter_false(predicate, self.stream) return self - @not_infinite_a - @async_shim + @unwrap + @not_infinite async def for_each(self, f: Callable[[T], None]): """ Evaluates the stream, consuming it and calling `f` for each element in the stream. @@ -284,8 +284,8 @@ async def for_each(self, f: Callable[[T], None]): """ await for_each(f, self.stream) - @not_infinite_s - @shim + @unwrap + @not_infinite def group_by(self, key: Callable[[T], U]): """ Returns a stream that groups elements together using the provided `key` function. @@ -325,7 +325,7 @@ def group_by(self, key: Callable[[T], U]): self.stream = group_by(key, self.stream) return self - @shim + @unwrap def inspect(self, f: Callable[[T], None]): """ Returns a stream that calls the function, `f`, with a reference to each element before yielding it. @@ -352,7 +352,7 @@ def inspect(self, f: Callable[[T], None]): self.stream = inspect(f, self.stream) return self - @shim + @unwrap def map(self, f: Callable[[T], U]): """ Returns a stream that maps each value using `f`. @@ -369,7 +369,7 @@ def map(self, f: Callable[[T], U]): self.stream = map(f, self.stream) return self - @shim + @unwrap def pool(self, size: int): """ Returns a stream that will collect up to `size` elements into a list before yielding. @@ -397,7 +397,7 @@ def pool(self, size: int): self.stream = pool(self.stream, size) return self - @shim + @unwrap def skip(self, n: int): """ Returns a stream that skips over `n` number of elements. @@ -414,7 +414,7 @@ def skip(self, n: int): self.stream = skip(self.stream, n) return self - @shim + @unwrap def skip_while(self, predicate: Callable[[T], bool]): """ Returns a stream that rejects elements while `predicate` returns `True`. @@ -433,8 +433,8 @@ def skip_while(self, predicate: Callable[[T], bool]): self.stream = skip_while(predicate, self.stream) return self - @not_infinite_s - @shim + @unwrap + @not_infinite def sort(self): """ Returns a stream whose elements are sorted. @@ -452,8 +452,8 @@ def sort(self): self.stream = sort(self.stream) return self - @not_infinite_s - @shim + @unwrap + @not_infinite def sort_with(self, key: Callable[[T], U]): """ Returns a stream whose elements are sorted using the provided key selection function. @@ -475,7 +475,7 @@ def sort_with(self, key: Callable[[T], U]): self.stream = sort_with(key, self.stream) return self - @shim + @unwrap def step_by(self, step: int): """ Returns a stream which steps over items by a custom amount. Regardless of the step, the first item @@ -497,8 +497,8 @@ def step_by(self, step: int): self.stream = step_by(self.stream, step) return self - @not_infinite_s - @async_shim + @unwrap + @not_infinite async def reduce(self, f: Callable[[T], U], accumulator: U) -> U: """ Evaluates the stream, consuming it and applying the function `f` to each item in the stream, @@ -522,7 +522,7 @@ async def reduce(self, f: Callable[[T], U], accumulator: U) -> U: """ return await reduce(f, self.stream, accumulator) - @shim + @unwrap def repeat(self, element: T): """ Returns a stream that repeats an element endlessly. @@ -557,7 +557,7 @@ def repeat(self, element: T): self._infinite = True return self - @shim + @unwrap def repeat_with(self, f: Callable[[], T]): """ Returns a stream that yields the output of `f` endlessly. @@ -592,8 +592,8 @@ def repeat_with(self, f: Callable[[], T]): self._infinite = True return self - @not_infinite_s - @shim + @unwrap + @not_infinite def reverse(self): """ Returns a stream whose elements are reversed. @@ -612,7 +612,7 @@ def reverse(self): self.stream = reverse(self.stream) return self - @shim + @unwrap def take(self, n: int): """ Returns a stream that only iterates over the first `n` elements. @@ -630,7 +630,7 @@ def take(self, n: int): self._infinite = False return self - @shim + @unwrap def take_while(self, predicate: Callable[[T], bool]): """ Returns a stream that only accepts elements while `predicate` returns `True`. @@ -651,7 +651,7 @@ def take_while(self, predicate: Callable[[T], bool]): self._infinite = False return self - @shim + @unwrap def tee(self, *receivers): """ Returns a stream whose elements will be appended to objects in `receivers`. @@ -675,7 +675,7 @@ def tee(self, *receivers): self.stream = inspect(other.append, self.stream) return self - @shim + @unwrap def zip(self, *iterables: Collection[T]): """ Returns a stream that iterates over one or more iterators simultaneously. diff --git a/pstream/_async/shim.py b/pstream/_async/util.py similarity index 61% rename from pstream/_async/shim.py rename to pstream/_async/util.py index 11b850a..3cd00fe 100644 --- a/pstream/_async/shim.py +++ b/pstream/_async/util.py @@ -19,54 +19,52 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. - +from inspect import iscoroutinefunction from collections.abc import AsyncIterator, AsyncIterable, Iterator, Iterable from functools import wraps from pstream.errors import InfiniteCollectionError -def not_infinite_a(fn): - async def inner(self, *args, **kwargs): - if self._infinite: - raise InfiniteCollectionError(fn) - return await fn(self, *args, **kwargs) +def not_infinite(fn): + if iscoroutinefunction(fn): + @wraps(fn) + async def inner(self, *args, **kwargs): + if self._infinite: + raise InfiniteCollectionError(fn) + return await fn(self, *args, **kwargs) + else: + @wraps(fn) + def inner(self, *args, **kwargs): + if self._infinite: + raise InfiniteCollectionError(fn) + return fn(self, *args, **kwargs) return inner -def not_infinite_s(fn): - def inner(self, *args, **kwargs): - if self._infinite: - raise InfiniteCollectionError(fn) - return fn(self, *args, **kwargs) - return inner - - -def shim(fn): - @wraps(fn) - def inner(self, *args): - if isinstance(self.stream, AsyncShim): - self.stream = self.stream.stream - try: - return fn(self, *args) - finally: - self.stream = AsyncShim.new(self.stream) - return inner - - -def async_shim(fn): - @wraps(fn) - async def inner(self, *args): - if isinstance(self.stream, AsyncShim): - self.stream = self.stream.stream - try: - return await fn(self, *args) - finally: - self.stream = AsyncShim.new(self.stream) +def unwrap(fn): + if iscoroutinefunction(fn): + @wraps(fn) + async def inner(self, *args, **kwargs): + if isinstance(self.stream, AsyncAdaptor): + self.stream = self.stream.stream + try: + return await fn(self, *args, **kwargs) + finally: + self.stream = AsyncAdaptor.new(self.stream) + else: + @wraps(fn) + def inner(self, *args, **kwargs): + if isinstance(self.stream, AsyncAdaptor): + self.stream = self.stream.stream + try: + return fn(self, *args, **kwargs) + finally: + self.stream = AsyncAdaptor.new(self.stream) return inner -class AsyncShim: +class AsyncAdaptor: @staticmethod def new(stream): @@ -75,9 +73,9 @@ def new(stream): if isinstance(stream, AsyncIterable): return stream.__aiter__() if isinstance(stream, Iterator): - return AsyncShim(stream) + return AsyncAdaptor(stream) if isinstance(stream, Iterable): - return AsyncShim(stream.__iter__()) + return AsyncAdaptor(stream.__iter__()) raise TypeError def __init__(self, stream): diff --git a/pstream/_sync/stream.py b/pstream/_sync/stream.py index c98ba2c..8e715ba 100644 --- a/pstream/_sync/stream.py +++ b/pstream/_sync/stream.py @@ -35,6 +35,8 @@ from collections import namedtuple, defaultdict +from pstream._sync.util import not_infinite + try: # Py3 from collections.abc import Iterator, Iterable @@ -43,7 +45,6 @@ from collections import Iterator, Iterable from pstream.errors import InfiniteCollectionError -from pstream.utils.defensive import must_be_callable class Stream(object): @@ -82,6 +83,7 @@ def chain(self, *iterables): self._stream = itertools.chain(self._stream, *iterables) return self + @not_infinite def count(self): """ Evaluates the stream, consuming it and returning a count of the number of elements in the stream. @@ -94,13 +96,12 @@ def count(self): >>> count = Stream(range(100)).filter(lambda x: x % 2 is 0).count() >>> assert count == 50 """ - if self._infinite: - raise InfiniteCollectionError(Stream.count) count = 0 for _ in self: count += 1 return count + @not_infinite def collect(self): """ Evaluates the stream, consuming it and returning a list of the final output. @@ -114,8 +115,6 @@ def collect(self): >>> got = stream.collect() >>> assert got == [2, 4, 6, 8] """ - if self._infinite: - raise InfiniteCollectionError(Stream.collect) return [_ for _ in self] def distinct(self): @@ -145,7 +144,6 @@ def inner(): self._stream = inner() return self - @must_be_callable def distinct_with(self, key): """ Returns a stream of distinct elements. Distinction is computed by applying the builtin `hash` function @@ -207,7 +205,6 @@ def enumerate(self): self._stream = enumerate(self._stream) return self.map(lambda enumeration: Stream.Enumeration(*enumeration)) - @must_be_callable def filter(self, predicate): """ Returns a stream that filters each element using `predicate`. Only elements for which `predicate` @@ -226,7 +223,6 @@ def filter(self, predicate): self._stream = filter(predicate, self._stream) return self - @must_be_callable def filter_false(self, predicate): """ Returns a stream that filters each element using `predicate`. Only elements for which `predicate` @@ -269,7 +265,7 @@ def flatten(self): self._stream = (x for stream in self._stream for x in stream) return self - @must_be_callable + @not_infinite def for_each(self, f): """ Evaluates the stream, consuming it and calling `f` for each element in the stream. @@ -299,6 +295,7 @@ def for_each(self, f): for x in self: f(x) + @not_infinite def group_by(self, key): """ Returns a stream that groups elements together using the provided `key` function. @@ -345,7 +342,6 @@ def inner(): self._stream = inner() return self - @must_be_callable def inspect(self, f): """ Returns a stream that calls the function, `f`, with a reference to each element before yielding it. @@ -377,7 +373,6 @@ def inner(): self._stream = inner() return self - @must_be_callable def map(self, f): """ Returns a stream that maps each value using `f`. @@ -394,7 +389,7 @@ def map(self, f): self._stream = map(f, self._stream) return self - @must_be_callable + @not_infinite def reduce(self, f, accumulator): """ Evaluates the stream, consuming it and applying the function `f` to each item in the stream, @@ -418,6 +413,7 @@ def reduce(self, f, accumulator): """ return functools.reduce(f, self, accumulator) + @not_infinite def reverse(self): """ Returns a stream whose elements are reversed. @@ -463,7 +459,6 @@ def inner(): self._stream = inner() return self - @must_be_callable def skip_while(self, predicate): """ Returns a stream that rejects elements while `predicate` returns `True`. @@ -482,6 +477,7 @@ def skip_while(self, predicate): self._stream = itertools.dropwhile(predicate, self._stream) return self + @not_infinite def sort(self): """ Returns a stream whose elements are sorted. @@ -498,6 +494,7 @@ def sort(self): """ return self.sort_with(None) + @not_infinite def sort_with(self, key): """ Returns a stream whose elements are sorted using the provided key selection function. @@ -563,7 +560,6 @@ def inner(): self._infinite = False return self - @must_be_callable def take_while(self, predicate): """ Returns a stream that only accepts elements while `predicate` returns `True`. @@ -694,9 +690,14 @@ def repeat(self, element): ... print(error) Stream.collect was called on an infinitely repeating iterator. If you use Stream.repeat, then you MUST include either a Stream.take or a Stream.take_while if you wish to call Stream.collect """ - return self.repeat_with(lambda: element) - @must_be_callable + def inner(): + while True: + yield element + self._stream = inner() + self._infinite = True + return self + def repeat_with(self, f): """ Returns a stream that yields the output of `f` endlessly. diff --git a/pstream/_sync/util.py b/pstream/_sync/util.py new file mode 100644 index 0000000..aa696f5 --- /dev/null +++ b/pstream/_sync/util.py @@ -0,0 +1,12 @@ +from functools import wraps + +from pstream.errors import InfiniteCollectionError + + +def not_infinite(fn): + @wraps(fn) + def inner(self, *args, **kwargs): + if self._infinite: + raise InfiniteCollectionError(fn) + return fn(self, *args, **kwargs) + return inner diff --git a/pstream/errors.py b/pstream/errors.py index 88b12b2..418d0de 100644 --- a/pstream/errors.py +++ b/pstream/errors.py @@ -27,8 +27,3 @@ def __init__(self, method): 'If you use Stream.repeat, then you MUST include either a ' 'Stream.take or a Stream.take_while if you wish to ' 'call {}'.format(name, name)) - - -class NotCallableError(ValueError): - def __init__(self, method, received): - super(NotCallableError, self).__init__('{} requires a callable. Received {}'.format(method, type(received))) diff --git a/pstream/utils/__init__.py b/pstream/utils/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/pstream/utils/defensive.py b/pstream/utils/defensive.py deleted file mode 100644 index 1b33f10..0000000 --- a/pstream/utils/defensive.py +++ /dev/null @@ -1,34 +0,0 @@ -# MIT License -# -# Copyright (c) 2020 Christopher Henderson, chris@chenderson.org -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. - -from functools import wraps - -from pstream.errors import NotCallableError - - -def must_be_callable(fn): - @wraps(fn) - def inner(self, f, *args): - if not callable(f): - raise NotCallableError(fn.__qualname__ if hasattr(fn, '__qualname__') else fn.__name__, f) - return fn(self, f, *args) - return inner diff --git a/tests/_async/test_shim.py b/tests/_async/test_shim.py index 1de5b03..bd8b521 100644 --- a/tests/_async/test_shim.py +++ b/tests/_async/test_shim.py @@ -24,7 +24,7 @@ from pstream import AsyncStream from pstream._async.functors import binary_function_stream_factory -from pstream._async.shim import AsyncShim +from pstream._async.util import AsyncAdaptor from tests._async.utils import run_to_completion from tests.sync.test_stream import expect @@ -60,20 +60,20 @@ def inner(): @run_to_completion async def test_async_iterable(self): stream = AsyncStream(TestShim.AsyncIterable(range(10))) - self.assertFalse(isinstance(stream.stream, AsyncShim)) + self.assertFalse(isinstance(stream.stream, AsyncAdaptor)) got = await stream.filter(lambda x: x % 2).collect() self.assertEqual(got, [1, 3, 5, 7, 9]) @run_to_completion async def test_iterable(self): stream = AsyncStream(TestShim.Iterable(range(10))) - self.assertTrue(isinstance(stream.stream, AsyncShim)) + self.assertTrue(isinstance(stream.stream, AsyncAdaptor)) got = await stream.filter(lambda x: x % 2).collect() self.assertEqual(got, [1, 3, 5, 7, 9]) @run_to_completion async def test_shim_iterable(self): - stream = AsyncShim(TestShim.Iterable(range(10))) + stream = AsyncAdaptor(TestShim.Iterable(range(10))) i = 0 async for x in stream: self.assertEqual(x, i) @@ -82,24 +82,24 @@ async def test_shim_iterable(self): @run_to_completion async def test_async_iterator(self): stream = AsyncStream(TestShim.AsyncIterable(range(10)).__aiter__()) - self.assertFalse(isinstance(stream.stream, AsyncShim)) + self.assertFalse(isinstance(stream.stream, AsyncAdaptor)) got = await stream.filter(lambda x: x % 2).collect() self.assertEqual(got, [1, 3, 5, 7, 9]) @run_to_completion async def test_iterator(self): stream = AsyncStream(TestShim.Iterable(range(10)).__iter__()) - self.assertTrue(isinstance(stream.stream, AsyncShim)) + self.assertTrue(isinstance(stream.stream, AsyncAdaptor)) got = await stream.filter(lambda x: x % 2).collect() self.assertEqual(got, [1, 3, 5, 7, 9]) @expect(TypeError) def test_value_error(self): - AsyncShim(1) + AsyncAdaptor(1) @expect(TypeError) def test_factory_value_error(self): - AsyncShim.new(1) + AsyncAdaptor.new(1) @expect(TypeError) def test_factory_error(self): diff --git a/tests/sync/test_stream.py b/tests/sync/test_stream.py index e04e3b5..e28dca1 100644 --- a/tests/sync/test_stream.py +++ b/tests/sync/test_stream.py @@ -24,7 +24,7 @@ import unittest from functools import wraps -from pstream.errors import InfiniteCollectionError, NotCallableError +from pstream.errors import InfiniteCollectionError from pstream import Stream @@ -91,13 +91,6 @@ def fingerprint(name): got = Stream(people).distinct_with(fingerprint).collect() self.assertEqual(got, ['Bob', 'Alice', 'Eve', 'Achmed']) - def test_distinct_with_bad_fn(self): - try: - Stream().distinct_with(1).collect() - except NotCallableError: - return - raise Exception - def test_enumerate(self): self.assertEqual(Stream([0, 1, 2]).enumerate().collect(), [(0, 0), (1, 1), (2, 2)]) @@ -110,10 +103,6 @@ def test_filter(self): def test_filter_false(self): self.assertEqual(Stream([1, 2, 3, 4]).filter_false(lambda x: x % 2 == 0).collect(), [1, 3]) - @expect(NotCallableError) - def test_filter_bad(self): - Stream().filter(None) - def test_filter_empty(self): self.assertEqual(Stream([]).filter(lambda x: x % 2 == 0).collect(), []) @@ -154,10 +143,6 @@ def increment(self, element): Stream([1, 2, 3, 4, 5]).for_each(count.increment) self.assertEqual(count.count, 15) - @expect(NotCallableError) - def test_for_each_bad(self): - Stream().for_each(None) - def test_for_each_empty(self): class Called: def __init__(self): @@ -193,10 +178,6 @@ def test_inspect(self): got = Stream([1, 2, 3, 4]).filter(lambda x: x % 2 == 0).inspect(inspector.visit).collect() self.assertEqual(got, inspector.copy) - @expect(NotCallableError) - def test_inspect_bad(self): - Stream().inspect(None) - def test_inspect_then(self): inspector = TestStream.Inspector() got = Stream([1, 2, 3, 4]).filter(lambda x: x % 2 == 0).inspect(inspector.visit).map(lambda x: x * 2).collect() @@ -206,10 +187,6 @@ def test_inspect_then(self): def test_map(self): self.assertEqual(Stream([1, 2, 3, 4]).map(lambda x: x * 2).collect(), [2, 4, 6, 8]) - @expect(NotCallableError) - def test_map_bad(self): - Stream().map(None) - def test_map_empty(self): self.assertEqual(Stream([]).map(lambda x: x * 2).collect(), []) @@ -257,10 +234,6 @@ def add(a, b): got = Stream(numbers).reduce(add, 0) assert got == 45 - @expect(NotCallableError) - def test_reduce_bad(self): - Stream().reduce(None, 0) - def test_repeat(self): s = Stream().repeat(5) for _ in range(100): @@ -286,6 +259,31 @@ def test_repeat_count(self): except InfiniteCollectionError: pass + def test_repeat_with(self): + s = Stream().repeat_with(lambda: 5) + for _ in range(100): + self.assertEqual(next(s), 5) + + def test_repeat_with_break_upstream(self): + s = Stream([1, 2, 3]).repeat_with(lambda: 5) + for _ in range(100): + self.assertEqual(next(s), 5) + + def test_repeat_with_terminator(self): + self.assertEqual(Stream().repeat_with(lambda: 5).take(5).collect(), [5, 5, 5, 5, 5]) + + def test_repeat_with_collection(self): + try: + Stream().repeat_with(lambda: 1).collect() + except InfiniteCollectionError: + pass + + def test_repeat_with_count(self): + try: + Stream().repeat_with(lambda: 1).count() + except InfiniteCollectionError: + pass + def test_reverse(self): self.assertEqual(Stream([1, 2, 3, 4]).reverse().collect(), [4, 3, 2, 1]) @@ -295,10 +293,6 @@ def test_skip(self): def test_skip_while(self): self.assertEqual(Stream([1, 2, 3, 4, 5, 6, 7, 8, 9]).skip_while(lambda x: x < 5).collect(), [5, 6, 7, 8, 9]) - @expect(NotCallableError) - def test_skip_while_bad(self): - Stream().skip_while(None) - def test_sort(self): arr = [12, 233, 4567, 344523, 7, 567, 34, 5678, 456, 23, 4, 7, 63, 45, 345] got = Stream(arr).sort().collect() @@ -341,10 +335,6 @@ def test_take_more_than_there_are(self): def test_take_while(self): self.assertEqual(Stream([1, 2, 3, 4, 5, 6, 7, 8, 9]).take_while(lambda x: x < 5).collect(), [1, 2, 3, 4]) - @expect(NotCallableError) - def test_take_while_bad(self): - Stream().take_while(None) - def test_tee(self): a = list() b = list() From 6f97c0ca03cd1560602420013b719c32123b7a3e Mon Sep 17 00:00:00 2001 From: christopher-henderson Date: Tue, 27 Oct 2020 20:16:29 -0700 Subject: [PATCH 2/4] fix py2 tests --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index a75e5e3..2249575 100644 --- a/Makefile +++ b/Makefile @@ -29,4 +29,4 @@ test_html: .PHONY: testpy2 testpy2: python -m pytest --cov=pstream -W error --cov-report=xml --cov-branch tests/sync - python -m flake8 pstream/_sync/*.py pstream/utils/*.py pstream/errors.py tests/sync/*.py --extend-ignore=F405,E501,F403,F401 + python -m flake8 pstream/_sync/*.py pstream/errors.py tests/sync/*.py --extend-ignore=F405,E501,F403,F401 From d076af9961eb50013e542204b684230bdf79e642 Mon Sep 17 00:00:00 2001 From: christopher-henderson Date: Wed, 28 Oct 2020 07:26:35 -0700 Subject: [PATCH 3/4] version bump --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 75c8531..98f28e8 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="pstream", - version="0.0.25", + version="0.0.26", author="Christopher Henderson", author_email="chris@chenderson.org", description="Provides a Stream and AsyncStream for composing fluent lazily evaluated, _sync fusion, iterators.", From b19ccd4eff8357ea7433332a501d0cce38efd627 Mon Sep 17 00:00:00 2001 From: christopher-henderson Date: Wed, 28 Oct 2020 07:33:32 -0700 Subject: [PATCH 4/4] version bump --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 98f28e8..c7edc52 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ version="0.0.26", author="Christopher Henderson", author_email="chris@chenderson.org", - description="Provides a Stream and AsyncStream for composing fluent lazily evaluated, _sync fusion, iterators.", + description="Provides a Stream and AsyncStream for composing fluent lazily evaluated, sync/async fusion, iterators.", long_description=long_description, long_description_content_type="text/markdown", url="https://github.com/christopher-henderson/pstream",