Skip to content

aio-libs/aiokafka

Repository files navigation

aiokafka

|Build status| |Coverage|

asyncio client for kafka

AIOKafkaProducer

AIOKafkaProducer is a high-level, asynchronous message producer.

Example of AIOKafkaProducer usage:

import asyncio
from aiokafka import AIOKafkaProducer

@asyncio.coroutine
def produce(loop):
    producer = AIOKafkaProducer(loop=loop, bootstrap_servers='localhost:1234')
    yield from producer.start()
    future1 = yield from producer.send('foobar', b'some_message_bytes')
    resp = yield from future1
    print("Message was produced to partition %i with offset %i"%(resp.partition, resp.offset))
    future2 = yield from producer.send('foobar', key=b'foo', value=b'bar')
    future3 = yield from producer.send('foobar', b'message for partition 1', partition=1)
    yield from asyncio.wait([future2, future3], loop=loop)
    yield from producer.stop()

loop = asyncio.get_event_loop()
loop.run_until_complete(produce(loop))
loop.close()

AIOKafkaConsumer

AIOKafkaConsumer is a high-level, asynchronous message consumer. It interacts with the assigned kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).

Example of AIOKafkaConsumer usage:

import asyncio
from kafka.common import KafkaError
from aiokafka import AIOKafkaConsumer

@asyncio.coroutine
def consume_task(consumer):
    while True:
        try:
            msg = yield from consumer.getone()
            print("consumed: ", msg.topic, msg.partition, msg.offset, msg.value)
        except KafkaError as err:
            print("error while consuming message: ", err)

loop = asyncio.get_event_loop()
consumer = AIOKafkaConsumer('topic1', 'topic2', loop=loop, bootstrap_servers='localhost:1234')
loop.run_until_complete(consumer.start())
c_task = asyncio.async(consume_task(consumer))
try:
    loop.run_forever()
finally:
    loop.run_until_complete(consumer.stop())
    c_task.close()
    loop.close()

Running tests

Docker is required to run tests. See https://docs.docker.com/engine/installation for installation notes.

Setting up tests requirements (assuming you're within virtualenv on ubuntu 14.04+):

sudo apt-get install -y libsnappy-dev
pip install flake8 pytest pytest-cov pytest-catchlog docker-py python-snappy coveralls .

Running tests:

make cov

To run tests with a specific version of Kafka (default one is 0.9.0.1) use KAFKA_VERSION variable:

make cov KAFKA_VERSION=0.8.2.1