Skip to content

Commit

Permalink
add source stream node
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Feb 18, 2016
1 parent f741d46 commit 49ae886
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
The `event` property has been removed from the Alerta node and is now set as the value of the alert ID.
- [#232](https://github.com/influxdata/kapacitor/issues/232): Better error message for alert integrations. Better error message for VictorOps 404 response.
- [#231](https://github.com/influxdata/kapacitor/issues/231): Fix window logic when there were gaps in the data stream longer than window every value.
- [#213](https://github.com/influxdata/kapacitor/issues/231): Add SourceStreamNode so that yuou must always first call `.from` on the `stream` object before filtering it, so as to not create confusing to understand TICKscripts.

## v0.10.1 [2016-02-08]

Expand Down
6 changes: 3 additions & 3 deletions cmd/kapacitord/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestServer_DefineTask(t *testing.T) {
if ti.TICKscript != tick {
t.Fatalf("unexpected TICKscript got %s exp %s", ti.TICKscript, tick)
}
dot := "digraph testTaskName {\nstream0 -> stream1;\n}"
dot := "digraph testTaskName {\nsrcstream0 -> stream1;\n}"
if ti.Dot != dot {
t.Fatalf("unexpected dot got %s exp %s", ti.Dot, dot)
}
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestServer_EnableTask(t *testing.T) {
if ti.TICKscript != tick {
t.Fatalf("unexpected TICKscript got %s exp %s", ti.TICKscript, tick)
}
dot := "digraph testTaskName {\nstream0 -> stream1 [label=\"0\"];\n}"
dot := "digraph testTaskName {\nsrcstream0 -> stream1 [label=\"0\"];\n}"
if ti.Dot != dot {
t.Fatalf("unexpected dot got %s exp %s", ti.Dot, dot)
}
Expand Down Expand Up @@ -232,7 +232,7 @@ func TestServer_DisableTask(t *testing.T) {
if ti.TICKscript != tick {
t.Fatalf("unexpected TICKscript got %s exp %s", ti.TICKscript, tick)
}
dot := "digraph testTaskName {\nstream0 -> stream1;\n}"
dot := "digraph testTaskName {\nsrcstream0 -> stream1;\n}"
if ti.Dot != dot {
t.Fatalf("unexpected dot got %s exp %s", ti.Dot, dot)
}
Expand Down
4 changes: 2 additions & 2 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func CreatePipeline(script string, sourceEdge EdgeType, scope *tick.Scope, deadm
var src Node
switch sourceEdge {
case StreamEdge:
src = newStreamNode()
src = newSourceStreamNode()
scope.Set("stream", src)
case BatchEdge:
src = newSourceBatchNode()
Expand All @@ -51,7 +51,7 @@ func CreatePipeline(script string, sourceEdge EdgeType, scope *tick.Scope, deadm
return nil, err
}
if sourceEdge == StreamEdge && deadman.Global() {
src.(*StreamNode).Deadman(deadman.Threshold(), deadman.Interval())
src.(*SourceStreamNode).Deadman(deadman.Threshold(), deadman.Interval())
}
return p, nil

Expand Down
36 changes: 26 additions & 10 deletions pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ func (d deadman) Message() string { return d.message }
func (d deadman) Global() bool { return d.global }

func TestTICK_To_Pipeline_MultiLine(t *testing.T) {
assert := assert.New(t)

var tickScript = `
var w = stream.window()
var w = stream.from().window()
w.period(10s)
w.every(1s)
`
Expand All @@ -35,15 +33,33 @@ w.every(1s)

scope := tick.NewScope()
p, err := CreatePipeline(tickScript, StreamEdge, scope, d)
assert.Nil(err)
assert.NotNil(p)
assert.Equal(1, len(p.sources[0].Children()))
w, ok := p.sources[0].Children()[0].(*WindowNode)
if assert.True(ok) {
assert.Equal(time.Duration(10)*time.Second, w.Period)
assert.Equal(time.Duration(1)*time.Second, w.Every)
if err != nil {
t.Fatal(err)
}
if p == nil {
t.Fatal("unexpected pipeline, got nil")
}
if exp, got := 1, len(p.sources); exp != got {
t.Errorf("unexpected number of pipeline sources: exp %d got %d", exp, got)
}
if exp, got := 1, len(p.sources[0].Children()); exp != got {
t.Errorf("unexpected number of source0 children: exp %d got %d", exp, got)
}
sn, ok := p.sources[0].Children()[0].(*StreamNode)
if !ok {
t.Fatalf("unexpected node type: exp StreamNode got %T", p.sources[0].Children()[0])
}
w, ok := sn.Children()[0].(*WindowNode)
if !ok {
t.Fatalf("unexpected node type: exp WindowNode got %T", sn.Children()[0])
}

if exp, got := time.Duration(10)*time.Second, w.Period; exp != got {
t.Errorf("unexpected window period exp %v got %v", exp, got)
}
if exp, got := time.Duration(1)*time.Second, w.Every; exp != got {
t.Errorf("unexpected window every exp %v got %v", exp, got)
}
}

func TestPipelineSort(t *testing.T) {
Expand Down
49 changes: 45 additions & 4 deletions pipeline/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,53 @@ import (
"github.com/influxdata/kapacitor/tick"
)

// A StreamNode represents the source of data being
// A SourceStreamNode represents the source of data being
// streamed to Kapacitor via any of its inputs.
// The stream node allows you to select which portion of the stream
// you want to process.
// The `stream` variable in stream tasks is an instance of
// a StreamNode.
// a SourceStreamNode.
// SourceStreamNode.From is the method/property of this node.
type SourceStreamNode struct {
node
}

func newSourceStreamNode() *SourceStreamNode {
return &SourceStreamNode{
node: node{
desc: "srcstream",
wants: StreamEdge,
provides: StreamEdge,
},
}
}

// Creates a new StreamNode that can be further
// filtered using the Database, RetentionPolicy, Measurement and Where properties.
// From can be called multiple times to create multiple
// independent forks of the data stream.
//
// Example:
// // Select the 'cpu' measurement from just the database 'mydb'
// // and retention policy 'myrp'.
// var cpu = stream.from()
// .database('mydb')
// .retentionPolicy('myrp')
// .measurement('cpu')
// // Select the 'load' measurement from any database and retention policy.
// var load = stream.from()
// .measurement('load')
// // Join cpu and load streams and do further processing.
// cpu.join(load)
// .as('cpu', 'load')
// ...
//
func (s *SourceStreamNode) From() *StreamNode {
f := newStreamNode()
s.linkChild(f)
return f
}

// A StreamNode selects a subset of the data flowing through a SourceStreamNode.
// The stream node allows you to select which portion of the stream you want to process.
//
// Example:
// stream
Expand Down
27 changes: 27 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,33 @@ import (
"github.com/influxdata/kapacitor/tick"
)

type SourceStreamNode struct {
node
s *pipeline.SourceStreamNode
}

// Create a new SourceStreamNode which copies all data to children
func newSourceStreamNode(et *ExecutingTask, n *pipeline.SourceStreamNode, l *log.Logger) (*SourceStreamNode, error) {
sn := &SourceStreamNode{
node: node{Node: n, et: et, logger: l},
s: n,
}
sn.node.runF = sn.runSourceStream
return sn, nil
}

func (s *SourceStreamNode) runSourceStream([]byte) error {
for pt, ok := s.ins[0].NextPoint(); ok; pt, ok = s.ins[0].NextPoint() {
for _, child := range s.outs {
err := child.CollectPoint(pt)
if err != nil {
return err
}
}
}
return nil
}

type StreamNode struct {
node
s *pipeline.StreamNode
Expand Down
2 changes: 2 additions & 0 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,8 @@ func (et *ExecutingTask) createNode(p pipeline.Node, l *log.Logger) (Node, error
switch t := p.(type) {
case *pipeline.StreamNode:
return newStreamNode(et, t, l)
case *pipeline.SourceStreamNode:
return newSourceStreamNode(et, t, l)
case *pipeline.SourceBatchNode:
return newSourceBatchNode(et, t, l)
case *pipeline.BatchNode:
Expand Down

0 comments on commit 49ae886

Please sign in to comment.