Skip to content

Commit

Permalink
Actor: Adds App.reply_expires setting
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Jun 16, 2017
1 parent a0c58a2 commit 4c27d5e
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 9 deletions.
1 change: 1 addition & 0 deletions faust/actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def _reply_topic(self, topic: str) -> TopicT:
partitions=1,
replicas=0,
deleting=True,
retention=self.app.reply_expires,
value_type=ReqRepResponse,
)

Expand Down
17 changes: 14 additions & 3 deletions faust/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import faust

from collections import defaultdict
from datetime import timedelta
from functools import wraps
from heapq import heappush, heappop
from itertools import chain
Expand All @@ -14,7 +15,7 @@
from uuid import uuid4

from . import transport
from .actors import ActorFun, Actor, ActorT, ReplyConsumer
from .actors import ActorFun, Actor, ActorT, ReplyConsumer, SinkT
from .exceptions import ImproperlyConfigured
from .sensors import SensorDelegate
from .topics import Topic, TopicManager, TopicManagerT
Expand Down Expand Up @@ -70,8 +71,12 @@
#: Can be customized by setting ``App(table_cleanup_interval=...)``.
TABLE_CLEANUP_INTERVAL = 30.0

#: Prefix used for reply topics.
REPLY_TOPIC_PREFIX = 'f-reply-'

#: Default expiry time for replies in seconds (float/timedelta).
DEFAULT_REPLY_EXPIRES = timedelta(days=1)

#: Format string for ``repr(app)``.
APP_REPR = """
<{name}({s.id}): {s.url} {s.state} actors({actors}) sources({sources})>
Expand Down Expand Up @@ -260,6 +265,7 @@ def __init__(self, id: str,
replication_factor: int = 1,
default_partitions: int = 8,
reply_to: str = None,
reply_expires: Seconds = DEFAULT_REPLY_EXPIRES,
Stream: SymbolArg = DEFAULT_STREAM_CLS,
Table: SymbolArg = DEFAULT_TABLE_CLS,
TableManager: SymbolArg = DEFAULT_TABLE_MANAGER_CLS,
Expand All @@ -280,6 +286,8 @@ def __init__(self, id: str,
self.replication_factor = replication_factor
self.default_partitions = default_partitions
self.reply_to = reply_to or REPLY_TOPIC_PREFIX + str(uuid4())
self.reply_expires = want_seconds(
reply_expires or DEFAULT_REPLY_EXPIRES)
self.avro_registry_url = avro_registry_url
self.Stream = symbol_by_name(Stream)
self.TableType = symbol_by_name(Table)
Expand Down Expand Up @@ -322,17 +330,20 @@ def topic(self, *topics: str,
config=config,
)

def actor(self, *,
def actor(self,
topic: Union[str, TopicT] = None,
*,
name: str = None,
concurrency: int = 1) -> Callable[[ActorFun], ActorT]:
concurrency: int = 1,
sink: Iterable[SinkT] = None) -> Callable[[ActorFun], ActorT]:
def _inner(fun: ActorFun) -> ActorT:
actor = Actor(
fun,
name=name,
app=self,
topic=topic,
concurrency=concurrency,
sink=sink,
on_error=self._on_actor_error,
)
self.actors[actor.name] = actor
Expand Down
18 changes: 12 additions & 6 deletions faust/types/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from ..utils.types.collections import NodeT
from ..utils.types.services import ServiceT
from ._coroutines import StreamCoroutine
from .actors import ActorFun, ActorT
from .actors import ActorFun, ActorT, SinkT
from .codecs import CodecArg
from .core import K, V
from .serializers import RegistryT
Expand Down Expand Up @@ -59,6 +59,7 @@ class AppT(ServiceT):
replication_factor: int
default_partitions: int # noqa: E704
reply_to: str
reply_expires: float
avro_registry_url: str
store: str

Expand All @@ -73,13 +74,15 @@ def __init__(self, id: str,
store: str = 'memory://',
avro_registry_url: str = None,
client_id: str = '',
commit_interval: Seconds = 1.0,
table_cleanup_interval: Seconds = 1.0,
commit_interval: Seconds = 9999.0,
table_cleanup_interval: Seconds = 9999.0,
key_serializer: CodecArg = 'json',
value_serializer: CodecArg = 'json',
num_standby_replicas: int = 0,
replication_factor: int = 1,
default_partitions: int = 8,
reply_to: str = None,
reply_expires: Seconds = 9999.0,
Stream: SymbolArg = '',
Table: SymbolArg = '',
TableManager: SymbolArg = '',
Expand All @@ -104,9 +107,12 @@ def topic(self, *topics: str,
...

@abc.abstractmethod
def actor(self, *,
topic: Union[str, TopicT],
concurrency: int = 1) -> Callable[[ActorFun], ActorT]:
def actor(self,
topic: Union[str, TopicT] = None,
*,
name: str = None,
concurrency: int = 1,
sink: Iterable[SinkT] = None) -> Callable[[ActorFun], ActorT]:
...

@abc.abstractmethod
Expand Down

0 comments on commit 4c27d5e

Please sign in to comment.