Skip to content

joowani/kq

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

15 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

KQ: Kafka-based Job Queue for Python

Build Status Documentation Status Package Version Python Versions Test Coverage Issues Open MIT License

KQ (Kafka Queue) is a lightweight Python library which lets you queue and execute jobs asynchronously using Apache Kafka. It uses kafka-python under the hood.

Announcements

  • KQ version 2.0.0 is now out!
  • Please see the releases page for latest updates.

Requirements

Installation

To install a stable version from PyPI (recommended):

~$ pip install kq

To install the latest version directly from GitHub:

~$ pip install -e [email protected]:joowani/kq.git@master#egg=kq

You may need to use sudo depending on your environment.

Getting Started

First, ensure that your Kafka instance is up and running:

~$ ./kafka-server-start.sh -daemon server.properties

Define your KQ worker module:

# my_worker.py

import logging

from kafka import KafkaConsumer
from kq import Worker

# Set up logging.
formatter = logging.Formatter('[%(levelname)s] %(message)s')
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger = logging.getLogger('kq.worker')
logger.setLevel(logging.DEBUG)
logger.addHandler(stream_handler)

# Set up a Kafka consumer.
consumer = KafkaConsumer(
    bootstrap_servers='127.0.0.1:9092',
    group_id='group',
    auto_offset_reset='latest'
)

# Set up a worker.
worker = Worker(topic='topic', consumer=consumer)
worker.start()

Start your worker:

~$ python my_worker.py
[INFO] Starting Worker(hosts=127.0.0.1:9092 topic=topic, group=group) ...

Enqueue a function call:

import requests

from kafka import KafkaProducer
from kq import Queue

# Set up a Kafka producer.
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')

# Set up a queue.
queue = Queue(topic='topic', producer=producer)

# Enqueue a function call.
job = queue.enqueue(requests.get, 'https://www.google.com')

Sit back and watch the worker process it in the background:

~$ python my_worker.py
[INFO] Starting Worker(hosts=127.0.0.1:9092, topic=topic, group=group) ...
[INFO] Processing Message(topic=topic, partition=0, offset=0) ...
[INFO] Executing job c7bf2359: requests.api.get('https://www.google.com')
[INFO] Job c7bf2359 returned: <Response [200]>

NEW in 2.0.0: You can now specify the job timeout, message key and partition:

job = queue.using(timeout=5, key=b'foo', partition=0).enqueue(requests.get, 'https://www.google.com')

Check out the documentation for more information.

Contributing

Please have a look at this page before submitting a pull request. Thanks!

Credits

This project was inspired by RQ.