diff --git a/CHANGELOG.md b/CHANGELOG.md index da3e2e8a4..a4d9f749b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## v1.5.1 [unreleased] - [#1982](https://github.com/influxdata/kapacitor/pull/1982): Fix KafkaTopic not working from TICKscript +- [#1989](https://github.com/influxdata/kapacitor/pull/1989): Improve Kafka alert throughput. ## v1.5.0 [2018-05-17] diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index c382bdbf2..4f489a9c9 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -8575,9 +8575,10 @@ stream tmInit := func(tm *kapacitor.TaskMaster) { configs := kafka.Configs{{ - Enabled: true, - ID: "default", - Brokers: []string{ts.Addr.String()}, + Enabled: true, + ID: "default", + Brokers: []string{ts.Addr.String()}, + BatchSize: 1, }} d := diagService.NewKafkaHandler().WithContext(keyvalue.KV("test", "kafka")) tm.KafkaService = kafka.NewService(configs, d) @@ -8594,6 +8595,9 @@ stream }, } + // Wait for kakfa messages to be written + time.Sleep(time.Second) + ts.Close() msgs, err := ts.Messages() if err != nil { diff --git a/pipeline/alert.go b/pipeline/alert.go index ceb2333a9..a1785abae 100644 --- a/pipeline/alert.go +++ b/pipeline/alert.go @@ -1853,6 +1853,13 @@ func (h *SNMPTrapHandler) validate() error { // .cluster('default') // .kafkaTopic('alerts') // +// Mesasges are written to Kafka asynchronously. +// As such, errors are not reported for individual writes to Kafka, rather an error counter is recorded. +// +// Kapacitor tracks these stats for Kafka: +// +// * write_errors - Reports the number of errors encountered when writing to Kafka for a given topic and cluster. +// * write_messages - Reports the number of messages written to Kafka for a given topic and cluster. // // tick:property func (n *AlertNodeData) Kafka() *KafkaHandler { diff --git a/services/kafka/config.go b/services/kafka/config.go index 62a1dd121..39c3e4914 100644 --- a/services/kafka/config.go +++ b/services/kafka/config.go @@ -94,7 +94,12 @@ func (c Config) WriterConfig() (kafka.WriterConfig, error) { Dialer: dialer, ReadTimeout: time.Duration(c.Timeout), WriteTimeout: time.Duration(c.Timeout), + BatchSize: c.BatchSize, BatchTimeout: time.Duration(c.BatchTimeout), + // Async=true allows internal batching of the messages to take place. + // It also means that no errors will be captured from the WriteMessages method. + // As such we track the WriteStats for errors and report them with Kapacitor's normal diagnostics. + Async: true, }, nil } diff --git a/services/kafka/service.go b/services/kafka/service.go index 837b39da6..c74a2b7a1 100644 --- a/services/kafka/service.go +++ b/services/kafka/service.go @@ -6,15 +6,24 @@ import ( "encoding/json" "fmt" "sort" + "strconv" "sync" + "sync/atomic" "text/template" + "time" "github.com/influxdata/kapacitor/alert" "github.com/influxdata/kapacitor/keyvalue" + "github.com/influxdata/kapacitor/server/vars" "github.com/pkg/errors" kafka "github.com/segmentio/kafka-go" ) +const ( + statWriteMessageCount = "write_messages" + statWriteErrorCount = "write_errors" +) + type Diagnostic interface { WithContext(ctx ...keyvalue.T) Diagnostic InsecureSkipVerify() @@ -25,13 +34,94 @@ type Cluster struct { mu sync.RWMutex cfg Config - writers map[string]*kafka.Writer + writers map[string]*writer +} + +// writer wraps a kafka.Writer and tracks stats +type writer struct { + kafka *kafka.Writer + + cluster, + topic string + + wg sync.WaitGroup + + statsKey string + ticker *time.Ticker + messageCount int64 + errorCount int64 +} + +func (w *writer) Open() { + statsKey, statsMap := vars.NewStatistic("kafka", map[string]string{ + "cluster": w.cluster, + "topic": w.topic, + }) + w.statsKey = statsKey + // setup stats for the writer + writeErrors := &writeErrorCount{ + w: w, + } + statsMap.Set(statWriteErrorCount, writeErrors) + writeMessages := &writeMessageCount{ + w: w, + } + statsMap.Set(statWriteMessageCount, writeMessages) + + w.ticker = time.NewTicker(time.Second) + w.wg.Add(1) + go func() { + defer w.wg.Done() + w.pollStats() + }() +} + +func (w *writer) Close() { + w.ticker.Stop() + vars.DeleteStatistic(w.statsKey) + w.kafka.Close() + w.wg.Wait() +} + +// pollStats periodically reads the writer Stats and accumulates the results. +// A read operation on the kafka.Writer.Stats() method causes the internal counters to be reset. +// As a result we control all reads through this method. +func (w *writer) pollStats() { + for range w.ticker.C { + stats := w.kafka.Stats() + atomic.AddInt64(&w.messageCount, stats.Messages) + atomic.AddInt64(&w.errorCount, stats.Errors) + } +} + +// writeMessageCount implements the kexpvar.IntVar to expose error counts. +type writeMessageCount struct { + w *writer +} + +func (w *writeMessageCount) IntValue() int64 { + return atomic.LoadInt64(&w.w.messageCount) +} +func (w *writeMessageCount) String() string { + return strconv.FormatInt(w.IntValue(), 10) +} + +// writeErrorCount implements the kexpvar.IntVar to expose error counts. +type writeErrorCount struct { + w *writer +} + +func (w *writeErrorCount) IntValue() int64 { + return atomic.LoadInt64(&w.w.errorCount) +} +func (w *writeErrorCount) String() string { + return strconv.FormatInt(w.IntValue(), 10) } func NewCluster(c Config) *Cluster { return &Cluster{ cfg: c, - writers: make(map[string]*kafka.Writer), + writers: make(map[string]*writer), } } @@ -40,13 +130,13 @@ func (c *Cluster) WriteMessage(topic string, key, msg []byte) error { if err != nil { return err } - return w.WriteMessages(context.Background(), kafka.Message{ + return w.kafka.WriteMessages(context.Background(), kafka.Message{ Key: key, Value: msg, }) } -func (c *Cluster) writer(topic string) (*kafka.Writer, error) { +func (c *Cluster) writer(topic string) (*writer, error) { c.mu.RLock() w, ok := c.writers[topic] c.mu.RUnlock() @@ -60,7 +150,14 @@ func (c *Cluster) writer(topic string) (*kafka.Writer, error) { return nil, err } wc.Topic = topic - w = kafka.NewWriter(wc) + kw := kafka.NewWriter(wc) + // Create new writer + w = &writer{ + kafka: kw, + cluster: c.cfg.ID, + topic: topic, + } + w.Open() c.writers[topic] = w } }