Skip to content

Commit

Permalink
Add ability to create database and rp with InfluxDBOutNode (influxdat…
Browse files Browse the repository at this point in the history
…a#881)

* add ability to create database and rp with InfluxDBOutNode

* close connection
  • Loading branch information
Nathaniel Cook authored Sep 9, 2016
1 parent 65c9e19 commit 613b247
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

- [#873](https://github.com/influxdata/kapacitor/pull/873): Add TCP alert handler
- [#869](https://github.com/influxdata/kapacitor/issues/869): Add ability to set alert message as a field
- [#854](https://github.com/influxdata/kapacitor/issues/854): Add `.create` property to InfluxDBOut node, which when set will create the database
and retention policy on task start.

### Bugfixes

Expand Down
31 changes: 31 additions & 0 deletions influxdb_out.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package kapacitor

import (
"bytes"
"errors"
"log"
"sync"
"time"

"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/influxdb"
"github.com/influxdata/kapacitor/models"
Expand Down Expand Up @@ -51,6 +53,35 @@ func (i *InfluxDBOutNode) runOut([]byte) error {
// Start the write buffer
i.wb.start()

// Create the database and retention policy
if i.i.CreateFlag {
var err error
var conn influxdb.Client
if i.i.Cluster != "" {
conn, err = i.et.tm.InfluxDBService.NewNamedClient(i.i.Cluster)
} else {
conn, err = i.et.tm.InfluxDBService.NewDefaultClient()
}
if err != nil {
i.logger.Printf("E! failed to connect to InfluxDB cluster %q to create database", i.i.Cluster)
} else {
var createDb bytes.Buffer
createDb.WriteString("CREATE DATABASE ")
createDb.WriteString(influxql.QuoteIdent(i.i.Database))
if i.i.RetentionPolicy != "" {
createDb.WriteString(" WITH NAME ")
createDb.WriteString(influxql.QuoteIdent(i.i.RetentionPolicy))
}
resp, err := conn.Query(influxdb.Query{Command: createDb.String()})
if err != nil {
i.logger.Printf("E! failed to create database %q on cluster %q: %v", i.i.Database, i.i.Cluster, err)
} else if resp.Err != "" {
i.logger.Printf("E! failed to create database %q on cluster %q: %s", i.i.Database, i.i.Cluster, resp.Err)
}
conn.Close()
}
}

switch i.Wants() {
case pipeline.StreamEdge:
for p, ok := i.ins[0].NextPoint(); ok; p, ok = i.ins[0].NextPoint() {
Expand Down
119 changes: 119 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6945,6 +6945,125 @@ stream
}
}
}
func TestStream_InfluxDBOut_CreateDatabase(t *testing.T) {

var script = `
stream
|from()
.measurement('cpu')
.where(lambda: "host" == 'nonexistant')
|influxDBOut()
.create()
.database('db')
`

done := make(chan error, 1)
var createQuery string

influxdb := NewMockInfluxDBService(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/ping" {
w.WriteHeader(http.StatusNoContent)
return
}
createQuery = r.URL.Query().Get("q")
done <- nil
}))

name := "TestStream_InfluxDBOut"

// Create a new execution env
tm := kapacitor.NewTaskMaster("testStreamer", logService)
tm.HTTPDService = httpService
tm.TaskStore = taskStore{}
tm.DeadmanService = deadman{}
tm.InfluxDBService = influxdb
tm.Open()

//Create the task
task, err := tm.NewTask(name, script, kapacitor.StreamTask, dbrps, 0, nil)
if err != nil {
t.Fatal(err)
}

//Start the task
et, err := tm.StartTask(task)
if err != nil {
t.Fatal(err)
}

t.Log(string(et.Task.Dot()))
defer tm.Close()

// Wait till we received a request
if e := <-done; e != nil {
t.Error(e)
}

expCreateQuery := `CREATE DATABASE db`
if createQuery != expCreateQuery {
t.Errorf("unexpected create database query: got %q exp: %q", createQuery, expCreateQuery)
}
}
func TestStream_InfluxDBOut_CreateDatabaseAndRP(t *testing.T) {

var script = `
stream
|from()
.measurement('cpu')
.where(lambda: "host" == 'nonexistant')
|influxDBOut()
.create()
.database('db')
.retentionPolicy('rp')
`

done := make(chan error, 1)
var createQuery string

influxdb := NewMockInfluxDBService(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/ping" {
w.WriteHeader(http.StatusNoContent)
return
}
createQuery = r.URL.Query().Get("q")
done <- nil
}))

name := "TestStream_InfluxDBOut"

// Create a new execution env
tm := kapacitor.NewTaskMaster("testStreamer", logService)
tm.HTTPDService = httpService
tm.TaskStore = taskStore{}
tm.DeadmanService = deadman{}
tm.InfluxDBService = influxdb
tm.Open()

//Create the task
task, err := tm.NewTask(name, script, kapacitor.StreamTask, dbrps, 0, nil)
if err != nil {
t.Fatal(err)
}

//Start the task
et, err := tm.StartTask(task)
if err != nil {
t.Fatal(err)
}

t.Log(string(et.Task.Dot()))
defer tm.Close()

// Wait till we received a request
if e := <-done; e != nil {
t.Error(e)
}

expCreateQuery := `CREATE DATABASE db WITH NAME rp`
if createQuery != expCreateQuery {
t.Errorf("unexpected create database query: got %q exp: %q", createQuery, expCreateQuery)
}
}

func TestStream_Selectors(t *testing.T) {

Expand Down
19 changes: 18 additions & 1 deletion pipeline/influxdb_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@ type InfluxDBOutNode struct {
// Default: 10s
FlushInterval time.Duration
// Static set of tags to add to all data points before writing them.
//tick:ignore
// tick:ignore
Tags map[string]string `tick:"Tag"`
// Create the specified database and retention policy
// tick:ignore
CreateFlag bool `tick:"Create"`
}

func newInfluxDBOutNode(wants EdgeType) *InfluxDBOutNode {
Expand All @@ -72,3 +75,17 @@ func (i *InfluxDBOutNode) Tag(key, value string) *InfluxDBOutNode {
i.Tags[key] = value
return i
}

// Create indicates that both the database and retention policy
// will be created, when the task is started.
// If the retention policy name is empty than no
// retention policy will be specified and
// the default retention policy name will be created.
//
// If the database already exists nothing happens.
//
// tick:property
func (i *InfluxDBOutNode) Create() *InfluxDBOutNode {
i.CreateFlag = true
return i
}

0 comments on commit 613b247

Please sign in to comment.