Skip to content

Commit

Permalink
support table metadata (arana-db#116)
Browse files Browse the repository at this point in the history
Co-authored-by: Dong Jianhui <[email protected]>
  • Loading branch information
Mulavar and Dong Jianhui authored Apr 19, 2022
1 parent 49bfcc0 commit 81dc89a
Show file tree
Hide file tree
Showing 9 changed files with 437 additions and 30 deletions.
68 changes: 68 additions & 0 deletions pkg/proto/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package proto

import (
"strings"
)

type TableMetadata struct {
Name string
Columns map[string]*ColumnMetadata
Indexes map[string]*IndexMetadata
ColumnNames []string
PrimaryKeyColumns []string
}

func NewTableMetadata(name string, columnMetadataList []*ColumnMetadata, indexMetadataList []*IndexMetadata) *TableMetadata {
tma := &TableMetadata{
Name: name,
Columns: make(map[string]*ColumnMetadata, 0),
Indexes: make(map[string]*IndexMetadata, 0),
ColumnNames: make([]string, len(columnMetadataList)),
PrimaryKeyColumns: make([]string, 0),
}
for i, columnMetadata := range columnMetadataList {
columnName := strings.ToLower(columnMetadata.Name)
tma.ColumnNames[i] = columnName
tma.Columns[columnName] = columnMetadata
if columnMetadata.PrimaryKey {
tma.PrimaryKeyColumns = append(tma.PrimaryKeyColumns, columnName)
}
}
for _, indexMetadata := range indexMetadataList {
indexName := strings.ToLower(indexMetadata.Name)
tma.Indexes[indexName] = indexMetadata
}

return tma
}

type ColumnMetadata struct {
Name string
// TODO int32
DataType string
Ordinal string
PrimaryKey bool
Generated bool
CaseSensitive bool
}

type IndexMetadata struct {
Name string
}
8 changes: 6 additions & 2 deletions pkg/proto/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

//go:generate mockgen -destination=../../testdata/mock_runtime.go -package=testdata . VConn,Plan,Optimizer,DB
//go:generate mockgen -destination=../../testdata/mock_runtime.go -package=testdata . VConn,Plan,Optimizer,DB,SchemaLoader
package proto

import (
Expand Down Expand Up @@ -56,7 +56,7 @@ type (
// Optimizer represents a sql statement optimizer which can be used to create QueryPlan or ExecPlan.
Optimizer interface {
// Optimize optimizes the sql with arguments then returns a Plan.
Optimize(ctx context.Context, stmt ast.StmtNode, args ...interface{}) (Plan, error)
Optimize(ctx context.Context, conn VConn, stmt ast.StmtNode, args ...interface{}) (Plan, error)
}

// Weight represents the read/write weight info.
Expand Down Expand Up @@ -113,4 +113,8 @@ type (
// Rollback rollbacks current transaction.
Rollback(ctx context.Context) (Result, uint16, error)
}

SchemaLoader interface {
Load(ctx context.Context, conn VConn, tables []string) map[string]*TableMetadata
}
)
171 changes: 171 additions & 0 deletions pkg/proto/schema_manager/loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package schema_manager

import (
"context"
"fmt"
"strings"
)

import (
"github.com/arana-db/arana/pkg/mysql"
"github.com/arana-db/arana/pkg/proto"
rcontext "github.com/arana-db/arana/pkg/runtime/context"
"github.com/arana-db/arana/pkg/util/log"
)

const (
orderByOrdinalPosition = " ORDER BY ORDINAL_POSITION"
// TODO add table schema filter
tableMetadataNoOrder = "SELECT TABLE_NAME, COLUMN_NAME, DATA_TYPE, COLUMN_KEY, EXTRA, COLLATION_NAME, ORDINAL_POSITION FROM information_schema.columns WHERE"
tableMetadataSQL = tableMetadataNoOrder + orderByOrdinalPosition
tableMetadataSQLInTables = tableMetadataNoOrder + " TABLE_NAME IN (%s)" + orderByOrdinalPosition
indexMetadataSQL = "SELECT TABLE_NAME, INDEX_NAME FROM information_schema.statistics WHERE TABLE_NAME IN (%s)"
)

type SimpleSchemaLoader struct {
Schema string
}

func (l *SimpleSchemaLoader) Load(ctx context.Context, conn proto.VConn, tables []string) map[string]*proto.TableMetadata {
ctx = rcontext.WithRead(rcontext.WithDirect(ctx))
var (
tableMetadataMap = make(map[string]*proto.TableMetadata, len(tables))
indexMetadataMap map[string][]*proto.IndexMetadata
columnMetadataMap map[string][]*proto.ColumnMetadata
)
columnMetadataMap = l.LoadColumnMetadataMap(ctx, conn, tables)
if columnMetadataMap != nil {
indexMetadataMap = l.LoadIndexMetadata(ctx, conn, tables)
}

for tableName, columns := range columnMetadataMap {
tableMetadataMap[tableName] = proto.NewTableMetadata(tableName, columns, indexMetadataMap[tableName])
}

return tableMetadataMap
}

func (l *SimpleSchemaLoader) LoadColumnMetadataMap(ctx context.Context, conn proto.VConn, tables []string) map[string][]*proto.ColumnMetadata {
resultSet, err := conn.Query(ctx, l.Schema, getColumnMetadataSQL(tables))
if err != nil {
return nil
}
result := make(map[string][]*proto.ColumnMetadata, 0)
if err != nil {
log.Errorf("Load ColumnMetadata error when call db: %v", err)
return nil
}
if resultSet == nil {
log.Error("Load ColumnMetadata error because the result is nil")
return nil
}

for _, row := range resultSet.GetRows() {
var innerRow mysql.Row
switch r := row.(type) {
case *mysql.BinaryRow:
innerRow = r.Row
case *mysql.Row:
innerRow = *r
case *mysql.TextRow:
innerRow = r.Row
}
textRow := mysql.TextRow{Row: innerRow}
rowValues, err := textRow.Decode()
if err != nil {
//logger.Errorf("Load ColumnMetadata error when decode text row: %v", err)
return nil
}
tableName := convertInterfaceToStrNullable(rowValues[0].Val)
columnName := convertInterfaceToStrNullable(rowValues[1].Val)
dataType := convertInterfaceToStrNullable(rowValues[2].Val)
columnKey := convertInterfaceToStrNullable(rowValues[3].Val)
extra := convertInterfaceToStrNullable(rowValues[4].Val)
collationName := convertInterfaceToStrNullable(rowValues[5].Val)
ordinalPosition := convertInterfaceToStrNullable(rowValues[6].Val)
result[tableName] = append(result[tableName], &proto.ColumnMetadata{
Name: columnName,
DataType: dataType,
Ordinal: ordinalPosition,
PrimaryKey: strings.EqualFold("PRI", columnKey),
Generated: strings.EqualFold("auto_increment", extra),
CaseSensitive: columnKey != "" && !strings.HasSuffix(collationName, "_ci"),
})
}
return result
}

func convertInterfaceToStrNullable(value interface{}) string {
if value != nil {
return string(value.([]byte))
}
return ""
}

func (l *SimpleSchemaLoader) LoadIndexMetadata(ctx context.Context, conn proto.VConn, tables []string) map[string][]*proto.IndexMetadata {
resultSet, err := conn.Query(ctx, l.Schema, getIndexMetadataSQL(tables))
if err != nil {
return nil
}
result := make(map[string][]*proto.IndexMetadata, 0)

for _, row := range resultSet.GetRows() {
var innerRow mysql.Row
switch r := row.(type) {
case *mysql.BinaryRow:
innerRow = r.Row
case *mysql.Row:
innerRow = *r
case *mysql.TextRow:
innerRow = r.Row
}
textRow := mysql.TextRow{Row: innerRow}
rowValues, err := textRow.Decode()
if err != nil {
log.Errorf("Load ColumnMetadata error when decode text row: %v", err)
return nil
}
tableName := convertInterfaceToStrNullable(rowValues[0].Val)
indexName := convertInterfaceToStrNullable(rowValues[1].Val)
result[tableName] = append(result[tableName], &proto.IndexMetadata{Name: indexName})
}

return result
}

func getIndexMetadataSQL(tables []string) string {
tableParamList := make([]string, 0, len(tables))
for _, table := range tables {
tableParamList = append(tableParamList, "'"+table+"'")
}
return fmt.Sprintf(indexMetadataSQL, strings.Join(tableParamList, ","))
}

func getColumnMetadataSQL(tables []string) string {
if len(tables) == 0 {
return tableMetadataSQL
}
tableParamList := make([]string, len(tables))
for i, table := range tables {
tableParamList[i] = "'" + table + "'"
}
// TODO use strings.Builder in the future
return fmt.Sprintf(tableMetadataSQLInTables, strings.Join(tableParamList, ","))
}
62 changes: 62 additions & 0 deletions pkg/proto/schema_manager/loader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package schema_manager

import (
"context"
"testing"
)

import (
"github.com/arana-db/arana/pkg/config"
"github.com/arana-db/arana/pkg/proto"
"github.com/arana-db/arana/pkg/runtime"
"github.com/arana-db/arana/pkg/runtime/namespace"
)

func TestLoader(t *testing.T) {
t.Skip()
node := &config.Node{
Name: "arana-node-1",
Host: "arana-mysql",
Port: 3306,
Username: "root",
Password: "123456",
Database: "employees",
ConnProps: nil,
Weight: "r10w10",
Labels: nil,
}
groupName := "employees_0000"
cmds := make([]namespace.Command, 0)
cmds = append(cmds, namespace.UpsertDB(groupName, runtime.NewAtomDB(node)))
namespaceName := "dongjianhui"
ns := namespace.New(namespaceName, nil, cmds...)
namespace.Register(ns)
rt, err := runtime.Load(namespaceName)
if err != nil {
panic(err)
}
schemeName := "employees"
tableName := "employees"
s := &SimpleSchemaLoader{
Schema: schemeName,
}

s.Load(context.Background(), rt.(proto.VConn), []string{tableName})
}
25 changes: 20 additions & 5 deletions pkg/runtime/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ const (
)

type (
keyFlag struct{}
keyRule struct{}
keySequence struct{}
keySql struct{}
keyNodeLabel struct{}
keyFlag struct{}
keyRule struct{}
keySequence struct{}
keySql struct{}
keyNodeLabel struct{}
keyDefaultDBGroup struct{}
)

type cFlag uint8
Expand All @@ -58,6 +59,11 @@ func WithSQL(ctx context.Context, sql string) context.Context {
return context.WithValue(ctx, keySql{}, sql)
}

// WithDBGroup binds the default db.
func WithDBGroup(ctx context.Context, group string) context.Context {
return context.WithValue(ctx, keyDefaultDBGroup{}, group)
}

// WithRule binds a rule.
func WithRule(ctx context.Context, ru *rule.Rule) context.Context {
return context.WithValue(ctx, keyRule{}, ru)
Expand Down Expand Up @@ -87,6 +93,15 @@ func Sequencer(ctx context.Context) proto.Sequencer {
return s
}

// DBGroup extracts the db.
func DBGroup(ctx context.Context) string {
db, ok := ctx.Value(keyDefaultDBGroup{}).(string)
if !ok {
return ""
}
return db
}

// Rule extracts the rule.
func Rule(ctx context.Context) *rule.Rule {
ru, ok := ctx.Value(keyRule{}).(*rule.Rule)
Expand Down
Loading

0 comments on commit 81dc89a

Please sign in to comment.