Skip to content

Commit

Permalink
Update parallel reader; add approximate statistics (matrixorigin#2159)
Browse files Browse the repository at this point in the history
  • Loading branch information
daviszhen authored Apr 4, 2022
1 parent d457b9d commit c3b3e35
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 63 deletions.
20 changes: 17 additions & 3 deletions pkg/vm/driver/kv/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ func (ce *kvExecutor) tpeScan(readCtx storage.ReadContext, shard metapb.Shard, r
case executor.GenWithResultLastKey:
nextKey = kv.NextKey(lastKey, nil)
case executor.UseShardEnd:
//the ShardEndKey may be nil too.
nextKey = ce.clone(shard.GetEnd())
}
}
Expand All @@ -224,9 +225,19 @@ func (ce *kvExecutor) tpeScan(readCtx storage.ReadContext, shard metapb.Shard, r
return rep, nil
}

//TODO: to remove the check after the adjust function been fixed
//the check ensure the endKey is not nil.
r := 0
for _, key := range keys {
if bytes.Compare(key, userReq.GetEnd()) >= 0 {
break
}
r++
}

tsr := pb.TpeScanResponse{
Keys: keys,
Values: values,
Keys: keys[:r],
Values: values[:r],
CompleteInAllShards: completed,
NextScanKey: nextKey,
}
Expand All @@ -236,7 +247,10 @@ func (ce *kvExecutor) tpeScan(readCtx storage.ReadContext, shard metapb.Shard, r
//for test
//print keys
//cnt := 0
//for _, key := range keys {
//for i, key := range keys {
// if i >= r {
// break
// }
// if bytes.Compare(key, userReq.GetEnd()) >= 0 {
// cnt++
// }
Expand Down
31 changes: 22 additions & 9 deletions pkg/vm/engine/tpe/engine/relation.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,21 @@ var (
)

func (trel *TpeRelation) Rows() int64 {
return 1
rows := int64(0)
for _, info := range trel.shards.ShardInfos() {
stats := info.GetStatistics()
rows += int64(stats.GetApproximateKeys())
}
return rows
}

func (trel *TpeRelation) Size(s string) int64 {
return 1
size := int64(0)
for _, info := range trel.shards.ShardInfos() {
stats := info.GetStatistics()
size += int64(stats.GetApproximateSize())
}
return size
}

func (trel *TpeRelation) Close() {
Expand Down Expand Up @@ -199,7 +209,8 @@ func (trel *TpeRelation) parallelReader(cnt int) []engine.Reader {
}

//for test
shardCountPerReader = shardInfosCount
//one reader for all shards
//shardCountPerReader = shardInfosCount

startIndex := 0
for i := 0; i < len(tpeReaders); i++ {
Expand All @@ -212,10 +223,12 @@ func (trel *TpeRelation) parallelReader(cnt int) []engine.Reader {
endKey: info.GetEndKey(),
nextScanKey: nil,
completeInShard: false,
shardID: info.GetShardID(),
node: ShardNode{
Addr: info.GetShardNode().Addr,
ID: info.GetShardNode().ID,
IDbytes: info.GetShardNode().IDbytes,
Addr: info.GetShardNode().Addr,
StoreID: info.GetShardNode().StoreID,
StoreIDbytes: info.GetShardNode().StoreIDbytes,
Statistics: info.GetStatistics(),
},
}
infos = append(infos, newInfo)
Expand Down Expand Up @@ -273,9 +286,9 @@ func (trel *TpeRelation) NewReader(cnt int) []engine.Reader {
nextScanKey: nil,
completeInShard: false,
node: ShardNode{
Addr: info.GetShardNode().Addr,
ID: info.GetShardNode().ID,
IDbytes: info.GetShardNode().IDbytes,
Addr: info.GetShardNode().Addr,
StoreID: info.GetShardNode().StoreID,
StoreIDbytes: info.GetShardNode().StoreIDbytes,
},
}
tr.shardInfos = append(tr.shardInfos, newInfo)
Expand Down
9 changes: 6 additions & 3 deletions pkg/vm/engine/tpe/engine/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package engine

import (
"fmt"
"github.com/matrixorigin/matrixcube/pb/metapb"
"github.com/matrixorigin/matrixcube/storage/kv/pebble"
"github.com/matrixorigin/matrixone/pkg/vm/driver"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
Expand Down Expand Up @@ -73,13 +74,14 @@ type ShardNode struct {
//the address of the store of the leader replica of the shard
Addr string
//the id of the store of the leader replica of the shard
ID uint64
StoreID uint64
//the bytes of the id
IDbytes string
StoreIDbytes string
Statistics metapb.ShardStats
}

func (sn ShardNode) String() string {
return fmt.Sprintf("ShardNode{ Addr %v ID %v IDbytes %v}", sn.Addr, sn.IDbytes, sn.IDbytes)
return fmt.Sprintf("ShardNode{ Addr %v ID %v IDbytes %v}", sn.Addr, sn.StoreID, sn.StoreIDbytes)
}

type ShardInfo struct {
Expand All @@ -90,6 +92,7 @@ type ShardInfo struct {
nextScanKey []byte
//scan shard completely?
completeInShard bool
shardID uint64
node ShardNode
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/tpe/tuplecodec/computationhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (chi *ComputationHandlerImpl) GetNodesHoldTheTable(dbId uint64, desc *descr
var nodes engine.Nodes
for _, node := range shards.nodes {
nodes = append(nodes, engine.Node{
Id: node.IDbytes,
Id: node.StoreIDbytes,
Addr: node.Addr,
})
}
Expand Down
20 changes: 14 additions & 6 deletions pkg/vm/engine/tpe/tuplecodec/cubekv.go
Original file line number Diff line number Diff line change
Expand Up @@ -996,10 +996,11 @@ func (ck *CubeKV) GetShardsWithRange(startKey TupleKey, endKey TupleKey) (interf
shardInfos = append(shardInfos, ShardInfo{
startKey: shard.GetStart(),
endKey: shard.GetEnd(),
shardID: shard.GetID(),
node: ShardNode{
Addr: store.ClientAddress,
IDbytes: string(codec.Uint642Bytes(store.ID)),
ID: store.ID,
Addr: store.ClientAddress,
StoreIDbytes: string(codec.Uint642Bytes(store.ID)),
StoreID: store.ID,
},
})

Expand All @@ -1015,12 +1016,19 @@ func (ck *CubeKV) GetShardsWithRange(startKey TupleKey, endKey TupleKey) (interf
//TODO: wait cube to fix
ck.Cube.RaftStore().GetRouter().AscendRange(uint64(pb.KVGroup), startKey, endKey, rpcpb.SelectLeader, callback)

//get statistics for every shard
for i := 0; i < len(shardInfos); i++ {
info := shardInfos[i]
stats := ck.Cube.RaftStore().GetRouter().GetShardStats(info.GetShardID())
shardInfos[i].statistics = stats
}

var nodes []ShardNode
for id, addr := range stores {
nodes = append(nodes, ShardNode{
Addr: addr,
IDbytes: string(codec.Uint642Bytes(id)),
ID: id,
Addr: addr,
StoreIDbytes: string(codec.Uint642Bytes(id)),
StoreID: id,
})
}

Expand Down
22 changes: 12 additions & 10 deletions pkg/vm/engine/tpe/tuplecodec/cubekv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,9 @@ func TestCubeKV_GetRangeWithLimit(t *testing.T) {
convey.So(err, convey.ShouldBeNil)
}

_, values1, _, _, err := kv.GetRangeWithLimit(TupleKey(prefix), nil, uint64(cnt))
endKey := SuccessorOfPrefix(TupleKey(prefix))

_, values1, _, _, err := kv.GetRangeWithLimit(TupleKey(prefix), endKey, uint64(cnt))
convey.So(err, convey.ShouldBeNil)

//for i, key := range keys1 {
Expand All @@ -467,7 +469,7 @@ func TestCubeKV_GetRangeWithLimit(t *testing.T) {
last := TupleKey(prefix)
readCnt := 0
for i := 0; i < cnt; i += step {
_, values, complete, nextScanKey, err := kv.GetRangeWithLimit(last, nil, uint64(step))
_, values, complete, nextScanKey, err := kv.GetRangeWithLimit(last, endKey, uint64(step))
convey.So(err, convey.ShouldBeNil)

for j := i; j < i+step; j++ {
Expand Down Expand Up @@ -567,15 +569,15 @@ func TestCubeKV_GetShardsWithRange(t *testing.T) {

for _, node := range shards.nodes {
fmt.Printf("%d %v | %s\n",
node.ID,
node.IDbytes, node.Addr)
node.StoreID,
node.StoreIDbytes, node.Addr)
}

for _, info := range shards.shardInfos {
fmt.Printf("%v %v %d %v | %s \n",
info.startKey, info.endKey,
info.node.ID,
info.node.IDbytes, info.node.Addr)
info.node.StoreID,
info.node.StoreIDbytes, info.node.Addr)
}
})
}
Expand Down Expand Up @@ -604,15 +606,15 @@ func TestCubeKV_GetShardsWithPrefix(t *testing.T) {

for _, node := range shards.nodes {
fmt.Printf("%d %v | %s\n",
node.ID,
node.IDbytes, node.Addr)
node.StoreID,
node.StoreIDbytes, node.Addr)
}

for _, info := range shards.shardInfos {
fmt.Printf("%v %v %d %v | %s \n",
info.startKey, info.endKey,
info.node.ID,
info.node.IDbytes, info.node.Addr)
info.node.StoreID,
info.node.StoreIDbytes, info.node.Addr)
}
})
}
Expand Down
93 changes: 67 additions & 26 deletions pkg/vm/engine/tpe/tuplecodec/indexhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ var (
errorWriteContextIsInvalid = errors.New("the write context is invalid")
errorReadContextIsInvalid = errors.New("the read context is invalid")
errorAttributeDoesNotHaveThePosition = errors.New("the attribute does not have the position")
errorShardNextScanKeyIsNil = errors.New("ShardNextScanKey is nil")
errorShardScanEndKeyIsNil = errors.New("ShardScanEndKey is nil")
)

var _ index.IndexHandler = &IndexHandlerImpl{}
Expand Down Expand Up @@ -121,16 +123,24 @@ func (ihi *IndexHandlerImpl) parallelReader(indexReadCtx *ReadContext) (*batch.B
}
}

if len(indexReadCtx.ShardNextScanKey) == 0 {
return nil, 0, errorShardNextScanKeyIsNil
}

if len(indexReadCtx.ShardScanEndKey) == 0 {
prefix := SuccessorOfPrefix(indexReadCtx.PrefixForScanKey)
prefixEnd := SuccessorOfPrefix(indexReadCtx.PrefixForScanKey)
if len(indexReadCtx.ShardEndKey) == 0 ||
prefix.Less(indexReadCtx.ShardScanEndKey) {
indexReadCtx.ShardScanEndKey = prefix
prefixEnd.Less(indexReadCtx.ShardScanEndKey) {
indexReadCtx.ShardScanEndKey = prefixEnd
} else {
indexReadCtx.ShardScanEndKey = indexReadCtx.ShardEndKey
}
}

if len(indexReadCtx.ShardScanEndKey) == 0 {
return nil, 0, errorShardScanEndKeyIsNil
}

//nextScanKey does not have the prefix of the table
//TODO: may be wrong,fix it
//if bytes.HasPrefix(indexReadCtx.ShardNextScanKey,indexReadCtx.PrefixForScanKey) {
Expand All @@ -148,9 +158,9 @@ func (ihi *IndexHandlerImpl) parallelReader(indexReadCtx *ReadContext) (*batch.B
//get keys with the prefix
for rowRead < int(ihi.kvLimit) {
needRead := int(ihi.kvLimit) - rowRead
logutil.Infof("readCtx before prefix %v %v",
indexReadCtx.PrefixForScanKey,
indexReadCtx.ParallelReaderContext)
//logutil.Infof("readCtx before prefix %v %v",
// indexReadCtx.PrefixForScanKey,
// indexReadCtx.ParallelReaderContext)
keys, values, complete, nextScanKey, err := ihi.kv.GetRangeWithPrefixLimit(
indexReadCtx.ShardNextScanKey,
indexReadCtx.ShardScanEndKey,
Expand All @@ -160,17 +170,17 @@ func (ihi *IndexHandlerImpl) parallelReader(indexReadCtx *ReadContext) (*batch.B
return nil, 0, err
}

//rowRead += len(keys)
//indexReadCtx.addReadCount(len(keys))
rowRead += len(keys)
indexReadCtx.addReadCount(len(keys))

//1.decode index key
//2.get fields wanted
for i := 0; i < len(keys); i++ {
if !keys[i].Less(indexReadCtx.ShardScanEndKey) {
break
}
rowRead++
indexReadCtx.addReadCount(1)
//if !keys[i].Less(indexReadCtx.ShardScanEndKey) {
// break
//}
//rowRead++
//indexReadCtx.addReadCount(1)
indexKey := keys[i][indexReadCtx.LengthOfPrefixForScanKey:]
_, dis, err := tkd.DecodePrimaryIndexKey(indexKey, indexReadCtx.IndexDesc)
if err != nil {
Expand All @@ -190,9 +200,9 @@ func (ihi *IndexHandlerImpl) parallelReader(indexReadCtx *ReadContext) (*batch.B
//need to update prefix
//decode index value
for i := 0; i < len(keys); i++ {
if !keys[i].Less(indexReadCtx.ShardScanEndKey) {
break
}
//if !keys[i].Less(indexReadCtx.ShardScanEndKey) {
// break
//}
//decode the name which is in the value
data := values[i]
if ihi.useLayout {
Expand Down Expand Up @@ -225,23 +235,54 @@ func (ihi *IndexHandlerImpl) parallelReader(indexReadCtx *ReadContext) (*batch.B
}

//get the next prefix
logutil.Infof("readCtx after complete %v prefix %v %v",
complete,
indexReadCtx.PrefixForScanKey,
indexReadCtx.ParallelReaderContext)
//logutil.Infof("readCtx after complete %v prefix %v nextScanKey %v ParallelReaderContext %v",
// complete,
// indexReadCtx.PrefixForScanKey,
// nextScanKey,
// indexReadCtx.ParallelReaderContext)

if complete {
logutil.Infof("parallel reader complete 1 in shard startKey %v endKey %v",
indexReadCtx.ShardStartKey,
indexReadCtx.ShardEndKey,
)
indexReadCtx.CompleteInShard = true
readFinished = true
break
}
//if the nextScanKey is out of the shard, stop scanning
if len(nextScanKey) == 0 || !nextScanKey.Less(indexReadCtx.ShardEndKey) {
indexReadCtx.CompleteInShard = true
readFinished = true
break
//the shardEnd is nil. it means +infinity
if len(indexReadCtx.ShardEndKey) == 0 {
//change it to the successor of the lastKey
if len(nextScanKey) == 0 {
if len(keys) != 0 {
indexReadCtx.ShardNextScanKey = SuccessorOfKey(keys[len(keys)-1])
} else {
logutil.Infof("parallel reader complete 2 in shard startKey %v endKey %v",
indexReadCtx.ShardStartKey,
indexReadCtx.ShardEndKey,
)
//the needRead can not be the zero.
//so, there is no data anymore.
indexReadCtx.CompleteInShard = true
readFinished = true
break
}
} else {
indexReadCtx.ShardNextScanKey = nextScanKey
}
} else {
//if the nextScanKey is out of the shard, stop scanning
if len(nextScanKey) == 0 || !nextScanKey.Less(indexReadCtx.ShardEndKey) {
logutil.Infof("parallel reader complete 3 in shard startKey %v endKey %v nextScanKey %v",
indexReadCtx.ShardStartKey,
indexReadCtx.ShardEndKey,
nextScanKey)
indexReadCtx.CompleteInShard = true
readFinished = true
break
}
indexReadCtx.ShardNextScanKey = nextScanKey
}
indexReadCtx.ShardNextScanKey = nextScanKey
}

TruncateBatch(bat, int(ihi.kvLimit), rowRead)
Expand Down
Loading

0 comments on commit c3b3e35

Please sign in to comment.