forked from rosedblabs/rosedb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
zset.go
177 lines (154 loc) · 5.35 KB
/
zset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package rosedb
import (
"github.com/flower-corp/rosedb/ds/art"
"github.com/flower-corp/rosedb/logfile"
"github.com/flower-corp/rosedb/logger"
"github.com/flower-corp/rosedb/util"
)
// ZAdd adds the specified member with the specified score to the sorted set stored at key.
func (db *RoseDB) ZAdd(key []byte, score float64, member []byte) error {
db.zsetIndex.mu.Lock()
defer db.zsetIndex.mu.Unlock()
if err := db.zsetIndex.murhash.Write(member); err != nil {
return err
}
sum := db.zsetIndex.murhash.EncodeSum128()
db.zsetIndex.murhash.Reset()
if db.zsetIndex.trees[string(key)] == nil {
db.zsetIndex.trees[string(key)] = art.NewART()
}
idxTree := db.zsetIndex.trees[string(key)]
scoreBuf := []byte(util.Float64ToStr(score))
zsetKey := db.encodeKey(key, scoreBuf)
entry := &logfile.LogEntry{Key: zsetKey, Value: member}
pos, err := db.writeLogEntry(entry, ZSet)
if err != nil {
return err
}
_, size := logfile.EncodeEntry(entry)
pos.entrySize = size
ent := &logfile.LogEntry{Key: sum, Value: member}
if err := db.updateIndexTree(idxTree, ent, pos, true, ZSet); err != nil {
return err
}
db.zsetIndex.indexes.ZAdd(string(key), score, string(sum))
return nil
}
// ZScore returns the score of member in the sorted set at key.
func (db *RoseDB) ZScore(key, member []byte) (ok bool, score float64) {
db.zsetIndex.mu.RLock()
defer db.zsetIndex.mu.RUnlock()
if err := db.zsetIndex.murhash.Write(member); err != nil {
return false, 0
}
sum := db.zsetIndex.murhash.EncodeSum128()
db.zsetIndex.murhash.Reset()
return db.zsetIndex.indexes.ZScore(string(key), string(sum))
}
// ZRem removes the specified members from the sorted set stored at key. Non existing members are ignored.
// An error is returned when key exists and does not hold a sorted set.
func (db *RoseDB) ZRem(key, member []byte) error {
db.zsetIndex.mu.Lock()
defer db.zsetIndex.mu.Unlock()
if err := db.zsetIndex.murhash.Write(member); err != nil {
return err
}
sum := db.zsetIndex.murhash.EncodeSum128()
db.zsetIndex.murhash.Reset()
ok := db.zsetIndex.indexes.ZRem(string(key), string(sum))
if !ok {
return nil
}
if db.zsetIndex.trees[string(key)] == nil {
db.zsetIndex.trees[string(key)] = art.NewART()
}
idxTree := db.zsetIndex.trees[string(key)]
oldVal, deleted := idxTree.Delete(sum)
db.sendDiscard(oldVal, deleted, ZSet)
entry := &logfile.LogEntry{Key: key, Value: sum, Type: logfile.TypeDelete}
pos, err := db.writeLogEntry(entry, ZSet)
if err != nil {
return err
}
// The deleted entry itself is also invalid.
_, size := logfile.EncodeEntry(entry)
node := &indexNode{fid: pos.fid, entrySize: size}
select {
case db.discards[ZSet].valChan <- node:
default:
logger.Warn("send to discard chan fail")
}
return nil
}
// ZCard returns the sorted set cardinality (number of elements) of the sorted set stored at key.
func (db *RoseDB) ZCard(key []byte) int {
db.zsetIndex.mu.RLock()
defer db.zsetIndex.mu.RUnlock()
return db.zsetIndex.indexes.ZCard(string(key))
}
// ZRange returns the specified range of elements in the sorted set stored at key.
func (db *RoseDB) ZRange(key []byte, start, stop int) ([][]byte, error) {
return db.zRangeInternal(key, start, stop, false)
}
// ZRevRange returns the specified range of elements in the sorted set stored at key.
// The elements are considered to be ordered from the highest to the lowest score.
func (db *RoseDB) ZRevRange(key []byte, start, stop int) ([][]byte, error) {
return db.zRangeInternal(key, start, stop, true)
}
// ZRank returns the rank of member in the sorted set stored at key, with the scores ordered from low to high.
// The rank (or index) is 0-based, which means that the member with the lowest score has rank 0.
func (db *RoseDB) ZRank(key []byte, member []byte) (ok bool, rank int) {
return db.zRankInternal(key, member, false)
}
// ZRevRank returns the rank of member in the sorted set stored at key, with the scores ordered from high to low.
// The rank (or index) is 0-based, which means that the member with the highest score has rank 0.
func (db *RoseDB) ZRevRank(key []byte, member []byte) (ok bool, rank int) {
return db.zRankInternal(key, member, true)
}
func (db *RoseDB) zRangeInternal(key []byte, start, stop int, rev bool) ([][]byte, error) {
db.zsetIndex.mu.RLock()
defer db.zsetIndex.mu.RUnlock()
if db.zsetIndex.trees[string(key)] == nil {
db.zsetIndex.trees[string(key)] = art.NewART()
}
idxTree := db.zsetIndex.trees[string(key)]
var res [][]byte
var values []interface{}
if rev {
values = db.zsetIndex.indexes.ZRevRange(string(key), start, stop)
} else {
values = db.zsetIndex.indexes.ZRange(string(key), start, stop)
}
for _, val := range values {
v, _ := val.(string)
if val, err := db.getVal(idxTree, []byte(v), ZSet); err != nil {
return nil, err
} else {
res = append(res, val)
}
}
return res, nil
}
func (db *RoseDB) zRankInternal(key []byte, member []byte, rev bool) (ok bool, rank int) {
db.zsetIndex.mu.RLock()
defer db.zsetIndex.mu.RUnlock()
if db.zsetIndex.trees[string(key)] == nil {
return
}
if err := db.zsetIndex.murhash.Write(member); err != nil {
return
}
sum := db.zsetIndex.murhash.EncodeSum128()
db.zsetIndex.murhash.Reset()
var result int64
if rev {
result = db.zsetIndex.indexes.ZRevRank(string(key), string(sum))
} else {
result = db.zsetIndex.indexes.ZRank(string(key), string(sum))
}
if result != -1 {
ok = true
rank = int(result)
}
return
}