-
Notifications
You must be signed in to change notification settings - Fork 0
/
batch.go
163 lines (144 loc) · 4.02 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package bitcask
import (
"bitcask/data"
"encoding/binary"
"sync"
"sync/atomic"
)
// 非事务操作
const nonTransactionSeqNo uint64 = 0
var txnFinKey = []byte("txn-fin")
// WriteBatch 原子写
type WriteBatch struct {
cfg WriteBatchConfig
mu *sync.Mutex
db *DB
pendingWrites map[string]*data.LogRecord // 暂存写入的信息
}
// NewWriteBatch 初始化WriteBatch方法
// NewWriteBatch creates a new WriteBatch object with the given WriteBatchConfig.
func (db *DB) NewWriteBatch(cfg WriteBatchConfig) *WriteBatch {
// b+树时且事务序列号文件不存在且不是第一次加载
if db.cfg.IndexType == BPTree && !db.seqNoFileExist && !db.isInitiated {
panic("cannot use write batch,seq NO file not exist")
}
return &WriteBatch{
cfg: cfg,
mu: new(sync.Mutex),
db: db,
pendingWrites: make(map[string]*data.LogRecord),
}
}
// Put adds a key-value pair to the write batch.
// It returns an error if the key is empty.
func (wb *WriteBatch) Put(key, value []byte) error {
if len(key) == 0 {
return ErrKeyIsEmpty
}
// Create a log record with the given key and value.
logRecord := &data.LogRecord{
Key: key,
Value: value,
}
// Add the log record to the pending writes map with a lock.
wb.mu.Lock()
wb.pendingWrites[string(key)] = logRecord
wb.mu.Unlock()
return nil
}
// Delete 从 WriteBatch 中删除指定的键。
// 如果键为空,则返回 ErrKeyIsEmpty 错误。
// 键对应的值存在时,会创建一个 LogRecord 并将其添加到 pendingWrites 映射中。
func (wb *WriteBatch) Delete(key []byte) error {
if len(key) == 0 {
return ErrKeyIsEmpty
}
wb.mu.Lock()
defer wb.mu.Unlock()
// 获取键对应的值
get := wb.db.index.Get(key)
if get == nil {
// 数据不存在,直接返回
if wb.pendingWrites[string(key)] != nil {
delete(wb.pendingWrites, string(key))
}
return nil
}
// 创建LogRecord并添加到pendingWrites映射中
wb.pendingWrites[string(key)] = &data.LogRecord{
Key: key,
Type: data.LogRecordDeleted,
}
return nil
}
// Commit 提交事务,将暂存的数据写入到数据文件,更新内存索引
func (wb *WriteBatch) Commit() error {
wb.mu.Lock()
defer wb.mu.Unlock()
if len(wb.pendingWrites) == 0 {
return nil
}
if uint(len(wb.pendingWrites)) > wb.cfg.MaxBatchNum {
return ErrExceedMaxBatchNum
}
wb.db.mu.Lock()
defer wb.db.mu.Unlock()
//更新事务的序列号
seqNo := atomic.AddUint64(&wb.db.seqNo, 1)
// 开始写数据到数据文件中
position := make(map[string]*data.LogRecordPos)
for _, record := range wb.pendingWrites {
pos, err := wb.db.appendLogRecord(&data.LogRecord{
Key: logRecordKeyWriteWithSeq(record.Key, seqNo),
Value: record.Value,
Type: record.Type,
})
if err != nil {
return err
}
position[string(record.Key)] = pos
}
// 事务完成标识
finished := &data.LogRecord{
Key: logRecordKeyWriteWithSeq(txnFinKey, seqNo),
Type: data.LogRecordTxnFinished,
}
_, err := wb.db.appendLogRecord(finished)
if err != nil {
return err
}
// 进行持久化
if wb.cfg.SyncWrites && wb.db.activeFile != nil {
if err := wb.db.activeFile.Sync(); err != nil {
return err
}
}
//更新内存索引
for _, record := range wb.pendingWrites {
pos := position[string(record.Key)]
if record.Type == data.LogRecordNormal {
wb.db.index.Put(record.Key, pos)
}
if record.Type == data.LogRecordDeleted {
wb.db.index.Delete(record.Key)
}
}
//将暂存数据清空
wb.pendingWrites = make(map[string]*data.LogRecord)
return nil
}
// logRecordKeyWriteWithSeq key + seq num 编码
func logRecordKeyWriteWithSeq(key []byte, seqNo uint64) []byte {
seq := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(seq[:], seqNo)
encKey := make([]byte, len(key)+n)
copy(encKey[:n], seq[:n])
copy(encKey[n:], key)
return encKey
}
// 解析 LogRecord 的 key,获取实际的 key 和事务序列号
func parseLogRecordKey(key []byte) ([]byte, uint64) {
seqNo, n := binary.Uvarint(key)
realKey := key[n:]
return realKey, seqNo
}