Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
roseduan committed Jun 5, 2022
1 parent fc702fc commit 1794f62
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 161 deletions.
39 changes: 17 additions & 22 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,30 +98,26 @@ type (
}

listIndex struct {
mu *sync.RWMutex
trees map[string]*art.AdaptiveRadixTree
idxTree *art.AdaptiveRadixTree
mu *sync.RWMutex
trees map[string]*art.AdaptiveRadixTree
}

hashIndex struct {
mu *sync.RWMutex
trees map[string]*art.AdaptiveRadixTree
idxTree *art.AdaptiveRadixTree
mu *sync.RWMutex
trees map[string]*art.AdaptiveRadixTree
}

setIndex struct {
mu *sync.RWMutex
murhash *util.Murmur128
trees map[string]*art.AdaptiveRadixTree
idxTree *art.AdaptiveRadixTree
}

zsetIndex struct {
mu *sync.RWMutex
indexes *zset.SortedSet
murhash *util.Murmur128
trees map[string]*art.AdaptiveRadixTree
idxTree *art.AdaptiveRadixTree
}
)

Expand All @@ -139,7 +135,6 @@ func newHashIdx() *hashIndex {

func newSetIdx() *setIndex {
return &setIndex{
idxTree: art.NewART(),
murhash: util.NewMurmur128(),
trees: make(map[string]*art.AdaptiveRadixTree),
mu: new(sync.RWMutex),
Expand Down Expand Up @@ -512,7 +507,7 @@ func (db *RoseDB) doRunGC(dataType DataType, specifiedFid int, gcRatio float64)
return err
}
// update index
if err = db.updateIndexTree(ent, valuePos, false, String); err != nil {
if err = db.updateIndexTree(db.strIndex.idxTree, ent, valuePos, false, String); err != nil {
return err
}
}
Expand All @@ -529,8 +524,8 @@ func (db *RoseDB) doRunGC(dataType DataType, specifiedFid int, gcRatio float64)
if db.listIndex.trees[string(listKey)] == nil {
return nil
}
db.listIndex.idxTree = db.listIndex.trees[string(listKey)]
indexVal := db.listIndex.idxTree.Get(listKey)
idxTree := db.listIndex.trees[string(listKey)]
indexVal := idxTree.Get(listKey)
if indexVal == nil {
return nil
}
Expand All @@ -541,7 +536,7 @@ func (db *RoseDB) doRunGC(dataType DataType, specifiedFid int, gcRatio float64)
if err != nil {
return err
}
if err = db.updateIndexTree(ent, valuePos, false, List); err != nil {
if err = db.updateIndexTree(idxTree, ent, valuePos, false, List); err != nil {
return err
}
}
Expand All @@ -555,8 +550,8 @@ func (db *RoseDB) doRunGC(dataType DataType, specifiedFid int, gcRatio float64)
if db.hashIndex.trees[string(key)] == nil {
return nil
}
db.hashIndex.idxTree = db.hashIndex.trees[string(key)]
indexVal := db.hashIndex.idxTree.Get(field)
idxTree := db.hashIndex.trees[string(key)]
indexVal := idxTree.Get(field)
if indexVal == nil {
return nil
}
Expand All @@ -572,7 +567,7 @@ func (db *RoseDB) doRunGC(dataType DataType, specifiedFid int, gcRatio float64)
entry := &logfile.LogEntry{Key: field, Value: ent.Value}
_, size := logfile.EncodeEntry(ent)
valuePos.entrySize = size
if err = db.updateIndexTree(entry, valuePos, false, Hash); err != nil {
if err = db.updateIndexTree(idxTree, entry, valuePos, false, Hash); err != nil {
return err
}
}
Expand All @@ -585,14 +580,14 @@ func (db *RoseDB) doRunGC(dataType DataType, specifiedFid int, gcRatio float64)
if db.setIndex.trees[string(ent.Key)] == nil {
return nil
}
db.setIndex.idxTree = db.setIndex.trees[string(ent.Key)]
idxTree := db.setIndex.trees[string(ent.Key)]
if err := db.setIndex.murhash.Write(ent.Value); err != nil {
logger.Fatalf("fail to write murmur hash: %v", err)
}
sum := db.setIndex.murhash.EncodeSum128()
db.setIndex.murhash.Reset()

indexVal := db.setIndex.idxTree.Get(sum)
indexVal := idxTree.Get(sum)
if indexVal == nil {
return nil
}
Expand All @@ -607,7 +602,7 @@ func (db *RoseDB) doRunGC(dataType DataType, specifiedFid int, gcRatio float64)
entry := &logfile.LogEntry{Key: sum, Value: ent.Value}
_, size := logfile.EncodeEntry(ent)
valuePos.entrySize = size
if err = db.updateIndexTree(entry, valuePos, false, Set); err != nil {
if err = db.updateIndexTree(idxTree, entry, valuePos, false, Set); err != nil {
return err
}
}
Expand All @@ -621,14 +616,14 @@ func (db *RoseDB) doRunGC(dataType DataType, specifiedFid int, gcRatio float64)
if db.zsetIndex.trees[string(key)] == nil {
return nil
}
db.zsetIndex.idxTree = db.zsetIndex.trees[string(key)]
idxTree := db.zsetIndex.trees[string(key)]
if err := db.zsetIndex.murhash.Write(ent.Value); err != nil {
logger.Fatalf("fail to write murmur hash: %v", err)
}
sum := db.zsetIndex.murhash.EncodeSum128()
db.zsetIndex.murhash.Reset()

indexVal := db.zsetIndex.idxTree.Get(sum)
indexVal := idxTree.Get(sum)
if indexVal == nil {
return nil
}
Expand All @@ -641,7 +636,7 @@ func (db *RoseDB) doRunGC(dataType DataType, specifiedFid int, gcRatio float64)
entry := &logfile.LogEntry{Key: sum, Value: ent.Value}
_, size := logfile.EncodeEntry(ent)
valuePos.entrySize = size
if err = db.updateIndexTree(entry, valuePos, false, ZSet); err != nil {
if err = db.updateIndexTree(idxTree, entry, valuePos, false, ZSet); err != nil {
return err
}
}
Expand Down
62 changes: 29 additions & 33 deletions hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ func (db *RoseDB) HSet(key, field, value []byte) error {
if db.hashIndex.trees[string(key)] == nil {
db.hashIndex.trees[string(key)] = art.NewART()
}
db.hashIndex.idxTree = db.hashIndex.trees[string(key)]
idxTree := db.hashIndex.trees[string(key)]
entry := &logfile.LogEntry{Key: field, Value: value}
_, size := logfile.EncodeEntry(ent)
valuePos.entrySize = size
return db.updateIndexTree(entry, valuePos, true, Hash)
return db.updateIndexTree(idxTree, entry, valuePos, true, Hash)
}

// HSetNX sets the given value only if the field doesn't exist.
Expand All @@ -46,8 +46,8 @@ func (db *RoseDB) HSetNX(key, field, value []byte) (bool, error) {
if db.hashIndex.trees[string(key)] == nil {
db.hashIndex.trees[string(key)] = art.NewART()
}
db.hashIndex.idxTree = db.hashIndex.trees[string(key)]
val, err := db.getVal(field, Hash)
idxTree := db.hashIndex.trees[string(key)]
val, err := db.getVal(idxTree, field, Hash)
// field exists in db
if val != nil {
return false, nil
Expand All @@ -59,11 +59,10 @@ func (db *RoseDB) HSetNX(key, field, value []byte) (bool, error) {
return false, err
}

db.hashIndex.idxTree = db.hashIndex.trees[string(key)]
entry := &logfile.LogEntry{Key: field, Value: value}
_, size := logfile.EncodeEntry(ent)
valuePos.entrySize = size
err = db.updateIndexTree(entry, valuePos, true, Hash)
err = db.updateIndexTree(idxTree, entry, valuePos, true, Hash)
if err != nil {
return false, err
}
Expand All @@ -78,8 +77,8 @@ func (db *RoseDB) HGet(key, field []byte) ([]byte, error) {
if db.hashIndex.trees[string(key)] == nil {
return nil, nil
}
db.hashIndex.idxTree = db.hashIndex.trees[string(key)]
val, err := db.getVal(field, Hash)
idxTree := db.hashIndex.trees[string(key)]
val, err := db.getVal(idxTree, field, Hash)
if err == ErrKeyNotFound {
return nil, nil
}
Expand All @@ -90,11 +89,11 @@ func (db *RoseDB) HGet(key, field []byte) ([]byte, error) {
// For every field that does not exist in the hash, a nil value is returned.
// Because non-existing keys are treated as empty hashes,
// running HMGET against a non-existing key will return a list of nil values.
func (db *RoseDB) HMGet(key []byte, field ...[]byte) (vals [][]byte, err error) {
func (db *RoseDB) HMGet(key []byte, fields ...[]byte) (vals [][]byte, err error) {
db.hashIndex.mu.RLock()
defer db.hashIndex.mu.RUnlock()

length := len(field)
length := len(fields)
// key not exist
if db.hashIndex.trees[string(key)] == nil {
for i := 0; i < length; i++ {
Expand All @@ -103,10 +102,10 @@ func (db *RoseDB) HMGet(key []byte, field ...[]byte) (vals [][]byte, err error)
return vals, nil
}
// key exist
db.hashIndex.idxTree = db.hashIndex.trees[string(key)]
idxTree := db.hashIndex.trees[string(key)]

for _, v := range field {
val, err := db.getVal(v, Hash)
for _, field := range fields {
val, err := db.getVal(idxTree, field, Hash)
if err == ErrKeyNotFound {
vals = append(vals, nil)
} else {
Expand All @@ -126,7 +125,7 @@ func (db *RoseDB) HDel(key []byte, fields ...[]byte) (int, error) {
if db.hashIndex.trees[string(key)] == nil {
return 0, nil
}
db.hashIndex.idxTree = db.hashIndex.trees[string(key)]
idxTree := db.hashIndex.trees[string(key)]

var count int
for _, field := range fields {
Expand All @@ -137,7 +136,7 @@ func (db *RoseDB) HDel(key []byte, fields ...[]byte) (int, error) {
return 0, err
}

val, updated := db.hashIndex.idxTree.Delete(field)
val, updated := idxTree.Delete(field)
if updated {
count++
}
Expand All @@ -164,8 +163,8 @@ func (db *RoseDB) HExists(key, field []byte) (bool, error) {
if db.hashIndex.trees[string(key)] == nil {
return false, nil
}
db.hashIndex.idxTree = db.hashIndex.trees[string(key)]
val, err := db.getVal(field, Hash)
idxTree := db.hashIndex.trees[string(key)]
val, err := db.getVal(idxTree, field, Hash)
if err != nil && err != ErrKeyNotFound {
return false, err
}
Expand All @@ -180,8 +179,8 @@ func (db *RoseDB) HLen(key []byte) int {
if db.hashIndex.trees[string(key)] == nil {
return 0
}
db.hashIndex.idxTree = db.hashIndex.trees[string(key)]
return db.hashIndex.idxTree.Size()
idxTree := db.hashIndex.trees[string(key)]
return idxTree.Size()
}

// HKeys returns all field names in the hash stored at key.
Expand Down Expand Up @@ -215,15 +214,14 @@ func (db *RoseDB) HVals(key []byte) ([][]byte, error) {
if !ok {
return values, nil
}
db.hashIndex.idxTree = tree

iter := tree.Iterator()
for iter.HasNext() {
node, err := iter.Next()
if err != nil {
return nil, err
}
val, err := db.getVal(node.Key(), Hash)
val, err := db.getVal(tree, node.Key(), Hash)
if err != nil && err != ErrKeyNotFound {
return nil, err
}
Expand All @@ -241,7 +239,6 @@ func (db *RoseDB) HGetAll(key []byte) ([][]byte, error) {
if !ok {
return [][]byte{}, nil
}
db.hashIndex.idxTree = tree

var index int
pairs := make([][]byte, tree.Size()*2)
Expand All @@ -252,12 +249,11 @@ func (db *RoseDB) HGetAll(key []byte) ([][]byte, error) {
return nil, err
}
field := node.Key()
val, err := db.getVal(field, Hash)
val, err := db.getVal(tree, field, Hash)
if err != nil && err != ErrKeyNotFound {
return nil, err
}
pairs[index] = field
pairs[index+1] = val
pairs[index], pairs[index+1] = field, val
index += 2
}
return pairs[:index], nil
Expand All @@ -272,8 +268,8 @@ func (db *RoseDB) HStrLen(key, field []byte) int {
if db.hashIndex.trees[string(key)] == nil {
return 0
}
db.hashIndex.idxTree = db.hashIndex.trees[string(key)]
val, err := db.getVal(field, Hash)
idxTree := db.hashIndex.trees[string(key)]
val, err := db.getVal(idxTree, field, Hash)
if err == ErrKeyNotFound {
return 0
}
Expand All @@ -290,8 +286,8 @@ func (db *RoseDB) HScan(key []byte, prefix []byte, pattern string, count int) ([
if db.hashIndex.trees[string(key)] == nil {
return nil, nil
}
db.hashIndex.idxTree = db.hashIndex.trees[string(key)]
fields := db.hashIndex.idxTree.PrefixScan(prefix, count)
idxTree := db.hashIndex.trees[string(key)]
fields := idxTree.PrefixScan(prefix, count)
if len(fields) == 0 {
return nil, nil
}
Expand All @@ -310,7 +306,7 @@ func (db *RoseDB) HScan(key []byte, prefix []byte, pattern string, count int) ([
if reg != nil && !reg.Match(field) {
continue
}
val, err := db.getVal(field, Hash)
val, err := db.getVal(idxTree, field, Hash)
if err != nil && err != ErrKeyNotFound {
return nil, err
}
Expand All @@ -332,8 +328,8 @@ func (db *RoseDB) HIncrBy(key, field []byte, incr int64) (int64, error) {
db.hashIndex.trees[string(key)] = art.NewART()
}

db.hashIndex.idxTree = db.hashIndex.trees[string(key)]
val, err := db.getVal(field, Hash)
idxTree := db.hashIndex.trees[string(key)]
val, err := db.getVal(idxTree, field, Hash)
if err != nil && !errors.Is(err, ErrKeyNotFound) {
return 0, err
}
Expand Down Expand Up @@ -363,7 +359,7 @@ func (db *RoseDB) HIncrBy(key, field []byte, incr int64) (int64, error) {
entry := &logfile.LogEntry{Key: field, Value: val}
_, size := logfile.EncodeEntry(ent)
valuePos.entrySize = size
err = db.updateIndexTree(entry, valuePos, true, Hash)
err = db.updateIndexTree(idxTree, entry, valuePos, true, Hash)
if err != nil {
return 0, err
}
Expand Down
Loading

0 comments on commit 1794f62

Please sign in to comment.