forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
combine.go
180 lines (163 loc) · 5.25 KB
/
combine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package pipeline
import (
"encoding/json"
"fmt"
"strings"
"time"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/kapacitor/tick/ast"
)
const (
defaultCombineDelimiter = "."
defaultMaxCombinations = 1e6
)
// Combine the data from a single node with itself.
// Points with the same time are grouped and then combinations are created.
// The size of the combinations is defined by how many expressions are given.
// Combinations are order independent and will not ever include the same point multiple times.
//
// Example:
// stream
// |from()
// .measurement('request_latency')
// |combine(lambda: "service" == 'login', lambda: TRUE)
// .as('login', 'other')
// // points that are within 1 second are considered the same time.
// .tolerance(1s)
// // delimiter for new field and tag names
// .delimiter('.')
// // Change group by to be new other.service tag
// |groupBy('other.service')
// // Both the "value" fields from each data point have been prefixed
// // with the respective names 'login' and 'other'.
// |eval(lambda: "login.value" / "other.value")
// .as('ratio')
// ...
//
// In the above example the data points for the `login` service are combined with the data points from all other services.
//
// Example:
// |combine(lambda: TRUE, lambda: TRUE)
// .as('login', 'other')
//
// In the above example all combination pairs are created.
//
// Example:
// |combine(lambda: TRUE, lambda: TRUE, lambda: TRUE)
// .as('login', 'other', 'another')
//
// In the above example all combinations triples are created.
type CombineNode struct {
chainnode
// The list of expressions for matching pairs
// tick:ignore
Lambdas []*ast.LambdaNode `json:"lambdas"`
// The alias names of the two parents.
// Note:
// Names[1] corresponds to the left parent
// Names[0] corresponds to the right parent
// tick:ignore
Names []string `tick:"As" json:"as"`
// The delimiter between the As names and existing field an tag keys.
// Can be the empty string, but you are responsible for ensuring conflicts are not possible if you use the empty string.
Delimiter string `json:"delimiter"`
// The maximum duration of time that two incoming points
// can be apart and still be considered to be equal in time.
// The joined data point's time will be rounded to the nearest
// multiple of the tolerance duration.
Tolerance time.Duration `json:"-"`
// Maximum number of possible combinations.
// Since the number of possible combinations can grow very rapidly
// you can set a maximum number of combinations allowed.
// If the max is crossed, an error is logged and the combinations are not calculated.
// Default: 10,000
Max int64 `json:"max"`
}
func newCombineNode(e EdgeType, lambdas []*ast.LambdaNode) *CombineNode {
c := &CombineNode{
chainnode: newBasicChainNode("combine", e, StreamEdge),
Lambdas: lambdas,
Delimiter: defaultCombineDelimiter,
Max: defaultMaxCombinations,
}
return c
}
// MarshalJSON converts CombineNode to JSON
// tick:ignore
func (n *CombineNode) MarshalJSON() ([]byte, error) {
type Alias CombineNode
var raw = &struct {
TypeOf
*Alias
Tolerance string `json:"tolerance"`
}{
TypeOf: TypeOf{
Type: "combine",
ID: n.ID(),
},
Alias: (*Alias)(n),
Tolerance: influxql.FormatDuration(n.Tolerance),
}
return json.Marshal(raw)
}
// UnmarshalJSON converts JSON to an CombineNode
// tick:ignore
func (n *CombineNode) UnmarshalJSON(data []byte) error {
type Alias CombineNode
var raw = &struct {
TypeOf
*Alias
Tolerance string `json:"tolerance"`
}{
Alias: (*Alias)(n),
}
err := json.Unmarshal(data, raw)
if err != nil {
return err
}
if raw.Type != "combine" {
return fmt.Errorf("error unmarshaling node %d of type %s as CombineNode", raw.ID, raw.Type)
}
n.Tolerance, err = influxql.ParseDuration(raw.Tolerance)
if err != nil {
return err
}
n.setID(raw.ID)
return nil
}
// Prefix names for all fields from the respective nodes.
// Each field from the parent nodes will be prefixed with the provided name and a '.'.
// See the example above.
//
// The names cannot have a dot '.' character.
//
// tick:property
func (n *CombineNode) As(names ...string) *CombineNode {
n.Names = names
return n
}
// Validate that the as() specification is consistent with the number of combine expressions.
func (n *CombineNode) validate() error {
if len(n.Names) == 0 {
return fmt.Errorf("a call to combine.as() is required to specify the output stream prefixes.")
}
if len(n.Names) != len(n.Lambdas) {
return fmt.Errorf("number of prefixes specified by combine.as() must match the number of combine expressions")
}
for _, name := range n.Names {
if len(name) == 0 {
return fmt.Errorf("must provide a prefix name for the combine node, see .as() property method")
}
if strings.Contains(name, n.Delimiter) {
return fmt.Errorf("cannot use name %s as field prefix, it contains the delimiter character %s", name, n.Delimiter)
}
}
names := make(map[string]bool, len(n.Names))
for _, name := range n.Names {
if names[name] {
return fmt.Errorf("cannot use the same prefix name see .as() property method")
}
names[name] = true
}
return nil
}