Skip to content

Commit

Permalink
Actor: Sink can now be: actor, topic or callable/async callable
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Jun 14, 2017
1 parent f5e8267 commit a8f8d32
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
9 changes: 7 additions & 2 deletions faust/actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import typing
from collections import defaultdict
from typing import (
Any, AsyncIterable, AsyncIterator, Awaitable,
Any, AsyncIterable, AsyncIterator, Awaitable, Callable,
Iterable, List, MutableMapping, MutableSequence, Union, cast,
)
from uuid import uuid4
Expand Down Expand Up @@ -319,7 +319,12 @@ async def _slurp(self, res: ActorInstanceT, it: AsyncIterator):

async def _delegate_to_sinks(self, value: Any) -> None:
for sink in self._sinks:
await maybe_async(sink(value))
if isinstance(sink, ActorT):
await cast(ActorT, sink).send(value=value)
elif isinstance(sink, TopicT):
await cast(TopicT, sink).send(value=value)
else:
await maybe_async(cast(Callable, sink)(value))

async def _reply(self, key: Any, value: Any, req: ReqRepRequest) -> None:
assert req.reply_to
Expand Down
5 changes: 4 additions & 1 deletion faust/types/actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ class AppT: ... # noqa

ActorErrorHandler = Callable[['ActorT', Exception], Awaitable]
ActorFun = Callable[[AsyncIterator], Union[Awaitable, AsyncIterable]]
SinkT = Callable[[Any], Union[Awaitable, None]]

#: A sink can be: Actor, Topic,
#: or callable/async callable taking value as argument.
SinkT = Union['ActorT', TopicT, Callable[[Any], Union[Awaitable, None]]]


class ActorT(ServiceT):
Expand Down

0 comments on commit a8f8d32

Please sign in to comment.