forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathedge.go
68 lines (58 loc) · 1.4 KB
/
edge.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package kapacitor
import (
"errors"
"fmt"
"log"
"sync"
"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/server/vars"
)
const (
statCollected = "collected"
statEmitted = "emitted"
defaultEdgeBufferSize = 1000
)
var ErrAborted = errors.New("edged aborted")
type Edge struct {
edge.StatsEdge
mu sync.Mutex
closed bool
statsKey string
statMap *expvar.Map
logger *log.Logger
}
func newEdge(taskName, parentName, childName string, t pipeline.EdgeType, size int, logService LogService) edge.StatsEdge {
e := edge.NewStatsEdge(edge.NewChannelEdge(t, defaultEdgeBufferSize))
tags := map[string]string{
"task": taskName,
"parent": parentName,
"child": childName,
"type": t.String(),
}
key, sm := vars.NewStatistic("edges", tags)
sm.Set(statCollected, e.CollectedVar())
sm.Set(statEmitted, e.EmittedVar())
name := fmt.Sprintf("%s|%s->%s", taskName, parentName, childName)
return &Edge{
StatsEdge: e,
statsKey: key,
statMap: sm,
logger: logService.NewLogger(fmt.Sprintf("[edge:%s] ", name), log.LstdFlags),
}
}
func (e *Edge) Close() error {
e.mu.Lock()
defer e.mu.Unlock()
if e.closed {
return nil
}
e.closed = true
vars.DeleteStatistic(e.statsKey)
e.logger.Printf("D! closing c: %d e: %d",
e.Collected(),
e.Emitted(),
)
return e.StatsEdge.Close()
}