Skip to content

Commit

Permalink
add kafka alert handler
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Mar 21, 2018
1 parent 6d9c9d8 commit a638862
Show file tree
Hide file tree
Showing 18 changed files with 958 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ alert will auto-recover.
This is because the new v2 API is not structured with static URLs, and so only the action can be defined and not the entire URL.
- [#1690](https://github.com/influxdata/kapacitor/issues/1690): Add https-private-key option to httpd config.
- [#1561](https://github.com/influxdata/kapacitor/issues/1561): Add .quiet to all nodes to silence any errors reported by the node.
- [#1826](https://github.com/influxdata/kapacitor/issues/1826): Add Kafka alert handler.

### Bugfixes
- [#1794](https://github.com/influxdata/kapacitor/issues/1794): Kapacitor ticks generating a hash instead of their actual given name.
Expand Down
14 changes: 14 additions & 0 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
alertservice "github.com/influxdata/kapacitor/services/alert"
"github.com/influxdata/kapacitor/services/hipchat"
"github.com/influxdata/kapacitor/services/httppost"
"github.com/influxdata/kapacitor/services/kafka"
"github.com/influxdata/kapacitor/services/mqtt"
"github.com/influxdata/kapacitor/services/opsgenie"
"github.com/influxdata/kapacitor/services/opsgenie2"
Expand Down Expand Up @@ -314,6 +315,19 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (a
n.IsStateChangesOnly = true
}

for _, k := range n.KafkaHandlers {
c := kafka.HandlerConfig{
Cluster: k.Cluster,
Topic: k.Topic,
Template: k.Template,
}
h, err := et.tm.KafkaService.Handler(c, ctx...)
if err != nil {
return nil, errors.Wrapf(err, "failed to create kafka handler")
}
an.handlers = append(an.handlers, h)
}

for _, a := range n.AlertaHandlers {
c := et.tm.AlertaService.DefaultHandlerConfig()
if a.Token != "" {
Expand Down
28 changes: 28 additions & 0 deletions etc/kapacitor/kapacitor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,34 @@ default-retention-policy = ""
# meaning alerts will only be sent if the alert state changes.
state-changes-only = false

[[kafka]]
# Configure Kafka
enabled = false
# ID is a unique identifier for this kafka cluster.
id = "localhost"
# Brokers is a list of host:port addresses of Kafka brokers.
brokers = []
# Timeout on network operations with the brokers.
# If 0 a default of 10s will be used.
timeout = 10s
# BatchSize is the number of messages that are batched before being sent to Kafka
# If 0 a default of 100 will be used.
batch-size = 100
# BatchTimeout is the maximum amount of time to wait before flushing an incomplete batch.
# If 0 a default of 1s will be used.
batch-timeout = 1s
# Use SSL enables ssl communication.
# Must be true for the other ssl options to take effect.
use-ssl = false
# Path to CA file
ssl-ca = ""
# Path to host cert file
ssl-cert = ""
# Path to cert key file
ssl-key = ""
# Use SSL but skip chain & host verification
insecure-skip-verify = false

[alerta]
# Configure Alerta.
enabled = false
Expand Down
45 changes: 45 additions & 0 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,10 @@ type AlertNodeData struct {
// Send alert using SNMPtraps.
// tick:ignore
SNMPTrapHandlers []*SNMPTrapHandler `tick:"SnmpTrap" json:"snmpTrap"`

// Send alert to Kafka topic
// tick:ignore
KafkaHandlers []*KafkaHandler `tick:"Kafka" json:"kafka"`
}

func newAlertNode(wants EdgeType) *AlertNode {
Expand Down Expand Up @@ -1818,3 +1822,44 @@ func (h *SNMPTrapHandler) validate() error {
}
return nil
}

// Send the alert to a Kafka topic.
//
// Example:
// [[kafka]]
// enabled = true
// id = "default"
// brokers = ["localhost:9092"]
//
// Example:
// stream
// |alert()
// .kafka()
// .cluster('default')
// .kafkaTopic('alerts')
//
//
// tick:property
func (n *AlertNodeData) Kafka() *KafkaHandler {
k := &KafkaHandler{
AlertNodeData: n,
}
n.KafkaHandlers = append(n.KafkaHandlers, k)
return k
}

// Kafka alert Handler
// tick:embedded:AlertNode.Kafka
type KafkaHandler struct {
*AlertNodeData `json:"-"`

// Cluster is the id of the configure kafka cluster
Cluster string `json:"cluster"`

// Kafka Topic
KafkaTopic string `json:"kafka-topic"`

// Template used to construct the message body
// If empty the alert data in JSON is sent as the message body.
Template string `json:"template"`
}
3 changes: 2 additions & 1 deletion pipeline/alert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ func TestAlertNode_MarshalJSON(t *testing.T) {
"opsGenie2": null,
"talk": null,
"mqtt": null,
"snmpTrap": null
"snmpTrap": null,
"kafka": null
}`,
},
}
Expand Down
3 changes: 2 additions & 1 deletion pipeline/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ func TestPipeline_MarshalJSON(t *testing.T) {
"opsGenie2": null,
"talk": null,
"mqtt": null,
"snmpTrap": null
"snmpTrap": null,
"kafka": null
},
{
"typeOf": "httpOut",
Expand Down
7 changes: 7 additions & 0 deletions pipeline/tick/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ func (n *AlertNode) Build(a *pipeline.AlertNode) (ast.Node, error) {
Dot("token", h.Token)
}

for _, h := range a.KafkaHandlers {
n.Dot("kafka").
Dot("cluster", h.Cluster).
Dot("kafkaTopic", h.KafkaTopic).
Dot("template", h.Template)
}

for _, h := range a.AlertaHandlers {
n.Dot("alerta").
Dot("token", h.Token).
Expand Down
22 changes: 22 additions & 0 deletions pipeline/tick/alert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,28 @@ func TestAlertHipchat(t *testing.T) {
PipelineTickTestHelper(t, pipe, want)
}

func TestAlertKafka(t *testing.T) {
pipe, _, from := StreamFrom()
handler := from.Alert().Kafka()
handler.Cluster = "default"
handler.KafkaTopic = "test"
handler.Template = "tmpl"

want := `stream
|from()
|alert()
.id('{{ .Name }}:{{ .Group }}')
.message('{{ .ID }} is {{ .Level }}')
.details('{{ json . }}')
.history(21)
.kafka()
.cluster('default')
.kafkaTopic('test')
.template('tmpl')
`
PipelineTickTestHelper(t, pipe, want)
}

func TestAlertAlerta(t *testing.T) {
pipe, _, from := StreamFrom()
handler := from.Alert().Alerta()
Expand Down
5 changes: 5 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/influxdata/kapacitor/services/httppost"
"github.com/influxdata/kapacitor/services/influxdb"
"github.com/influxdata/kapacitor/services/k8s"
"github.com/influxdata/kapacitor/services/kafka"
"github.com/influxdata/kapacitor/services/load"
"github.com/influxdata/kapacitor/services/marathon"
"github.com/influxdata/kapacitor/services/mqtt"
Expand Down Expand Up @@ -82,6 +83,7 @@ type Config struct {
// Alert handlers
Alerta alerta.Config `toml:"alerta" override:"alerta"`
HipChat hipchat.Config `toml:"hipchat" override:"hipchat"`
Kafka kafka.Configs `toml:"kafka" override:"kafka,element-key=id"`
MQTT mqtt.Configs `toml:"mqtt" override:"mqtt,element-key=name"`
OpsGenie opsgenie.Config `toml:"opsgenie" override:"opsgenie"`
OpsGenie2 opsgenie2.Config `toml:"opsgenie2" override:"opsgenie2"`
Expand Down Expand Up @@ -263,6 +265,9 @@ func (c *Config) Validate() error {
if err := c.HipChat.Validate(); err != nil {
return errors.Wrap(err, "hipchat")
}
if err := c.Kafka.Validate(); err != nil {
return errors.Wrap(err, "kafka")
}
if err := c.MQTT.Validate(); err != nil {
return errors.Wrap(err, "mqtt")
}
Expand Down
14 changes: 14 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/influxdata/kapacitor/services/httppost"
"github.com/influxdata/kapacitor/services/influxdb"
"github.com/influxdata/kapacitor/services/k8s"
"github.com/influxdata/kapacitor/services/kafka"
"github.com/influxdata/kapacitor/services/load"
"github.com/influxdata/kapacitor/services/marathon"
"github.com/influxdata/kapacitor/services/mqtt"
Expand Down Expand Up @@ -229,6 +230,7 @@ func New(c *Config, buildInfo BuildInfo, diagService *diagnostic.Service) (*Serv
// Append Alert integration services
s.appendAlertaService()
s.appendHipChatService()
s.appendKafkaService()
if err := s.appendMQTTService(); err != nil {
return nil, errors.Wrap(err, "mqtt service")
}
Expand Down Expand Up @@ -731,6 +733,18 @@ func (s *Server) appendHipChatService() {
s.AppendService("hipchat", srv)
}

func (s *Server) appendKafkaService() {
c := s.config.Kafka
d := s.DiagService.NewKafkaHandler()
srv := kafka.NewService(c, d)

s.TaskMaster.KafkaService = srv
s.AlertService.KafkaService = srv

s.SetDynamicService("kafka", srv)
s.AppendService("kafka", srv)
}

func (s *Server) appendAlertaService() {
c := s.config.Alerta
d := s.DiagService.NewAlertaHandler()
Expand Down
55 changes: 55 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
"github.com/influxdata/kapacitor/services/httppost"
"github.com/influxdata/kapacitor/services/httppost/httpposttest"
"github.com/influxdata/kapacitor/services/k8s"
"github.com/influxdata/kapacitor/services/kafka"
"github.com/influxdata/kapacitor/services/kafka/kafkatest"
"github.com/influxdata/kapacitor/services/mqtt"
"github.com/influxdata/kapacitor/services/mqtt/mqtttest"
"github.com/influxdata/kapacitor/services/opsgenie"
Expand Down Expand Up @@ -8362,6 +8364,16 @@ func TestServer_ListServiceTests(t *testing.T) {
"cluster": "",
},
},
{
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/service-tests/kafka"},
Name: "kafka",
Options: client.ServiceTestOptions{
"cluster": "example",
"topic": "test",
"key": "key",
"message": "test kafka message",
},
},
{
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/service-tests/kubernetes"},
Name: "kubernetes",
Expand Down Expand Up @@ -9166,6 +9178,49 @@ func TestServer_AlertHandlers(t *testing.T) {
return nil
},
},
{
handler: client.TopicHandler{
Kind: "kafka",
Options: map[string]interface{}{
"cluster": "default",
"topic": "test",
},
},
setup: func(c *server.Config, ha *client.TopicHandler) (context.Context, error) {
ts, err := kafkatest.NewServer()
if err != nil {
return nil, err
}
ctxt := context.WithValue(nil, "server", ts)

c.Kafka = kafka.Configs{{
Enabled: true,
ID: "default",
Brokers: []string{ts.Addr.String()},
}}
return ctxt, nil
},
result: func(ctxt context.Context) error {
ts := ctxt.Value("server").(*kafkatest.Server)
time.Sleep(2 * time.Second)
ts.Close()
got, err := ts.Messages()
if err != nil {
return err
}
exp := []kafkatest.Message{{
Topic: "test",
Partition: 1,
Offset: 0,
Key: "id",
Message: string(adJSON) + "\n",
}}
if !cmp.Equal(exp, got) {
return fmt.Errorf("unexpected kafak messages -exp/+got:\n%s", cmp.Diff(exp, got))
}
return nil
},
},
{
handler: client.TopicHandler{
Kind: "log",
Expand Down
15 changes: 15 additions & 0 deletions services/alert/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/influxdata/kapacitor/services/hipchat"
"github.com/influxdata/kapacitor/services/httpd"
"github.com/influxdata/kapacitor/services/httppost"
"github.com/influxdata/kapacitor/services/kafka"
"github.com/influxdata/kapacitor/services/mqtt"
"github.com/influxdata/kapacitor/services/opsgenie"
"github.com/influxdata/kapacitor/services/opsgenie2"
Expand Down Expand Up @@ -85,6 +86,9 @@ type Service struct {
HipChatService interface {
Handler(hipchat.HandlerConfig, ...keyvalue.T) alert.Handler
}
KafkaService interface {
Handler(kafka.HandlerConfig, ...keyvalue.T) (alert.Handler, error)
}
MQTTService interface {
Handler(mqtt.HandlerConfig, ...keyvalue.T) alert.Handler
}
Expand Down Expand Up @@ -787,6 +791,17 @@ func (s *Service) createHandlerFromSpec(spec HandlerSpec) (handler, error) {
}
h = s.HipChatService.Handler(c, ctx...)
h = newExternalHandler(h)
case "kafka":
c := kafka.HandlerConfig{}
err = decodeOptions(spec.Options, &c)
if err != nil {
return handler{}, err
}
h, err = s.KafkaService.Handler(c, ctx...)
if err != nil {
return handler{}, err
}
h = newExternalHandler(h)
case "log":
c := DefaultLogHandlerConfig()
err = decodeOptions(spec.Options, &c)
Expand Down
21 changes: 21 additions & 0 deletions services/diagnostic/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/influxdata/kapacitor/services/httppost"
"github.com/influxdata/kapacitor/services/influxdb"
"github.com/influxdata/kapacitor/services/k8s"
"github.com/influxdata/kapacitor/services/kafka"
"github.com/influxdata/kapacitor/services/mqtt"
"github.com/influxdata/kapacitor/services/opsgenie"
"github.com/influxdata/kapacitor/services/opsgenie2"
Expand Down Expand Up @@ -409,6 +410,26 @@ func (h *HipChatHandler) Error(msg string, err error) {
h.l.Error(msg, Error(err))
}

// Kafka handler
type KafkaHandler struct {
l Logger
}

func (h *KafkaHandler) WithContext(ctx ...keyvalue.T) kafka.Diagnostic {
fields := logFieldsFromContext(ctx)

return &KafkaHandler{
l: h.l.With(fields...),
}
}

func (h *KafkaHandler) Error(msg string, err error) {
h.l.Error(msg, Error(err))
}
func (h *KafkaHandler) InsecureSkipVerify() {
h.l.Info("service is configured to skip ssl verification")
}

// HTTPD handler

type HTTPDHandler struct {
Expand Down
Loading

0 comments on commit a638862

Please sign in to comment.