Skip to content


Repository files navigation

Kafka Konsumer


Kafka Konsumer provides an easy implementation of Kafka consumer with a built-in retry/exception manager (kafka-cronsumer).



go get


You can find a number of ready-to-run examples at this directory.

After running docker-compose up command, you can run any application you want.

Without Retry/Exception Manager
func main() {
    consumerCfg := &kafka.ConsumerConfig{
        Reader: kafka.ReaderConfig{
            Brokers: []string{"localhost:29092"},
            Topic:   "standart-topic",
            GroupID: "standart-cg",
        ConsumeFn:    consumeFn,
        RetryEnabled: false,

    consumer, _ := kafka.NewConsumer(consumerCfg)
    defer consumer.Stop()

func consumeFn(message kafka.Message) error {
    fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
    return nil
With Retry/Exception Option Enabled
func main() {
    consumerCfg := &kafka.ConsumerConfig{
        Reader: kafka.ReaderConfig{
            Brokers: []string{"localhost:29092"},
            Topic:   "standart-topic",
            GroupID: "standart-cg",
        RetryEnabled: true,
        RetryConfiguration: kafka.RetryConfiguration{
            Topic:         "retry-topic",
            StartTimeCron: "*/1 * * * *",
            WorkDuration:  50 * time.Second,
            MaxRetry:      3,
        ConsumeFn: consumeFn,

    consumer, _ := kafka.NewConsumer(consumerCfg)
    defer consumer.Stop()

func consumeFn(message kafka.Message) error {
    fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
    return nil
With Batch Option
func main() {
    consumerCfg := &kafka.ConsumerConfig{
        Reader: kafka.ReaderConfig{
            Brokers: []string{"localhost:29092"},
            Topic:   "standart-topic",
            GroupID: "standart-cg",
        LogLevel:     kafka.LogLevelDebug,
        RetryEnabled: true,
        RetryConfiguration: kafka.RetryConfiguration{
            Brokers:       []string{"localhost:29092"},
            Topic:         "retry-topic",
            StartTimeCron: "*/1 * * * *",
            WorkDuration:  50 * time.Second,
            MaxRetry:      3,
        BatchConfiguration: kafka.BatchConfiguration{
            MessageGroupLimit:    1000,
            MessageGroupDuration: time.Second,
            BatchConsumeFn:       batchConsumeFn,

    consumer, _ := kafka.NewConsumer(consumerCfg)
    defer consumer.Stop()


func batchConsumeFn(messages []kafka.Message) error {
    fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value)
    return nil

With Grafana & Prometheus

In this example, we are demonstrating how to create Grafana dashboard and how to define alerts in Prometheus. You can see the example by going to the with-grafana folder in the examples folder and running the infrastructure with docker compose up and then the application.


With SASL-PLAINTEXT Authentication

Under the examples - with-sasl-plaintext folder, you can find an example of a consumer integration with SASL/PLAIN mechanism. To try the example, you can run the command docker compose up under the specified folder and then start the application.


config description default
reader Describes all segmentio kafka reader configurations
consumeFn Kafka consumer function, if retry enabled it, is also used to consume retriable messages
logLevel Describes log level; valid options are debug, info, warn, and error info
concurrency Number of goroutines used at listeners 1
retryEnabled Retry/Exception consumer is working or not false
rack see doc
retryConfiguration.startTimeCron Cron expression when retry consumer (kafka-cronsumer) starts to work at
retryConfiguration.workDuration Work duration exception consumer actively consuming messages
retryConfiguration.topic Retry/Exception topic names
retryConfiguration.brokers Retry topic brokers urls
retryConfiguration.maxRetry Maximum retry value for attempting to retry a message 3
retryConfiguration.tls.rootCAPath see doc ""
retryConfiguration.tls.intermediateCAPath Same with rootCA, if you want to specify two rootca you can use it with rootCAPath ""
retryConfiguration.sasl.authType SCRAM or PLAIN
retryConfiguration.sasl.username SCRAM OR PLAIN username
retryConfiguration.sasl.password SCRAM OR PLAIN password
batchConfiguration.messageGroupLimit Maximum number of messages in a batch
batchConfiguration.messageGroupDuration Maximum time to wait for a batch
tls.rootCAPath see doc ""
tls.intermediateCAPath Same with rootCA, if you want to specify two rootca you can use it with rootCAPath ""
sasl.authType SCRAM or PLAIN
sasl.username SCRAM OR PLAIN username
sasl.password SCRAM OR PLAIN password
logger If you want to custom logger info
apiEnabled Enabled metrics false
apiConfiguration.port Set API port 8090
apiConfiguration.healtCheckPath Set Health check path healthcheck
metricConfiguration.path Set metric endpoint path /metrics


Kafka Konsumer offers an API that handles exposing several metrics.

Exposed Metrics

Metric Name Description Value Type
kafka_konsumer_processed_messages_total Total number of processed messages. Counter
kafka_konsumer_processed_batch_messages_total Total number of processed batch messages. Counter
kafka_konsumer_unprocessed_messages_total Total number of unprocessed messages. Counter
kafka_konsumer_unprocessed_batch_messages_total Total number of unprocessed batch messages. Counter