Skip to content

Commit

Permalink
feat: tricklenode (influxdata#2530)
Browse files Browse the repository at this point in the history
* feat: tricklenode

* chore: update changelog

* test: fix TestTrickle ast test

* chore: cleanup imports
  • Loading branch information
docmerlin authored Jun 2, 2021
1 parent 22315fb commit fef0d30
Show file tree
Hide file tree
Showing 13 changed files with 267 additions and 28 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

### Features
- [#2484](https://github.com/influxdata/kapacitor/pull/2484): Add Zenoss alert event handler.
- [#2512](https://github.com/influxdata/kapacitor/pull/2512): Pull in auth code from Kapacitor Enterprise.
- [#2493](https://github.com/influxdata/kapacitor/pull/2493): Route kafka alerts to partitions by ID, and allow for configuring the hashing strategy.
- [#2512](https://github.com/influxdata/kapacitor/pull/2512): Pull in auth code from Kapacitor Enterprise.
- [#2530](https://github.com/influxdata/kapacitor/pull/2530): Add a node tricklenode that converts batches to streams, the inverse of windownode.
- [#2544](https://github.com/influxdata/kapacitor/pull/2544): flux tasks skeleton in Kapacitor
- [#2555](https://github.com/influxdata/kapacitor/pull/2555): run flux tasks with built-in flux engine
- [#2559](https://github.com/influxdata/kapacitor/pull/2559): kapacitor cli supports flux tasks
Expand Down
55 changes: 39 additions & 16 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ import (
"github.com/influxdata/kapacitor/alert"
"github.com/influxdata/kapacitor/clock"
"github.com/influxdata/kapacitor/models"
alertservice "github.com/influxdata/kapacitor/services/alert"
"github.com/influxdata/kapacitor/services/httppost"
"github.com/influxdata/kapacitor/services/storage/storagetest"
"github.com/influxdata/wlog"
)

Expand Down Expand Up @@ -2605,6 +2602,42 @@ data
testBatcherWithOutput(t, "TestBatch_StateTracking", script, 8*time.Second, er, false)
}

func TestBatch_Trickle(t *testing.T) {
var script = `
var data = batch
|query('SELECT value FROM "telegraf"."default"."cpu"')
.period(4s)
.every(4s)
.groupBy('host')
data
|trickle()
|window().period(10s)
|httpOut('TestBatch_Trickle')`
er := models.Result{
Series: models.Rows{
{
Name: "cpu_usage_idle",
Tags: map[string]string{"cpu": "cpu-total"},
Columns: []string{"time", "mean"},
Values: [][]interface{}{
{time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), 90.38281469458698},
{time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC), 80.38281469458698},
},
},
{
Name: "cpu_usage_idle",
Tags: map[string]string{"cpu": "cpu0"},
Columns: []string{"time", "mean"},
Values: [][]interface{}{
{time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), 83.56930693069836},
},
},
},
}

testBatcherWithOutput(t, "TestBatch_Trickle", script, 40*time.Second, er, false)
}

func TestBatch_StateCount(t *testing.T) {
var script = `
var data = batch
Expand Down Expand Up @@ -3689,27 +3722,18 @@ batch

// Helper test function for batcher
func testBatcher(t *testing.T, name, script string) (clock.Setter, *kapacitor.ExecutingTask, <-chan error, *kapacitor.TaskMaster) {
t.Helper()
if testing.Verbose() {
wlog.SetLevel(wlog.DEBUG)
} else {
wlog.SetLevel(wlog.OFF)
}

// Create a new execution env
d := diagService.NewKapacitorHandler()
tm := kapacitor.NewTaskMaster("testBatcher", newServerInfo(), d)
httpdService := newHTTPDService()
tm.HTTPDService = httpdService
tm.TaskStore = taskStore{}
tm.DeadmanService = deadman{}
tm.HTTPPostService, _ = httppost.NewService(nil, diagService.NewHTTPPostHandler())
as := alertservice.NewService(diagService.NewAlertServiceHandler())
as.StorageService = storagetest.New()
as.HTTPDService = httpdService
if err := as.Open(); err != nil {
tm, err := createTaskMaster("testBatcher")
if err != nil {
t.Fatal(err)
}
tm.AlertService = as
tm.Open()

// Create task
Expand Down Expand Up @@ -3764,7 +3788,6 @@ func testBatcherWithOutput(
if err != nil {
t.Error(err)
}

// Get the result
output, err := et.GetOutput(name)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion integrations/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func Bench(b *testing.B, tasksCount, pointCount, expectedProcessedCount int, tic
for i := 0; i < b.N; i++ {
// Do not time setup
b.StopTimer()
tm, err := createTaskMaster()
tm, err := createTaskMaster("testStreamer")
if err != nil {
b.Fatal(err)
}
Expand Down
18 changes: 9 additions & 9 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11735,7 +11735,7 @@ stream
`

// Create a new execution env
tm, err := createTaskMaster()
tm, err := createTaskMaster("testStreamer")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -11791,7 +11791,7 @@ stream
},
}
// Create a new execution env
tm, err := createTaskMaster()
tm, err := createTaskMaster("testStreamer")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -11923,7 +11923,7 @@ stream
},
}
// Create a new execution env
tm, err := createTaskMaster()
tm, err := createTaskMaster("testStreamer")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -12348,7 +12348,7 @@ stream
name := "TestStream_InfluxDBOut"

// Create a new execution env
tm, err := createTaskMaster()
tm, err := createTaskMaster("testStreamer")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -12408,7 +12408,7 @@ stream
name := "TestStream_InfluxDBOut"

// Create a new execution env
tm, err := createTaskMaster()
tm, err := createTaskMaster("testStreamer")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -13459,7 +13459,7 @@ func testStreamer(
}

// Create a new execution env
tm, err := createTaskMaster()
tm, err := createTaskMaster("testStreamer")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -13540,7 +13540,7 @@ func testStreamerWithInputChannel(
}

// Create a new execution env
tm, err := createTaskMaster()
tm, err := createTaskMaster("testStreamer")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -13779,9 +13779,9 @@ func compareListIgnoreOrder(got, exp []interface{}, cmpF func(got, exp interface
return nil
}

func createTaskMaster() (*kapacitor.TaskMaster, error) {
func createTaskMaster(name string) (*kapacitor.TaskMaster, error) {
d := diagService.NewKapacitorHandler()
tm := kapacitor.NewTaskMaster("testStreamer", newServerInfo(), d)
tm := kapacitor.NewTaskMaster(name, newServerInfo(), d)
httpdService := newHTTPDService()
tm.HTTPDService = httpdService
tm.TaskStore = taskStore{}
Expand Down
34 changes: 34 additions & 0 deletions integrations/testdata/TestBatch_Trickle.0.brpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"name": "cpu_usage_idle",
"tags": {
"cpu": "cpu-total"
},
"points": [
{
"fields": {
"mean": 90.38281469458698
},
"time": "2015-10-30T17:14:12Z"
},
{
"fields": {
"mean": 80.38281469458698
},
"time": "2015-10-30T17:14:13Z"
}
]
}
{
"name": "cpu_usage_idle",
"tags": {
"cpu": "cpu0"
},
"points": [
{
"fields": {
"mean": 83.56930693069836
},
"time": "2015-10-30T17:14:12Z"
}
]
}
11 changes: 11 additions & 0 deletions pipeline/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,3 +535,14 @@ func (n *chainnode) Sideload() *SideloadNode {
n.linkChild(s)
return s
}

// Create a node that converts batches (such as windowed data) into non-batches.
func (n *chainnode) Trickle() *TrickleNode {
if n.Provides() != BatchEdge {
panic("cannot Trickle stream edge")
}

s := newTrickleNode()
n.linkChild(s)
return s
}
2 changes: 2 additions & 0 deletions pipeline/tick/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ func (a *AST) Create(n pipeline.Node, parents []ast.Node) (ast.Node, error) {
case *pipeline.StreamNode:
s := StreamNode{}
return s.Build()
case *pipeline.TrickleNode:
return NewTrickle(parents).Build(node)
case *pipeline.BatchNode:
b := BatchNode{}
return b.Build()
Expand Down
3 changes: 2 additions & 1 deletion pipeline/tick/tick_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/pipeline/tick"
"github.com/influxdata/kapacitor/tick/stateful"
Expand Down Expand Up @@ -116,7 +117,7 @@ func PipelineTickTestHelper(t *testing.T, pipe *pipeline.Pipeline, want string,
}

if got != want {
t.Errorf("unexpected TICKscript:\ngot:\n%v\nwant:\n%v\n", got, want)
t.Errorf("unexpected TICKscript:\n %s", cmp.Diff(got, want))
t.Log(got) // print is helpful to get the correct format.
}

Expand Down
26 changes: 26 additions & 0 deletions pipeline/tick/trickle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package tick

import (
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/ast"
)

// TrickleNode converts the StatsNode pipeline node into the TICKScript AST
type TrickleNode struct {
Function
}

// NewTrickle creates a TrickleNode function builder
func NewTrickle(parents []ast.Node) *TrickleNode {
return &TrickleNode{
Function{
Parents: parents,
},
}
}

// Build NewTrickle ast.Node
func (n *TrickleNode) Build(s *pipeline.TrickleNode) (ast.Node, error) {
n.Pipe("trickle")
return n.prev, n.err
}
20 changes: 20 additions & 0 deletions pipeline/tick/trickle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package tick_test

import (
"testing"
"time"
)

func TestTrickle(t *testing.T) {
pipe, _, from := StreamFrom()
w := from.Window()
w.Every = time.Second
w.Trickle()
want := `stream
|from()
|window()
.every(1s)
|trickle()
`
PipelineTickTestHelper(t, pipe, want)
}
53 changes: 53 additions & 0 deletions pipeline/trickle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package pipeline

import (
"encoding/json"
"fmt"
)

// A node that converts from batchedges to streamedges.
// Example:
// var errors = stream
// |from()
// |trickle()
// Children of trickle will be treated as if they are in a stream.
type TrickleNode struct {
chainnode `json:"-"`
}

func newTrickleNode() *TrickleNode {
return &TrickleNode{
chainnode: newBasicChainNode("trickle", BatchEdge, StreamEdge),
}
}

// MarshalJSON converts TrickleNode to JSON
// tick:ignore
func (n *TrickleNode) MarshalJSON() ([]byte, error) {
var raw = &struct {
TypeOf
}{
TypeOf: TypeOf{
Type: "trickle",
ID: n.ID(),
},
}
return json.Marshal(raw)
}

// UnmarshalJSON converts JSON to an TrickleNode
// tick:ignore
func (n *TrickleNode) UnmarshalJSON(data []byte) error {
var raw = &struct {
TypeOf
}{}
err := json.Unmarshal(data, raw)
if err != nil {
return err
}
if raw.Type != "trickle" {
return fmt.Errorf("error unmarshaling node %d of type %s as TrickleNode", raw.ID, raw.Type)
}
n.setID(raw.ID)
return nil
}
2 changes: 2 additions & 0 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,8 @@ func (et *ExecutingTask) createNode(p pipeline.Node, d NodeDiagnostic) (n Node,
n, err = newStateCountNode(et, t, d)
case *pipeline.SideloadNode:
n, err = newSideloadNode(et, t, d)
case *pipeline.TrickleNode:
n = newTrickleNode(et, t, d)
case *pipeline.BarrierNode:
n, err = newBarrierNode(et, t, d)
default:
Expand Down
Loading

0 comments on commit fef0d30

Please sign in to comment.