forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
flatten.go
138 lines (125 loc) · 3.29 KB
/
flatten.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package pipeline
import (
"encoding/json"
"fmt"
"time"
"github.com/influxdata/influxdb/influxql"
)
const (
defaultFlattenDelimiter = "."
)
// Flatten a set of points on specific dimensions.
// For example given two points:
//
// m,host=A,port=80 bytes=3512
// m,host=A,port=443 bytes=6723
//
// Flattening the points on `port` would result in a single point:
//
// m,host=A 80.bytes=3512,443.bytes=6723
//
// Example:
// |flatten()
// .on('port')
//
// If flattening on multiple dimensions the order is preserved:
//
// m,host=A,port=80 bytes=3512
// m,host=A,port=443 bytes=6723
// m,host=B,port=443 bytes=7243
//
// Flattening the points on `host` and `port` would result in a single point:
//
// m A.80.bytes=3512,A.443.bytes=6723,B.443.bytes=7243
//
// Example:
// |flatten()
// .on('host', 'port')
//
//
// Since flattening points creates dynamically named fields in general it is expected
// that the resultant data is passed to a UDF or similar for custom processing.
type FlattenNode struct {
chainnode `json:"-"`
// The dimensions on which to join
// tick:ignore
Dimensions []string `tick:"On" json:"on"`
// The delimiter between field name parts
Delimiter string `json:"delimiter"`
// The maximum duration of time that two incoming points
// can be apart and still be considered to be equal in time.
// The joined data point's time will be rounded to the nearest
// multiple of the tolerance duration.
Tolerance time.Duration `json:"tolerance"`
// DropOriginalFieldNameFlag indicates whether the original field name should
// be included in the final field name.
//tick:ignore
DropOriginalFieldNameFlag bool `tick:"DropOriginalFieldName" json:"dropOriginalFieldName"`
}
func newFlattenNode(e EdgeType) *FlattenNode {
f := &FlattenNode{
chainnode: newBasicChainNode("flatten", e, e),
Delimiter: defaultFlattenDelimiter,
}
return f
}
// MarshalJSON converts FlattenNode to JSON
// tick:ignore
func (n *FlattenNode) MarshalJSON() ([]byte, error) {
type Alias FlattenNode
var raw = &struct {
TypeOf
*Alias
Tolerance string `json:"tolerance"`
}{
TypeOf: TypeOf{
Type: "flatten",
ID: n.ID(),
},
Alias: (*Alias)(n),
Tolerance: influxql.FormatDuration(n.Tolerance),
}
return json.Marshal(raw)
}
// UnmarshalJSON converts JSON to an FlattenNode
// tick:ignore
func (n *FlattenNode) UnmarshalJSON(data []byte) error {
type Alias FlattenNode
var raw = &struct {
TypeOf
*Alias
Tolerance string `json:"tolerance"`
}{
Alias: (*Alias)(n),
}
err := json.Unmarshal(data, raw)
if err != nil {
return err
}
if raw.Type != "flatten" {
return fmt.Errorf("error unmarshaling node %d of type %s as FlattenNode", raw.ID, raw.Type)
}
n.Tolerance, err = influxql.ParseDuration(raw.Tolerance)
if err != nil {
return err
}
n.setID(raw.ID)
return nil
}
// Specify the dimensions on which to flatten the points.
// tick:property
func (f *FlattenNode) On(dims ...string) *FlattenNode {
f.Dimensions = dims
return f
}
// DropOriginalFieldName indicates whether the original field name should
// be dropped when constructing the final field name.
// tick:property
func (f *FlattenNode) DropOriginalFieldName(drop ...bool) *FlattenNode {
if len(drop) == 1 {
f.DropOriginalFieldNameFlag = drop[0]
} else {
f.DropOriginalFieldNameFlag = true
}
return f
}