Skip to content

Commit

Permalink
Implement MQTT Alert Handler
Browse files Browse the repository at this point in the history
Much of this code was pulled from the guide published here:
https://github.com/influxdata/kapacitor/blob/master/alert/HANDLERS.md .

This also adds eclipse/paho.mqtt.golang as a dependency to Kapacitor.
This client was chosen primarily because it is the same one that is used
in Telegraf. It is actively developed, and is widely used by other
popular projects in the community (e.g. Gobot).
  • Loading branch information
timraymond authored and nathanielc committed Jul 24, 2017
1 parent a16064c commit c50c13c
Show file tree
Hide file tree
Showing 72 changed files with 9,938 additions and 2 deletions.
10 changes: 8 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ required = ["github.com/benbjohnson/tmpl","github.com/golang/protobuf/protoc-gen
branch = "master"
name = "github.com/shurcooL/markdownfmt"

[[constraint]]
name = "github.com/eclipse/paho.mqtt.golang"
version = "~1.0.0"

# Force the Azure projects to be a specific older version that Prometheus needs
[[override]]
name = "github.com/Azure/azure-sdk-for-go"
Expand Down
11 changes: 11 additions & 0 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
alertservice "github.com/influxdata/kapacitor/services/alert"
"github.com/influxdata/kapacitor/services/hipchat"
"github.com/influxdata/kapacitor/services/httppost"
"github.com/influxdata/kapacitor/services/mqtt"
"github.com/influxdata/kapacitor/services/opsgenie"
"github.com/influxdata/kapacitor/services/pagerduty"
"github.com/influxdata/kapacitor/services/pushover"
Expand Down Expand Up @@ -377,6 +378,16 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
an.handlers = append(an.handlers, h)
}

for _, m := range n.MQTTHandlers {
c := et.tm.MQTTService.DefaultHandlerConfig()
if m.Topic != "" {
c.Topic = m.Topic
c.QoS = mqtt.QoSLevel(m.QoS)
c.Retained = m.Retained
}
h := et.tm.MQTTService.Handler(c, l)
an.handlers = append(an.handlers, h)
}
// Parse level expressions
an.levels = make([]stateful.Expression, alert.Critical+1)
an.scopePools = make([]stateful.ScopePool, alert.Critical+1)
Expand Down
30 changes: 30 additions & 0 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const defaultDetailsTmpl = "{{ json . }}"
// * Pushover -- Send alert to Pushover.
// * Talk -- Post alert message to Talk client.
// * Telegram -- Post alert message to Telegram client.
// * MQTT -- Post alert message to MQTT.
//
// See below for more details on configuring each handler.
//
Expand Down Expand Up @@ -351,6 +352,10 @@ type AlertNode struct {
// tick:ignore
TalkHandlers []*TalkHandler `tick:"Talk"`

// Send alert to MQTT
// tick:ignore
MQTTHandlers []*MQTTHandler `tick:"Mqtt"`

// Send alert using SNMPtraps.
// tick:ignore
SNMPTrapHandlers []*SNMPTrapHandler `tick:"SnmpTrap"`
Expand Down Expand Up @@ -1001,6 +1006,31 @@ func (a *AlertaHandler) Services(service ...string) *AlertaHandler {
return a
}

// Send alert to an MQTT broker
// tick:property
func (a *AlertNode) Mqtt(topic string) *MQTTHandler {
m := &MQTTHandler{
AlertNode: a,
Topic: topic,
}
a.MQTTHandlers = append(a.MQTTHandlers, m)
return m
}

type MQTTHandler struct {
*AlertNode

// The topic where alerts will be dispatched to
Topic string

// The QoS that will be used to deliver the alerts
QoS int64

// Retained indicates whether this alert should be delivered to
// clients that were not connected to the broker at the time of the alert
Retained bool
}

// Send the alert to Sensu.
//
// Example:
Expand Down
6 changes: 6 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/influxdata/kapacitor/services/k8s"
"github.com/influxdata/kapacitor/services/logging"
"github.com/influxdata/kapacitor/services/marathon"
"github.com/influxdata/kapacitor/services/mqtt"
"github.com/influxdata/kapacitor/services/nerve"
"github.com/influxdata/kapacitor/services/opsgenie"
"github.com/influxdata/kapacitor/services/pagerduty"
Expand Down Expand Up @@ -76,6 +77,7 @@ type Config struct {
// Alert handlers
Alerta alerta.Config `toml:"alerta" override:"alerta"`
HipChat hipchat.Config `toml:"hipchat" override:"hipchat"`
MQTT mqtt.Config `toml:"mqtt" override:"mqtt"`
OpsGenie opsgenie.Config `toml:"opsgenie" override:"opsgenie"`
PagerDuty pagerduty.Config `toml:"pagerduty" override:"pagerduty"`
Pushover pushover.Config `toml:"pushover" override:"pushover"`
Expand Down Expand Up @@ -138,6 +140,7 @@ func NewConfig() *Config {

c.Alerta = alerta.NewConfig()
c.HipChat = hipchat.NewConfig()
c.MQTT = mqtt.NewConfig()
c.OpsGenie = opsgenie.NewConfig()
c.PagerDuty = pagerduty.NewConfig()
c.Pushover = pushover.NewConfig()
Expand Down Expand Up @@ -245,6 +248,9 @@ func (c *Config) Validate() error {
if err := c.HipChat.Validate(); err != nil {
return err
}
if err := c.MQTT.Validate(); err != nil {
return err
}
if err := c.OpsGenie.Validate(); err != nil {
return err
}
Expand Down
14 changes: 14 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/influxdata/kapacitor/services/k8s"
"github.com/influxdata/kapacitor/services/logging"
"github.com/influxdata/kapacitor/services/marathon"
"github.com/influxdata/kapacitor/services/mqtt"
"github.com/influxdata/kapacitor/services/nerve"
"github.com/influxdata/kapacitor/services/noauth"
"github.com/influxdata/kapacitor/services/opsgenie"
Expand Down Expand Up @@ -205,6 +206,7 @@ func New(c *Config, buildInfo BuildInfo, logService logging.Interface) (*Server,
// Append Alert integration services
s.appendAlertaService()
s.appendHipChatService()
s.appendMQTTService()
s.appendOpsGenieService()
s.appendPagerDutyService()
s.appendPushoverService()
Expand Down Expand Up @@ -455,6 +457,18 @@ func (s *Server) appendAuthService() {
s.AppendService("auth", srv)
}

func (s *Server) appendMQTTService() {
c := s.config.MQTT
l := s.LogService.NewLogger("[mqtt] ", log.LstdFlags)
srv := mqtt.NewService(c, l)

s.TaskMaster.MQTTService = srv
s.AlertService.MQTTService = srv

s.SetDynamicService("mqtt", srv)
s.AppendService("mqtt", srv)
}

func (s *Server) appendOpsGenieService() {
c := s.config.OpsGenie
l := s.LogService.NewLogger("[opsgenie] ", log.LstdFlags)
Expand Down
14 changes: 14 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/influxdata/kapacitor/services/hipchat/hipchattest"
"github.com/influxdata/kapacitor/services/httppost"
"github.com/influxdata/kapacitor/services/k8s"
"github.com/influxdata/kapacitor/services/mqtt"
"github.com/influxdata/kapacitor/services/opsgenie"
"github.com/influxdata/kapacitor/services/opsgenie/opsgenietest"
"github.com/influxdata/kapacitor/services/pagerduty"
Expand All @@ -60,6 +61,9 @@ var udfDir string
func init() {
dir, _ := os.Getwd()
udfDir = filepath.Clean(filepath.Join(dir, "../udf"))
mqtt.NewClient = func(c mqtt.Config) mqtt.Client {
return &mqtt.MockClient{}
}
}

func TestServer_Ping(t *testing.T) {
Expand Down Expand Up @@ -7250,6 +7254,16 @@ func TestServer_ListServiceTests(t *testing.T) {
"id": "",
},
},
{
Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/service-tests/mqtt"},
Name: "mqtt",
Options: client.ServiceTestOptions{
"topic": "",
"message": "test MQTT message",
"qos": "AtMostOnce",
"retained": false,
},
},
{
Link: client.Link{Relation: "self", Href: "/kapacitor/v1/service-tests/nerve"},
Name: "nerve",
Expand Down
13 changes: 13 additions & 0 deletions services/alert/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/influxdata/kapacitor/services/hipchat"
"github.com/influxdata/kapacitor/services/httpd"
"github.com/influxdata/kapacitor/services/httppost"
"github.com/influxdata/kapacitor/services/mqtt"
"github.com/influxdata/kapacitor/services/opsgenie"
"github.com/influxdata/kapacitor/services/pagerduty"
"github.com/influxdata/kapacitor/services/pushover"
Expand Down Expand Up @@ -65,6 +66,10 @@ type Service struct {
HipChatService interface {
Handler(hipchat.HandlerConfig, *log.Logger) alert.Handler
}
MQTTService interface {
DefaultHandlerConfig() mqtt.HandlerConfig
Handler(mqtt.HandlerConfig, *log.Logger) alert.Handler
}
OpsGenieService interface {
Handler(opsgenie.HandlerConfig, *log.Logger) alert.Handler
}
Expand Down Expand Up @@ -732,6 +737,14 @@ func (s *Service) createHandlerFromSpec(spec HandlerSpec) (handler, error) {
return handler{}, err
}
h = newExternalHandler(h)
case "mqtt":
c := s.MQTTService.DefaultHandlerConfig()
err = decodeOptions(spec.Options, &c)
if err != nil {
return handler{}, err
}
h := s.MQTTService.Handler(c, s.logger)
h = newExternalHandler(h)
case "opsgenie":
c := opsgenie.HandlerConfig{}
err = decodeOptions(spec.Options, &c)
Expand Down
105 changes: 105 additions & 0 deletions services/mqtt/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package mqtt

import (
"errors"
"log"
"time"

pahomqtt "github.com/eclipse/paho.mqtt.golang"
)

// Client describes an immutable MQTT client, designed to accommodate the
// incongruencies between real clients and mock clients.
type Client interface {
Connect() error
Disconnect()
Publish(string, byte, bool, string) error
}

// NewClient produces a disconnected MQTT client
var NewClient = func(c Config) Client {
return &PahoClient{
host: c.Broker(),
port: c.Port,
username: c.Username,
password: c.Password,
clientID: c.ClientID,
}
}

type PahoClient struct {
host string
port uint16

username string
password string

clientID string

client pahomqtt.Client
}

var _ Client = &PahoClient{}

// DefaultQuiesceTimeout is the duration the client will wait for outstanding
// messages to be published before forcing a disconnection
const DefaultQuiesceTimeout time.Duration = 250 * time.Millisecond

func (p *PahoClient) Connect() error {
log.Printf("Current config: %#v\n", p)
opts := pahomqtt.NewClientOptions()
opts.AddBroker(p.host)
opts.SetClientID(p.clientID)
opts.SetUsername(p.username)
opts.SetPassword(p.password)

// Using a clean session forces the broker to dispose of client session
// information after disconnecting. Retention of this is useful for
// constrained clients. Since Kapacitor is only publishing, it has no
// storage requirements and can reduce load on the broker by using a clean
// session.
opts.SetCleanSession(true)

p.client = pahomqtt.NewClient(opts)
token := p.client.Connect()
log.Printf("Current config: %#v\n", p)

token.Wait() // Tokens are futures

return token.Error()
}

func (p *PahoClient) Disconnect() {
if p.client != nil {
p.client.Disconnect(uint(DefaultQuiesceTimeout / time.Millisecond))
}
}

var foo int = 1

func (p *PahoClient) Publish(topic string, qos byte, retained bool, message string) error {
p.client.Publish(topic, qos, retained, message)
return nil
}

var _ Client = &MockClient{}

type MockClient struct {
connected bool
}

func (m *MockClient) Connect() error {
m.connected = true
return nil
}

func (m *MockClient) Disconnect() {
m.connected = false
}

func (m *MockClient) Publish(topic string, qos byte, retained bool, message string) error {
if !m.connected {
return errors.New("Publish() called before Connect()")
}
return nil
}
Loading

0 comments on commit c50c13c

Please sign in to comment.