Skip to content

Commit

Permalink
plan: move some initialization out of convert2PhysicalPlan process (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros authored Dec 23, 2016
1 parent 26e5c0c commit b7a7685
Show file tree
Hide file tree
Showing 9 changed files with 296 additions and 105 deletions.
5 changes: 0 additions & 5 deletions plan/aggregation_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ func (a *aggPushDownSolver) tryToPushDownAgg(aggFuncs []expression.AggregationFu
agg := a.makeNewAgg(aggFuncs, gbyCols)
child.SetParents(agg)
agg.SetChildren(child)
agg.correlated = agg.correlated || child.IsCorrelated()
// If agg has no group-by item, it will return a default value, which may cause some bugs.
// So here we add a group-by item forcely.
if len(agg.GroupByItems) == 0 {
Expand Down Expand Up @@ -259,9 +258,6 @@ func (a *aggPushDownSolver) makeNewAgg(aggFuncs []expression.AggregationFunction
var newFuncs []expression.AggregationFunction
newFuncs, schema = a.decompose(aggFunc, schema, agg.GetID())
newAggFuncs = append(newAggFuncs, newFuncs...)
for _, arg := range aggFunc.GetArgs() {
agg.correlated = agg.correlated || arg.IsCorrelated()
}
}
for _, gbyCol := range gbyCols {
firstRow := expression.NewAggFunction(ast.AggFuncFirstRow, []expression.Expression{gbyCol.Clone()}, false)
Expand All @@ -280,7 +276,6 @@ func (a *aggPushDownSolver) pushAggCrossUnion(agg *Aggregation, unionSchema expr
baseLogicalPlan: newBaseLogicalPlan(Agg, a.alloc),
}
newAgg.SetSchema(agg.schema.Clone())
newAgg.correlated = agg.correlated
newAgg.initIDAndContext(a.ctx)
for _, aggFunc := range agg.AggFuncs {
newAggFunc := aggFunc.Clone()
Expand Down
42 changes: 13 additions & 29 deletions plan/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ func (b *planBuilder) buildAggregation(p LogicalPlan, aggFuncList []*ast.Aggrega
baseLogicalPlan: newBaseLogicalPlan(Agg, b.allocator)}
agg.self = agg
agg.initIDAndContext(b.ctx)
agg.correlated = p.IsCorrelated()
for _, item := range gbyItems {
agg.correlated = agg.correlated || item.IsCorrelated()
}
addChild(agg, p)
schema := make(expression.Schema, 0, len(aggFuncList))
// aggIdxMap maps the old index to new index after applying common aggregation functions elimination.
Expand All @@ -66,7 +62,6 @@ func (b *planBuilder) buildAggregation(p LogicalPlan, aggFuncList []*ast.Aggrega
return nil, nil
}
p = np
agg.correlated = agg.correlated || newArg.IsCorrelated()
newArgList = append(newArgList, newArg)
}
newFunc := expression.NewAggFunction(aggFunc.F, newArgList, aggFunc.Distinct)
Expand All @@ -93,6 +88,7 @@ func (b *planBuilder) buildAggregation(p LogicalPlan, aggFuncList []*ast.Aggrega
agg.GroupByItems = gbyItems
agg.SetSchema(schema)
agg.collectGroupByColumns()
agg.SetCorrelated()
return agg, aggIndexMap
}

Expand Down Expand Up @@ -201,7 +197,6 @@ func (b *planBuilder) buildJoin(join *ast.Join) LogicalPlan {
joinPlan.self = joinPlan
joinPlan.initIDAndContext(b.ctx)
joinPlan.SetSchema(newSchema)
joinPlan.correlated = leftPlan.IsCorrelated() || rightPlan.IsCorrelated()
if join.On != nil {
onExpr, _, err := b.rewrite(join.On.Expr, joinPlan, nil, false)
if err != nil {
Expand Down Expand Up @@ -231,6 +226,7 @@ func (b *planBuilder) buildJoin(join *ast.Join) LogicalPlan {
}
addChild(joinPlan, leftPlan)
addChild(joinPlan, rightPlan)
joinPlan.SetCorrelated()
return joinPlan
}

Expand All @@ -240,7 +236,6 @@ func (b *planBuilder) buildSelection(p LogicalPlan, where ast.ExprNode, AggMappe
selection := &Selection{baseLogicalPlan: newBaseLogicalPlan(Sel, b.allocator)}
selection.self = selection
selection.initIDAndContext(b.ctx)
selection.correlated = p.IsCorrelated()
for _, cond := range conditions {
expr, np, err := b.rewrite(cond, p, AggMapper, false)
if err != nil {
Expand All @@ -252,14 +247,14 @@ func (b *planBuilder) buildSelection(p LogicalPlan, where ast.ExprNode, AggMappe
continue
}
expressions = append(expressions, expression.SplitCNFItems(expr)...)
selection.correlated = selection.correlated || expr.IsCorrelated()
}
if len(expressions) == 0 {
return p
}
selection.Conditions = expressions
selection.SetSchema(p.GetSchema().Clone())
addChild(selection, p)
selection.SetCorrelated()
return selection
}

Expand All @@ -271,7 +266,6 @@ func (b *planBuilder) buildProjection(p LogicalPlan, fields []*ast.SelectField,
}
proj.self = proj
proj.initIDAndContext(b.ctx)
proj.correlated = p.IsCorrelated()
schema := make(expression.Schema, 0, len(fields))
oldLen := 0
for _, field := range fields {
Expand All @@ -281,7 +275,6 @@ func (b *planBuilder) buildProjection(p LogicalPlan, fields []*ast.SelectField,
return nil, oldLen
}
p = np
proj.correlated = proj.correlated || newExpr.IsCorrelated()
proj.Exprs = append(proj.Exprs, newExpr)
var tblName, colName model.CIStr
if field.AsName.L != "" {
Expand Down Expand Up @@ -323,6 +316,7 @@ func (b *planBuilder) buildProjection(p LogicalPlan, fields []*ast.SelectField,
}
proj.SetSchema(schema)
addChild(proj, p)
proj.SetCorrelated()
return proj, oldLen
}

Expand All @@ -332,7 +326,7 @@ func (b *planBuilder) buildDistinct(src LogicalPlan) LogicalPlan {
d.initIDAndContext(b.ctx)
addChild(d, src)
d.SetSchema(src.GetSchema())
d.correlated = src.IsCorrelated()
d.SetCorrelated()
return d
}

Expand All @@ -343,7 +337,6 @@ func (b *planBuilder) buildUnion(union *ast.UnionStmt) LogicalPlan {
u.children = make([]Plan, len(union.SelectList.Selects))
for i, sel := range union.SelectList.Selects {
u.children[i] = b.buildSelect(sel)
u.correlated = u.correlated || u.children[i].IsCorrelated()
}
firstSchema := u.children[0].GetSchema().Clone()
for _, sel := range u.children {
Expand Down Expand Up @@ -379,6 +372,7 @@ func (b *planBuilder) buildUnion(union *ast.UnionStmt) LogicalPlan {
}

u.SetSchema(firstSchema)
u.SetCorrelated()
var p LogicalPlan
p = u
if union.Distinct {
Expand Down Expand Up @@ -409,20 +403,19 @@ func (b *planBuilder) buildSort(p LogicalPlan, byItems []*ast.ByItem, aggMapper
sort := &Sort{baseLogicalPlan: newBaseLogicalPlan(Srt, b.allocator)}
sort.self = sort
sort.initIDAndContext(b.ctx)
sort.correlated = p.IsCorrelated()
for _, item := range byItems {
it, np, err := b.rewrite(item.Expr, p, aggMapper, true)
if err != nil {
b.err = err
return nil
}
p = np
sort.correlated = sort.correlated || it.IsCorrelated()
exprs = append(exprs, &ByItems{Expr: it, Desc: item.Desc})
}
sort.ByItems = exprs
addChild(sort, p)
sort.SetSchema(p.GetSchema().Clone())
sort.SetCorrelated()
return sort
}

Expand All @@ -434,9 +427,9 @@ func (b *planBuilder) buildLimit(src LogicalPlan, limit *ast.Limit) LogicalPlan
}
li.self = li
li.initIDAndContext(b.ctx)
li.correlated = src.IsCorrelated()
addChild(li, src)
li.SetSchema(src.GetSchema().Clone())
li.SetCorrelated()
return li
}

Expand Down Expand Up @@ -866,7 +859,7 @@ func (b *planBuilder) buildTrim(p LogicalPlan, len int) LogicalPlan {
trim.initIDAndContext(b.ctx)
addChild(trim, p)
trim.SetSchema(p.GetSchema().Clone()[:len])
trim.correlated = p.IsCorrelated()
trim.SetCorrelated()
return trim
}

Expand Down Expand Up @@ -950,15 +943,6 @@ func (b *planBuilder) buildApply(outerPlan, innerPlan LogicalPlan, checker *Appl
ap.initIDAndContext(b.ctx)
addChild(ap, outerPlan)
addChild(ap, innerPlan)
corColumns := innerPlan.extractCorrelatedCols()
ap.correlated = outerPlan.IsCorrelated()
for _, corCol := range corColumns {
// If the outer column can't be resolved from this outer schema, it should be resolved by outer schema.
if idx := outerPlan.GetSchema().GetIndex(&corCol.Column); idx == -1 {
ap.correlated = true
break
}
}
innerSchema := innerPlan.GetSchema().Clone()
if checker == nil {
for _, col := range innerSchema {
Expand All @@ -973,6 +957,7 @@ func (b *planBuilder) buildApply(outerPlan, innerPlan LogicalPlan, checker *Appl
IsAggOrSubq: true,
}))
}
ap.SetCorrelated()
return ap
}

Expand All @@ -998,7 +983,7 @@ out:
RetType: types.NewFieldType(mysql.TypeTiny),
ColName: model.NewCIStr("exists_col")}
exists.SetSchema(expression.Schema{newCol})
exists.correlated = p.IsCorrelated()
exists.SetCorrelated()
return exists
}

Expand All @@ -1008,18 +993,16 @@ func (b *planBuilder) buildMaxOneRow(p LogicalPlan) LogicalPlan {
maxOneRow.initIDAndContext(b.ctx)
addChild(maxOneRow, p)
maxOneRow.SetSchema(p.GetSchema().Clone())
maxOneRow.correlated = p.IsCorrelated()
maxOneRow.SetCorrelated()
return maxOneRow
}

func (b *planBuilder) buildSemiJoin(outerPlan, innerPlan LogicalPlan, onCondition []expression.Expression, asScalar bool, not bool) LogicalPlan {
joinPlan := &Join{baseLogicalPlan: newBaseLogicalPlan(Jn, b.allocator)}
joinPlan.self = joinPlan
joinPlan.initIDAndContext(b.ctx)
joinPlan.correlated = outerPlan.IsCorrelated() || innerPlan.IsCorrelated()
for i, expr := range onCondition {
onCondition[i] = expr.Decorrelate(outerPlan.GetSchema())
joinPlan.correlated = joinPlan.correlated || onCondition[i].IsCorrelated()
}
eqCond, leftCond, rightCond, otherCond := extractOnCondition(onCondition, outerPlan, innerPlan)
joinPlan.EqualConditions = eqCond
Expand All @@ -1042,6 +1025,7 @@ func (b *planBuilder) buildSemiJoin(outerPlan, innerPlan LogicalPlan, onConditio
joinPlan.SetChildren(outerPlan, innerPlan)
outerPlan.SetParents(joinPlan)
innerPlan.SetParents(joinPlan)
joinPlan.SetCorrelated()
return joinPlan
}

Expand Down
Loading

0 comments on commit b7a7685

Please sign in to comment.