Frafka is a Kafka implementation for Frizzle based on confluent-go-kafka.
Frizzle is a magic message (Msg
) bus designed for parallel processing w many goroutines.
Receive()
messages from a configuredSource
- Do your processing, possibly
Send()
eachMsg
on to one or moreSink
destinations Ack()
(orFail()
) theMsg
to notify theSource
that processing completed
As of Go 1.11, frafka uses go mod for dependency management.
Frafka depends on C library librdkafka
(>=v0.11.6
). For Debian 9+ (which includes golang docker images),
it has to be built from source. Fortunately, there's a script for that.
# Install librdkafka
- curl --silent -OL https://raw.githubusercontent.com/confluentinc/confluent-kafka-go/v0.11.4/mk/bootstrap-librdkafka.sh
- bash bootstrap-librdkafka.sh v0.11.4 /usr/local
- ldconfig
Once that is installed, should be good to go with
go get github.com/qntfy/frafka
cd frafka
go build
Frafka has integration tests which require a kafka broker to test against. KAFKA_BROKERS
environment variable is
used by tests. simplesteph/kafka-stack-docker-compose
has a great simple docker-compose setup that is used in frafka CI currently.
curl --silent -L -o kafka.yml https://raw.githubusercontent.com/simplesteph/kafka-stack-docker-compose/v5.1.0/zk-single-kafka-single.yml
DOCKER_HOST_IP=127.0.0.1 docker-compose -f kafka.yml up -d
# takes a while to initialize; can use a tool like wait-for-it.sh in scripting
export KAFKA_BROKERS=127.0.0.1:9092
go test -v --cover ./...
Frafka Sources and Sinks are configured using Viper.
func InitSink(config *viper.Viper) (*Sink, error)
func InitSource(config *viper.Viper) (*Source, error)
We typically initialize Viper through environment variables (but client can do whatever it wants, just needs to provide the configured Viper object with relevant values). The application might use a prefix before the below values.
Variable | Required | Description | Default |
---|---|---|---|
KAFKA_BROKERS | required | address(es) of kafka brokers, space separated | |
KAFKA_TOPICS | source | topic(s) to read from | |
KAFKA_CONSUMER_GROUP | source | consumer group value for coordinating multiple clients | |
KAFKA_CONSUME_LATEST_FIRST | source (optional) | start at the beginning or end of topic | earliest |
KAFKA_MAX_BUFFER_KB | optional | How large a buffer to allow for prefetching and batch produing kafka message* | 16384 |
*KAFKA_MAX_BUFFER_KB
is passed through to librdkafka. Default is 16MB.
Corresponding librdkafka config values are queue.buffering.max.kbytes
(Producer) and queued.max.messages.kbytes
(Consumer). Note that librdkafka creates one buffer each for the Producer (Sink) and for each topic+partition
being consumed by the source. E.g. with default 16MB default, if you are consuming from 4 partitions and also
producing then the theoretical max memory usage from the buffer would be 16*(4+1) = 80
MB.
Since records are sent in batch fashion, Kafka may report errors or other information asynchronously.
Event can be recovered via channels returned by the Sink.Events()
and Source.Events()
methods.
Partition changes and EOF will be reported as non-error Events, other errors will conform to error
interface.
Where possible, Events will retain underlying type from confluent-kafka-go
if more information is desired.
Contributions welcome! Take a look at open issues.