Skip to content

Commit

Permalink
add sideload node and service
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Nov 8, 2017
1 parent 5f80c34 commit 216b381
Show file tree
Hide file tree
Showing 141 changed files with 1,321 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- [#1578](https://github.com/influxdata/kapacitor/pull/1578): Add support for exposing logs via the API.
- [#1605](https://github.com/influxdata/kapacitor/issues/1605): Add support for {{ .Duration }} on Alert Message property.
- [#1644](https://github.com/influxdata/kapacitor/issues/1644): Add support for [JSON lines](https://en.wikipedia.org/wiki/JSON_Streaming#Line_delimited_JSON) for steaming HTTP logs.
- [#1637](https://github.com/influxdata/kapacitor/issues/1637): Add new node Sideload, that allows loading data from files into the stream of data. Data can be loaded using a hierarchy.

### Bugfixes

Expand Down
16 changes: 16 additions & 0 deletions client/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -2357,6 +2357,22 @@ GET /kapacitor/v1/ping
| ---- | ------- |
| 204 | Success |

### Sideload Reload

You can trigger a reload of all sideload sources by making a POST request to `kapacitor/v1/sideload/reload`, with an empty body.

#### Example

```
POST /kapacitor/v1/sideload/reload
```

#### Response

| Code | Meaning |
| ---- | ------- |
| 204 | Success |


### Debug Vars

Expand Down
6 changes: 2 additions & 4 deletions edge/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,10 @@ func (pm *pointMessage) ToRow() *models.Row {
Name: pm.name,
Tags: pm.tags,
}
row.Columns = make([]string, len(pm.fields)+1)
row.Columns = make([]string, 1, len(pm.fields)+1)
row.Columns[0] = "time"
i := 1
for f := range pm.fields {
row.Columns[i] = f
i++
row.Columns = append(row.Columns, f)
}
// Sort all columns but leave time as first
sort.Strings(row.Columns[1:])
Expand Down
2 changes: 1 addition & 1 deletion integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3187,7 +3187,7 @@ func testBatcher(t *testing.T, name, script string) (clock.Setter, *kapacitor.Ex
var data io.ReadCloser
for i := 0; err == nil; {
f := fmt.Sprintf("%s.%d.brpl", name, i)
data, err = os.Open(path.Join("data", f))
data, err = os.Open(path.Join("testdata", f))
if err == nil {
allData = append(allData, data)
i++
Expand Down
238 changes: 234 additions & 4 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"html"
"io/ioutil"
Expand Down Expand Up @@ -49,6 +50,7 @@ import (
"github.com/influxdata/kapacitor/services/pushover/pushovertest"
"github.com/influxdata/kapacitor/services/sensu"
"github.com/influxdata/kapacitor/services/sensu/sensutest"
"github.com/influxdata/kapacitor/services/sideload"
"github.com/influxdata/kapacitor/services/slack"
"github.com/influxdata/kapacitor/services/slack/slacktest"
"github.com/influxdata/kapacitor/services/smtp"
Expand All @@ -73,7 +75,12 @@ import (
var diagService *diagnostic.Service

func init() {
diagService = diagnostic.NewService(diagnostic.NewConfig(), ioutil.Discard, ioutil.Discard)
flag.Parse()
out := ioutil.Discard
if testing.Verbose() {
out = os.Stderr
}
diagService = diagnostic.NewService(diagnostic.NewConfig(), out, out)
diagService.Open()
}

Expand Down Expand Up @@ -9532,7 +9539,7 @@ stream
t.Fatal(err)
}
name := "TestStream_KapacitorLoopback"
data, err := os.Open(path.Join(dir, "data", name+".srpl"))
data, err := os.Open(path.Join(dir, "testdata", name+".srpl"))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -9664,7 +9671,7 @@ stream
t.Fatal(err)
}
name := "TestStream_KapacitorLoopback"
data, err := os.Open(path.Join(dir, "data", name+".srpl"))
data, err := os.Open(path.Join(dir, "testdata", name+".srpl"))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -9731,6 +9738,229 @@ stream
}
}

func TestStream_Sideload(t *testing.T) {
wd, err := os.Getwd()
if err != nil {
t.Fatal(err)
}
var script = fmt.Sprintf(`
stream
|from()
.database('dbname')
.retentionPolicy('rpname')
.measurement('m')
.groupBy('t0', 't1', 't2')
|sideload()
.source('file://%s/testdata/sideload')
.order('t0/{{.t0}}.yml', 't1/{{.t1}}.yml', 't2/{{.t2}}.yml')
.field('f1', 0)
.field('f2', 0.0)
.tag('t3', 'one')
|log()
|httpOut('TestStream_Sideload')
`, wd)

er := models.Result{
Series: models.Rows{
{
Name: "m",
Tags: map[string]string{"t0": "a", "t1": "m", "t2": "x", "t3": "one"},
Columns: []string{"time", "f1", "f2", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
0.0,
0.0,
1.0,
},
},
},
{
Name: "m",
Tags: map[string]string{"t0": "b", "t1": "n", "t2": "y", "t3": "why"},
Columns: []string{"time", "f1", "f2", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
2.0,
3.5,
1.0,
},
},
},
{
Name: "m",
Tags: map[string]string{"t0": "c", "t1": "o", "t2": "y", "t3": "why"},
Columns: []string{"time", "f1", "f2", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
12.0,
13.5,
1.0,
},
},
},
},
}
tmInit := func(tm *kapacitor.TaskMaster) {
tm.SideloadService = sideload.NewService(diagService.NewSideloadHandler())
}

testStreamerWithOutput(t, "TestStream_Sideload", script, 1*time.Second, er, true, tmInit)
}

func TestStream_Sideload_JSON(t *testing.T) {
wd, err := os.Getwd()
if err != nil {
t.Fatal(err)
}
var script = fmt.Sprintf(`
stream
|from()
.database('dbname')
.retentionPolicy('rpname')
.measurement('m')
.groupBy('t0', 't1', 't2')
|sideload()
.source('file://%s/testdata/sideload')
.order('t0/{{.t0}}.json', 't1/{{.t1}}.json', 't2/{{.t2}}.yml')
.field('f1', 0)
.field('f2', 0.0)
.tag('t3', 'one')
|log()
|httpOut('TestStream_Sideload')
`, wd)

er := models.Result{
Series: models.Rows{
{
Name: "m",
Tags: map[string]string{"t0": "a", "t1": "m", "t2": "x", "t3": "one"},
Columns: []string{"time", "f1", "f2", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
0.0,
0.0,
1.0,
},
},
},
{
Name: "m",
Tags: map[string]string{"t0": "b", "t1": "n", "t2": "y", "t3": "why"},
Columns: []string{"time", "f1", "f2", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
2.0,
3.5,
1.0,
},
},
},
{
Name: "m",
Tags: map[string]string{"t0": "c", "t1": "o", "t2": "y", "t3": "why"},
Columns: []string{"time", "f1", "f2", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
12.0,
13.5,
1.0,
},
},
},
},
}
tmInit := func(tm *kapacitor.TaskMaster) {
tm.SideloadService = sideload.NewService(diagService.NewSideloadHandler())
}

testStreamerWithOutput(t, "TestStream_Sideload", script, 1*time.Second, er, true, tmInit)
}

func TestStream_Sideload_Multiple(t *testing.T) {
wd, err := os.Getwd()
if err != nil {
t.Fatal(err)
}
var script = fmt.Sprintf(`
stream
|from()
.database('dbname')
.retentionPolicy('rpname')
.measurement('m')
.groupBy('t0', 't1', 't2')
|sideload()
.source('file://%[1]s/testdata/sideload')
.order('t0/{{.t0}}.yml', 't1/{{.t1}}.yml', 't2/{{.t2}}.yml')
.field('f1', 0)
.field('f2', 0.0)
.tag('t3', 'one')
|sideload()
.source('file://%[1]s/testdata/sideload')
.order('t0/{{.t0}}.yml', 't1/{{.t1}}.yml', 't2/{{.t2}}.yml')
.field('other', -1.0)
|log()
|httpOut('TestStream_Sideload')
`, wd)

er := models.Result{
Series: models.Rows{
{
Name: "m",
Tags: map[string]string{"t0": "a", "t1": "m", "t2": "x", "t3": "one"},
Columns: []string{"time", "f1", "f2", "other", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
0.0,
0.0,
-1.0,
1.0,
},
},
},
{
Name: "m",
Tags: map[string]string{"t0": "b", "t1": "n", "t2": "y", "t3": "why"},
Columns: []string{"time", "f1", "f2", "other", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
2.0,
3.5,
56.0,
1.0,
},
},
},
{
Name: "m",
Tags: map[string]string{"t0": "c", "t1": "o", "t2": "y", "t3": "why"},
Columns: []string{"time", "f1", "f2", "other", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
12.0,
13.5,
56.0,
1.0,
},
},
},
},
}
tmInit := func(tm *kapacitor.TaskMaster) {
tm.SideloadService = sideload.NewService(diagService.NewSideloadHandler())
}

testStreamerWithOutput(t, "TestStream_Sideload", script, 1*time.Second, er, true, tmInit)
}

func TestStream_InfluxDBOut(t *testing.T) {

var script = `
Expand Down Expand Up @@ -10970,7 +11200,7 @@ func testStreamer(
if err != nil {
t.Fatal(err)
}
data, err := os.Open(path.Join(dir, "data", name+".srpl"))
data, err := os.Open(path.Join(dir, "testdata", name+".srpl"))
if err != nil {
t.Fatal(err)
}
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
9 changes: 9 additions & 0 deletions integrations/testdata/TestStream_Sideload.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
dbname
rpname
m,t0=a,t1=m,t2=x value=1 0000000000
dbname
rpname
m,t0=b,t1=n,t2=y value=1 0000000000
dbname
rpname
m,t0=c,t1=o,t2=y value=1 0000000000
File renamed without changes.
File renamed without changes.
File renamed without changes.
3 changes: 3 additions & 0 deletions integrations/testdata/sideload/t0/b.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"f1": 2
}
1 change: 1 addition & 0 deletions integrations/testdata/sideload/t0/b.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
f1: 2
3 changes: 3 additions & 0 deletions integrations/testdata/sideload/t0/c.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"f1": 12
}
1 change: 1 addition & 0 deletions integrations/testdata/sideload/t0/c.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
f1: 12
3 changes: 3 additions & 0 deletions integrations/testdata/sideload/t1/n.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"f2": 3.5
}
1 change: 1 addition & 0 deletions integrations/testdata/sideload/t1/n.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
f2: 3.5
4 changes: 4 additions & 0 deletions integrations/testdata/sideload/t1/o.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"f1": 42,
"f2": 13.5
}
2 changes: 2 additions & 0 deletions integrations/testdata/sideload/t1/o.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
f1: 42
f2: 13.5
4 changes: 4 additions & 0 deletions integrations/testdata/sideload/t2/y.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"t3": "why",
"other": 56
}
2 changes: 2 additions & 0 deletions integrations/testdata/sideload/t2/y.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
t3: why
other: 56
Loading

0 comments on commit 216b381

Please sign in to comment.