Skip to content

Commit

Permalink
Update dataio (matrixorigin#2660)
Browse files Browse the repository at this point in the history
* Add update&delete file

* Update replay data file

* Fix code check
  • Loading branch information
LeftHandCold authored May 26, 2022
1 parent 2775d86 commit 71c3f57
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 26 deletions.
19 changes: 11 additions & 8 deletions pkg/vm/engine/tae/dataio/segmentio/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package segmentio
import (
"bytes"
"fmt"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/layout/segment"
"sync"

"github.com/RoaringBitmap/roaring"
Expand Down Expand Up @@ -51,6 +52,9 @@ func newBlock(id uint64, seg file.Segment, colCnt int, indexCnt map[int]int) *bl
columns: make([]*columnBlock, colCnt),
}
bf.deletes = newDeletes(bf)
bf.deletes.file = make([]*segment.BlockFile, 1)
bf.deletes.file[0] = bf.seg.GetSegmentFile().NewBlockFile(
fmt.Sprintf("%d_%d.del", colCnt, bf.id))
bf.indexMeta = newIndex(&columnBlock{block: bf}).dataFile
bf.OnZeroCB = bf.close
for i := range bf.columns {
Expand All @@ -71,6 +75,7 @@ func replayBlock(id uint64, seg file.Segment, colCnt int, indexCnt map[int]int)
columns: make([]*columnBlock, colCnt),
}
bf.deletes = newDeletes(bf)
bf.deletes.file = make([]*segment.BlockFile, 1)
bf.indexMeta = newIndex(&columnBlock{block: bf}).dataFile
bf.OnZeroCB = bf.close
for i := range bf.columns {
Expand Down Expand Up @@ -109,6 +114,12 @@ func (bf *blockFile) ReadRows() uint32 {

func (bf *blockFile) WriteTS(ts uint64) (err error) {
bf.ts = ts
if bf.deletes.file != nil {
bf.deletes.mutex.Lock()
defer bf.deletes.mutex.Unlock()
bf.deletes.file = append(bf.deletes.file,
bf.seg.GetSegmentFile().NewBlockFile(fmt.Sprintf("%d_%d_%d.del", len(bf.columns), bf.id, ts)))
}
return
}

Expand Down Expand Up @@ -160,14 +171,6 @@ func (bf *blockFile) Close() error {
return nil
}

func (bf *blockFile) removeData(data *dataFile) {
if data.file != nil {
for _, file := range data.file {
bf.seg.GetSegmentFile().ReleaseFile(file)
}
}
}

func (bf *blockFile) Destroy() error {
bf.destroy.Lock()
defer bf.destroy.Unlock()
Expand Down
26 changes: 19 additions & 7 deletions pkg/vm/engine/tae/dataio/segmentio/colblk.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/file"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/layout/segment"
"sync"
)

type columnBlock struct {
mutex sync.RWMutex
common.RefHelper
block *blockFile
ts uint64
Expand All @@ -44,6 +42,9 @@ func newColumnBlock(block *blockFile, indexCnt int, col int) *columnBlock {
cb.indexes[i] = newIndex(cb)
}
cb.updates = newUpdates(cb)
cb.updates.file = make([]*segment.BlockFile, 1)
cb.updates.file[0] = cb.block.seg.GetSegmentFile().NewBlockFile(
fmt.Sprintf("%d_%d.update", cb.col, cb.block.id))
cb.data = newData(cb)
cb.data.file = make([]*segment.BlockFile, 1)
cb.data.file[0] = cb.block.seg.GetSegmentFile().NewBlockFile(
Expand All @@ -63,8 +64,9 @@ func openColumnBlock(block *blockFile, indexCnt int, col int) *columnBlock {
cb.indexes[i] = newIndex(cb)
}
cb.updates = newUpdates(cb)
cb.updates.file = make([]*segment.BlockFile, 1)
cb.data = newData(cb)
cb.data.file = make([]*segment.BlockFile, 0)
cb.data.file = make([]*segment.BlockFile, 1)
cb.OnZeroCB = cb.close
cb.Ref()
return cb
Expand All @@ -73,11 +75,17 @@ func openColumnBlock(block *blockFile, indexCnt int, col int) *columnBlock {
func (cb *columnBlock) WriteTS(ts uint64) (err error) {
cb.ts = ts
if cb.data.file != nil {
cb.mutex.Lock()
defer cb.mutex.Unlock()
cb.data.mutex.Lock()
defer cb.data.mutex.Unlock()
cb.data.file = append(cb.data.file,
cb.block.seg.GetSegmentFile().NewBlockFile(fmt.Sprintf("%d_%d_%d.blk", cb.col, cb.block.id, ts)))
}
if cb.updates.file != nil {
cb.updates.mutex.Lock()
defer cb.updates.mutex.Unlock()
cb.updates.file = append(cb.updates.file,
cb.block.seg.GetSegmentFile().NewBlockFile(fmt.Sprintf("%d_%d_%d.update", cb.col, cb.block.id, ts)))
}
return
}

Expand Down Expand Up @@ -165,11 +173,15 @@ func (cb *columnBlock) close() {

func (cb *columnBlock) Destroy() {
logutil.Infof("Destroying Block %d Col @ TS %d", cb.block.id, cb.ts)
cb.mutex.RLock()
cb.data.mutex.Lock()
files := cb.data.file
cb.mutex.RUnlock()
cb.data.file = nil
cb.data.mutex.Unlock()
if files != nil {
for _, file := range files {
if file == nil {
continue
}
cb.block.seg.GetSegmentFile().ReleaseFile(file)
}
}
Expand Down
28 changes: 24 additions & 4 deletions pkg/vm/engine/tae/dataio/segmentio/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ package segmentio
import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/layout/segment"
"sync"
)

const UPGRADE_FILE_NUM = 10

type dataFile struct {
mutex sync.RWMutex
colBlk *columnBlock
file []*segment.BlockFile
buf []byte
Expand Down Expand Up @@ -85,13 +89,14 @@ func (df *dataFile) Write(buf []byte) (n int, err error) {
df.stat.originSize = int64(len(df.buf))
return
}
df.colBlk.mutex.RLock()
df.mutex.RLock()
file := df.file[len(df.file)-1]
df.colBlk.mutex.RUnlock()
df.mutex.RUnlock()
err = file.GetSegement().Append(file, buf)
df.stat.algo = file.GetAlgo()
df.stat.originSize = file.GetOriginSize()
df.stat.size = file.GetFileSize()
df.upgradeFile()
return
}

Expand All @@ -105,13 +110,28 @@ func (df *dataFile) Read(buf []byte) (n int, err error) {
if bufLen == 0 {
return 0, nil
}
df.colBlk.mutex.RLock()
df.mutex.RLock()
file := df.file[len(df.file)-1]
df.colBlk.mutex.RUnlock()
df.mutex.RUnlock()
n, err = file.Read(buf)
return n, nil
}

func (df *dataFile) upgradeFile() {
if len(df.file) < UPGRADE_FILE_NUM {
return
}
go func() {
df.mutex.Lock()
releaseFile := df.file[:len(df.file)-1]
df.file = df.file[len(df.file)-1 : len(df.file)]
df.mutex.Unlock()
for _, file := range releaseFile {
df.colBlk.block.seg.GetSegmentFile().ReleaseFile(file)
}
}()
}

func (df *dataFile) GetFileType() common.FileType {
return common.DiskFile
}
Expand Down
51 changes: 44 additions & 7 deletions pkg/vm/engine/tae/dataio/segmentio/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ func (sf *segmentFile) RemoveBlock(id uint64) {
delete(sf.blocks, id)
}

func (sf *segmentFile) replayInfo(stat *fileStat, file *segment.BlockFile) {
stat.size = file.GetFileSize()
stat.originSize = file.GetOriginSize()
stat.algo = file.GetAlgo()
stat.name = file.GetName()
}

func (sf *segmentFile) Replay(colCnt int, indexCnt map[int]int, cache *bytes.Buffer) error {
err := sf.seg.Replay(cache)
if err != nil {
Expand All @@ -81,7 +88,7 @@ func (sf *segmentFile) Replay(colCnt int, indexCnt map[int]int, cache *bytes.Buf
sf.Lock()
defer sf.Unlock()
for name, file := range nodes {
tmpName := strings.Split(name, ".blk")
tmpName := strings.Split(name, ".")
fileName := strings.Split(tmpName[0], "_")
if len(fileName) < 2 {
continue
Expand All @@ -99,19 +106,49 @@ func (sf *segmentFile) Replay(colCnt int, indexCnt map[int]int, cache *bytes.Buf
if err != nil {
return err
}
bf.columns[col].data.file = append(bf.columns[col].data.file, file)
bf.columns[col].data.stat.size = file.GetFileSize()
bf.columns[col].data.stat.originSize = file.GetOriginSize()
bf.columns[col].data.stat.algo = file.GetAlgo()
bf.columns[col].data.stat.name = file.GetName()
var ts uint64 = 0
if len(fileName) > 2 {
ts, err := strconv.ParseUint(fileName[2], 10, 64)
ts, err = strconv.ParseUint(fileName[2], 10, 64)
if err != nil {
return err
}
}
switch tmpName[1] {
case "blk":
sf.replayInfo(bf.columns[col].data.stat, file)
if ts == 0 {
bf.columns[col].ts = 0
bf.columns[col].data.file[0] = file
break
}
if bf.columns[col].ts < ts {
bf.columns[col].ts = ts
bf.columns[col].data.file[0] = file
}
case "update":
sf.replayInfo(bf.columns[col].updates.stat, file)
if ts == 0 {
bf.columns[col].ts = 0
bf.columns[col].updates.file[0] = file
break
}
if bf.columns[col].ts < ts {
bf.columns[col].ts = ts
bf.columns[col].updates.file[0] = file
}
case "del":
sf.replayInfo(bf.deletes.stat, file)
if ts == 0 {
bf.ts = 0
bf.deletes.file[0] = file
break
}
if bf.ts < ts {
bf.ts = ts
bf.deletes.file[0] = file
}
default:
panic(any("No Support"))
}

}
Expand Down
13 changes: 13 additions & 0 deletions pkg/vm/engine/tae/dataio/segmentio/segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package segmentio

import (
"bytes"
roaring "github.com/RoaringBitmap/roaring/roaring64"
"github.com/matrixorigin/matrixone/pkg/compress"
"path"
"testing"
Expand Down Expand Up @@ -67,10 +68,22 @@ func TestSegmentFile_Replay(t *testing.T) {
var w bytes.Buffer
dataStr := "hello tae"
w.WriteString(dataStr)
deletes := roaring.New()
deletes.Add(10)
deletes.Add(20)
deletesBuf, _ := deletes.ToBytes()
for i := 0; i < 20; i++ {
blkId1 := common.NextGlobalSeqNum()
block, err := seg.OpenBlock(blkId1, colCnt, indexCnt)
assert.Nil(t, err)
blockTs := common.NextGlobalSeqNum()
err = block.WriteTS(blockTs)
assert.Nil(t, err)
readTs, _ := block.ReadTS()
assert.Equal(t, blockTs, readTs)

err = block.WriteDeletes(deletesBuf)
assert.Nil(t, err)
ids = append(ids, blkId1)

colBlk0, err := block.OpenColumn(0)
Expand Down

0 comments on commit 71c3f57

Please sign in to comment.