-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbeats_unique_queue.go
157 lines (130 loc) · 2.88 KB
/
beats_unique_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package heartfelt
import "sync"
var _ beatsRepository = (*beatsUniqueQueue)(nil)
// beatsUniqueQueue maintenance a beats unique queue for fixed timeout heartbeat.
type beatsUniqueQueue struct {
lastBeatsMap map[string]*linkNode // lastBeatsMap is a map: key => beat.key, value => beat node of link.
link doublyLink // link is a heartbeat unique queue
nodePool sync.Pool
}
func newBeatsUniqueQueue() beatsRepository {
return &beatsUniqueQueue{
lastBeatsMap: make(map[string]*linkNode),
link: doublyLink{},
nodePool: sync.Pool{
New: func() interface{} {
return &linkNode{}
},
},
}
}
func (queue *beatsUniqueQueue) isEmpty() bool {
return queue.link.head == nil
}
func (queue *beatsUniqueQueue) peek() *beat {
if queue.isEmpty() {
return nil
}
return queue.link.head.data
}
func (queue *beatsUniqueQueue) pop() *beat {
if queue.isEmpty() {
return nil
}
n := queue.link.pop()
b := n.data
delete(queue.lastBeatsMap, b.key)
queue.nodePool.Put(n) // Reuse the *linkNode.
return b
}
func (queue *beatsUniqueQueue) push(b *beat) *beat {
var node *linkNode
var ok bool
var oldBeat *beat
if node, ok = queue.lastBeatsMap[b.key]; ok {
queue.link.remove(node)
if b != node.data {
oldBeat = node.data
b.firstTime = oldBeat.firstTime
node.data = b
}
} else {
node = queue.nodePool.Get().(*linkNode) // Reuse the *linkNode.
node.data = b
node.prev = nil
node.next = nil
queue.lastBeatsMap[b.key] = node
}
queue.link.push(node)
return oldBeat
}
func (queue *beatsUniqueQueue) remove(key string) *beat {
if n, ok := queue.lastBeatsMap[key]; ok {
delete(queue.lastBeatsMap, key) // Remove the heart from the hearts map.
queue.link.remove(n) // Remove relative beat from beatlink.
d := n.data
queue.nodePool.Put(n) // Reuse the *linkNode.
return d
}
return nil
}
// doublyLink is a doubly link of heartbeats
type doublyLink struct {
head *linkNode
tail *linkNode
}
type linkNode struct {
data *beat
prev *linkNode
next *linkNode
}
func (link *doublyLink) peek() *linkNode {
return link.head
}
func (link *doublyLink) pop() *linkNode {
if link.head == nil {
return nil
}
n := link.head
if link.head == link.tail {
link.head = nil
link.tail = nil
} else {
n.next.prev = n.prev
link.head = n.next
}
n.prev = nil
n.next = nil
return n
}
func (link *doublyLink) push(node *linkNode) bool {
if node == nil {
return false
}
if link.tail == nil {
link.head, link.tail = node, node
} else {
link.tail.next = node
node.prev = link.tail
link.tail = node
}
return true
}
func (link *doublyLink) remove(node *linkNode) bool {
if node == nil {
return false
}
if node == link.head {
link.head = node.next
} else {
node.prev.next = node.next
}
if node == link.tail {
link.tail = node.prev
} else {
node.next.prev = node.prev
}
node.prev = nil
node.next = nil
return true
}