Skip to content

Commit

Permalink
Update TPE multi-node handling (matrixorigin#2187)
Browse files Browse the repository at this point in the history
  • Loading branch information
daviszhen authored Apr 11, 2022
1 parent 7d7c310 commit e1da54f
Show file tree
Hide file tree
Showing 16 changed files with 747 additions and 305 deletions.
1 change: 1 addition & 0 deletions cmd/db-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func main() {
tpeConf.PBKV = kvs
tpeConf.KVLimit = uint64(config.GlobalSystemVariables.GetTpeKVLimit())
tpeConf.ParallelReader = config.GlobalSystemVariables.GetTpeParallelReader()
tpeConf.MultiNode = config.GlobalSystemVariables.GetTpeMultiNode()
tpeConf.TpeDedupSetBatchTimeout = time.Duration(config.GlobalSystemVariables.GetTpeDedupSetBatchTimeout())
tpeConf.TpeDedupSetBatchTrycount = int(config.GlobalSystemVariables.GetTpeDedupSetBatchTryCount())
tpeConf.ValueLayoutSerializerType = config.GlobalSystemVariables.GetTpeValueLayoutSerializer()
Expand Down
10 changes: 10 additions & 0 deletions cmd/generate-config/system_vars_def.toml
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,16 @@ values = []
comment = "default is false. Enable transactional processing engine."
update-mode = "dynamic"

[[parameter]]
name = "tpeMultiNode"
scope = ["global"]
access = ["file"]
type = "bool"
domain-type = "set"
values = []
comment = "default is false. true : tpe works well in multi node environment."
update-mode = "dynamic"

[[parameter]]
name = "tpeKVType"
scope = ["global"]
Expand Down
4 changes: 3 additions & 1 deletion pkg/vm/engine/tpe/computation/computationHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,6 @@ type ComputationHandler interface {
GetNodesHoldTheTable(dbId uint64, desc *descriptor.RelationDesc) (engine.Nodes, interface{}, error)

ParallelReader() bool
}

MultiNode() bool
}
106 changes: 67 additions & 39 deletions pkg/vm/engine/tpe/engine/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package engine
import (
"errors"
"fmt"
"github.com/matrixorigin/matrixone/pkg/logutil"
"time"

"github.com/matrixorigin/matrixone/pkg/vm/engine"
Expand All @@ -26,59 +27,86 @@ import (
)

var (
errorUnsupportedTableDef = errors.New("unsupported tableDef")
errorUnsupportedTableDef = errors.New("unsupported tableDef")
errorDuplicatePrimaryKeyName = errors.New("duplicate primary key name")
errorDuplicateAttributeName = errors.New("duplicate attribute name")
errorDuplicateAttributeName = errors.New("duplicate attribute name")
)

func (td * TpeDatabase) Relations() []string {
func (td *TpeDatabase) Relations() []string {
var names []string
tableDescs, err := td.computeHandler.ListTables(td.id)
if err != nil {
return names
}
for _, desc := range tableDescs {
names = append(names,desc.Name)
names = append(names, desc.Name)
}
return names
}

func (td * TpeDatabase) Relation(name string) (engine.Relation, error) {
tableDesc, err := td.computeHandler.GetTable(td.id,name)
func (td *TpeDatabase) Relation(name string) (engine.Relation, error) {
tableDesc, err := td.computeHandler.GetTable(td.id, name)
if err != nil {
return nil, err
}

//load nodes for the table
nodes, shardsHandler, err := td.computeHandler.GetNodesHoldTheTable(td.id,tableDesc)
nodes, shardsHandler, err := td.computeHandler.GetNodesHoldTheTable(td.id, tableDesc)
if err != nil {
return nil, err
}

shards,ok := shardsHandler.(*tuplecodec.Shards)
shards, ok := shardsHandler.(*tuplecodec.Shards)
if !ok {
return nil,tuplecodec.ErrorIsNotShards
return nil, tuplecodec.ErrorIsNotShards
}

var shardsInfosInThisNode []tuplecodec.ShardInfo
//only records this node and the shards that this node holds.
for _, info := range shards.GetShardInfos() {
if info.GetShardNode().StoreID != td.storeID {
continue
}
shardsInfosInThisNode = append(shardsInfosInThisNode, info)
}
shardsInThisNode := &tuplecodec.Shards{}
shardsInThisNode.SetShardInfos(shardsInfosInThisNode)

logutil.Infof("cube_store_id %d", td.storeID)

var thisNodes engine.Nodes
for _, node := range shards.GetShardNodes() {
if node.StoreID == td.storeID {
thisNodes = append(thisNodes, engine.Node{
Id: node.StoreIDbytes,
Addr: node.Addr,
})
break
}
}

return &TpeRelation{
id: uint64(tableDesc.ID),
dbDesc: td.desc,
desc: tableDesc,
computeHandler: td.computeHandler,
nodes: nodes,
shards: shards,
},nil
id: uint64(tableDesc.ID),
dbDesc: td.desc,
desc: tableDesc,
computeHandler: td.computeHandler,
nodes: nodes,
shards: shards,
thisNodes: thisNodes,
shardsInThisNode: shardsInThisNode,
storeID: td.storeID,
}, nil
}

func (td * TpeDatabase) Delete(epoch uint64, name string) error {
_, err := td.computeHandler.DropTable(epoch,td.id,name)
func (td *TpeDatabase) Delete(epoch uint64, name string) error {
_, err := td.computeHandler.DropTable(epoch, td.id, name)
if err != nil {
return err
}
return nil
}

func (td * TpeDatabase) Create(epoch uint64,name string, defs []engine.TableDef) error {
func (td *TpeDatabase) Create(epoch uint64, name string, defs []engine.TableDef) error {
//convert defs into desc
tableDesc := &descriptor.RelationDesc{}

Expand All @@ -90,15 +118,15 @@ func (td * TpeDatabase) Create(epoch uint64,name string, defs []engine.TableDef)
for _, def := range defs {
if pk, ok := def.(*engine.PrimaryIndexDef); ok {
for _, pkName := range pk.Names {
if _,exist := dedupPKs[pkName]; exist{
if _, exist := dedupPKs[pkName]; exist {
return errorDuplicatePrimaryKeyName
}else{
} else {
dedupPKs[pkName] = 1
}
}
primaryKeys = append(primaryKeys, pk.Names...)
}
if cmt,ok := def.(*engine.CommentDef); ok {
if cmt, ok := def.(*engine.CommentDef); ok {
tableDesc.Comment = cmt.Comment
}
}
Expand All @@ -113,12 +141,12 @@ func (td * TpeDatabase) Create(epoch uint64,name string, defs []engine.TableDef)
//tpe must need primary key
if len(primaryKeys) == 0 {
//add implicit primary key
pkFieldName := "rowid"+fmt.Sprintf("%d",time.Now().Unix())
pkFieldName := "rowid" + fmt.Sprintf("%d", time.Now().Unix())
pkAttrDesc := descriptor.AttributeDesc{
ID: uint32(columnIdx),
ID: uint32(columnIdx),
Name: pkFieldName,
Ttype: orderedcodec.VALUE_TYPE_UINT64,
TypesType: tuplecodec.TpeTypeToEngineType(orderedcodec.VALUE_TYPE_UINT64),
TypesType: tuplecodec.TpeTypeToEngineType(orderedcodec.VALUE_TYPE_UINT64),
Is_null: false,
Default_value: "",
Is_hidden: true,
Expand All @@ -130,43 +158,43 @@ func (td * TpeDatabase) Create(epoch uint64,name string, defs []engine.TableDef)
Constrains: nil,
}

tableDesc.Attributes = append(tableDesc.Attributes,pkAttrDesc)
tableDesc.Attributes = append(tableDesc.Attributes, pkAttrDesc)

indexDesc := descriptor.IndexDesc_Attribute{
Name: pkAttrDesc.Name,
Direction: 0,
ID: pkAttrDesc.ID,
ID: pkAttrDesc.ID,
Type: orderedcodec.VALUE_TYPE_UINT64,
TypesType: tuplecodec.TpeTypeToEngineType(orderedcodec.VALUE_TYPE_UINT64),
TypesType: tuplecodec.TpeTypeToEngineType(orderedcodec.VALUE_TYPE_UINT64),
}

pkDesc.Attributes = append(pkDesc.Attributes,indexDesc)
pkDesc.Attributes = append(pkDesc.Attributes, indexDesc)

columnIdx++
}

dedupAttrNames := make(map[string]int8)
for _, def := range defs {
if attr,ok := def.(*engine.AttributeDef); ok {
if attr, ok := def.(*engine.AttributeDef); ok {
//attribute has exists?
if _,exist := dedupAttrNames[attr.Attr.Name]; exist {
if _, exist := dedupAttrNames[attr.Attr.Name]; exist {
return errorDuplicateAttributeName
}else{
} else {
dedupAttrNames[attr.Attr.Name] = 1
}

var isPrimaryKey bool = false
//the attribute is the primary key
if _,exist := dedupPKs[attr.Attr.Name]; exist {
if _, exist := dedupPKs[attr.Attr.Name]; exist {
isPrimaryKey = true
}

attrDesc := descriptor.AttributeDesc{
ID: uint32(columnIdx),
Name: attr.Attr.Name,
Ttype: tuplecodec.EngineTypeToTpeType(&attr.Attr.Type),
TypesType: attr.Attr.Type,
Default: attr.Attr.Default,
TypesType: attr.Attr.Type,
Default: attr.Attr.Default,
Is_null: !isPrimaryKey,
Default_value: "",
Is_hidden: false,
Expand All @@ -178,25 +206,25 @@ func (td * TpeDatabase) Create(epoch uint64,name string, defs []engine.TableDef)
Constrains: nil,
}

tableDesc.Attributes = append(tableDesc.Attributes,attrDesc)
tableDesc.Attributes = append(tableDesc.Attributes, attrDesc)

if isPrimaryKey {
indexDesc :=descriptor.IndexDesc_Attribute{
indexDesc := descriptor.IndexDesc_Attribute{
Name: attr.Attr.Name,
Direction: 0,
ID: uint32(columnIdx),
Type: tuplecodec.EngineTypeToTpeType(&attr.Attr.Type),
TypesType: attr.Attr.Type,
}
pkDesc.Attributes = append(pkDesc.Attributes,indexDesc)
pkDesc.Attributes = append(pkDesc.Attributes, indexDesc)
}

columnIdx++
}
}

//create table
_, err := td.computeHandler.CreateTable(epoch,td.id,tableDesc)
_, err := td.computeHandler.CreateTable(epoch, td.id, tableDesc)
if err != nil {
return err
}
Expand Down
33 changes: 30 additions & 3 deletions pkg/vm/engine/tpe/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ package engine

import (
"errors"
"github.com/matrixorigin/matrixcube/pb/metapb"
"github.com/matrixorigin/matrixone/pkg/vm/driver/pb"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tpe/tuplecodec"
"strings"
)

var (
Expand Down Expand Up @@ -76,7 +79,7 @@ func NewTpeEngine(tc *TpeConfig) (*TpeEngine, error) {
ihi := tuplecodec.NewIndexHandlerImpl(tch, nil, kv, uint64(kvLimit), serial, valueLayout, rcc)
ihi.PBKV = tc.PBKV
epoch := tuplecodec.NewEpochHandler(tch, dh, kv)
ch := tuplecodec.NewComputationHandlerImpl(dh, kv, tch, serial, ihi, epoch, tc.ParallelReader)
ch := tuplecodec.NewComputationHandlerImpl(dh, kv, tch, serial, ihi, epoch, tc.ParallelReader, tc.MultiNode)
te.computeHandler = ch
te.dh = dh
return te, nil
Expand Down Expand Up @@ -113,16 +116,40 @@ func (te *TpeEngine) Database(name string) (engine.Database, error) {
if err != nil {
return nil, err
}
storeID := uint64(0)
if te.tpeConfig.Cube != nil {
storeID = te.tpeConfig.Cube.RaftStore().Meta().ID
}
return &TpeDatabase{
id: uint64(dbDesc.ID),
desc: dbDesc,
computeHandler: te.computeHandler,
storeID: storeID,
},
nil
}

func (te *TpeEngine) Node(s string) *engine.NodeInfo {
return &engine.NodeInfo{Mcpu: 1}
func (te *TpeEngine) Node(ip string) *engine.NodeInfo {
var ni *engine.NodeInfo
if te.tpeConfig.Cube != nil {
te.tpeConfig.Cube.RaftStore().GetRouter().Every(uint64(pb.KVGroup), true, func(shard metapb.Shard, store metapb.Store) bool {
if ni != nil {
return false
}
if strings.HasPrefix(store.ClientAddress, ip) {
stats := te.tpeConfig.Cube.RaftStore().GetRouter().GetStoreStats(store.ID)
ni = &engine.NodeInfo{
Mcpu: len(stats.GetCpuUsages()),
}
}
return true
})
} else {
return &engine.NodeInfo{
Mcpu: 1,
}
}
return ni
}

func (te *TpeEngine) RemoveDeletedTable(epoch uint64) error {
Expand Down
Loading

0 comments on commit e1da54f

Please sign in to comment.