Skip to content

Commit

Permalink
fix as rename confusion
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Oct 27, 2015
1 parent ee9a1c9 commit cf092cc
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 7 deletions.
2 changes: 1 addition & 1 deletion integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ var viewCounts = stream
errorCounts.join(viewCounts)
.as('errors', 'views')
.rename('error_view')
.streamName('error_view')
.eval(lambda: "errors.sum" / "views.sum")
.as('error_percent')
.httpOut('error_rate')
Expand Down
16 changes: 15 additions & 1 deletion join.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package kapacitor

import (
"fmt"
"strings"

"github.com/influxdb/kapacitor/models"
"github.com/influxdb/kapacitor/pipeline"
)
Expand All @@ -12,6 +15,17 @@ type JoinNode struct {

// Create a new JoinNode, which takes pairs from parent streams combines them into a single point.
func newJoinNode(et *ExecutingTask, n *pipeline.JoinNode) (*JoinNode, error) {
for _, name := range n.Names {
if len(name) == 0 {
return nil, fmt.Errorf("must provide a prefix name for the join node, see .as() property method")
}
if strings.ContainsRune(name, '.') {
return nil, fmt.Errorf("cannot use name %s as field prefix, it contains a '.' character", name)
}
}
if n.Names[0] == n.Names[1] {
return nil, fmt.Errorf("cannot use the same prefix name see .as() property method")
}
jn := &JoinNode{
j: n,
node: node{Node: n, et: et},
Expand All @@ -22,7 +36,7 @@ func newJoinNode(et *ExecutingTask, n *pipeline.JoinNode) (*JoinNode, error) {

func (j *JoinNode) runJoin() error {

rename := j.j.Rename
rename := j.j.StreamName
if rename == "" {
rename = j.parents[1].Name()
}
Expand Down
17 changes: 12 additions & 5 deletions pipeline/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ package pipeline
// Example:
// var errors = stream.fork().from('errors')
// var requests = stream.fork().from('requests')
// // Join the errors and requests stream
// // Join the errors and requests streams
// errors.join(requests)
// // Provide prefix names for the fields of the data points.
// .as('errors', 'requests')
// .rename('error_rate')
// .streamName('error_rate')
// // Both the "value" fields from each parent have been prefixed
// // with the respective names 'errors' and 'requests'.
// .eval(lambda: "errors.value" / "requests.value"))
// .as('rate')
// ...
Expand All @@ -28,7 +31,7 @@ type JoinNode struct {
Names []string
// The name of this new joined data stream.
// If empty the name of the left parent is used.
Rename string
StreamName string
}

func newJoinNode(e EdgeType, n Node) *JoinNode {
Expand All @@ -39,10 +42,14 @@ func newJoinNode(e EdgeType, n Node) *JoinNode {
return j
}

// The alias names to be used as a dot prefix for all field names for each
// data point respectively.
// Prefix names for all fields from the respective nodes.
// Each field from the parent nodes will be prefixed with the provided name and a '.'.
// See the example above.
//
// The name `nLeft` corresponds to the node on the left (i.e `leftNode.join(rightNode)`).
// The name `nRight` corresponds to the node on the right.
// The names cannot have a dot '.' character.
//
// tick:property
func (j *JoinNode) As(nLeft, nRight string) *JoinNode {
//NOTE: the order is reversed because the second node is linked first
Expand Down

0 comments on commit cf092cc

Please sign in to comment.