- "Attaching" should be deprecated and transactions should be used to send messages as we commit.
Forward request to node with key
https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
User must explicitly mark tables as public for table to be exposed in HTTP interface:
user_to_amount = app.Table('user_to_amount', public=True)
List available tables
GET localhost:6666/api/table/ 200 {"results": ["user_to_amount"]}
List of key/value pairs in the table (with pagination)
GET localhost:6666/api/table/user_to_amount/?page= 200 {"results": {"key", "value"}}
Get value by key:
GET localhost:6666/api/table/user_to_amount/key/ 200 {"key": "value"}
If content-type is set to text/html, return HTML pages allowing the user to browse key/value pairs.
- Stream/Stream join
- Table/Table join
- Stream/Table, Table/Stream join.
See faust/joins.py
API already exposed in faust.streams.Stream, but not implemented.
faust table
Show tables from the command-line (maybe use https://robpol86.github.io/terminaltables/)
Partition assignor links:
- https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java (KafkaStream’s partition Assignor)
- https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal (Partition assignor protocol used by kafka)
- https://github.com/dpkp/kafka-python/blob/master/kafka/coordinator/assignors/roundrobin.py (Kafka python’s roundrobin parition assignor for a simpler example of the partition assignor)
- https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy (Sticky Partition Assignment strategy that was added recently)
Standby Tables
Like standby tasks in KS
Note: There are no standby streams, this is only for tables.
TopicConductor consumes data from other partitions, to quickly recover if one of the nodes go down.
Probably will have to change TopicConductor._topicmap to use
(topic, partition)
as key.New attribute:
Table.standby
Can be used for introspection only, to quickly check if a stream is standby, or to be used in for example
repr(table)
.
Nested data-structures, like
Mapping[str, List]
,Mapping[str, Set]
Can be accomplished by treating the changelog as a database "transaction log"
For example, adding a new element to a Mapping of sets:
class SubReq(faust.Record): topic: str class PubReq(faust.Record): topic: str message: str subscribers = app.Table('subscribers', type=set) @app.agent() async def subscribe(subscriptions: Stream[SubReq]) -> AsyncIterable[bool]: async for subsription in subscriptions: subscribers[subscription.topic].add(subscriber.account) @app.agent() async def send_to_subscribers(requests): async for req in requests: for account in subscribers[req.topic]: accounts.get(account).send_message(req.message) @route('/(?P<topic>/send/') @accept_methods('POST') async def send_to_subscribers(request): await send_to_subscribers.send(PubReq( topic=request.POST['topic'], message=request.POST['message'], )
Adding an element produces the following changelog:
KEY=topic VALUE={'action': 'add', 'value': new_member}
while removing an element produces the changelog:
KEY=topic VALUE={'action': 'remove', 'value': new_member}
- NOTE: Not sure how this would coexist with windowing, but maybe it will
work just by the Window+key keying.
Daemonization
Handled by supervisord/circus ?
Sentry/Raven
faust
command-line toolDONE:
$ faust -A examples.simple worker $ FAUSTAPP=examples.simple faust worker
TODO(?):
$ faust -A examples.simple status $ faust -A examples.simple ping $ faust -A examples.simple send topic [value [ key]]
Need to write functional tests: test behavior, not coverage.
Need to dive into C to add callbacks to C client so that it can be connected to the event loop.
There are already NodeJS clients using librdkafka so this should definitely be possible.
Look at confluent-kafka for inspiration.
through()
latencygroup_by()
latency
GET localhost:6666/stats/
Returns: general stats events processed/s, total events, commit()
latency etc.,
GET localhost:6666/stats/topic/mytopic/
Stats related to topic by name.
GET localhost:6666/stats/task/mytask/
Stats related to task by name.
GET localhost:6666/stats/table/mytable/
Stats related to table by table name.
Show graphs in realtime: Wow factor+++ :-)
Find out if there are any obvious optimizations that can be applied as it's currently quite slow.
Introduction/README
Tutorial
Glossary (docs/glossary.rst)
User Guide (docs/userguide/)
Streams
Tables
Models
Availability
- partitioning
- recovery
- acknowledgements
Sensors
Deployment
- daemonization
- uvloop vs. asyncio
- debugging (aiomonitor)
- logging
Web API
These are very very very low priority tasks, and more of a convenience if anyone wants to learn Python typing.
Add typing to (either .pyi header files, or fork projects):
- aiokafka
- kafka-python
- aiohttp
- avro-python3
WeakSet missing from mypy
Not really a task, but a note to keep checking when this is fixed in a future mypy version.
Things to replace Celery, maybe not in Core but in a separate library.
Chains
Chords/Barrier
- synchronization should be possible:
chord_id = uuid(); requests = [....]
,
then each agent forwards a completion message to an agent that keeps track of counts:
chord_unlock.send(key=chord_id, value=(chord_size, callback) when the `chord_unlock` agent sees that ``count > chord_size``, it calls the callback