Skip to content

Commit

Permalink
Alert handler for Microsoft Teams
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Mindenhall committed Jun 13, 2018
1 parent 0e52907 commit f7d8d4e
Show file tree
Hide file tree
Showing 16 changed files with 646 additions and 2 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## v1.5.1 [unreleased]

### Features


## v1.5.0 [2018-05-17]

### Features
Expand Down
21 changes: 21 additions & 0 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/influxdata/kapacitor/services/slack"
"github.com/influxdata/kapacitor/services/smtp"
"github.com/influxdata/kapacitor/services/snmptrap"
"github.com/influxdata/kapacitor/services/teams"
"github.com/influxdata/kapacitor/services/telegram"
"github.com/influxdata/kapacitor/services/victorops"
"github.com/influxdata/kapacitor/tick/ast"
Expand Down Expand Up @@ -440,6 +441,26 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (a
h := et.tm.MQTTService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}

for _, t := range n.TeamsHandlers {
c := teams.HandlerConfig{
ChannelURL: t.ChannelURL,
}
h := et.tm.TeamsService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}
if len(n.TeamsHandlers) == 0 && (et.tm.TeamsService != nil && et.tm.TeamsService.Global()) {
c := teams.HandlerConfig{}
h := et.tm.TeamsService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
}
// If Teams has been configured with state changes only set it.
if et.tm.TeamsService != nil &&
et.tm.TeamsService.Global() &&
et.tm.TeamsService.StateChangesOnly() {
n.IsStateChangesOnly = true
}

// Parse level expressions
an.levels = make([]stateful.Expression, alert.Critical+1)
an.scopePools = make([]stateful.ScopePool, alert.Critical+1)
Expand Down
73 changes: 73 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ import (
"github.com/influxdata/kapacitor/services/swarm/swarmtest"
"github.com/influxdata/kapacitor/services/talk"
"github.com/influxdata/kapacitor/services/talk/talktest"
"github.com/influxdata/kapacitor/services/teams"
"github.com/influxdata/kapacitor/services/teams/teamstest"
"github.com/influxdata/kapacitor/services/telegram"
"github.com/influxdata/kapacitor/services/telegram/telegramtest"
"github.com/influxdata/kapacitor/services/victorops"
Expand Down Expand Up @@ -9687,6 +9689,77 @@ stream
}
}

func TestStream_AlertTeams(t *testing.T) {
ts := teamstest.NewServer()
defer ts.Close()

var script = `
stream
|from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')
.groupBy('host')
|window()
.period(10s)
.every(10s)
|count('value')
|alert()
.id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}')
.info(lambda: "count" > 6.0)
.warn(lambda: "count" > 7.0)
.crit(lambda: "count" > 8.0)
.teams()
.teams()
.channelURL('%s')
`
// To test with live webhook, replace "ts.URL" in line below with your
// webhook URL. The test will fail, but ONE message will post to Teams.
script = fmt.Sprintf(script, ts.URL)

tmInit := func(tm *kapacitor.TaskMaster) {

c := teams.NewConfig()
c.Enabled = true
c.ChannelURL = ts.URL
sl := teams.NewService(c, diagService.NewTeamsHandler())
tm.TeamsService = sl
}
testStreamerNoOutput(t, "TestStream_Alert", script, 13*time.Second, tmInit)

exp := []interface{}{
teamstest.Request{
URL: "/",
Card: teams.Card{
CardType: "MessageCard",
Context: "http://schema.org/extensions",
Title: "CRITICAL: [kapacitor/cpu/serverA]",
Text: "kapacitor/cpu/serverA is CRITICAL",
Summary: "CRITICAL: [kapacitor/cpu/serverA] - kapacitor/cpu/serverA is CRITICAL...",
},
},
teamstest.Request{
URL: "/",
Card: teams.Card{
CardType: "MessageCard",
Context: "http://schema.org/extensions",
Title: "CRITICAL: [kapacitor/cpu/serverA]",
Text: "kapacitor/cpu/serverA is CRITICAL",
Summary: "CRITICAL: [kapacitor/cpu/serverA] - kapacitor/cpu/serverA is CRITICAL...",
},
},
}

ts.Close()
var got []interface{}
for _, g := range ts.Requests() {
got = append(got, g)
}

if err := compareListIgnoreOrder(got, exp, nil); err != nil {
t.Error(err)
}
}

func TestStream_AlertLog(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "TestStream_AlertLog")
if err != nil {
Expand Down
68 changes: 68 additions & 0 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type AlertNode struct{ *AlertNodeData }
// * Talk -- Post alert message to Talk client.
// * Telegram -- Post alert message to Telegram client.
// * MQTT -- Post alert message to MQTT.
// * Teams -- Post alert message to Microsoft Teams.
//
// See below for more details on configuring each handler.
//
Expand Down Expand Up @@ -385,6 +386,10 @@ type AlertNodeData struct {
// Send alert to Kafka topic
// tick:ignore
KafkaHandlers []*KafkaHandler `tick:"Kafka" json:"kafka"`

// Send alert to Microsoft Teams channel.
// tick:ignore
TeamsHandlers []*TeamsHandler `tick:"Teams" json:"teams"`
}

func newAlertNode(wants EdgeType) *AlertNode {
Expand Down Expand Up @@ -1878,3 +1883,66 @@ type KafkaHandler struct {
// If empty the alert data in JSON is sent as the message body.
Template string `json:"template"`
}

// Send the alert to a Microsoft Teams channel.
// To allow Kapacitor to post to Teams, to to the URL
// https://docs.microsoft.com/en-us/microsoftteams/platform/concepts/connectors#setting-up-a-custom-incoming-webhook
// and follow instructions to create a webhook for a Teams channel. Add the webhook URL to the configuration.
//
// Example:
// [teams]
// enabled = true
// channel-url = "https://outlook.office.com/webhook/..."
//
// In order to not post a message every alert interval
// use AlertNode.StateChangesOnly so that only events
// where the alert changed state are posted to the room.
//
// Example:
// stream
// |alert()
// .teams()
//
// Send alerts to Teams channel in the configuration file.
//
// Example:
// stream
// |alert()
// .teams()
// .channelURL('https://outlook.office.com/webhook/...')
//
// Send alerts to Teams channel with webhook (overrides configuration file).
//
// If the 'teams' section in the configuration has the option: global = true
// then all alerts are sent to Teams without the need to explicitly state it
// in the TICKscript.
//
// Example:
// [teams]
// enabled = true
// channel-url = "https://outlook.office.com/webhook/..."
// global = true
// state-changes-only = true
//
// Example:
// stream
// |alert()
//
// Send alert to Teams using default channel.
// tick:property
func (n *AlertNodeData) Teams() *TeamsHandler {
teams := &TeamsHandler{
AlertNodeData: n,
}
n.TeamsHandlers = append(n.TeamsHandlers, teams)
return teams
}

// tick:embedded:AlertNode.Teams
type TeamsHandler struct {
*AlertNodeData `json:"-"`

// Teams channel webhook URL to post messages.
// If empty uses the URL from the configuration.
ChannelURL string `json:"channel_url"`
}
3 changes: 2 additions & 1 deletion pipeline/alert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ func TestAlertNode_MarshalJSON(t *testing.T) {
"talk": null,
"mqtt": null,
"snmpTrap": null,
"kafka": null
"kafka": null,
"teams": null
}`,
},
}
Expand Down
3 changes: 2 additions & 1 deletion pipeline/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ func TestPipeline_MarshalJSON(t *testing.T) {
"talk": null,
"mqtt": null,
"snmpTrap": null,
"kafka": null
"kafka": null,
"teams": null
},
{
"typeOf": "httpOut",
Expand Down
6 changes: 6 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/influxdata/kapacitor/services/swarm"
"github.com/influxdata/kapacitor/services/talk"
"github.com/influxdata/kapacitor/services/task_store"
"github.com/influxdata/kapacitor/services/teams"
"github.com/influxdata/kapacitor/services/telegram"
"github.com/influxdata/kapacitor/services/triton"
"github.com/influxdata/kapacitor/services/udf"
Expand Down Expand Up @@ -96,6 +97,7 @@ type Config struct {
Sensu sensu.Config `toml:"sensu" override:"sensu"`
Slack slack.Configs `toml:"slack" override:"slack,element-key=workspace"`
Talk talk.Config `toml:"talk" override:"talk"`
Teams teams.Config `toml:"teams" override:"teams"`
Telegram telegram.Config `toml:"telegram" override:"telegram"`
VictorOps victorops.Config `toml:"victorops" override:"victorops"`

Expand Down Expand Up @@ -162,6 +164,7 @@ func NewConfig() *Config {
c.Sensu = sensu.NewConfig()
c.Slack = slack.Configs{slack.NewDefaultConfig()}
c.Talk = talk.NewConfig()
c.Teams = teams.NewConfig()
c.SNMPTrap = snmptrap.NewConfig()
c.Telegram = telegram.NewConfig()
c.VictorOps = victorops.NewConfig()
Expand Down Expand Up @@ -305,6 +308,9 @@ func (c *Config) Validate() error {
if err := c.Talk.Validate(); err != nil {
return errors.Wrap(err, "talk")
}
if err := c.Teams.Validate(); err != nil {
return errors.Wrap(err, "teams")
}
if err := c.Telegram.Validate(); err != nil {
return errors.Wrap(err, "telegram")
}
Expand Down
14 changes: 14 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
"github.com/influxdata/kapacitor/services/swarm"
"github.com/influxdata/kapacitor/services/talk"
"github.com/influxdata/kapacitor/services/task_store"
"github.com/influxdata/kapacitor/services/teams"
"github.com/influxdata/kapacitor/services/telegram"
"github.com/influxdata/kapacitor/services/triton"
"github.com/influxdata/kapacitor/services/udf"
Expand Down Expand Up @@ -243,6 +244,7 @@ func New(c *Config, buildInfo BuildInfo, diagService *diagnostic.Service) (*Serv
return nil, errors.Wrap(err, "httppost service")
}
s.appendSMTPService()
s.appendTeamsService()
s.appendTelegramService()
if err := s.appendSlackService(); err != nil {
return nil, errors.Wrap(err, "slack service")
Expand Down Expand Up @@ -956,6 +958,18 @@ func (s *Server) appendTritonService() {
s.AppendService("triton", srv)
}

func (s *Server) appendTeamsService() {
c := s.config.Teams
d := s.DiagService.NewTeamsHandler()
srv := teams.NewService(c, d)

s.TaskMaster.TeamsService = srv
s.AlertService.TeamsService = srv

s.SetDynamicService("teams", srv)
s.AppendService("teams", srv)
}

// Err returns an error channel that multiplexes all out of band errors received from all services.
func (s *Server) Err() <-chan error { return s.err }

Expand Down
Loading

0 comments on commit f7d8d4e

Please sign in to comment.