Skip to content

Latest commit

 

History

History
 
 

clojure

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

kafka-clj

kafka-clj provides a producer and consumer that supports a basic fetch API as well as a managed sequence interface. Multifetch is not supported yet.

Quick Start

Download and start Kafka.

Pull dependencies with Leiningen:

$ lein deps

And run the example:

$ lein run-example

Usage

Sending messages

(with-open [p (producer "localhost" 9092)]
  (produce p "test" 0 "Message 1")
  (produce p "test" 0 ["Message 2" "Message 3"]))

Simple consumer

(with-open [c (consumer "localhost" 9092)]
  (let [offs (offsets c "test" 0 -1 10)]
    (consume c "test" 0 (last offs) 1000000)))

Consumer sequence

(with-open [c (consumer "localhost" 9092)]
  (doseq [m (consume-seq c "test" 0 {:blocking true})]
    (println m)))

Following options are supported:

  • :blocking boolean default false, sequence returns nil the first time fetch does not return new messages. If set to true, the sequence tries to fetch new messages :repeat-count times every :repeat-timeout milliseconds.
  • :repeat-count int number of attempts to fetch new messages before terminating, default 10.
  • :repeat-timeout int wait time in milliseconds between fetch attempts, default 1000.
  • :offset long initialized to highest offset if not provided.
  • :max-size int max result message size, default 1000000.

Serialization

Load namespace kafka.print for basic print_dup/read-string serialization or kafka.serializeable for Java object serialization. For custom serialization implement Pack and Unpack protocols.

Questions? Email adam.smyczek _at_ gmail.com.