Skip to content

Commit

Permalink
Merge pull request pingcap#900 from nieyy/master
Browse files Browse the repository at this point in the history
performance_schema: support insert operation
  • Loading branch information
qiuyesuifeng committed Feb 16, 2016
2 parents 8c7d7d2 + 4e90674 commit 194bdc4
Show file tree
Hide file tree
Showing 6 changed files with 355 additions and 5 deletions.
8 changes: 8 additions & 0 deletions perfschema/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"strings"

"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -80,6 +81,13 @@ func decodeValue(data []byte, cols []*model.ColumnInfo) ([]interface{}, error) {
return rvalues, nil
}

// dumpValue is used for debugging purposes only.
func dumpValue(funcName string, vals []interface{}) {
for _, val := range vals {
log.Debugf("[%s] %T: %v", funcName, val, val)
}
}

// findCol finds column in cols by name.
func findCol(cols []*model.ColumnInfo, name string) *model.ColumnInfo {
for _, col := range cols {
Expand Down
10 changes: 6 additions & 4 deletions perfschema/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ func (ps *perfSchema) buildModel(tbName string, colNames []string, cols []column
var ci *model.ColumnInfo

if col.elems == nil {
ci = buildUsualColumnInfo(colNames[i], col.tp, col.size, col.flag, col.deflt)
ci = buildUsualColumnInfo(i, colNames[i], col.tp, col.size, col.flag, col.deflt)
} else {
ci = buildEnumColumnInfo(colNames[i], col.elems, col.flag, col.deflt)
ci = buildEnumColumnInfo(i, colNames[i], col.elems, col.flag, col.deflt)
}

rcols[i] = ci
Expand All @@ -217,7 +217,7 @@ func (ps *perfSchema) buildModel(tbName string, colNames []string, cols []column
ps.fields[tbName] = fields
}

func buildUsualColumnInfo(name string, tp byte, size int, flag uint, def interface{}) *model.ColumnInfo {
func buildUsualColumnInfo(offset int, name string, tp byte, size int, flag uint, def interface{}) *model.ColumnInfo {
mCharset := charset.CharsetBin
mCollation := charset.CharsetBin
if tp == mysql.TypeString || tp == mysql.TypeVarchar || tp == mysql.TypeBlob || tp == mysql.TypeLongBlob {
Expand All @@ -237,13 +237,14 @@ func buildUsualColumnInfo(name string, tp byte, size int, flag uint, def interfa
}
colInfo := &model.ColumnInfo{
Name: model.NewCIStr(name),
Offset: offset,
FieldType: fieldType,
DefaultValue: def,
}
return colInfo
}

func buildEnumColumnInfo(name string, elems []string, flag uint, def interface{}) *model.ColumnInfo {
func buildEnumColumnInfo(offset int, name string, elems []string, flag uint, def interface{}) *model.ColumnInfo {
mCharset := charset.CharsetBin
mCollation := charset.CharsetBin
if def == nil {
Expand All @@ -258,6 +259,7 @@ func buildEnumColumnInfo(name string, elems []string, flag uint, def interface{}
}
colInfo := &model.ColumnInfo{
Name: model.NewCIStr(name),
Offset: offset,
FieldType: fieldType,
DefaultValue: def,
}
Expand Down
280 changes: 280 additions & 0 deletions perfschema/insert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package perfschema

import (
"strings"
"sync/atomic"

"github.com/juju/errors"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/codec"
"github.com/syndtr/goleveldb/leveldb"
)

// InsertValues is the rest part of insert/replace into statement.
type InsertValues struct {
ColNames []string
Lists [][]expression.Expression
TableIdent table.Ident
Setlist []*expression.Assignment
}

// Simulate the behavior of an INSERT statement.
func (ps *perfSchema) ExecInsert(insertVals *InsertValues) error {
name := strings.ToUpper(insertVals.TableIdent.Name.O)
t := ps.getTable(name)
if t == nil {
return errors.Errorf("INSERT INTO %s: operation not permitted", insertVals.TableIdent)
}

cols, err := ps.getColumns(insertVals, t.Columns)
if err != nil {
return errors.Trace(err)
}

rows, err := ps.getRows(insertVals, t, cols)
if err != nil {
return errors.Trace(err)
}

return ps.addRecords(name, rows)
}

func (ps *perfSchema) addRecords(tbName string, rows [][]interface{}) error {
var store *leveldb.DB
var lastLsn uint64

// Same as MySQL, we only support INSERT operations for setup_actors & setup_objects.
switch tbName {
case TableSetupActors:
store = ps.stores[TableSetupActors]
lastLsn = atomic.AddUint64(ps.lsns[TableSetupActors], uint64(len(rows)))
case TableSetupObjects:
store = ps.stores[TableSetupObjects]
lastLsn = atomic.AddUint64(ps.lsns[TableSetupObjects], uint64(len(rows)))
default:
return errors.Errorf("INSERT INTO %s.%s: operation not permitted", Name, tbName)
}

batch := pool.Get().(*leveldb.Batch)
defer func() {
batch.Reset()
pool.Put(batch)
}()

for i, row := range rows {
lsn := lastLsn - uint64(len(rows)) + uint64(i)
rawKey := []interface{}{uint64(lsn)}
key, err := codec.EncodeKey(nil, rawKey...)
if err != nil {
return errors.Trace(err)
}
val, err := codec.EncodeValue(nil, row...)
if err != nil {
return errors.Trace(err)
}
batch.Put(key, val)
}

err := store.Write(batch, nil)
return errors.Trace(err)
}

func checkValueCount(insertVals *InsertValues, insertValueCount, valueCount, num int, cols []*model.ColumnInfo) error {
if insertValueCount != valueCount {
// "insert into t values (), ()" is valid.
// "insert into t values (), (1)" is not valid.
// "insert into t values (1), ()" is not valid.
// "insert into t values (1,2), (1)" is not valid.
// So the value count must be same for all insert list.
return errors.Errorf("Column count doesn't match value count at row %d", num+1)
}
if valueCount == 0 && len(insertVals.ColNames) > 0 {
// "insert into t (c1) values ()" is not valid.
return errors.Errorf("INSERT INTO %s: expected %d value(s), have %d", insertVals.TableIdent, len(insertVals.ColNames), 0)
} else if valueCount > 0 && valueCount != len(cols) {
return errors.Errorf("INSERT INTO %s: expected %d value(s), have %d", insertVals.TableIdent, len(cols), valueCount)
}

return nil
}

func fillRowData(t *model.TableInfo, cols []*model.ColumnInfo, vals []interface{}) ([]interface{}, error) {
row := make([]interface{}, len(t.Columns))
marked := make(map[int]struct{}, len(vals))
for i, v := range vals {
offset := cols[i].Offset
row[offset] = v
marked[offset] = struct{}{}
}
err := initDefaultValues(t, row, marked)
if err != nil {
return nil, errors.Trace(err)
}
if err = castValues(row, cols); err != nil {
return nil, errors.Trace(err)
}
if err = checkNotNulls(t.Columns, row); err != nil {
return nil, errors.Trace(err)
}
return row, nil
}

func fillValueList(insertVals *InsertValues) error {
if len(insertVals.Setlist) > 0 {
if len(insertVals.Lists) > 0 {
return errors.Errorf("INSERT INTO %s: set type should not use values", insertVals.TableIdent)
}

var l []expression.Expression
for _, v := range insertVals.Setlist {
l = append(l, v.Expr)
}
insertVals.Lists = append(insertVals.Lists, l)
}

return nil
}

func getColumnDefaultValues(cols []*model.ColumnInfo) (map[interface{}]interface{}, error) {
defaultValMap := map[interface{}]interface{}{}
for _, col := range cols {
if value, ok, err := getColDefaultValue(col); ok {
if err != nil {
return nil, errors.Trace(err)
}

defaultValMap[col.Name.L] = value
}
}

return defaultValMap, nil
}

// There are three types of insert statements:
// 1 insert ... values(...) --> name type column
// 2 insert ... set x=y... --> set type column
// 3 insert ... (select ..) --> name type column
// See: https://dev.mysql.com/doc/refman/5.7/en/insert.html
func (ps *perfSchema) getColumns(insertVals *InsertValues, tableCols []*model.ColumnInfo) ([]*model.ColumnInfo, error) {
var cols []*model.ColumnInfo
var err error

if len(insertVals.Setlist) > 0 {
// Process `set` type column.
columns := make([]string, 0, len(insertVals.Setlist))
for _, v := range insertVals.Setlist {
columns = append(columns, v.ColName)
}

cols, err = findCols(tableCols, columns)
if err != nil {
return nil, errors.Errorf("INSERT INTO %s: %s", insertVals.TableIdent, err)
}

if len(cols) == 0 {
return nil, errors.Errorf("INSERT INTO %s: empty column", insertVals.TableIdent)
}
} else {
// Process `name` type column.
cols, err = findCols(tableCols, insertVals.ColNames)
if err != nil {
return nil, errors.Errorf("INSERT INTO %s: %s", insertVals.TableIdent, err)
}

// If cols are empty, use all columns instead.
if len(cols) == 0 {
cols = tableCols
}
}

return cols, nil
}

func (ps *perfSchema) getRows(insertVals *InsertValues, t *model.TableInfo, cols []*model.ColumnInfo) (rows [][]interface{}, err error) {
// process `insert|replace ... set x=y...`
if err = fillValueList(insertVals); err != nil {
return nil, errors.Trace(err)
}

evalMap, err := getColumnDefaultValues(cols)
if err != nil {
return nil, errors.Trace(err)
}

rows = make([][]interface{}, len(insertVals.Lists))
for i, list := range insertVals.Lists {
if err = checkValueCount(insertVals, len(insertVals.Lists[0]), len(list), i, cols); err != nil {
return nil, errors.Trace(err)
}

vals := make([]interface{}, len(list))
for j, expr := range list {
// For "insert into t values (default)" Default Eval.
evalMap[expression.ExprEvalDefaultName] = cols[j].Name.O

vals[j], err = expr.Eval(nil, evalMap)
if err != nil {
return nil, errors.Trace(err)
}
}

rows[i], err = fillRowData(t, cols, vals)
if err != nil {
return nil, errors.Trace(err)
}
}
return
}

func (ps *perfSchema) getTable(tbName string) *model.TableInfo {
// Same as MySQL, we only support INSERT operations for setup_actors & setup_objects.
switch tbName {
case TableSetupActors:
case TableSetupObjects:
default:
return nil
}
return ps.tables[tbName]
}

func initDefaultValues(t *model.TableInfo, row []interface{}, marked map[int]struct{}) error {
var defaultValueCols []*model.ColumnInfo
for i, c := range t.Columns {
if row[i] != nil {
// Column value is not nil, continue.
continue
}
// If the nil value is evaluated in insert list, we will use nil.
if _, ok := marked[i]; ok {
continue
}

var value interface{}
value, _, err := getColDefaultValue(c)
if err != nil {
return errors.Trace(err)
}
row[i] = value
defaultValueCols = append(defaultValueCols, c)
}

if err := castValues(row, defaultValueCols); err != nil {
return errors.Trace(err)
}
return nil
}
3 changes: 3 additions & 0 deletions perfschema/perfschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
type PerfSchema interface {
// For SELECT statement only.
NewPerfSchemaPlan(tableName string) (plan.Plan, error)

// For INSERT statement only.
ExecInsert(insertVals *InsertValues) error
}

type perfSchema struct {
Expand Down
Loading

0 comments on commit 194bdc4

Please sign in to comment.