Skip to content

Commit

Permalink
Add pipeline tick to AST helper functions
Browse files Browse the repository at this point in the history
  • Loading branch information
goller committed Oct 30, 2017
1 parent b6aeeb6 commit 20025dc
Show file tree
Hide file tree
Showing 27 changed files with 127 additions and 348 deletions.
173 changes: 19 additions & 154 deletions pipeline/tick/alert_test.go

Large diffs are not rendered by default.

9 changes: 1 addition & 8 deletions pipeline/tick/combine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ func TestCombine(t *testing.T) {
combine.Delimiter = "cup"
combine.Tolerance = time.Hour + 10*time.Minute
combine.Max = 1
got, err := PipelineTick(pipe)
if err != nil {
t.Fatalf("Unexpected error building pipeline %v", err)
}

want := `stream
|from()
Expand All @@ -53,8 +49,5 @@ func TestCombine(t *testing.T) {
.tolerance(70m)
.max(1)
`
if got != want {
t.Errorf("TestCombine = %v, want %v", got, want)
t.Log(got) // print is helpful to get the correct format.
}
PipelineTickTestHelper(t, pipe, want)
}
1 change: 1 addition & 0 deletions pipeline/tick/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func (n *DefaultNode) Build(d *pipeline.DefaultNode) (ast.Node, error) {
for k := range d.Tags {
tagKeys = append(tagKeys, k)
}
sort.Strings(tagKeys)
for _, k := range tagKeys {
n.Dot("tag", k, d.Tags[k])
}
Expand Down
11 changes: 3 additions & 8 deletions pipeline/tick/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,15 @@ func TestDefault(t *testing.T) {
def.Field("judgement", "plantiff")
def.Field("finance", "loan")
def.Tag("vocabulary", "volcano")
got, err := PipelineTick(pipe)
if err != nil {
t.Fatalf("Unexpected error building pipeline %v", err)
}
def.Tag("make", "toyota")

want := `stream
|from()
|default()
.field('finance', 'loan')
.field('judgement', 'plantiff')
.tag('make', 'toyota')
.tag('vocabulary', 'volcano')
`
if got != want {
t.Errorf("TestDefault = %v, want %v", got, want)
t.Log(got) // print is helpful to get the correct format.
}
PipelineTickTestHelper(t, pipe, want)
}
11 changes: 2 additions & 9 deletions pipeline/tick/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ func TestDelete(t *testing.T) {
delete.Tag("race")
delete.Tag("city")

got, err := PipelineTick(pipe)
if err != nil {
t.Fatalf("Unexpected error building pipeline %v", err)
}

want := `stream
|from()
|delete()
Expand All @@ -25,8 +20,6 @@ func TestDelete(t *testing.T) {
.tag('race')
.tag('city')
`
if got != want {
t.Errorf("TestDelete = %v, want %v", got, want)
t.Log(got) // print is helpful to get the correct format.
}

PipelineTickTestHelper(t, pipe, want)
}
9 changes: 1 addition & 8 deletions pipeline/tick/derivative_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ func TestDerivative(t *testing.T) {
d.As = "very important"
d.Unit = time.Hour
d.NonNegative()
got, err := PipelineTick(pipe)
if err != nil {
t.Fatalf("Unexpected error building pipeline %v", err)
}

want := `stream
|from()
Expand All @@ -23,8 +19,5 @@ func TestDerivative(t *testing.T) {
.unit(1h)
.nonNegative()
`
if got != want {
t.Errorf("TestDerivative = %v, want %v", got, want)
t.Log(got) // print is helpful to get the correct format.
}
PipelineTickTestHelper(t, pipe, want)
}
10 changes: 1 addition & 9 deletions pipeline/tick/eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@ func TestEval(t *testing.T) {
})
eval.As("multiply", "divide").Tags("cells").Keep("petri", "dish").Quiet()

got, err := PipelineTick(pipe)
if err != nil {
t.Fatalf("Unexpected error building pipeline %v", err)
}

want := `stream
|from()
|eval(lambda: lambda: "cpu" != 'cpu-total' AND lambda: "host" =~ /logger\d+/)
Expand All @@ -50,8 +45,5 @@ func TestEval(t *testing.T) {
.quiet()
.keep('petri', 'dish')
`
if got != want {
t.Errorf("TestEval = %v, want %v", got, want)
t.Log(got) // print is helpful to get the correct format.
}
PipelineTickTestHelper(t, pipe, want)
}
9 changes: 1 addition & 8 deletions pipeline/tick/flatten_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ func TestFlatten(t *testing.T) {
flatten.Delimiter = "blackline"
flatten.Tolerance = time.Second
flatten.DropOriginalFieldName()
got, err := PipelineTick(pipe)
if err != nil {
t.Fatalf("Unexpected error building pipeline %v", err)
}

want := `stream
|from()
Expand All @@ -25,8 +21,5 @@ func TestFlatten(t *testing.T) {
.tolerance(1s)
.dropOriginalFieldName()
`
if got != want {
t.Errorf("TestFlatten = %v, want %v", got, want)
t.Log(got) // print is helpful to get the correct format.
}
PipelineTickTestHelper(t, pipe, want)
}
10 changes: 1 addition & 9 deletions pipeline/tick/from_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,6 @@ func TestFrom(t *testing.T) {
from.Truncate = time.Second
from.Round = time.Second

got, err := PipelineTick(pipe)
if err != nil {
t.Fatalf("Unexpected error building pipeline %v", err)
}

want := `stream
|from()
.database('mydb')
Expand All @@ -59,8 +54,5 @@ func TestFrom(t *testing.T) {
.where(lambda: lambda: "cpu" != 'cpu-total' AND lambda: "host" =~ /logger\d+/)
.groupBy('this', 'that', 'these', 'those')
`
if got != want {
t.Errorf("TestFrom = %v, want %v", got, want)
t.Log(got) // print is helpful to get the correct format.
}
PipelineTickTestHelper(t, pipe, want)
}
9 changes: 1 addition & 8 deletions pipeline/tick/group_by_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ func TestGroupBy(t *testing.T) {
logger := from.Log()
logger.Level = "Coxeter"
logger.GroupBy("simplex", "cube", "orthoplpex", "demicube").Exclude("4_21", "2_41", "1_42").ByMeasurement()
got, err := PipelineTick(pipe)
if err != nil {
t.Fatalf("Unexpected error building pipeline %v", err)
}

want := `stream
|from()
Expand All @@ -22,8 +18,5 @@ func TestGroupBy(t *testing.T) {
.exclude('4_21', '2_41', '1_42')
.byMeasurement()
`
if got != want {
t.Errorf("TestGroupBy = %v, want %v", got, want)
t.Log(got) // print is helpful to get the correct format.
}
PipelineTickTestHelper(t, pipe, want)
}
9 changes: 1 addition & 8 deletions pipeline/tick/http_out_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,10 @@ import (
func TestHTTPOut(t *testing.T) {
pipe, _, from := StreamFrom()
from.HttpOut("There is never any ending to Paris – Hemingway")
got, err := PipelineTick(pipe)
if err != nil {
t.Fatalf("Unexpected error building pipeline %v", err)
}

want := `stream
|from()
|httpOut('There is never any ending to Paris – Hemingway')
`
if got != want {
t.Errorf("TestHTTPOut = %v, want %v", got, want)
t.Log(got) // print is helpful to get the correct format.
}
PipelineTickTestHelper(t, pipe, want)
}
9 changes: 1 addition & 8 deletions pipeline/tick/http_post_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ func TestHTTPPost(t *testing.T) {
pipe, _, from := StreamFrom()
post := from.HttpPost("http://influx1.local:8086/query", "http://influx2.local:8086/query")
post.Endpoint("endpoint1").Endpoint("endpoint2").Header("Authorization", "Basic GOTO 10").Header("X-Forwarded-For", `10 PRINT "HELLO WORLD"`)
got, err := PipelineTick(pipe)
if err != nil {
t.Fatalf("Unexpected error building pipeline %v", err)
}

want := `stream
|from()
Expand All @@ -21,8 +17,5 @@ func TestHTTPPost(t *testing.T) {
.header('Authorization', 'Basic GOTO 10')
.header('X-Forwarded-For', '10 PRINT "HELLO WORLD"')
`
if got != want {
t.Errorf("TestHTTPPost = %v, want %v", got, want)
t.Log(got) // print is helpful to get the correct format.
}
PipelineTickTestHelper(t, pipe, want)
}
10 changes: 1 addition & 9 deletions pipeline/tick/influx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@ func TestInfluxQL(t *testing.T) {
influx.As = "my"
influx.UsePointTimes()

got, err := PipelineTick(pipe)
if err != nil {
t.Fatalf("Unexpected error building pipeline %v", err)
}

want := `stream
|from()
|mean('streets')
Expand All @@ -23,8 +18,5 @@ func TestInfluxQL(t *testing.T) {
.as('my')
.usePointTimes()
`
if got != want {
t.Errorf("TestInfluxQL = %v, want %v", got, want)
t.Log(got) // print is helpful to get the correct format.
}
PipelineTickTestHelper(t, pipe, want)
}
10 changes: 1 addition & 9 deletions pipeline/tick/influxdb_out_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@ func TestInfluxDBOut(t *testing.T) {
influx.FlushInterval = time.Second
influx.Create()

got, err := PipelineTick(pipe)
if err != nil {
t.Fatalf("Unexpected error building pipeline %v", err)
}

want := `stream
|from()
|influxDBOut()
Expand All @@ -38,8 +33,5 @@ func TestInfluxDBOut(t *testing.T) {
.tag('kapacitor', 'true')
.tag('version', '0.2')
`
if got != want {
t.Errorf("TestInfluxDBOut = %v, want %v", got, want)
t.Log(got) // print is helpful to get the correct format.
}
PipelineTickTestHelper(t, pipe, want)
}
9 changes: 1 addition & 8 deletions pipeline/tick/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ func TestJoin(t *testing.T) {
join.Tolerance = time.Second
join.StreamName = "kwh"

got, err := PipelineTick(pipe)
if err != nil {
t.Fatalf("Unexpected error building pipeline %v", err)
}
want := `var from3 = stream
|from()
.measurement('floor_power')
Expand All @@ -45,8 +41,5 @@ stream
.streamName('kwh')
.tolerance(1s)
`
if got != want {
t.Errorf("TestJoin = %v, want %v", got, want)
t.Log(got) // print is helpful to get the correct format.
}
PipelineTickTestHelper(t, pipe, want)
}
10 changes: 1 addition & 9 deletions pipeline/tick/k8s_autoscale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ func TestK8sAutoscale(t *testing.T) {
},
}

got, err := PipelineTick(pipe)
if err != nil {
t.Fatalf("Unexpected error building pipeline %v", err)
}

want := `stream
|from()
|k8sAutoscale()
Expand All @@ -81,8 +76,5 @@ func TestK8sAutoscale(t *testing.T) {
.kindTag('deployment')
.resourceTag('dock')
`
if got != want {
t.Errorf("TestK8sAutoscale = %v, want %v", got, want)
t.Log(got) // print is helpful to get the correct format.
}
PipelineTickTestHelper(t, pipe, want)
}
41 changes: 41 additions & 0 deletions pipeline/tick/kapacitor_loopback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package tick

import (
"sort"

"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/ast"
)

// KapacitorLoopbackNode converts the KapacitorLoopbackNode pipeline node into the TICKScript AST
type KapacitorLoopbackNode struct {
Function
}

// NewKapacitorLoopbackNode creates a KapacitorLoopbackNode function builder
func NewKapacitorLoopbackNode(parents []ast.Node) *KapacitorLoopbackNode {
return &KapacitorLoopbackNode{
Function{
Parents: parents,
},
}
}

// Build creates a KapacitorLoopbackNode ast.Node
func (n *KapacitorLoopbackNode) Build(k *pipeline.KapacitorLoopbackNode) (ast.Node, error) {
n.Pipe("kapacitorLoopback").
Dot("database", k.Database).
Dot("retentionPolicy", k.RetentionPolicy).
Dot("measurement", k.Measurement)

var tagKeys []string
for key := range k.Tags {
tagKeys = append(tagKeys, key)
}
sort.Strings(tagKeys)
for _, key := range tagKeys {
n.Dot("tag", key, k.Tags[key])
}

return n.prev, n.err
}
26 changes: 26 additions & 0 deletions pipeline/tick/kapacitor_loopback_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package tick_test

import (
"testing"
)

func TestKapacitorLoopback(t *testing.T) {
pipe, _, from := StreamFrom()
loop := from.KapacitorLoopback()
loop.Database = "mydb"
loop.RetentionPolicy = "myrp"
loop.Measurement = "meas"
loop.Tag("vocabulary", "volcano")
loop.Tag("season", "winter")

want := `stream
|from()
|kapacitorLoopback()
.database('mydb')
.retentionPolicy('myrp')
.measurement('meas')
.tag('season', 'winter')
.tag('vocabulary', 'volcano')
`
PipelineTickTestHelper(t, pipe, want)
}
9 changes: 1 addition & 8 deletions pipeline/tick/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ func TestLog(t *testing.T) {
logger.Prefix = "oh no"
logger.Log() // default options

got, err := PipelineTick(pipe)
if err != nil {
t.Fatalf("Unexpected error building pipeline %v", err)
}
want := `batch
|query('select cpu_usage from cpu')
|log()
Expand All @@ -23,8 +19,5 @@ func TestLog(t *testing.T) {
|log()
.level('INFO')
`
if got != want {
t.Errorf("TestLog = %v, want %v", got, want)
t.Log(got) // print is helpful to get the correct format.
}
PipelineTickTestHelper(t, pipe, want)
}
Loading

0 comments on commit 20025dc

Please sign in to comment.