Skip to content

Commit

Permalink
*: add unique key info into schema (pingcap#2376)
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros authored and shenli committed Jan 6, 2017
1 parent 1b72b11 commit 1dd0945
Show file tree
Hide file tree
Showing 7 changed files with 449 additions and 10 deletions.
37 changes: 37 additions & 0 deletions expression/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ func EvaluateExprWithNull(ctx context.Context, schema Schema, expr Expression) (
// TableInfo2Schema converts table info to schema.
func TableInfo2Schema(tbl *model.TableInfo) Schema {
schema := NewSchema(make([]*Column, 0, len(tbl.Columns)))
keys := make([]KeyInfo, 0, len(tbl.Indices)+1)
for i, col := range tbl.Columns {
newCol := &Column{
ColName: col.Name,
Expand All @@ -269,6 +270,42 @@ func TableInfo2Schema(tbl *model.TableInfo) Schema {
}
schema.Append(newCol)
}
for _, idx := range tbl.Indices {
if !idx.Unique || idx.State != model.StatePublic {
continue
}
ok := true
newKey := make([]*Column, 0, len(idx.Columns))
for _, idxCol := range idx.Columns {
find := false
for i, col := range tbl.Columns {
if idxCol.Name.L == col.Name.L {
if !mysql.HasNotNullFlag(col.Flag) {
break
}
newKey = append(newKey, schema.Columns[i])
find = true
break
}
}
if !find {
ok = false
break
}
}
if ok {
keys = append(keys, newKey)
}
}
if tbl.PKIsHandle {
for i, col := range tbl.Columns {
if mysql.HasPriKeyFlag(col.Flag) {
keys = append(keys, []*Column{schema.Columns[i]})
break
}
}
}
schema.SetUniqueKeys(keys)
return schema
}

Expand Down
57 changes: 54 additions & 3 deletions expression/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,23 @@ import (
"github.com/pingcap/tidb/ast"
)

// Schema stands for the row schema get from input.
// KeyInfo stores the columns of one unique key or primary key.
type KeyInfo []*Column

// Clone copies the entire UniqueKey.
func (ki KeyInfo) Clone() KeyInfo {
result := make([]*Column, 0, len(ki))
for _, col := range ki {
newCol := *col
result = append(result, &newCol)
}
return result
}

// Schema stands for the row schema and unique key information get from input.
type Schema struct {
Columns []*Column
Keys []KeyInfo
}

// String implements fmt.Stringer interface.
Expand All @@ -31,16 +45,29 @@ func (s Schema) String() string {
for _, col := range s.Columns {
colStrs = append(colStrs, col.String())
}
return "[" + strings.Join(colStrs, ",") + "]"
ukStrs := make([]string, 0, len(s.Keys))
for _, key := range s.Keys {
ukColStrs := make([]string, 0, len(key))
for _, col := range key {
ukColStrs = append(ukColStrs, col.String())
}
ukStrs = append(ukStrs, "["+strings.Join(ukColStrs, ",")+"]")
}
return "Column: [" + strings.Join(colStrs, ",") + "] Unique key: [" + strings.Join(ukStrs, ",") + "]"
}

// Clone copies the total schema.
func (s Schema) Clone() Schema {
result := NewSchema(make([]*Column, 0, s.Len()))
keys := make([]KeyInfo, 0, len(s.Keys))
for _, col := range s.Columns {
newCol := *col
result.Append(&newCol)
}
for _, key := range s.Keys {
keys = append(keys, key.Clone())
}
result.SetUniqueKeys(keys)
return result
}

Expand Down Expand Up @@ -102,9 +129,33 @@ func (s *Schema) Append(col *Column) {
s.Columns = append(s.Columns, col)
}

// SetUniqueKeys will set the value of Schema.Keys.
func (s *Schema) SetUniqueKeys(keys []KeyInfo) {
s.Keys = keys
}

// GetColumnsIndices will return a slice which contains the position of each column in schema.
// If there is one column that doesn't match, nil will be returned.
func (s Schema) GetColumnsIndices(cols []*Column) (ret []int) {
ret = make([]int, 0, len(cols))
for _, col := range cols {
pos := s.GetColumnIndex(col)
if pos != -1 {
ret = append(ret, pos)
} else {
return nil
}
}
return
}

// MergeSchema will merge two schema into one schema.
func MergeSchema(lSchema, rSchema Schema) Schema {
return NewSchema(append(lSchema.Clone().Columns, rSchema.Clone().Columns...))
tmpL := lSchema.Clone()
tmpR := rSchema.Clone()
ret := NewSchema(append(tmpL.Columns, tmpR.Columns...))
ret.SetUniqueKeys(append(tmpL.Keys, tmpR.Keys...))
return ret
}

// NewSchema returns a schema made by its parameter.
Expand Down
210 changes: 210 additions & 0 deletions plan/build_key_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package plan

import (
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/mysql"
)

// A bijection exists between columns of an aggregation's schema and this aggregation's aggFuncs.
// Sometimes we need a schema made by arg of aggFuncs to convert a column in child's schema to a column in this aggregation's Schema.
func (p *Aggregation) buildSchemaByAggFuncs() expression.Schema {
schema := expression.NewSchema(make([]*expression.Column, 0, p.schema.Len()))
for _, fun := range p.AggFuncs {
if col, isCol := fun.GetArgs()[0].(*expression.Column); isCol && fun.GetName() == ast.AggFuncFirstRow {
schema.Append(col)
} else {
// If the arg is not a column, we add a column to occupy the position.
schema.Append(&expression.Column{
Position: -1})
}
}
return schema
}

func (p *Aggregation) buildKeyInfo() {
p.baseLogicalPlan.buildKeyInfo()
// dealing with p.AggFuncs
schemaByFuncs := p.buildSchemaByAggFuncs()
for _, key := range p.GetChildren()[0].GetSchema().Keys {
indices := schemaByFuncs.GetColumnsIndices(key)
if indices == nil {
continue
}
newKey := make([]*expression.Column, 0, len(key))
for _, i := range indices {
newKey = append(newKey, p.schema.Columns[i])
}
p.schema.Keys = append(p.schema.Keys, newKey)
}
// dealing with p.GroupbyCols
// This is only used for optimization and needn't to be pushed up, so only one is enough.
schemaByGroupby := expression.NewSchema(p.groupByCols)
for _, key := range p.GetChildren()[0].GetSchema().Keys {
indices := schemaByGroupby.GetColumnsIndices(key)
if indices == nil {
continue
}
newKey := make([]*expression.Column, 0, len(key))
for _, i := range indices {
newKey = append(newKey, schemaByGroupby.Columns[i])
}
p.schema.Keys = append(p.schema.Keys, newKey)
break
}
}

// A bijection exists between columns of a projection's schema and this projection's Exprs.
// Sometimes we need a schema made by expr of Exprs to convert a column in child's schema to a column in this projection's Schema.
func (p *Projection) buildSchemaByExprs() expression.Schema {
schema := expression.NewSchema(make([]*expression.Column, 0, p.schema.Len()))
for _, expr := range p.Exprs {
if col, isCol := expr.(*expression.Column); isCol {
schema.Append(col)
} else {
// If the expression is not a column, we add a column to occupy the position.
schema.Append(&expression.Column{
Position: -1})
}
}
return schema
}

func (p *Projection) buildKeyInfo() {
p.baseLogicalPlan.buildKeyInfo()
schema := p.buildSchemaByExprs()
for _, key := range p.GetChildren()[0].GetSchema().Keys {
indices := schema.GetColumnsIndices(key)
if indices == nil {
continue
}
newKey := make([]*expression.Column, 0, len(key))
for _, i := range indices {
newKey = append(newKey, p.schema.Columns[i])
}
p.schema.Keys = append(p.schema.Keys, newKey)
}
}

func (p *Trim) buildKeyInfo() {
p.baseLogicalPlan.buildKeyInfo()
for _, key := range p.children[0].GetSchema().Keys {
ok := true
newKey := make([]*expression.Column, 0, len(key))
for _, col := range key {
pos := p.schema.GetColumnIndex(col)
if pos == -1 {
ok = false
break
}
newKey = append(newKey, p.schema.Columns[pos])
}
if ok {
p.schema.Keys = append(p.schema.Keys, newKey)
}
}
}

func (p *Join) buildKeyInfo() {
p.baseLogicalPlan.buildKeyInfo()
switch p.JoinType {
case SemiJoin, SemiJoinWithAux:
p.schema.Keys = p.children[0].GetSchema().Clone().Keys
case InnerJoin, LeftOuterJoin, RightOuterJoin:
// If there is no equal conditions, then cartesian product can't be prevented and unique key information will destroy.
if len(p.EqualConditions) == 0 {
return
}
lOk := false
rOk := false
// Such as 'select * from t1 join t2 where t1.a = t2.a and t1.b = t2.b'.
// If one sides (a, b) is a unique key, then the unique key information is remained.
// But we don't consider this situation currently.
// Only key made by one column is considered now.
for _, expr := range p.EqualConditions {
ln := expr.GetArgs()[0].(*expression.Column)
rn := expr.GetArgs()[1].(*expression.Column)
for _, key := range p.children[0].GetSchema().Keys {
if len(key) == 1 && key[0].Equal(ln, p.ctx) {
lOk = true
break
}
}
for _, key := range p.children[1].GetSchema().Keys {
if len(key) == 1 && key[0].Equal(rn, p.ctx) {
rOk = true
break
}
}
}
// For inner join, if one side of one equal condition is unique key,
// another side's unique key information will all be reserved.
// If it's an outer join, NULL value will fill some position, which will destroy the unique key information.
if lOk && p.JoinType != LeftOuterJoin {
p.schema.Keys = append(p.schema.Keys, p.children[1].GetSchema().Keys...)
}
if rOk && p.JoinType != RightOuterJoin {
p.schema.Keys = append(p.schema.Keys, p.children[0].GetSchema().Keys...)
}
}
}

func (p *DataSource) buildKeyInfo() {
p.baseLogicalPlan.buildKeyInfo()
indices, _ := availableIndices(p.indexHints, p.tableInfo)
for _, idx := range indices {
if !idx.Unique {
continue
}
newKey := make([]*expression.Column, 0, len(idx.Columns))
ok := true
for _, idxCol := range idx.Columns {
// The columns of this index should all occur in column schema.
// Since null value could be duplicate in unique key. So we check NotNull flag of every column.
find := false
for i, col := range p.schema.Columns {
if idxCol.Name.L == col.ColName.L {
if !mysql.HasNotNullFlag(p.Columns[i].Flag) {
break
}
newKey = append(newKey, p.schema.Columns[i])
find = true
break
}
}
if !find {
ok = false
break
}
}
if ok {
p.schema.Keys = append(p.schema.Keys, newKey)
}
}
if p.tableInfo.PKIsHandle {
for i, col := range p.Columns {
if mysql.HasPriKeyFlag(col.Flag) {
p.schema.Keys = append(p.schema.Keys, []*expression.Column{p.schema.Columns[i]})
break
}
}
}
}

func (p *Apply) buildKeyInfo() {
p.baseLogicalPlan.buildKeyInfo()
p.schema.Keys = append(p.children[0].GetSchema().Clone().Keys, p.children[1].GetSchema().Clone().Keys...)
}
3 changes: 1 addition & 2 deletions plan/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ func (b *planBuilder) buildResultSetNode(node ast.ResultSetNode) LogicalPlan {
v.TableAsName = &x.AsName
}
if x.AsName.L != "" {
schema := p.GetSchema()
for _, col := range schema.Columns {
for _, col := range p.GetSchema().Columns {
col.TblName = x.AsName
col.DBName = model.NewCIStr("")
}
Expand Down
Loading

0 comments on commit 1dd0945

Please sign in to comment.