Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some cleanup and refactor #15

Merged
merged 4 commits into from
Oct 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ test_html:
.PHONY: testpy2
testpy2:
python -m pytest --cov=pstream -W error --cov-report=xml --cov-branch tests/sync
python -m flake8 pstream/_sync/*.py pstream/utils/*.py pstream/errors.py tests/sync/*.py --extend-ignore=F405,E501,F403,F401
python -m flake8 pstream/_sync/*.py pstream/errors.py tests/sync/*.py --extend-ignore=F405,E501,F403,F401
2 changes: 0 additions & 2 deletions docs/classes/errors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@
PStream: fluent, async, iteration
===================================


.. automodule:: pstream.errors
:members:


Indices and tables
==================

Expand Down
3 changes: 0 additions & 3 deletions docs/classes/sync.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@
PStream: fluent, async, iteration
===================================

Hi!

.. autoclass:: pstream.Stream
:members:


Indices and tables
==================

Expand Down
74 changes: 37 additions & 37 deletions pstream/_async/async_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from .shim import AsyncShim, shim, async_shim, not_infinite_a, not_infinite_s
from .util import AsyncAdaptor, unwrap, not_infinite
from pstream.errors import InfiniteCollectionError
from pstream._async.functors import *

Expand Down Expand Up @@ -63,11 +63,11 @@ def __init__(self, initial: Collection[T] = None):
"""
if initial is None:
initial = []
self.stream = AsyncShim.new(initial)
self.stream = AsyncAdaptor.new(initial)
self._infinite = False

@not_infinite_a
@async_shim
@not_infinite
@unwrap
async def collect(self) -> List[T]:
"""
Evaluates the stream, consuming it and returning a list of the final output.
Expand All @@ -83,7 +83,7 @@ async def collect(self) -> List[T]:
"""
return await collect(self.stream)

@shim
@unwrap
def chain(self, *iterables: Collection[T]):
"""
Returns a stream that links an arbitrary number of iterators to this iterator, in a chain.
Expand All @@ -100,8 +100,8 @@ def chain(self, *iterables: Collection[T]):
self.stream = chain(self.stream, *iterables)
return self

@not_infinite_a
@async_shim
@unwrap
@not_infinite
async def count(self):
"""
Evaluates the stream, consuming it and returning a count of the number of elements in the stream.
Expand All @@ -116,7 +116,7 @@ async def count(self):
"""
return await count(self.stream)

@shim
@unwrap
def distinct(self):
"""
Returns a stream of distinct elements. Distinction is computed by applying the builtin `hash` function
Expand All @@ -135,7 +135,7 @@ def distinct(self):
self.stream = distinct(self.stream)
return self

@shim
@unwrap
def distinct_with(self, key: Callable[[T], U]):
"""
Returns a stream of distinct elements. Distinction is computed by applying the builtin `hash` function
Expand All @@ -160,7 +160,7 @@ def distinct_with(self, key: Callable[[T], U]):
self.stream = distinct_with(key, self.stream)
return self

@shim
@unwrap
def enumerate(self):
"""
Returns a stream that yields the current count and the element during iteration.
Expand All @@ -187,7 +187,7 @@ def enumerate(self):
self.stream = enumerate(self.stream)
return self

@shim
@unwrap
def flatten(self):
"""
Returns a stream that flattens one level of nesting in a stream of elements that are themselves iterators.
Expand All @@ -213,7 +213,7 @@ def flatten(self):
self.stream = flatten(self.stream)
return self

@shim
@unwrap
def filter(self, predicate: Callable[[T], bool]):
"""
Returns a stream that filters each element using `predicate`. Only elements for which `predicate`
Expand All @@ -233,7 +233,7 @@ def filter(self, predicate: Callable[[T], bool]):
self.stream = filter(predicate, self.stream)
return self

@shim
@unwrap
def filter_false(self, predicate: Callable[[T], bool]):
"""
Returns a stream that filters each element using `predicate`. Only elements for which `predicate`
Expand All @@ -253,8 +253,8 @@ def filter_false(self, predicate: Callable[[T], bool]):
self.stream = filter_false(predicate, self.stream)
return self

@not_infinite_a
@async_shim
@unwrap
@not_infinite
async def for_each(self, f: Callable[[T], None]):
"""
Evaluates the stream, consuming it and calling `f` for each element in the stream.
Expand Down Expand Up @@ -284,8 +284,8 @@ async def for_each(self, f: Callable[[T], None]):
"""
await for_each(f, self.stream)

@not_infinite_s
@shim
@unwrap
@not_infinite
def group_by(self, key: Callable[[T], U]):
"""
Returns a stream that groups elements together using the provided `key` function.
Expand Down Expand Up @@ -325,7 +325,7 @@ def group_by(self, key: Callable[[T], U]):
self.stream = group_by(key, self.stream)
return self

@shim
@unwrap
def inspect(self, f: Callable[[T], None]):
"""
Returns a stream that calls the function, `f`, with a reference to each element before yielding it.
Expand All @@ -352,7 +352,7 @@ def inspect(self, f: Callable[[T], None]):
self.stream = inspect(f, self.stream)
return self

@shim
@unwrap
def map(self, f: Callable[[T], U]):
"""
Returns a stream that maps each value using `f`.
Expand All @@ -369,7 +369,7 @@ def map(self, f: Callable[[T], U]):
self.stream = map(f, self.stream)
return self

@shim
@unwrap
def pool(self, size: int):
"""
Returns a stream that will collect up to `size` elements into a list before yielding.
Expand Down Expand Up @@ -397,7 +397,7 @@ def pool(self, size: int):
self.stream = pool(self.stream, size)
return self

@shim
@unwrap
def skip(self, n: int):
"""
Returns a stream that skips over `n` number of elements.
Expand All @@ -414,7 +414,7 @@ def skip(self, n: int):
self.stream = skip(self.stream, n)
return self

@shim
@unwrap
def skip_while(self, predicate: Callable[[T], bool]):
"""
Returns a stream that rejects elements while `predicate` returns `True`.
Expand All @@ -433,8 +433,8 @@ def skip_while(self, predicate: Callable[[T], bool]):
self.stream = skip_while(predicate, self.stream)
return self

@not_infinite_s
@shim
@unwrap
@not_infinite
def sort(self):
"""
Returns a stream whose elements are sorted.
Expand All @@ -452,8 +452,8 @@ def sort(self):
self.stream = sort(self.stream)
return self

@not_infinite_s
@shim
@unwrap
@not_infinite
def sort_with(self, key: Callable[[T], U]):
"""
Returns a stream whose elements are sorted using the provided key selection function.
Expand All @@ -475,7 +475,7 @@ def sort_with(self, key: Callable[[T], U]):
self.stream = sort_with(key, self.stream)
return self

@shim
@unwrap
def step_by(self, step: int):
"""
Returns a stream which steps over items by a custom amount. Regardless of the step, the first item
Expand All @@ -497,8 +497,8 @@ def step_by(self, step: int):
self.stream = step_by(self.stream, step)
return self

@not_infinite_s
@async_shim
@unwrap
@not_infinite
async def reduce(self, f: Callable[[T], U], accumulator: U) -> U:
"""
Evaluates the stream, consuming it and applying the function `f` to each item in the stream,
Expand All @@ -522,7 +522,7 @@ async def reduce(self, f: Callable[[T], U], accumulator: U) -> U:
"""
return await reduce(f, self.stream, accumulator)

@shim
@unwrap
def repeat(self, element: T):
"""
Returns a stream that repeats an element endlessly.
Expand Down Expand Up @@ -557,7 +557,7 @@ def repeat(self, element: T):
self._infinite = True
return self

@shim
@unwrap
def repeat_with(self, f: Callable[[], T]):
"""
Returns a stream that yields the output of `f` endlessly.
Expand Down Expand Up @@ -592,8 +592,8 @@ def repeat_with(self, f: Callable[[], T]):
self._infinite = True
return self

@not_infinite_s
@shim
@unwrap
@not_infinite
def reverse(self):
"""
Returns a stream whose elements are reversed.
Expand All @@ -612,7 +612,7 @@ def reverse(self):
self.stream = reverse(self.stream)
return self

@shim
@unwrap
def take(self, n: int):
"""
Returns a stream that only iterates over the first `n` elements.
Expand All @@ -630,7 +630,7 @@ def take(self, n: int):
self._infinite = False
return self

@shim
@unwrap
def take_while(self, predicate: Callable[[T], bool]):
"""
Returns a stream that only accepts elements while `predicate` returns `True`.
Expand All @@ -651,7 +651,7 @@ def take_while(self, predicate: Callable[[T], bool]):
self._infinite = False
return self

@shim
@unwrap
def tee(self, *receivers):
"""
Returns a stream whose elements will be appended to objects in `receivers`.
Expand All @@ -675,7 +675,7 @@ def tee(self, *receivers):
self.stream = inspect(other.append, self.stream)
return self

@shim
@unwrap
def zip(self, *iterables: Collection[T]):
"""
Returns a stream that iterates over one or more iterators simultaneously.
Expand Down
Loading