forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathudf.go
153 lines (140 loc) · 2.88 KB
/
udf.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package kapacitor
import (
"errors"
"log"
"sync"
"github.com/influxdata/kapacitor/pipeline"
)
// User defined function
type UDFNode struct {
node
u *pipeline.UDFNode
process *UDFProcess
aborted chan struct{}
wg sync.WaitGroup
mu sync.Mutex
stopped bool
}
// Create a new UDFNode that sends incoming data to child process
func newUDFNode(et *ExecutingTask, n *pipeline.UDFNode, l *log.Logger) (*UDFNode, error) {
un := &UDFNode{
node: node{Node: n, et: et, logger: l},
u: n,
aborted: make(chan struct{}),
}
un.process = NewUDFProcess(
n.Commander,
l,
n.Timeout,
un.abortedCallback,
)
un.node.runF = un.runUDF
un.node.stopF = un.stopUDF
return un, nil
}
var errNodeAborted = errors.New("node aborted")
func (u *UDFNode) stopUDF() {
u.mu.Lock()
defer u.mu.Unlock()
if !u.stopped {
u.stopped = true
u.process.Abort(errNodeAborted)
}
}
func (u *UDFNode) runUDF(snapshot []byte) (err error) {
defer func() {
u.mu.Lock()
defer u.mu.Unlock()
//Ignore stopped errors if the process was stopped externally
if u.stopped && (err == ErrUDFProcessStopped || err == errNodeAborted) {
err = nil
}
u.stopped = true
}()
err = u.process.Start()
if err != nil {
return
}
err = u.process.Init(u.u.Options)
if err != nil {
return
}
if snapshot != nil {
err = u.process.Restore(snapshot)
if err != nil {
return
}
}
forwardErr := make(chan error, 1)
go func() {
switch u.Provides() {
case pipeline.StreamEdge:
for p := range u.process.PointOut {
for _, out := range u.outs {
err := out.CollectPoint(p)
if err != nil {
forwardErr <- err
return
}
}
}
case pipeline.BatchEdge:
for b := range u.process.BatchOut {
for _, out := range u.outs {
err := out.CollectBatch(b)
if err != nil {
forwardErr <- err
return
}
}
}
}
forwardErr <- nil
}()
// The abort callback needs to know when we are done writing
// so we wrap in a wait group.
u.wg.Add(1)
go func() {
defer u.wg.Done()
switch u.Wants() {
case pipeline.StreamEdge:
for p, ok := u.ins[0].NextPoint(); ok; p, ok = u.ins[0].NextPoint() {
u.timer.Start()
select {
case u.process.PointIn <- p:
case <-u.aborted:
return
}
u.timer.Stop()
}
case pipeline.BatchEdge:
for b, ok := u.ins[0].NextBatch(); ok; b, ok = u.ins[0].NextBatch() {
u.timer.Start()
select {
case u.process.BatchIn <- b:
case <-u.aborted:
return
}
u.timer.Stop()
}
}
}()
// wait till we are done writing
u.wg.Wait()
// Stop the process
err = u.process.Stop()
if err != nil {
return
}
// Wait/Return any error from the forwarding goroutine
err = <-forwardErr
return
}
func (u *UDFNode) abortedCallback() {
close(u.aborted)
// wait till we are done writing
u.wg.Wait()
}
func (u *UDFNode) snapshot() ([]byte, error) {
return u.process.Snapshot()
}