Skip to content

Commit

Permalink
Improve Kafka alert throughput
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Jul 3, 2018
1 parent ea9612a commit de90dd9
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## v1.5.1 [unreleased]

- [#1982](https://github.com/influxdata/kapacitor/pull/1982): Fix KafkaTopic not working from TICKscript
- [#1989](https://github.com/influxdata/kapacitor/pull/1989): Improve Kafka alert throughput.

## v1.5.0 [2018-05-17]

Expand Down
10 changes: 7 additions & 3 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8575,9 +8575,10 @@ stream

tmInit := func(tm *kapacitor.TaskMaster) {
configs := kafka.Configs{{
Enabled: true,
ID: "default",
Brokers: []string{ts.Addr.String()},
Enabled: true,
ID: "default",
Brokers: []string{ts.Addr.String()},
BatchSize: 1,
}}
d := diagService.NewKafkaHandler().WithContext(keyvalue.KV("test", "kafka"))
tm.KafkaService = kafka.NewService(configs, d)
Expand All @@ -8594,6 +8595,9 @@ stream
},
}

// Wait for kakfa messages to be written
time.Sleep(time.Second)

ts.Close()
msgs, err := ts.Messages()
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -1853,6 +1853,13 @@ func (h *SNMPTrapHandler) validate() error {
// .cluster('default')
// .kafkaTopic('alerts')
//
// Mesasges are written to Kafka asynchronously.
// As such, errors are not reported for individual writes to Kafka, rather an error counter is recorded.
//
// Kapacitor tracks these stats for Kafka:
//
// * write_errors - Reports the number of errors encountered when writing to Kafka for a given topic and cluster.
// * write_messages - Reports the number of messages written to Kafka for a given topic and cluster.
//
// tick:property
func (n *AlertNodeData) Kafka() *KafkaHandler {
Expand Down
5 changes: 5 additions & 0 deletions services/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,12 @@ func (c Config) WriterConfig() (kafka.WriterConfig, error) {
Dialer: dialer,
ReadTimeout: time.Duration(c.Timeout),
WriteTimeout: time.Duration(c.Timeout),
BatchSize: c.BatchSize,
BatchTimeout: time.Duration(c.BatchTimeout),
// Async=true allows internal batching of the messages to take place.
// It also means that no errors will be captured from the WriteMessages method.
// As such we track the WriteStats for errors and report them with Kapacitor's normal diagnostics.
Async: true,
}, nil
}

Expand Down
107 changes: 102 additions & 5 deletions services/kafka/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,24 @@ import (
"encoding/json"
"fmt"
"sort"
"strconv"
"sync"
"sync/atomic"
"text/template"
"time"

"github.com/influxdata/kapacitor/alert"
"github.com/influxdata/kapacitor/keyvalue"
"github.com/influxdata/kapacitor/server/vars"
"github.com/pkg/errors"
kafka "github.com/segmentio/kafka-go"
)

const (
statWriteMessageCount = "write_messages"
statWriteErrorCount = "write_errors"
)

type Diagnostic interface {
WithContext(ctx ...keyvalue.T) Diagnostic
InsecureSkipVerify()
Expand All @@ -25,13 +34,94 @@ type Cluster struct {
mu sync.RWMutex
cfg Config

writers map[string]*kafka.Writer
writers map[string]*writer
}

// writer wraps a kafka.Writer and tracks stats
type writer struct {
kafka *kafka.Writer

cluster,
topic string

wg sync.WaitGroup

statsKey string
ticker *time.Ticker
messageCount int64
errorCount int64
}

func (w *writer) Open() {
statsKey, statsMap := vars.NewStatistic("kafka", map[string]string{
"cluster": w.cluster,
"topic": w.topic,
})
w.statsKey = statsKey
// setup stats for the writer
writeErrors := &writeErrorCount{
w: w,
}
statsMap.Set(statWriteErrorCount, writeErrors)
writeMessages := &writeMessageCount{
w: w,
}
statsMap.Set(statWriteMessageCount, writeMessages)

w.ticker = time.NewTicker(time.Second)
w.wg.Add(1)
go func() {
defer w.wg.Done()
w.pollStats()
}()
}

func (w *writer) Close() {
w.ticker.Stop()
vars.DeleteStatistic(w.statsKey)
w.kafka.Close()
w.wg.Wait()
}

// pollStats periodically reads the writer Stats and accumulates the results.
// A read operation on the kafka.Writer.Stats() method causes the internal counters to be reset.
// As a result we control all reads through this method.
func (w *writer) pollStats() {
for range w.ticker.C {
stats := w.kafka.Stats()
atomic.AddInt64(&w.messageCount, stats.Messages)
atomic.AddInt64(&w.errorCount, stats.Errors)
}
}

// writeMessageCount implements the kexpvar.IntVar to expose error counts.
type writeMessageCount struct {
w *writer
}

func (w *writeMessageCount) IntValue() int64 {
return atomic.LoadInt64(&w.w.messageCount)
}
func (w *writeMessageCount) String() string {
return strconv.FormatInt(w.IntValue(), 10)
}

// writeErrorCount implements the kexpvar.IntVar to expose error counts.
type writeErrorCount struct {
w *writer
}

func (w *writeErrorCount) IntValue() int64 {
return atomic.LoadInt64(&w.w.errorCount)
}
func (w *writeErrorCount) String() string {
return strconv.FormatInt(w.IntValue(), 10)
}

func NewCluster(c Config) *Cluster {
return &Cluster{
cfg: c,
writers: make(map[string]*kafka.Writer),
writers: make(map[string]*writer),
}
}

Expand All @@ -40,13 +130,13 @@ func (c *Cluster) WriteMessage(topic string, key, msg []byte) error {
if err != nil {
return err
}
return w.WriteMessages(context.Background(), kafka.Message{
return w.kafka.WriteMessages(context.Background(), kafka.Message{
Key: key,
Value: msg,
})
}

func (c *Cluster) writer(topic string) (*kafka.Writer, error) {
func (c *Cluster) writer(topic string) (*writer, error) {
c.mu.RLock()
w, ok := c.writers[topic]
c.mu.RUnlock()
Expand All @@ -60,7 +150,14 @@ func (c *Cluster) writer(topic string) (*kafka.Writer, error) {
return nil, err
}
wc.Topic = topic
w = kafka.NewWriter(wc)
kw := kafka.NewWriter(wc)
// Create new writer
w = &writer{
kafka: kw,
cluster: c.cfg.ID,
topic: topic,
}
w.Open()
c.writers[topic] = w
}
}
Expand Down

0 comments on commit de90dd9

Please sign in to comment.