Skip to content

Commit

Permalink
Merge pull request influxdata#2202 from fhriley/mqtt_topic_template
Browse files Browse the repository at this point in the history
Add templating for MQTT topics
  • Loading branch information
nathanielc authored Aug 9, 2019
2 parents e6bc51b + 21f8cf2 commit f3673ce
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 12 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## unreleased

### Features

- [#2202](https://github.com/influxdata/kapacitor/pull/2202): Add templating for MQTT topics.

## v1.5.3 [2019-06-18]

### Features
Expand Down
5 changes: 4 additions & 1 deletion alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,10 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (a
QoS: mqtt.QoSLevel(m.Qos),
Retained: m.Retained,
}
h := et.tm.MQTTService.Handler(c, ctx...)
h, err := et.tm.MQTTService.Handler(c, ctx...)
if err != nil {
return nil, errors.Wrap(err, "failed to create MQTT handler")
}
an.handlers = append(an.handlers, h)
}
// Parse level expressions
Expand Down
47 changes: 47 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9731,6 +9731,53 @@ func TestServer_AlertHandlers(t *testing.T) {
return nil
},
},
{
handler: client.TopicHandler{
Kind: "mqtt",
Options: map[string]interface{}{
"topic": "test/{{.TaskName}}",
"qos": "at-least-once",
"retained": true,
},
},
setup: func(c *server.Config, ha *client.TopicHandler) (context.Context, error) {
cc := new(mqtttest.ClientCreator)
ctxt := context.WithValue(nil, "clientCreator", cc)
cfg := &mqtt.Config{
Enabled: true,
Name: "test",
URL: "tcp://mqtt.example.com:1883",
}

cfg.SetNewClientF(cc.NewClient)

c.MQTT = mqtt.Configs{*cfg}
return ctxt, nil
},
result: func(ctxt context.Context) error {
s := ctxt.Value("clientCreator").(*mqtttest.ClientCreator)
if got, exp := len(s.Clients), 1; got != exp {
return fmt.Errorf("unexpected number of clients created : exp %d got: %d", exp, got)
}
if got, exp := len(s.Configs), 1; got != exp {
return fmt.Errorf("unexpected number of configs received: exp %d got: %d", exp, got)
}
if got, exp := s.Configs[0].URL, "tcp://mqtt.example.com:1883"; exp != got {
return fmt.Errorf("unexpected config URL: exp %q got %q", exp, got)
}
got := s.Clients[0].PublishData
exp := []mqtttest.PublishData{{
Topic: "test/testAlertHandlers",
QoS: mqtt.AtLeastOnce,
Retained: true,
Message: []byte("message"),
}}
if !reflect.DeepEqual(exp, got) {
return fmt.Errorf("unexpected mqtt publish data:\nexp\n%+v\ngot\n%+v\n", exp, got)
}
return nil
},
},
{
handler: client.TopicHandler{
Kind: "opsgenie",
Expand Down
7 changes: 5 additions & 2 deletions services/alert/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type Service struct {
Handler(kafka.HandlerConfig, ...keyvalue.T) (alert.Handler, error)
}
MQTTService interface {
Handler(mqtt.HandlerConfig, ...keyvalue.T) alert.Handler
Handler(mqtt.HandlerConfig, ...keyvalue.T) (alert.Handler, error)
}
OpsGenieService interface {
Handler(opsgenie.HandlerConfig, ...keyvalue.T) alert.Handler
Expand Down Expand Up @@ -825,7 +825,10 @@ func (s *Service) createHandlerFromSpec(spec HandlerSpec) (handler, error) {
if err != nil {
return handler{}, err
}
h = s.MQTTService.Handler(c, ctx...)
h, err = s.MQTTService.Handler(c, ctx...)
if err != nil {
return handler{}, err
}
h = newExternalHandler(h)
case "opsgenie":
c := opsgenie.HandlerConfig{}
Expand Down
51 changes: 43 additions & 8 deletions services/mqtt/service.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package mqtt

import (
"bytes"
"fmt"
"log"
"sync"
text "text/template"

"github.com/influxdata/kapacitor/alert"
"github.com/influxdata/kapacitor/keyvalue"
Expand Down Expand Up @@ -64,6 +66,7 @@ const (
type Service struct {
diag Diagnostic

bufPool sync.Pool
mu sync.RWMutex
clients map[string]Client
configs map[string]Config
Expand Down Expand Up @@ -93,7 +96,12 @@ func NewService(cs Configs, d Diagnostic) (*Service, error) {
}

return &Service{
diag: d,
diag: d,
bufPool: sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
},
configs: configs,
clients: clients,
defaultBrokerName: defaultBrokerName,
Expand Down Expand Up @@ -208,21 +216,29 @@ func (s *Service) update(cs Configs) error {
return nil
}

func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) alert.Handler {
func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) (alert.Handler, error) {
d := s.diag.WithContext(ctx...)
d.CreatingAlertHandler(c)

topicTmpl, err := text.New("topic").Parse(c.Topic)
if err != nil {
return nil, err
}
c.TopicTemplate = topicTmpl

return &handler{
s: s,
c: c,
diag: d,
}
}, nil
}

type HandlerConfig struct {
BrokerName string `mapstructure:"broker-name"`
Topic string `mapstructure:"topic"`
QoS QoSLevel `mapstructure:"qos"`
Retained bool `mapstructure:"retained"`
BrokerName string `mapstructure:"broker-name"`
Topic string `mapstructure:"topic"`
QoS QoSLevel `mapstructure:"qos"`
Retained bool `mapstructure:"retained"`
TopicTemplate *text.Template `mapstructure:"topic-template"`
}

type handler struct {
Expand All @@ -233,11 +249,30 @@ type handler struct {

func (h *handler) Handle(event alert.Event) {
h.diag.HandlingEvent()
if err := h.s.Alert(h.c.BrokerName, h.c.Topic, h.c.QoS, h.c.Retained, event.State.Message); err != nil {
topic, err := h.renderTopic(h.c.TopicTemplate, event)
if err != nil {
h.diag.Error("failed to create MQTT topic from template", err)
} else if err = h.s.Alert(h.c.BrokerName, topic, h.c.QoS, h.c.Retained, event.State.Message); err != nil {
h.diag.Error("failed to post message to MQTT broker", err)
}
}

func (h *handler) renderTopic(topicTmpl *text.Template, event alert.Event) (string, error) {
td := event.TemplateData()

topicBuf := h.s.bufPool.Get().(*bytes.Buffer)
defer func() {
topicBuf.Reset()
h.s.bufPool.Put(topicBuf)
}()

err := topicTmpl.Execute(topicBuf, td)
if err != nil {
return "", err
}
return topicBuf.String(), nil
}

type testOptions struct {
BrokerName string `json:"broker-name"`
Topic string `json:"topic"`
Expand Down
2 changes: 1 addition & 1 deletion task_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type TaskMaster struct {
Handler(smtp.HandlerConfig, ...keyvalue.T) alert.Handler
}
MQTTService interface {
Handler(mqtt.HandlerConfig, ...keyvalue.T) alert.Handler
Handler(mqtt.HandlerConfig, ...keyvalue.T) (alert.Handler, error)
}

OpsGenieService interface {
Expand Down

0 comments on commit f3673ce

Please sign in to comment.