forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
group_by.go
139 lines (127 loc) · 3.39 KB
/
group_by.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package pipeline
import (
"encoding/json"
"errors"
"fmt"
"github.com/influxdata/kapacitor/tick/ast"
)
// A GroupByNode will group the incoming data.
// Each group is then processed independently for the rest of the pipeline.
// Only tags that are dimensions in the grouping will be preserved;
// all other tags are dropped.
//
// Example:
// stream
// |groupBy('service', 'datacenter')
// ...
//
// The above example groups the data along two dimensions `service` and `datacenter`.
// Groups are dynamically created as new data arrives and each group is processed
// independently.
type GroupByNode struct {
chainnode
//The dimensions by which to group to the data.
// tick:ignore
Dimensions []interface{} `json:"dimensions"`
// The dimensions to exclude.
// Useful for substractive tags from using *.
// tick:ignore
ExcludedDimensions []string `tick:"Exclude" json:"exclude"`
// Whether to include the measurement in the group ID.
// tick:ignore
ByMeasurementFlag bool `tick:"ByMeasurement" json:"byMeasurement"`
}
func newGroupByNode(wants EdgeType, dims []interface{}) *GroupByNode {
return &GroupByNode{
chainnode: newBasicChainNode("groupby", wants, wants),
Dimensions: dims,
}
}
// MarshalJSON converts GroupByNode to JSON
// tick:ignore
func (n *GroupByNode) MarshalJSON() ([]byte, error) {
type Alias GroupByNode
var raw = &struct {
TypeOf
*Alias
}{
TypeOf: TypeOf{
Type: "groupBy",
ID: n.ID(),
},
Alias: (*Alias)(n),
}
return json.Marshal(raw)
}
// UnmarshalJSON converts JSON to an GroupByNode
// tick:ignore
func (n *GroupByNode) UnmarshalJSON(data []byte) error {
type Alias GroupByNode
var raw = &struct {
TypeOf
*Alias
}{
Alias: (*Alias)(n),
}
err := json.Unmarshal(data, raw)
if err != nil {
return err
}
if raw.Type != "groupBy" {
return fmt.Errorf("error unmarshaling node %d of type %s as GroupByNode", raw.ID, raw.Type)
}
n.setID(raw.ID)
return nil
}
func (n *GroupByNode) validate() error {
return validateDimensions(n.Dimensions, n.ExcludedDimensions)
}
func validateDimensions(dimensions []interface{}, excludedDimensions []string) error {
hasStar := false
for _, d := range dimensions {
switch dim := d.(type) {
case string:
if len(dim) == 0 {
return errors.New("dimensions cannot not be the empty string")
}
case *ast.StarNode:
hasStar = true
default:
return fmt.Errorf("invalid dimension object of type %T", d)
}
}
if hasStar && len(dimensions) > 1 {
return errors.New("cannot group by both '*' and named dimensions.")
}
if !hasStar && len(excludedDimensions) > 0 {
return errors.New("exclude requires '*'")
}
return nil
}
// If set will include the measurement name in the group ID.
// Along with any other group by dimensions.
//
// Example:
// ...
// |groupBy('host')
// .byMeasurement()
//
// The above example groups points by their host tag and measurement name.
//
// If you want to remove the measurement name from the group ID,
// then groupBy all existing dimensions but without specifying 'byMeasurement'.
//
// Example:
// |groupBy(*)
//
// The above removes the group by measurement name if any.
// tick:property
func (n *GroupByNode) ByMeasurement() *GroupByNode {
n.ByMeasurementFlag = true
return n
}
// Exclude removes any tags from the group.
func (n *GroupByNode) Exclude(dims ...string) *GroupByNode {
n.ExcludedDimensions = append(n.ExcludedDimensions, dims...)
return n
}