Skip to content
This repository has been archived by the owner on Jan 18, 2020. It is now read-only.

Commit

Permalink
性能测试
Browse files Browse the repository at this point in the history
  • Loading branch information
gqcn committed Feb 8, 2018
1 parent ac85992 commit 01bb32b
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 48 deletions.
1 change: 0 additions & 1 deletion gkvdb/gkvdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ const (
gDATA_BUCKET_SIZE = 32 // 数据分块大小(byte, 值越大,数据增长时占用的空间越大)
gFILE_POOL_CACHE_TIMEOUT = 60 // 文件指针池缓存时间(秒)
gCACHE_DEFAULT_TIMEOUT = 10000 // gcache默认缓存时间(毫秒)
gBINLOG_AUTO_SYNCING = 100 // binlog自动同步到磁盘的时间(毫秒)
gAUTO_COMPACTING_MINSIZE = 512 // 当空闲块大小>=该大小时,对其进行数据整理
gAUTO_COMPACTING_TIMEOUT = 100 // 自动进行数据整理的时间(毫秒)
gBINLOG_MAX_LENGTH = 200000 // binlog临时队列最大长度,超过该长度则强制性阻塞同步到数据文件
Expand Down
8 changes: 5 additions & 3 deletions gkvdb/gkvdb_auto.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ func (table *Table) startAutoCompactingLoop() {
// 开启自动同步线程
func (db *DB) startAutoSyncingLoop() {
go func() {
for !db.isClosed() {
db.binlog.sync(0)
time.Sleep(gBINLOG_AUTO_SYNCING*time.Millisecond)
select {
case <- db.binlog.syncEvents:
db.binlog.sync()
case <- db.binlog.closeEvents:
return
}
}()
}
Expand Down
92 changes: 51 additions & 41 deletions gkvdb/gkvdb_binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,22 @@ import (
"gitee.com/johng/gf/g/os/gfilepool"
"gitee.com/johng/gf/g/container/glist"
"gitee.com/johng/gf/g/encoding/gbinary"
"math"
"gitee.com/johng/gf/g/os/grpool"
)

// binlog操作对象
type BinLog struct {
sync.RWMutex // binlog文件互斥锁
smu sync.RWMutex // binlog同步互斥锁
db *DB // 所属数据库
fp *gfilepool.Pool // 文件指针池
queue *glist.SafeList // 同步打包数据队列
length int32 // 队列数据项长度(注意queue中存放的是打包的数据项)
sync.RWMutex // binlog文件互斥锁
smu sync.RWMutex // binlog同步互斥锁
db *DB // 所属数据库
fp *gfilepool.Pool // 文件指针池
queue *glist.SafeList // 同步打包数据队列
length int32 // 队列数据项长度(注意queue中存放的是打包的数据项)
syncEvents chan struct{} // 数据同步通知事件
closeEvents chan struct{} // 数据库关闭事件
limitFreeEvents chan struct{} // 数据长度上限阻塞释放通知事件

}

// binlog写入项
Expand All @@ -33,8 +39,11 @@ type BinLogItem struct {
// 创建binlog对象
func newBinLog(db *DB) (*BinLog, error) {
binlog := &BinLog{
db : db,
queue : glist.NewSafeList(),
db : db,
queue : glist.NewSafeList(),
syncEvents : make(chan struct{}, math.MaxUint32),
closeEvents : make(chan struct{}, 2),
limitFreeEvents : make(chan struct{}, 0),
}
path := db.getBinLogFilePath()
if gfile.Exists(path) && (!gfile.IsWritable(path) || !gfile.IsReadable(path)){
Expand All @@ -47,6 +56,7 @@ func newBinLog(db *DB) (*BinLog, error) {
// 关闭binlog
func (binlog *BinLog) close() {
binlog.fp.Close()
binlog.closeEvents <- struct{}{}
}

// 从binlog文件中恢复未同步数据到memtable中
Expand All @@ -64,7 +74,6 @@ func (binlog *BinLog) initFromFile() {
if blsize < 0 ||
i + 13 + blsize + 8 > len(blbuffer) ||
bytes.Compare(blbuffer[i + 5 : i + 13], blbuffer[i + 13 + blsize : i + 13 + blsize + 8]) != 0 {
//fmt.Println("invalid:", i)
i++
continue
} else {
Expand All @@ -74,20 +83,21 @@ func (binlog *BinLog) initFromFile() {
for n, m := range datamap {
binlog.length += int32(len(m))
if table, err := binlog.db.Table(n); err == nil {
if err := table.memt.set(m); err != nil {
glog.Error(err)
}
table.memt.set(m)
} else {
glog.Error(err)
}
}
if binlog.queue.PushFront(BinLogItem{int64(i), datamap}) == nil {
glog.Error("push binlog to sync queue failed")
}
binlog.queue.PushFront(BinLogItem{int64(i), datamap})
}
i += 13 + blsize + 8
}
}

// 判断继续执行同步
if binlog.queue.Len() > 0 {
binlog.syncEvents <- struct{}{}
}
}

// 将二进制数据转换为事务对象
Expand All @@ -110,13 +120,16 @@ func (binlog *BinLog) binlogBufferToDataMap(buffer []byte) map[string]map[string
return datamap
}

// 是否binlog长度达到上限
func (binlog *BinLog) reachLengthLimit() bool {
return atomic.LoadInt32(&binlog.length) >= gBINLOG_MAX_LENGTH
}

// 添加binlog到文件,支持批量添加
// 返回写入的文件开始位置,以及是否有错误
func (binlog *BinLog) writeByTx(tx *Transaction) error {
// 首先判断队列长度,执行强制同步
if atomic.LoadInt32(&binlog.length) >= gBINLOG_MAX_LENGTH {
//fmt.Println("force binlog to sync, queue length:", atomic.LoadInt32(&binlog.length))
binlog.sync(1)
if binlog.reachLengthLimit() {
<- binlog.limitFreeEvents
}
// 内容序列
buffer := make([]byte, 0)
Expand Down Expand Up @@ -172,23 +185,20 @@ func (binlog *BinLog) writeByTx(tx *Transaction) error {
// 再写内存表(分别写入到对应表的memtable中)
for n, m := range tx.tables {
if table, err := tx.db.Table(n); err == nil {
if err := table.memt.set(m); err != nil {
os.Truncate(binlog.db.getBinLogFilePath(), start)
return err
}
table.memt.set(m)
} else {
os.Truncate(binlog.db.getBinLogFilePath(), start)
return err
}
}

// 添加到磁盘化队列
if binlog.queue.PushFront(BinLogItem{start, tx.tables}) == nil {
return errors.New("push binlog to sync queue failed")
}

binlog.queue.PushFront(BinLogItem{start, tx.tables})
// 增加数据队列长度记录
atomic.AddInt32(&binlog.length, length)

// 发送同步通知事件
binlog.syncEvents <- struct{}{}
return nil
}

Expand All @@ -210,13 +220,10 @@ func (binlog *BinLog) markSynced(start int64) error {
}

// 执行binlog同步
func (binlog *BinLog) sync(from int) {
// 来源于事务提交时的强制同步,需要判断同步内容大小
if (from == 1 && atomic.LoadInt32(&binlog.length) < gBINLOG_MAX_LENGTH) || binlog.queue.Len() == 0 {
//fmt.Println("no reaching binlog max length, no sync")
func (binlog *BinLog) sync() {
if binlog.queue.Len() == 0 {
return
}

// binlog互斥锁保证同时只有一个线程在运行
binlog.smu.Lock()
defer binlog.smu.Unlock()
Expand All @@ -234,20 +241,18 @@ func (binlog *BinLog) sync(from int) {
wg.Add(1)
length += int32(len(m))
// 不同的数据表异步执行数据保存
go func(n string, m map[string][]byte) {
name := n
data := m
grpool.Add(func() {
defer wg.Done()
// 获取数据表对象
table, err := binlog.db.Table(n)
table, err := binlog.db.Table(name)
if err != nil {
atomic.StoreInt32(&done, -1)
glog.Error(err)
return
}
// 执行保存操作
for k, v := range m {
if atomic.LoadInt32(&done) < 0 {
return
}
for k, v := range data {
if len(v) == 0 {
if err := table.remove([]byte(k)); err != nil {
atomic.StoreInt32(&done, -1)
Expand All @@ -262,7 +267,7 @@ func (binlog *BinLog) sync(from int) {
}
}
}
}(n, m)
})
}
wg.Wait()
// 同步失败,重新推入队列
Expand All @@ -271,14 +276,19 @@ func (binlog *BinLog) sync(from int) {
time.Sleep(time.Second)
} else {
binlog.markSynced(item.txstart)
limited := binlog.reachLengthLimit()
atomic.AddInt32(&binlog.length, -length)
if limited && !binlog.reachLengthLimit() {
close(binlog.limitFreeEvents)
binlog.limitFreeEvents = make(chan struct{}, 0)
}
}
} else {
// 将binlog文件锁起来,
// 防止在文件大小矫正过程中内容发生改变
binlog.Lock()
// 必须要保证所有binlog已经同步完成才执行清空操作
if atomic.LoadInt32(&binlog.length) > 0 && binlog.queue.Len() == 0 {
if atomic.LoadInt32(&binlog.length) <= 0 && binlog.queue.Len() == 0 {
// 清空数据库所有的表的缓存,由于该操作在binlog写锁内部执行,
// binlog写入完成之后才能写memtable,因此这里不存在memtable在清理的过程中写入数据的问题
binlog.db.tables.Iterator(func(k string, v interface{}){
Expand Down
3 changes: 1 addition & 2 deletions gkvdb/gkvdb_memtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ func (table *Table) newMemTable() *MemTable {
}

// 保存事务
func (mtable *MemTable) set(datamap map[string][]byte) error {
func (mtable *MemTable) set(datamap map[string][]byte) {
mtable.mu.Lock()
defer mtable.mu.Unlock()

for k, v := range datamap {
mtable.datamap[k] = v
}
return nil
}

// 查询键值对
Expand Down
6 changes: 5 additions & 1 deletion gkvdb_test/performance_test/gkvdb/gkvdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,12 @@ func TestRemove(count int) {


func main() {
count := 300000
count := 10000000
TestSet(count)
TestGet(count)
TestRemove(count)

select {

}
}

0 comments on commit 01bb32b

Please sign in to comment.