forked from JustinTimperio/gpq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpq.go
135 lines (113 loc) · 3.59 KB
/
pq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package gpq
import (
"errors"
"sync"
"sync/atomic"
"github.com/JustinTimperio/gpq/gheap"
"github.com/JustinTimperio/gpq/schema"
)
// A PriorityQueue implements heap.Interface and holds Items.
type CorePriorityQueue[T any] struct {
items []*schema.Item[T]
mutex *sync.RWMutex
bpq *BucketPriorityQueue
}
// NewCorePriorityQueue creates a new CorePriorityQueue
func NewCorePriorityQueue[T any](bpq *BucketPriorityQueue) CorePriorityQueue[T] {
return CorePriorityQueue[T]{
items: make([]*schema.Item[T], 0),
mutex: &sync.RWMutex{},
bpq: bpq,
}
}
// Len is used to get the length of the heap
// It is needed to implement the heap.Interface
func (pq CorePriorityQueue[T]) Len() int {
return len(pq.items)
}
// Less is used to compare the priority of two items
// It is needed to implement the heap.Interface
func (pq CorePriorityQueue[T]) Less(i, j int) bool {
return pq.items[i].Priority > pq.items[j].Priority
}
// Swap is used to swap two items in the heap
// It is needed to implement the heap.Interface
func (pq CorePriorityQueue[T]) Swap(i, j int) {
pq.items[i], pq.items[j] = pq.items[j], pq.items[i]
pq.items[i].Index = i
pq.items[j].Index = j
}
// EnQueue adds an item to the heap and the end of the array
func (pq *CorePriorityQueue[T]) EnQueue(data schema.Item[T]) {
pq.mutex.Lock()
defer pq.mutex.Unlock()
n := len(pq.items)
item := data
item.Index = n
pq.items = append(pq.items, &item)
atomic.AddUint64(&pq.bpq.ObjectsInQueue, 1)
if !pq.bpq.Contains(item.Priority) {
pq.bpq.Add(item.Priority)
}
}
// DeQueue removes the first item from the heap
func (pq *CorePriorityQueue[T]) DeQueue() (wasRecoverd bool, batchNumber uint64, diskUUID []byte, priority int64, data T, err error) {
pq.mutex.Lock()
defer pq.mutex.Unlock()
if len(pq.items) == 0 {
return false, 0, nil, -1, data, errors.New("Core Priority Queue Error: No items found in the queue")
}
old := pq.items
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.Index = -1 // for safety
pq.items = old[0 : n-1]
// Check if the bucket is now empty
if len(pq.items) == 0 {
pq.bpq.Remove(item.Priority)
}
atomic.AddUint64(&pq.bpq.ObjectsInQueue, ^uint64(0))
return item.WasRestored, item.BatchNumber, item.DiskUUID, item.Priority, item.Data, nil
}
// Peek returns the first item in the heap without removing it
func (pq CorePriorityQueue[T]) Peek() (data T, err error) {
pq.mutex.Lock()
defer pq.mutex.Unlock()
if len(pq.items) == 0 {
return data, errors.New("No items in the queue")
}
return pq.items[0].Data, nil
}
// Exposes the raw pointers to the items in the queue so that reprioritization can be done
func (pq CorePriorityQueue[T]) ReadPointers() []*schema.Item[T] {
return pq.items
}
// UpdatePriority modifies the priority of an Item in the queue.
func (pq *CorePriorityQueue[T]) UpdatePriority(item *schema.Item[T], priority int64) {
pq.mutex.Lock()
defer pq.mutex.Unlock()
item.Priority = priority
gheap.Prioritize[T](pq, item.Index)
}
// Remove removes an item from the queue
func (pq *CorePriorityQueue[T]) Remove(item *schema.Item[T]) {
pq.mutex.Lock()
defer pq.mutex.Unlock()
gheap.Remove[T](pq, item.Index)
}
// NoLockDeQueue removes the first item from the heap without locking the queue
// This is used for nested calls to avoid deadlocks
func (pq *CorePriorityQueue[T]) NoLockDeQueue() {
old := pq.items
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.Index = -1 // for safety
pq.items = old[0 : n-1]
// Check if the bucket is now empty
if len(pq.items) == 0 {
pq.bpq.Remove(item.Priority)
}
atomic.AddUint64(&pq.bpq.ObjectsInQueue, ^uint64(0))
}