Detailed documentation: https://crystaldoc.info/github/BT-OpenSource/crafka/main/index.html
Add this to your application's shard.yml
:
dependencies:
crafka:
github: bt-opensource/crafka
require "crafka"
producer = Kafka::Producer.new({"bootstrap.servers" => "localhost:9092", "broker.address.family" => "v4"})
producer.produce(topic: "topic_name", payload: "my message".to_slice)
# Optionally
producer.poll # Serves queued callbacks
producer.flush # Wait for outstanding produce requests to complete
All available args to #produce
: topic
, payload
, key
, timestamp
.
librdkafka recommends that rd_kafka_poll
is called at regular intervals to serve queued callbacks. This functionality is built in to Crafka.
By default after each #produce
, a Kafka::Producer
will call poll if it hasn't polled in the last 5 seconds.
You can configure this with the poll_interval
argument:
producer = Kafka::Producer.new(
{"bootstrap.servers" => "localhost:9092", "broker.address.family" => "v4"},
poll_interval: 30
)
To disable auto polling, set poll_interval
to 0.
To enable capturing of the statistics described here you can pass a stats_path
argument to Kafka::Producer.new
containing the location of a file to be written to.
Also ensure that you set the statistics.interval.ms
in your producer config.
producer = Kafka::Producer.new(
{"bootstrap.servers" => "localhost:9092", "broker.address.family" => "v4", "statistics.interval.ms" => "5000"},
stats_path: "/some/directory/librdkafka_stats.json"
)
consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9092", "broker.address.family" => "v4", "group.id" => "consumer_group_name"})
consumer.subscribe("topic_name")
consumer.each do |message|
# message is an instance of Kafka::Message
puts message.payload
end
consumer.close
consumer.subscribe("topic_name", "another_topic", "more_and_more")
consumer.subscribe("^starts_with") # subscribe to multiple with a regex
make setup
crystal spec
- Update shard.yml and
src/crafka.cr
with new version number - Update CHANGELOG.md with changes
- Commit and tag commit
Originally forked from: https://github.com/CloudKarafka/kafka.cr