kafka-clj provides a producer and consumer that supports a basic fetch API as well as a managed sequence interface. Multifetch is not supported yet.
Download and start Kafka.
Pull dependencies with Leiningen:
$ lein deps
And run the example:
$ lein run-example
(with-open [p (producer "localhost" 9092)]
(produce p "test" 0 "Message 1")
(produce p "test" 0 ["Message 2" "Message 3"]))
(with-open [c (consumer "localhost" 9092)]
(let [offs (offsets c "test" 0 -1 10)]
(consume c "test" 0 (last offs) 1000000)))
(with-open [c (consumer "localhost" 9092)]
(doseq [m (consume-seq c "test" 0 {:blocking true})]
(println m)))
Following options are supported:
- :blocking boolean default false, sequence returns nil the first time fetch does not return new messages. If set to true, the sequence tries to fetch new messages :repeat-count times every :repeat-timeout milliseconds.
- :repeat-count int number of attempts to fetch new messages before terminating, default 10.
- :repeat-timeout int wait time in milliseconds between fetch attempts, default 1000.
- :offset long initialized to highest offset if not provided.
- :max-size int max result message size, default 1000000.
Load namespace kafka.print for basic print_dup/read-string serialization or kafka.serializeable for Java object serialization. For custom serialization implement Pack and Unpack protocols.
Questions? Email adam.smyczek _at_ gmail.com.