Skip to content

Commit

Permalink
plan: remove unionscan schema. (pingcap#5119)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored Nov 18, 2017
1 parent 393da08 commit fccc540
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 106 deletions.
24 changes: 9 additions & 15 deletions plan/column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,22 @@ func (p *Union) PruneColumns(parentUsedCols []*expression.Column) {
}
}

// PruneColumns implements LogicalPlan interface.
func (p *LogicalUnionScan) PruneColumns(parentUsedCols []*expression.Column) {
for _, col := range p.schema.TblID2Handle {
parentUsedCols = append(parentUsedCols, col[0])
}
p.children[0].(LogicalPlan).PruneColumns(parentUsedCols)
p.SetSchema(p.children[0].Schema())
}

// PruneColumns implements LogicalPlan interface.
func (p *DataSource) PruneColumns(parentUsedCols []*expression.Column) {
used := getUsedList(parentUsedCols, p.schema)
p.pruneUnionScanSchema(used)
handleIdx := -1 // -1 for not found.
for _, col := range p.schema.TblID2Handle {
handleIdx = col[0].Index
}
if p.unionScanSchema != nil {
used[handleIdx] = true
}
firstCol, firstColInfo := p.schema.Columns[0], p.Columns[0]
for i := len(used) - 1; i >= 0; i-- {
if !used[i] {
Expand All @@ -186,17 +191,6 @@ func (p *DataSource) PruneColumns(parentUsedCols []*expression.Column) {
}
}

func (p *DataSource) pruneUnionScanSchema(usedMask []bool) {
if p.unionScanSchema == nil {
return
}
for i := p.unionScanSchema.Len() - 1; i >= 0; i-- {
if !usedMask[i] {
p.unionScanSchema.Columns = append(p.unionScanSchema.Columns[:i], p.unionScanSchema.Columns[i+1:]...)
}
}
}

// PruneColumns implements LogicalPlan interface.
func (p *TableDual) PruneColumns(_ []*expression.Column) {
}
Expand Down
2 changes: 1 addition & 1 deletion plan/dag_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderUnionScan(c *C) {
},
{
sql: "select c from t where c = 1",
best: "IndexReader(Index(t.c_d_e)[[1,1]])->UnionScan([eq(test.t.c, 1)])",
best: "IndexReader(Index(t.c_d_e)[[1,1]])->UnionScan([eq(test.t.c, 1)])->Projection",
},
}
for _, tt := range tests {
Expand Down
6 changes: 6 additions & 0 deletions plan/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ func (p Selection) init(ctx context.Context) *Selection {
return &p
}

func (p LogicalUnionScan) init(ctx context.Context) *LogicalUnionScan {
p.basePlan = newBasePlan(TypeUnionScan, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
return &p
}

func (p Projection) init(ctx context.Context) *Projection {
p.basePlan = newBasePlan(TypeProj, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
Expand Down
81 changes: 33 additions & 48 deletions plan/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ func (b *planBuilder) buildResultSetNode(node ast.ResultSetNode) LogicalPlan {
case *ast.TableName:
p = b.buildDataSource(v)
default:
b.err = ErrUnsupportedType.Gen("unsupported table source type %T", v)
return nil
b.err = ErrUnsupportedType.GenByArgs(v)
}
if b.err != nil {
return nil
Expand Down Expand Up @@ -218,15 +217,8 @@ func extractOnCondition(conditions []expression.Expression, left LogicalPlan, ri
}

func extractTableAlias(p LogicalPlan) *model.CIStr {
if dataSource, ok := p.(*DataSource); ok {
if dataSource.TableAsName.L != "" {
return dataSource.TableAsName
}
return &dataSource.tableInfo.Name
} else if len(p.Schema().Columns) > 0 {
if p.Schema().Columns[0].TblName.L != "" {
return &(p.Schema().Columns[0].TblName)
}
if p.Schema().Len() > 0 && p.Schema().Columns[0].TblName.L != "" {
return &(p.Schema().Columns[0].TblName)
}
return nil
}
Expand Down Expand Up @@ -1566,7 +1558,7 @@ func (b *planBuilder) buildDataSource(tn *ast.TableName) LogicalPlan {
statisticTable = handle.GetTableStats(tableInfo.ID)
}

p := DataSource{
ds := DataSource{
indexHints: tn.IndexHints,
tableInfo: tableInfo,
statisticTable: statisticTable,
Expand All @@ -1582,37 +1574,26 @@ func (b *planBuilder) buildDataSource(tn *ast.TableName) LogicalPlan {
} else {
columns = tbl.Cols()
}
var pkCol *expression.Column
p.Columns = make([]*model.ColumnInfo, 0, len(columns))
var handleCol *expression.Column
ds.Columns = make([]*model.ColumnInfo, 0, len(columns))
schema := expression.NewSchema(make([]*expression.Column, 0, len(columns))...)
for i, col := range columns {
p.Columns = append(p.Columns, col.ToInfo())
ds.Columns = append(ds.Columns, col.ToInfo())
schema.Append(&expression.Column{
FromID: p.id,
FromID: ds.id,
ColName: col.Name,
TblName: tableInfo.Name,
DBName: schemaName,
RetType: &col.FieldType,
Position: i,
ID: col.ID})
if tableInfo.PKIsHandle && mysql.HasPriKeyFlag(col.Flag) {
pkCol = schema.Columns[schema.Len()-1]
}
}
needUnionScan := b.ctx.Txn() != nil && !b.ctx.Txn().IsReadOnly()
if b.needColHandle == 0 && !needUnionScan {
p.SetSchema(schema)
return b.projectVirtualColumns(p, columns)
}
if needUnionScan {
p.unionScanSchema = expression.NewSchema(make([]*expression.Column, 0, len(tableInfo.Columns))...)
for _, col := range schema.Columns {
p.unionScanSchema.Append(col)
handleCol = schema.Columns[schema.Len()-1]
}
}
if pkCol == nil {
idCol := &expression.Column{
FromID: p.id,
if handleCol == nil {
handleCol = &expression.Column{
FromID: ds.id,
DBName: schemaName,
TblName: tableInfo.Name,
ColName: model.NewCIStr("_rowid"),
Expand All @@ -1621,30 +1602,35 @@ func (b *planBuilder) buildDataSource(tn *ast.TableName) LogicalPlan {
Index: schema.Len(),
ID: model.ExtraHandleID,
}
if needUnionScan && b.needColHandle > 0 {
p.unionScanSchema.Columns = append(p.unionScanSchema.Columns, idCol)
p.unionScanSchema.TblID2Handle[tableInfo.ID] = []*expression.Column{idCol}
}
p.Columns = append(p.Columns, &model.ColumnInfo{
ds.Columns = append(ds.Columns, &model.ColumnInfo{
ID: model.ExtraHandleID,
Name: model.NewCIStr("_rowid"),
})
schema.Append(idCol)
schema.TblID2Handle[tableInfo.ID] = []*expression.Column{idCol}
} else {
if needUnionScan && b.needColHandle > 0 {
p.unionScanSchema.TblID2Handle[tableInfo.ID] = []*expression.Column{pkCol}
}
schema.TblID2Handle[tableInfo.ID] = []*expression.Column{pkCol}
schema.Append(handleCol)
}
p.SetSchema(schema)
return b.projectVirtualColumns(p, columns)
schema.TblID2Handle[tableInfo.ID] = []*expression.Column{handleCol}
ds.SetSchema(schema)
// make plan as DS -> US -> Proj
var result LogicalPlan = ds
if b.ctx.Txn() != nil && !b.ctx.Txn().IsReadOnly() {
ds.NeedColHandle = true
us := LogicalUnionScan{}.init(b.ctx)
us.SetSchema(ds.Schema().Clone())
setParentAndChildren(us, result)
result = us
}
proj := b.projectVirtualColumns(ds, columns)
if proj != nil {
setParentAndChildren(proj, result)
result = proj
}
return result
}

// projectVirtualColumns is only for DataSource. If some table has virtual generated columns,
// we add a projection on the original DataSource, and calculate those columns in the projection
// so that plans above it can reference generated columns by their name.
func (b *planBuilder) projectVirtualColumns(ds *DataSource, columns []*table.Column) LogicalPlan {
func (b *planBuilder) projectVirtualColumns(ds *DataSource, columns []*table.Column) *Projection {
var hasVirtualGeneratedColumn = false
for _, column := range columns {
if column.IsGenerated() && !column.GeneratedStored {
Expand All @@ -1653,7 +1639,7 @@ func (b *planBuilder) projectVirtualColumns(ds *DataSource, columns []*table.Col
}
}
if !hasVirtualGeneratedColumn {
return ds
return nil
}
var proj = Projection{
Exprs: make([]expression.Expression, 0, len(columns)),
Expand Down Expand Up @@ -1683,7 +1669,6 @@ func (b *planBuilder) projectVirtualColumns(ds *DataSource, columns []*table.Col
proj.Exprs = append(proj.Exprs, expr)
}
proj.SetSchema(ds.Schema().Clone())
setParentAndChildren(proj, ds)
return proj
}

Expand Down
2 changes: 1 addition & 1 deletion plan/logical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1544,8 +1544,8 @@ func (s *testPlanSuite) TestTopNPushDown(c *C) {
is: s.is,
colMapper: make(map[*ast.ColumnNameExpr]int),
}
p := builder.build(stmt).(LogicalPlan)
c.Assert(builder.err, IsNil)
p := builder.build(stmt).(LogicalPlan)
p, err = logicalOptimize(builder.optFlag, p.(LogicalPlan), builder.ctx)
c.Assert(err, IsNil)
c.Assert(ToString(p), Equals, tt.best, comment)
Expand Down
19 changes: 8 additions & 11 deletions plan/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,14 @@ type TableDual struct {
RowCount int
}

// LogicalUnionScan is only used in non read-only txn.
type LogicalUnionScan struct {
*basePlan
baseLogicalPlan

conditions []expression.Expression
}

// DataSource represents a tablescan without condition push down.
type DataSource struct {
*basePlan
Expand All @@ -298,9 +306,6 @@ type DataSource struct {

// NeedColHandle is used in execution phase.
NeedColHandle bool

// This is schema the PhysicalUnionScan should be.
unionScanSchema *expression.Schema
}

func (p *DataSource) getPKIsHandleCol() *expression.Column {
Expand All @@ -320,14 +325,6 @@ func (p *DataSource) TableInfo() *model.TableInfo {
return p.tableInfo
}

// Schema implements the plan interface.
func (p *DataSource) Schema() *expression.Schema {
if p.unionScanSchema != nil {
return p.unionScanSchema
}
return p.schema
}

// Union represents Union plan.
type Union struct {
*basePlan
Expand Down
39 changes: 13 additions & 26 deletions plan/physical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ func (p *requiredProp) enforceProperty(task task, ctx context.Context) task {
return sort.attach2Task(task)
}

func (p *PhysicalUnionScan) getChildrenPossibleProps(prop *requiredProp) [][]*requiredProp {
return [][]*requiredProp{{prop}}
}

func (p *LogicalUnionScan) generatePhysicalPlans() []PhysicalPlan {
us := PhysicalUnionScan{Conditions: p.conditions}.init(p.ctx)
us.SetSchema(p.schema)
return []PhysicalPlan{us}
}

// getChildrenPossibleProps will check if this sort property can be pushed or not.
// When a sort column will be replaced by scalar function, we refuse it.
// When a sort column will be replaced by a constant, we just remove it.
Expand Down Expand Up @@ -177,7 +187,7 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(outerIdx int) []PhysicalPlan {
outerJoinKeys = p.RightJoinKeys
}
x, ok := innerChild.(*DataSource)
if !ok || x.unionScanSchema != nil {
if !ok {
return nil
}
indices, includeTableScan := availableIndices(x.indexHints, x.tableInfo)
Expand Down Expand Up @@ -567,17 +577,6 @@ func (p *baseLogicalPlan) getBestTask(bestTask task, prop *requiredProp, pp Phys
return bestTask, nil
}

func addUnionScan(cop *copTask, ds *DataSource) task {
t := finishCopTask(cop, ds.ctx)
us := PhysicalUnionScan{
Conditions: ds.pushedDownConds,
NeedColHandle: ds.NeedColHandle,
}.init(ds.ctx)
us.SetSchema(ds.unionScanSchema)
us.profile = t.plan().statsProfile()
return us.attach2Task(t)
}

// tryToGetMemTask will check if this table is a mem table. If it is, it will produce a task and store it.
func (p *DataSource) tryToGetMemTask(prop *requiredProp) (task task, err error) {
client := p.ctx.GetClient()
Expand Down Expand Up @@ -741,7 +740,7 @@ func (p *DataSource) convertToIndexScan(prop *requiredProp, idx *model.IndexInfo
Columns: p.Columns,
Index: idx,
dataSourceSchema: p.schema,
physicalTableSource: physicalTableSource{NeedColHandle: p.NeedColHandle || p.unionScanSchema != nil},
physicalTableSource: physicalTableSource{NeedColHandle: p.NeedColHandle},
}.init(p.ctx)
statsTbl := p.statisticTable
rowCount := float64(statsTbl.Count)
Expand Down Expand Up @@ -827,9 +826,6 @@ func (p *DataSource) convertToIndexScan(prop *requiredProp, idx *model.IndexInfo
}
cop.keepOrder = true
is.addPushedDownSelection(cop, p, prop.expectedCnt)
if p.unionScanSchema != nil {
task = addUnionScan(cop, p)
}
} else {
is.OutOfOrder = true
expectedCnt := math.MaxFloat64
Expand All @@ -839,9 +835,6 @@ func (p *DataSource) convertToIndexScan(prop *requiredProp, idx *model.IndexInfo
return invalidTask, nil
}
is.addPushedDownSelection(cop, p, expectedCnt)
if p.unionScanSchema != nil {
task = addUnionScan(cop, p)
}
}
if prop.taskTp == rootTaskType {
task = finishCopTask(task, p.ctx)
Expand Down Expand Up @@ -994,7 +987,7 @@ func (p *DataSource) convertToTableScan(prop *requiredProp) (task task, err erro
Columns: p.Columns,
TableAsName: p.TableAsName,
DBName: p.DBName,
physicalTableSource: physicalTableSource{NeedColHandle: p.NeedColHandle || p.unionScanSchema != nil},
physicalTableSource: physicalTableSource{NeedColHandle: p.NeedColHandle},
}.init(p.ctx)
ts.SetSchema(p.schema)
sc := p.ctx.GetSessionVars().StmtCtx
Expand Down Expand Up @@ -1056,9 +1049,6 @@ func (p *DataSource) convertToTableScan(prop *requiredProp) (task task, err erro
ts.KeepOrder = true
copTask.keepOrder = true
ts.addPushedDownSelection(copTask, p.profile, prop.expectedCnt)
if p.unionScanSchema != nil {
task = addUnionScan(copTask, p)
}
} else {
expectedCnt := math.MaxFloat64
if prop.isEmpty() {
Expand All @@ -1067,9 +1057,6 @@ func (p *DataSource) convertToTableScan(prop *requiredProp) (task task, err erro
return invalidTask, nil
}
ts.addPushedDownSelection(copTask, p.profile, expectedCnt)
if p.unionScanSchema != nil {
task = addUnionScan(copTask, p)
}
}
if prop.taskTp == rootTaskType {
task = finishCopTask(task, p.ctx)
Expand Down
3 changes: 0 additions & 3 deletions plan/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,6 @@ type physicalTableSource struct {

// NeedColHandle is used in execution phase.
NeedColHandle bool

// TODO: This should be removed after old planner was removed.
unionScanSchema *expression.Schema
}

func needCount(af aggregation.Aggregation) bool {
Expand Down
2 changes: 1 addition & 1 deletion plan/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (

// Error instances.
var (
ErrUnsupportedType = terror.ClassOptimizerPlan.New(CodeUnsupportedType, "Unsupported type")
ErrUnsupportedType = terror.ClassOptimizerPlan.New(CodeUnsupportedType, "Unsupported type %T")
SystemInternalErrorType = terror.ClassOptimizerPlan.New(SystemInternalError, "System internal error")
ErrUnknownColumn = terror.ClassOptimizerPlan.New(CodeUnknownColumn, mysql.MySQLErrName[mysql.ErrBadField])
ErrUnknownTable = terror.ClassOptimizerPlan.New(CodeUnknownTable, mysql.MySQLErrName[mysql.ErrUnknownTable])
Expand Down
Loading

0 comments on commit fccc540

Please sign in to comment.