forked from huichen/wukong
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpersistent_storage_worker.go
84 lines (71 loc) · 1.74 KB
/
persistent_storage_worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package engine
import (
"bytes"
"encoding/binary"
"encoding/gob"
"github.com/huichen/wukong/types"
"io"
"log"
"sync/atomic"
)
type persistentStorageIndexDocumentRequest struct {
docId uint64
data types.DocumentIndexData
}
func (engine *Engine) persistentStorageIndexDocumentWorker(shard int) {
for {
request := <-engine.persistentStorageIndexDocumentChannels[shard]
// 得到key
b := make([]byte, 8)
length := binary.PutUvarint(b, request.docId)
// 得到value
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(request.data)
if err != nil {
atomic.AddUint64(&engine.numDocumentsStored, 1)
continue
}
// 将key-value写入数据库
engine.dbs[shard].Set(b[0:length], buf.Bytes())
atomic.AddUint64(&engine.numDocumentsStored, 1)
}
}
func (engine *Engine) persistentStorageRemoveDocumentWorker(docId uint64, shard uint32) {
// 得到key
b := make([]byte, 8)
length := binary.PutUvarint(b, docId)
// 从数据库删除该key
engine.dbs[shard].Delete(b[0:length])
}
func (engine *Engine) persistentStorageInitWorker(shard int) {
iter, err := engine.dbs[shard].SeekFirst()
if err == io.EOF {
engine.persistentStorageInitChannel <- true
return
} else if err != nil {
engine.persistentStorageInitChannel <- true
log.Fatal("无法遍历数据库")
}
for {
key, value, err := iter.Next()
if err == io.EOF {
break
} else if err != nil {
continue
}
// 得到docID
docId, _ := binary.Uvarint(key)
// 得到data
buf := bytes.NewReader(value)
dec := gob.NewDecoder(buf)
var data types.DocumentIndexData
err = dec.Decode(&data)
if err != nil {
continue
}
// 添加索引
engine.internalIndexDocument(docId, data)
}
engine.persistentStorageInitChannel <- true
}