Skip to content

Commit

Permalink
Add a raw TCP handler to alert node.
Browse files Browse the repository at this point in the history
  • Loading branch information
Ross McDonald committed Sep 7, 2016
1 parent c4953cf commit 8e42b9e
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 0 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Changelog

## v1.0.1 [unreleased]

### Release Notes

### Features

- [#873](https://github.com/influxdata/kapacitor/pull/873): Add TCP alert handler

### Bugfixes

## v1.0.0 [2016-09-02]

### Release Notes
Expand Down
32 changes: 32 additions & 0 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
html "html/template"
"log"
"net"
"net/http"
"os"
"os/exec"
Expand Down Expand Up @@ -173,6 +174,11 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *
an.handlers = append(an.handlers, func(ad *AlertData) { an.handlePost(post, ad) })
}

for _, tcp := range n.TcpHandlers {
tcp := tcp
an.handlers = append(an.handlers, func(ad *AlertData) { an.handleTcp(tcp, ad) })
}

for _, email := range n.EmailHandlers {
email := email
an.handlers = append(an.handlers, func(ad *AlertData) { an.handleEmail(email, ad) })
Expand Down Expand Up @@ -910,6 +916,32 @@ func (a *AlertNode) handlePost(post *pipeline.PostHandler, ad *AlertData) {
return
}

func (a *AlertNode) handleTcp(tcp *pipeline.TcpHandler, ad *AlertData) {
buf := a.bufPool.Get().(*bytes.Buffer)
defer func() {
buf.Reset()
a.bufPool.Put(buf)
}()

err := json.NewEncoder(buf).Encode(ad)
if err != nil {
a.logger.Println("E! failed to marshal alert data json", err)
return
}

conn, err := net.Dial("tcp", tcp.Address)
if err != nil {
a.logger.Println("E! failed to connect", err)
return
}
defer conn.Close()

buf.WriteByte("\n")
conn.Write(buf.Bytes())

return
}

func (a *AlertNode) handleEmail(email *pipeline.EmailHandler, ad *AlertData) {
if a.et.tm.SMTPService != nil {
err := a.et.tm.SMTPService.SendMail(email.ToList, ad.Message, ad.Details)
Expand Down
25 changes: 25 additions & 0 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const defaultLogFileMode = 0600
//
// * log -- log alert data to file.
// * post -- HTTP POST data to a specified URL.
// * tcp -- Send data to a specified address via raw TCP.
// * email -- Send and email with alert data.
// * exec -- Execute a command passing alert data over STDIN.
// * HipChat -- Post alert message to HipChat room.
Expand Down Expand Up @@ -79,6 +80,7 @@ const defaultLogFileMode = 0600
// .crit(lambda: "value" > 30)
// .post("http://example.com/api/alert")
// .post("http://another.example.com/api/alert")
// .tcp("exampleendpoint.com:5678")
// .email('[email protected]')
//
//
Expand Down Expand Up @@ -283,6 +285,10 @@ type AlertNode struct {
// tick:ignore
PostHandlers []*PostHandler `tick:"Post"`

// Send the JSON alert data to the specified endpoint via TCP.
// tick:ignore
TcpHandlers []*TcpHandler `tick:"Tcp"`

// Email handlers
// tick:ignore
EmailHandlers []*EmailHandler `tick:"Email"`
Expand Down Expand Up @@ -464,6 +470,25 @@ type PostHandler struct {
URL string
}

// Send JSON alert data to a specified address over TCP.
// tick:property
func (a *AlertNode) Tcp(address string) *TcpHandler {
tcp := &TcpHandler{
AlertNode: a,
Address: address,
}
a.TcpHandlers = append(a.TcpHandlers, tcp)
return tcp
}

// tick:embedded:AlertNode.Tcp
type TcpHandler struct {
*AlertNode

// The endpoint address.
Address string
}

// Email the alert data.
//
// If the To list is empty, the To addresses from the configuration are used.
Expand Down

0 comments on commit 8e42b9e

Please sign in to comment.