Skip to content

Commit

Permalink
Merge pull request influxdata#2319 from influxdata/fix/kafka-log
Browse files Browse the repository at this point in the history
fix(kafka): update kafka lib; make kafka errors unsilent
  • Loading branch information
docmerlin authored Apr 17, 2020
2 parents 94092cb + af19ca8 commit 71a67c4
Show file tree
Hide file tree
Showing 18 changed files with 641 additions and 384 deletions.
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions services/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package kafka

import (
"crypto/tls"
"fmt"
"time"

"github.com/influxdata/influxdb/toml"
"github.com/influxdata/kapacitor/tlsconfig"
"github.com/pkg/errors"
kafka "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go"
)

const (
Expand Down Expand Up @@ -75,7 +76,7 @@ func (c *Config) ApplyConditionalDefaults() {
}
}

func (c Config) WriterConfig() (kafka.WriterConfig, error) {
func (c Config) WriterConfig(diagnostic Diagnostic) (kafka.WriterConfig, error) {
var tlsCfg *tls.Config
if c.UseSSL {
t, err := tlsconfig.Create(c.SSLCA, c.SSLCert, c.SSLKey, c.InsecureSkipVerify)
Expand All @@ -88,6 +89,7 @@ func (c Config) WriterConfig() (kafka.WriterConfig, error) {
Timeout: time.Duration(c.Timeout),
TLS: tlsCfg,
}

return kafka.WriterConfig{
Brokers: c.Brokers,
Balancer: &kafka.LeastBytes{},
Expand All @@ -100,6 +102,9 @@ func (c Config) WriterConfig() (kafka.WriterConfig, error) {
// It also means that no errors will be captured from the WriteMessages method.
// As such we track the WriteStats for errors and report them with Kapacitor's normal diagnostics.
Async: true,
ErrorLogger: kafka.LoggerFunc(func(s string, x ...interface{}) {
diagnostic.Error("kafka client error", fmt.Errorf(s, x...))
}),
}, nil
}

Expand Down
20 changes: 13 additions & 7 deletions services/kafka/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ func NewCluster(c Config) *Cluster {
}
}

func (c *Cluster) WriteMessage(topic string, key, msg []byte) error {
w, err := c.writer(topic)
func (c *Cluster) WriteMessage(diagnostic Diagnostic, topic string, key, msg []byte) error {
w, err := c.writer(topic, diagnostic)
if err != nil {
return err
}
Expand All @@ -138,7 +138,7 @@ func (c *Cluster) WriteMessage(topic string, key, msg []byte) error {
})
}

func (c *Cluster) writer(topic string) (*writer, error) {
func (c *Cluster) writer(topic string, diagnostic Diagnostic) (*writer, error) {
c.mu.RLock()
w, ok := c.writers[topic]
c.mu.RUnlock()
Expand All @@ -147,10 +147,13 @@ func (c *Cluster) writer(topic string) (*writer, error) {
defer c.mu.Unlock()
w, ok = c.writers[topic]
if !ok {
wc, err := c.cfg.WriterConfig()
wc, err := c.cfg.WriterConfig(diagnostic)
if err != nil {
return nil, err
}
if topic == "" {
return nil, errors.New("topic must not be empty")
}
wc.Topic = topic
kw := kafka.NewWriter(wc)
// Create new writer
Expand Down Expand Up @@ -315,7 +318,7 @@ func (s *Service) Test(options interface{}) error {
if !ok {
return fmt.Errorf("unknown cluster %q", o.Cluster)
}
return c.WriteMessage(o.Topic, []byte(o.Key), []byte(o.Message))
return c.WriteMessage(s.diag, o.Topic, []byte(o.Key), []byte(o.Message))
}

type HandlerConfig struct {
Expand Down Expand Up @@ -347,12 +350,15 @@ func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) (alert.Handler, er
return nil, errors.Wrap(err, "failed to parse template")
}
}

diag := s.diag.WithContext(ctx...)

return &handler{
s: s,
cluster: cluster,
topic: c.Topic,
template: t,
diag: s.diag.WithContext(ctx...),
diag: diag,
}, nil
}

Expand All @@ -361,7 +367,7 @@ func (h *handler) Handle(event alert.Event) {
if err != nil {
h.diag.Error("failed to prepare kafka message body", err)
}
if err := h.cluster.WriteMessage(h.topic, []byte(event.State.ID), body); err != nil {
if err := h.cluster.WriteMessage(h.diag, h.topic, []byte(event.State.ID), body); err != nil {
h.diag.Error("failed to write message to kafka", err)
}
}
Expand Down
14 changes: 12 additions & 2 deletions vendor/github.com/segmentio/kafka-go/batch.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 71a67c4

Please sign in to comment.