forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
influxdb_out.go
144 lines (134 loc) · 3.78 KB
/
influxdb_out.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package pipeline
import (
"encoding/json"
"fmt"
"time"
"github.com/influxdata/influxdb/influxql"
)
const DefaultBufferSize = 1000
const DefaultFlushInterval = time.Second * 10
// Writes the data to InfluxDB as it is received.
//
// Example:
// stream
// |from()
// .measurement('requests')
// |eval(lambda: "errors" / "total")
// .as('error_percent')
// // Write the transformed data to InfluxDB
// |influxDBOut()
// .database('mydb')
// .retentionPolicy('myrp')
// .measurement('errors')
// .tag('kapacitor', 'true')
// .tag('version', '0.2')
//
// Available Statistics:
//
// * points_written -- number of points written to InfluxDB
// * write_errors -- number of errors attempting to write to InfluxDB
//
type InfluxDBOutNode struct {
node `json:"-"`
// The name of the InfluxDB instance to connect to.
// If empty the configured default will be used.
Cluster string `json:"cluster"`
// The name of the database.
Database string `json:"database"`
// The name of the retention policy.
RetentionPolicy string `json:"retentionPolicy"`
// The name of the measurement.
Measurement string `json:"measurement"`
// The write consistency to use when writing the data.
WriteConsistency string `json:"writeConsistency"`
// The precision to use when writing the data.
Precision string `json:"precision"`
// Number of points to buffer when writing to InfluxDB.
// Default: 1000
Buffer int64 `json:"buffer"`
// Write points to InfluxDB after interval even if buffer is not full.
// Default: 10s
FlushInterval time.Duration `json:"flushInterval"`
// Static set of tags to add to all data points before writing them.
// tick:ignore
Tags map[string]string `tick:"Tag" json:"tags"`
// Create the specified database and retention policy
// tick:ignore
CreateFlag bool `tick:"Create" json:"create"`
}
func newInfluxDBOutNode(wants EdgeType) *InfluxDBOutNode {
return &InfluxDBOutNode{
node: node{
desc: "influxdb_out",
wants: wants,
provides: NoEdge,
},
Tags: make(map[string]string),
Buffer: DefaultBufferSize,
FlushInterval: DefaultFlushInterval,
}
}
// MarshalJSON converts InfluxDBOutNode to JSON
// tick:ignore
func (n *InfluxDBOutNode) MarshalJSON() ([]byte, error) {
type Alias InfluxDBOutNode
var raw = &struct {
TypeOf
*Alias
FlushInterval string `json:"flushInterval"`
}{
TypeOf: TypeOf{
Type: "influxdbOut",
ID: n.ID(),
},
Alias: (*Alias)(n),
FlushInterval: influxql.FormatDuration(n.FlushInterval),
}
return json.Marshal(raw)
}
// UnmarshalJSON converts JSON to an InfluxDBOutNode
// tick:ignore
func (n *InfluxDBOutNode) UnmarshalJSON(data []byte) error {
type Alias InfluxDBOutNode
var raw = &struct {
TypeOf
*Alias
FlushInterval string `json:"flushInterval"`
}{
Alias: (*Alias)(n),
}
err := json.Unmarshal(data, raw)
if err != nil {
return err
}
if raw.Type != "influxdbOut" {
return fmt.Errorf("error unmarshaling node %d of type %s as InfluxDBOutNode", raw.ID, raw.Type)
}
n.FlushInterval, err = influxql.ParseDuration(raw.FlushInterval)
if err != nil {
return err
}
n.setID(raw.ID)
return nil
}
// Add a static tag to all data points.
// Tag can be called more then once.
//
// tick:property
func (i *InfluxDBOutNode) Tag(key, value string) *InfluxDBOutNode {
i.Tags[key] = value
return i
}
// Create indicates that both the database and retention policy
// will be created, when the task is started.
// If the retention policy name is empty then no
// retention policy will be specified and
// the default retention policy name will be created.
//
// If the database already exists nothing happens.
//
// tick:property
func (i *InfluxDBOutNode) Create() *InfluxDBOutNode {
i.CreateFlag = true
return i
}