kafka-go requires Go version 1.15 or later.
p := kafka.NewProducer(kafka.ProducerConfig{
Version: "",
Brokers: []string{broker},
})
_, _, err := p.Publish(context.Background(), topic, "key", []byte(string("val")))
// config
singleConf := kafka.ConsumerConfig{
Version: "",
Brokers: []string{"127.0.0.1:9092"},
Group: "test-group",
Topic: "test",
CacheCapacity: 100,
ConnectTimeout: time.Millisecond * time.Duration(5000),
}
batchConf := kafka.BatchConsumerConf{
CacheCapacity: 100,
Consumers: 4,
Processors: 4,
}
consumer := kafka.NewConsumer(singleConf, nil)
bc := kafka.NewBatchConsumer(batchConf, consumer, kafka.WithHandle(func(ctx context.Context, key string, data []byte) error {
log.Info("receive msg:", "value", data)
time.Sleep(time.Millisecond * 500)
return nil
}))
bc.Start()
imports(
"github.com/oofpgDLD/kafka-go"
"github.com/oofpgDLD/kafka-go/trace"
)
// new consumer with trace interceptor
func newConsumer(singleConf kafka.ConsumerConfig, batchConf kafka.BatchConsumerConf) {
consumer := kafka.NewConsumer(singleConf, nil)
bc := kafka.NewBatchConsumer(batchConf, consumer, kafka.WithHandle(func(ctx context.Context, key string, data []byte) error {
log.Info("receive msg:", "value", data)
time.Sleep(time.Millisecond * 500)
return nil
}), xkafka.WithBatchConsumerInterceptors(trace.ConsumeInterceptor))
}
// new producer with trace interceptor
func newProducer() {
p := kafka.NewProducer(kafka.ProducerConfig{
Version: "",
Brokers: []string{broker},
}, xkafka.WithProducerInterceptors(trace.ProducerInterceptor))
}
[kafka producer and consumer test | kafka 消费者和生产者测试]
kafka-go is under the MIT license. See the LICENSE file for details.