Skip to content

Commit

Permalink
Merge pull request influxdata#1345 from influxdata/feature/tr-mqtt-alert
Browse files Browse the repository at this point in the history
Add an MQTT Alert Handler
  • Loading branch information
timraymond authored Jul 24, 2017
2 parents a16064c + dac5399 commit fc8f75d
Show file tree
Hide file tree
Showing 80 changed files with 10,460 additions and 82 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- [#1413](https://github.com/influxdata/kapacitor/issues/1413): Add subscriptions modes to InfluxDB subscriptions.
- [#1436](https://github.com/influxdata/kapacitor/issues/1436): Add linear fill support for QueryNode.
- [#1345](https://github.com/influxdata/kapacitor/issues/1345): Add MQTT Alert Handler

### Bugfixes

Expand Down
10 changes: 8 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ required = ["github.com/benbjohnson/tmpl","github.com/golang/protobuf/protoc-gen
branch = "master"
name = "github.com/shurcooL/markdownfmt"

[[constraint]]
name = "github.com/eclipse/paho.mqtt.golang"
version = "~1.0.0"

# Force the Azure projects to be a specific older version that Prometheus needs
[[override]]
name = "github.com/Azure/azure-sdk-for-go"
Expand Down
11 changes: 11 additions & 0 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
alertservice "github.com/influxdata/kapacitor/services/alert"
"github.com/influxdata/kapacitor/services/hipchat"
"github.com/influxdata/kapacitor/services/httppost"
"github.com/influxdata/kapacitor/services/mqtt"
"github.com/influxdata/kapacitor/services/opsgenie"
"github.com/influxdata/kapacitor/services/pagerduty"
"github.com/influxdata/kapacitor/services/pushover"
Expand Down Expand Up @@ -377,6 +378,16 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
an.handlers = append(an.handlers, h)
}

for _, m := range n.MQTTHandlers {
c := mqtt.HandlerConfig{
BrokerName: m.BrokerName,
Topic: m.Topic,
QoS: mqtt.QoSLevel(m.Qos),
Retained: m.Retained,
}
h := et.tm.MQTTService.Handler(c, l)
an.handlers = append(an.handlers, h)
}
// Parse level expressions
an.levels = make([]stateful.Expression, alert.Critical+1)
an.scopePools = make([]stateful.ScopePool, alert.Critical+1)
Expand Down
32 changes: 32 additions & 0 deletions etc/kapacitor/kapacitor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,38 @@ default-retention-policy = ""
# The default authorName.
author_name = "Kapacitor"

# MQTT client configuration.
# Mutliple different clients may be configured by
# repeating [[mqtt]] sections.
[[mqtt]]
enabled = false
# Unique name for this broker configuration
name = "localhost"
# Whether this broker configuration is the default
default = true
# URL of the MQTT broker.
# Possible protocols include:
# tcp - Raw TCP network connection
# ssl - TLS protected TCP network connection
# ws - Websocket network connection
url = "tcp://localhost:1883"

# TLS/SSL configuration
# A CA can be provided without a key/cert pair
# ssl-ca = "/etc/kapacitor/ca.pem"
# Absolutes paths to pem encoded key and cert files.
# ssl-cert = "/etc/kapacitor/cert.pem"
# ssl-key = "/etc/kapacitor/key.pem"

# Unique ID for this MQTT client.
# If empty used the value of "name"
client-id = ""

# Username
username = ""
# Password
password = ""

##################################
# Input Methods, same as InfluxDB
#
Expand Down
42 changes: 42 additions & 0 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const defaultDetailsTmpl = "{{ json . }}"
// * Pushover -- Send alert to Pushover.
// * Talk -- Post alert message to Talk client.
// * Telegram -- Post alert message to Telegram client.
// * MQTT -- Post alert message to MQTT.
//
// See below for more details on configuring each handler.
//
Expand Down Expand Up @@ -351,6 +352,10 @@ type AlertNode struct {
// tick:ignore
TalkHandlers []*TalkHandler `tick:"Talk"`

// Send alert to MQTT
// tick:ignore
MQTTHandlers []*MQTTHandler `tick:"Mqtt"`

// Send alert using SNMPtraps.
// tick:ignore
SNMPTrapHandlers []*SNMPTrapHandler `tick:"SnmpTrap"`
Expand Down Expand Up @@ -1001,6 +1006,43 @@ func (a *AlertaHandler) Services(service ...string) *AlertaHandler {
return a
}

// Send alert to an MQTT broker
// tick:property
func (a *AlertNode) Mqtt(topic string) *MQTTHandler {
m := &MQTTHandler{
AlertNode: a,
Topic: topic,
}
a.MQTTHandlers = append(a.MQTTHandlers, m)
return m
}

// tick:embedded:AlertNode.Mqtt
type MQTTHandler struct {
*AlertNode

// BrokerName is the name of the configured MQTT broker to use when publishing the alert.
// If empty defaults to the configured default broker.
BrokerName string

// The topic where alerts will be dispatched to
Topic string

// The Qos that will be used to deliver the alerts
//
// Valid values are:
//
// * 0 - At most once delivery
// * 1 - At least once delivery
// * 2 - Exactly once delivery
//
Qos int64

// Retained indicates whether this alert should be delivered to
// clients that were not connected to the broker at the time of the alert.
Retained bool
}

// Send the alert to Sensu.
//
// Example:
Expand Down
6 changes: 6 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/influxdata/kapacitor/services/k8s"
"github.com/influxdata/kapacitor/services/logging"
"github.com/influxdata/kapacitor/services/marathon"
"github.com/influxdata/kapacitor/services/mqtt"
"github.com/influxdata/kapacitor/services/nerve"
"github.com/influxdata/kapacitor/services/opsgenie"
"github.com/influxdata/kapacitor/services/pagerduty"
Expand Down Expand Up @@ -76,6 +77,7 @@ type Config struct {
// Alert handlers
Alerta alerta.Config `toml:"alerta" override:"alerta"`
HipChat hipchat.Config `toml:"hipchat" override:"hipchat"`
MQTT mqtt.Configs `toml:"mqtt" override:"mqtt,element-key=name"`
OpsGenie opsgenie.Config `toml:"opsgenie" override:"opsgenie"`
PagerDuty pagerduty.Config `toml:"pagerduty" override:"pagerduty"`
Pushover pushover.Config `toml:"pushover" override:"pushover"`
Expand Down Expand Up @@ -138,6 +140,7 @@ func NewConfig() *Config {

c.Alerta = alerta.NewConfig()
c.HipChat = hipchat.NewConfig()
c.MQTT = mqtt.Configs{}
c.OpsGenie = opsgenie.NewConfig()
c.PagerDuty = pagerduty.NewConfig()
c.Pushover = pushover.NewConfig()
Expand Down Expand Up @@ -245,6 +248,9 @@ func (c *Config) Validate() error {
if err := c.HipChat.Validate(); err != nil {
return err
}
if err := c.MQTT.Validate(); err != nil {
return err
}
if err := c.OpsGenie.Validate(); err != nil {
return err
}
Expand Down
20 changes: 20 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/influxdata/kapacitor/services/k8s"
"github.com/influxdata/kapacitor/services/logging"
"github.com/influxdata/kapacitor/services/marathon"
"github.com/influxdata/kapacitor/services/mqtt"
"github.com/influxdata/kapacitor/services/nerve"
"github.com/influxdata/kapacitor/services/noauth"
"github.com/influxdata/kapacitor/services/opsgenie"
Expand Down Expand Up @@ -205,6 +206,9 @@ func New(c *Config, buildInfo BuildInfo, logService logging.Interface) (*Server,
// Append Alert integration services
s.appendAlertaService()
s.appendHipChatService()
if err := s.appendMQTTService(); err != nil {
return nil, errors.Wrap(err, "mqtt service")
}
s.appendOpsGenieService()
s.appendPagerDutyService()
s.appendPushoverService()
Expand Down Expand Up @@ -455,6 +459,22 @@ func (s *Server) appendAuthService() {
s.AppendService("auth", srv)
}

func (s *Server) appendMQTTService() error {
cs := s.config.MQTT
l := s.LogService.NewLogger("[mqtt] ", log.LstdFlags)
srv, err := mqtt.NewService(cs, l)
if err != nil {
return err
}

s.TaskMaster.MQTTService = srv
s.AlertService.MQTTService = srv

s.SetDynamicService("mqtt", srv)
s.AppendService("mqtt", srv)
return nil
}

func (s *Server) appendOpsGenieService() {
c := s.config.OpsGenie
l := s.LogService.NewLogger("[opsgenie] ", log.LstdFlags)
Expand Down
Loading

0 comments on commit fc8f75d

Please sign in to comment.