forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
udf.proto
225 lines (194 loc) · 6.41 KB
/
udf.proto
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
syntax = "proto3";
package udf;
//------------------------------------------------------
// RPC Messages for Kapacitor to communicate with
// a child process or socket for data processing.
//
// Messages are streamed by writing a varint header
// that contains the length of the following message.
//
// To decode the stream read a varint, then read
// the determined size and decode as a protobuf message.
// There is not footer so the next varint if any begins
// right after the previous message.
//
//------------------------------------------------------
// Management messages
//
// *Request messages are sent to the UDF from Kapacitor.
// *Response messages are sent to Kapacitor from the UDF.
//
// While there is an obvious request/response structure for communicating,
// there is a loose coupling between request and response.
// Meaning that ordering or synchronizing STDIN and STDOUT in anyway
// is not necessary.
// For example if Kapacitor requests a snapshot and the
// UDF is in the middle of writing a previous response or
// data points those can continue. Eventually Kapacitor will receive
// the snapshot response and act accordingly.
//
// A KeepaliveRequest/KeepaliveResponse system is used to ensure that
// the process is responsive. Every time that a KeepaliveRequest is sent
// a KeepaliveResponse must be returned within a timeout.
// If the timeout is reached than the process is considered dead and will be terminated/restarted.
//
// It is recommend to disable buffering on the input and output sockets.
// Some languages like python will automatically buffer the STDIN and STDOUT sockets.
// To disable this behavior use the -u flag on the python interpreter.
// Request that the process return information about available Options.
message InfoRequest {
}
enum EdgeType {
STREAM = 0;
BATCH = 1;
}
message InfoResponse {
EdgeType wants = 1;
EdgeType provides = 2;
map<string, OptionInfo> options = 3;
}
enum ValueType {
BOOL = 0;
INT = 1;
DOUBLE = 2;
STRING = 3;
DURATION = 4;
}
message OptionInfo {
repeated ValueType valueTypes = 1;
}
// Request that the process initialize itself with the provided options.
message InitRequest {
repeated Option options = 1;
}
message Option {
string name = 1;
repeated OptionValue values = 2;
}
message OptionValue {
ValueType type = 1;
oneof value {
bool boolValue = 2;
int64 intValue = 3;
double doubleValue = 4;
string stringValue = 5;
int64 durationValue = 6;
}
}
// Respond to Kapacitor whether initialization was successful.
message InitResponse {
bool success = 1;
string error = 2;
}
// Request that the process provide a snapshot of its state.
message SnapshotRequest {
}
// Respond to Kapacitor with a serialized snapshot of the running state.
message SnapshotResponse {
bytes snapshot = 1;
}
// Request that the process restore its state from a snapshot.
message RestoreRequest {
bytes snapshot = 1;
}
// Respond with success or failure to a RestoreRequest
message RestoreResponse {
bool success = 1;
string error = 2;
}
// Request that the process respond with a Keepalive to verify it is responding.
message KeepaliveRequest {
// The number of nanoseconds since the epoch.
// Used only for debugging keepalive requests.
int64 time = 1;
}
// Respond to KeepaliveRequest
message KeepaliveResponse {
// The number of nanoseconds since the epoch.
// Used only for debugging keepalive requests.
int64 time = 1;
}
// Sent from the process to Kapacitor indicating an error has occurred.
// If an ErrorResponse is received, Kapacitor will terminate the process.
message ErrorResponse {
string error = 1;
}
//------------------------------------------------------
// Data flow messages
//
// Sent and received by both the process and Kapacitor
// Indicates the beginning of a batch.
// All subsequent points should be considered
// part of the batch until EndBatch arrives.
// This includes grouping. Batches of
// differing groups may not be interleaved.
//
// All the meta data but tmax is provided,
// since tmax may not be known at
// the beginning of a batch.
message BeginBatch {
string name = 1;
string group = 2;
map<string,string> tags = 3;
}
// Message containing information about a single data point.
// Can be sent on it's own or bookended by BeginBatch and EndBatch messages.
message Point {
int64 time = 1;
string name = 2;
string database = 3;
string retentionPolicy = 4;
string group = 5;
repeated string dimensions = 6;
map<string,string> tags = 7;
map<string,double> fieldsDouble = 8;
map<string,int64> fieldsInt = 9;
map<string,string> fieldsString = 10;
}
// Indicates the end of a batch and contains
// all meta data associated with the batch.
// The same meta information is provided for
// ease of use with the addition of tmax since it
// may not be know at BeginBatch.
message EndBatch {
string name = 1;
string group = 2;
int64 tmax = 3;
map<string,string> tags = 4;
}
//-----------------------------------------------------------
// Wrapper messages
//
// All messages sent over STDIN will be Request messages.
// All messages sent over STDOUT must be Response messages.
// Request message wrapper -- sent from Kapacitor to process
message Request {
oneof message {
// Management requests
InfoRequest info = 1;
InitRequest init = 2;
KeepaliveRequest keepalive = 3;
SnapshotRequest snapshot = 4;
RestoreRequest restore = 5;
// Data flow responses
BeginBatch begin = 16;
Point point = 17;
EndBatch end = 18;
}
}
// Response message wrapper -- sent from process to Kapacitor
message Response {
oneof message {
// Management responses
InfoResponse info = 1;
InitResponse init = 2;
KeepaliveResponse keepalive = 3;
SnapshotResponse snapshot = 4;
RestoreResponse restore = 5;
ErrorResponse error = 6;
// Data flow responses
BeginBatch begin = 16;
Point point = 17;
EndBatch end = 18;
}
}