Kafka Konsumer provides an easy implementation of Kafka consumer with a built-in retry/exception manager (kafka-cronsumer).
- Added ability for manipulating kafka message headers.
- Added transactional retry feature. Set false if you want to use exception/retry strategy to only failed messages.
- Enable manuel commit at both single and batch consuming modes.
- Enabling consumer resume/pause functionality. Please refer to its example and how it works documentation.
- Bumped kafka-cronsumer to the latest version:
- Backoff strategy support (linear, exponential options)
- Added message key for retried messages
- Added x-error-message to see what was the error of the message during processing
- Reduce memory allocation.
- Increase TP on changing internal concurrency structure.
You can get latest version via go get github.com/Trendyol/kafka-konsumer/v2@latest
-
You need to change import path from
github.com/Trendyol/kafka-konsumer
togithub.com/Trendyol/kafka-konsumer/v2
-
You need to change your consume function with pointer signature.
-
We moved messageGroupDuration from
batchConfiguration.messageGroupDuration
to root level. Because this field is used single (non-batch) consumer too.
go get github.com/Trendyol/kafka-konsumer/v2@latest
You can find a number of ready-to-run examples at this directory.
After running docker-compose up
command, you can run any application you want.
Simple Consumer
func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
ConsumeFn: consumeFn,
RetryEnabled: false,
}
consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()
consumer.Consume()
}
func consumeFn(message kafka.Message) error {
fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
return nil
}
Simple Consumer With Retry/Exception Option
func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
RetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
Topic: "retry-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
ConsumeFn: consumeFn,
}
consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()
consumer.Consume()
}
func consumeFn(message kafka.Message) error {
fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
return nil
}
With Batch Option
func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
LogLevel: kafka.LogLevelDebug,
RetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "retry-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
MessageGroupDuration: time.Second,
BatchConfiguration: kafka.BatchConfiguration{
MessageGroupLimit: 1000,
BatchConsumeFn: batchConsumeFn,
},
}
consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()
consumer.Consume()
}
func batchConsumeFn(messages []kafka.Message) error {
fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value)
return nil
}
With Disabling Transactional Retry
func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
LogLevel: kafka.LogLevelDebug,
RetryEnabled: true,
TransactionalRetry: kafka.NewBoolPtr(false),
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "retry-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
MessageGroupDuration: time.Second,
BatchConfiguration: kafka.BatchConfiguration{
MessageGroupLimit: 1000,
BatchConsumeFn: batchConsumeFn,
},
}
consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()
consumer.Consume()
}
func batchConsumeFn(messages []kafka.Message) error {
// you can add custom error handling here & flag messages
for i := range messages {
if i%2 == 0 {
messages[i].IsFailed = true
}
}
// you must return err here to retry failed messages
return errors.New("err")
}
Please refer to Tracing Example
Please refer to Pause Resume Example
In this example, we are demonstrating how to create Grafana dashboard and how to define alerts in Prometheus. You can
see the example by going to the with-grafana folder in the examples folder
and running the infrastructure with docker compose up
and then the application.
Under the examples - with-sasl-plaintext folder, you can find an example
of a consumer integration with SASL/PLAIN mechanism. To try the example, you can run the command docker compose up
under the specified folder and then start the application.
config | description | default |
---|---|---|
reader |
Describes all segmentio kafka reader configurations | |
consumeFn |
Kafka consumer function, if retry enabled it, is also used to consume retriable messages | |
logLevel |
Describes log level; valid options are debug , info , warn , and error |
info |
concurrency |
Number of goroutines used at listeners | 1 |
retryEnabled |
Retry/Exception consumer is working or not | false |
transactionalRetry |
Set false if you want to use exception/retry strategy to only failed messages | true |
commitInterval |
indicates the interval at which offsets are committed to the broker. | 1s |
rack |
see doc | |
clientId |
see doc | |
messageGroupDuration |
Maximum time to wait for a batch | |
dial.Timeout |
see doc | no timeout |
dial.KeepAlive |
see doc | not enabled |
transport.DialTimeout |
see doc | 5s |
transport.IdleTimeout |
see doc | 30s |
transport.MetadataTTL |
see doc | 6s |
transport.MetadataTopics |
see doc | all topics in cluster |
distributedTracingEnabled |
indicates open telemetry support on/off for consume and produce operations. | false |
distributedTracingConfiguration.TracerProvider |
see doc | otel.GetTracerProvider() |
distributedTracingConfiguration.Propagator |
see doc | otel.GetTextMapPropagator() |
retryConfiguration.clientId |
see doc | |
retryConfiguration.startTimeCron |
Cron expression when retry consumer (kafka-cronsumer) starts to work at | |
retryConfiguration.workDuration |
Work duration exception consumer actively consuming messages | |
retryConfiguration.topic |
Retry/Exception topic names | |
retryConfiguration.brokers |
Retry topic brokers urls | |
retryConfiguration.maxRetry |
Maximum retry value for attempting to retry a message | 3 |
retryConfiguration.tls.rootCAPath |
see doc | "" |
retryConfiguration.tls.intermediateCAPath |
Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" |
retryConfiguration.sasl.authType |
SCRAM or PLAIN |
|
retryConfiguration.sasl.username |
SCRAM OR PLAIN username | |
retryConfiguration.sasl.password |
SCRAM OR PLAIN password | |
batchConfiguration.messageGroupLimit |
Maximum number of messages in a batch | |
batchConfiguration.batchConsumeFn |
Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages | |
batchConfiguration.preBatchFn |
This function enable for transforming messages before batch consuming starts | |
tls.rootCAPath |
see doc | "" |
tls.intermediateCAPath |
Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" |
sasl.authType |
SCRAM or PLAIN |
|
sasl.username |
SCRAM OR PLAIN username | |
sasl.password |
SCRAM OR PLAIN password | |
logger |
If you want to custom logger | info |
apiEnabled |
Enabled metrics | false |
apiConfiguration.port |
Set API port | 8090 |
apiConfiguration.healtCheckPath |
Set Health check path | healthcheck |
metricConfiguration.path |
Set metric endpoint path | /metrics |
Kafka Konsumer offers an API that handles exposing several metrics.
Metric Name | Description | Value Type |
---|---|---|
kafka_konsumer_processed_messages_total_current | Total number of processed messages. | Counter |
kafka_konsumer_unprocessed_messages_total_current | Total number of unprocessed messages. | Counter |