Skip to content

Commit

Permalink
Tae layout (matrixorigin#2534)
Browse files Browse the repository at this point in the history
* Add tae layout

* Add mutex when NewBlockFile

* Fix bitmap free bug

* Fix log write bug

* Add file Destory

* Update dataio block

* Create segment to disable sync by default

* Optimize the Destroy interface of the block

* Add unmount segment of layout

* Add layout segment Destroy

* Add Allocator interface

* Add free space and delete segment file

* Disable file free
  • Loading branch information
LeftHandCold authored May 15, 2022
1 parent 76f0511 commit 11df252
Show file tree
Hide file tree
Showing 23 changed files with 1,968 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ endif

.PHONY: fmt
fmt:
gofmt -l -s .
gofmt -l -s -w .


.PHONY: install-static-check-tools
Expand Down
5 changes: 5 additions & 0 deletions pkg/vm/engine/tae/dataio/mockio/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/file"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/layout/segment"
)

var SegmentFileMockFactory = func(name string, id uint64) file.Segment {
Expand All @@ -36,6 +37,10 @@ type segmentFile struct {
name string
}

func (sf *segmentFile) GetSegmentFile() *segment.Segment {
panic(any("implement me"))
}

func newSegmentFile(name string, id uint64) *segmentFile {
sf := &segmentFile{
blocks: make(map[uint64]*blockFile),
Expand Down
305 changes: 305 additions & 0 deletions pkg/vm/engine/tae/dataio/segmentio/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package segmentio

import (
"bytes"
"github.com/RoaringBitmap/roaring"
gbat "github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
gvec "github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/container/batch"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/container/compute"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/container/vector"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/file"
idxCommon "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index/common"
"sync"
)

type blockFile struct {
common.RefHelper
seg file.Segment
rows uint32
id uint64
ts uint64
columns []*columnBlock
deletes *deletesFile
indexMeta *dataFile
destroy sync.Mutex
}

func newBlock(id uint64, seg file.Segment, colCnt int, indexCnt map[int]int) *blockFile {
bf := &blockFile{
seg: seg,
id: id,
columns: make([]*columnBlock, colCnt),
}
bf.deletes = newDeletes(bf)
bf.indexMeta = newIndex(&columnBlock{block: bf}).dataFile
bf.OnZeroCB = bf.close
for i := range bf.columns {
cnt := 0
if indexCnt != nil {
cnt = indexCnt[i]
}
bf.columns[i] = newColumnBlock(bf, cnt, i)
}
bf.Ref()
return bf
}

func (bf *blockFile) Fingerprint() *common.ID {
return &common.ID{
BlockID: bf.id,
}
}

func (bf *blockFile) close() {
bf.Close()
err := bf.Destroy()
if err != nil {
panic(any("Destroy error"))
}
}

func (bf *blockFile) WriteRows(rows uint32) (err error) {
bf.rows = rows
return nil
}

func (bf *blockFile) ReadRows() uint32 {
return bf.rows
}

func (bf *blockFile) WriteTS(ts uint64) (err error) {
bf.ts = ts
return
}

func (bf *blockFile) ReadTS() (ts uint64, err error) {
ts = bf.ts
return
}

func (bf *blockFile) WriteDeletes(buf []byte) (err error) {
_, err = bf.deletes.Write(buf)
return
}

func (bf *blockFile) ReadDeletes(buf []byte) (err error) {
_, err = bf.deletes.Read(buf)
return
}

func (bf *blockFile) WriteIndexMeta(buf []byte) (err error) {
_, err = bf.indexMeta.Write(buf)
return
}

func (bf *blockFile) LoadIndexMeta() (*idxCommon.IndicesMeta, error) {
size := bf.indexMeta.Stat().Size()
buf := make([]byte, size)
_, err := bf.indexMeta.Read(buf)
if err != nil {
return nil, err
}
indices := idxCommon.NewEmptyIndicesMeta()
if err = indices.Unmarshal(buf); err != nil {
return nil, err
}
return indices, nil
}

func (bf *blockFile) OpenColumn(colIdx int) (colBlk file.ColumnBlock, err error) {
if colIdx >= len(bf.columns) {
err = file.ErrInvalidParam
return
}
bf.columns[colIdx].Ref()
colBlk = bf.columns[colIdx]
return
}

func (bf *blockFile) Close() error {
return nil
}

func (bf *blockFile) removeData(data *dataFile) {
if data.file != nil {
for _, file := range data.file {
bf.seg.GetSegmentFile().ReleaseFile(file)
}
}
}

func (bf *blockFile) Destroy() error {
bf.destroy.Lock()
defer bf.destroy.Unlock()
if bf.columns == nil {
return nil
}
for _, cb := range bf.columns {
cb.Unref()
}
bf.removeData(bf.deletes.dataFile)
bf.removeData(bf.indexMeta)
bf.columns = nil
bf.deletes = nil
bf.indexMeta = nil
if bf.seg != nil {
bf.seg.RemoveBlock(bf.id)
}
return nil
}

func (bf *blockFile) Sync() error { return bf.seg.GetSegmentFile().Sync() }

func (bf *blockFile) LoadIBatch(colTypes []types.Type, maxRow uint32) (bat batch.IBatch, err error) {
attrs := make([]int, len(bf.columns))
vecs := make([]vector.IVector, len(attrs))
var f common.IRWFile
for i, colBlk := range bf.columns {
if f, err = colBlk.OpenDataFile(); err != nil {
return
}
defer f.Unref()
size := f.Stat().Size()
buf := make([]byte, size)
if _, err = f.Read(buf); err != nil {
return
}
vec := vector.NewVector(colTypes[i], uint64(maxRow))
if err = vec.Unmarshal(buf); err != nil {
return
}
vecs[i] = vec
attrs[i] = i
}
bat, err = batch.NewBatch(attrs, vecs)
return
}

func (bf *blockFile) LoadBatch(attrs []string, colTypes []types.Type) (bat *gbat.Batch, err error) {
bat = gbat.New(true, attrs)
var f common.IRWFile
for i, colBlk := range bf.columns {
if f, err = colBlk.OpenDataFile(); err != nil {
return
}
defer f.Unref()
size := f.Stat().Size()
buf := make([]byte, size)
if _, err = f.Read(buf); err != nil {
return
}
vec := gvec.New(colTypes[i])
if err = vec.Read(buf); err != nil {
return
}
bat.Vecs[i] = vec
}
return
}

func (bf *blockFile) WriteColumnVec(ts uint64, colIdx int, vec *gvec.Vector) (err error) {
cb, err := bf.OpenColumn(colIdx)
if err != nil {
return err
}
defer cb.Close()
err = cb.WriteTS(ts)
buf, err := vec.Show()
if err != nil {
return err
}
err = cb.WriteData(buf)
return
}

func (bf *blockFile) WriteBatch(bat *gbat.Batch, ts uint64) (err error) {
if err = bf.WriteTS(ts); err != nil {
return
}
if err = bf.WriteRows(uint32(gvec.Length(bat.Vecs[0]))); err != nil {
return
}
for colIdx := range bat.Attrs {
if err = bf.WriteColumnVec(ts, colIdx, bat.Vecs[colIdx]); err != nil {
return
}
}
return
}

func (bf *blockFile) WriteIBatch(bat batch.IBatch, ts uint64, masks map[uint16]*roaring.Bitmap, vals map[uint16]map[uint32]interface{}, deletes *roaring.Bitmap) (err error) {
attrs := bat.GetAttrs()
var w bytes.Buffer
if deletes != nil {
if _, err = deletes.WriteTo(&w); err != nil {
return
}
}
if err = bf.WriteTS(ts); err != nil {
return err
}
for _, colIdx := range attrs {
cb, err := bf.OpenColumn(colIdx)
if err != nil {
return err
}
defer cb.Close()
err = cb.WriteTS(ts)
if err != nil {
return err
}
vec, err := bat.GetVectorByAttr(colIdx)
if err != nil {
return err
}
updates := vals[uint16(colIdx)]
if updates != nil {
w.Reset()
mask := masks[uint16(colIdx)]
if _, err = mask.WriteTo(&w); err != nil {
return err
}
col := gvec.New(vec.GetDataType())
it := mask.Iterator()
for it.HasNext() {
row := it.Next()
v := updates[row]
compute.AppendValue(col, v)
}
buf, err := col.Show()
if err != nil {
return err
}
w.Write(buf)
if err = cb.WriteUpdates(w.Bytes()); err != nil {
return err
}
}
w.Reset()
buf, err := vec.Marshal()
if err != nil {
return err
}
if err = cb.WriteData(buf); err != nil {
return err
}
}
return
}
Loading

0 comments on commit 11df252

Please sign in to comment.