Skip to content

Commit

Permalink
plan, executor: split selection to logical plan and physical plan. (p…
Browse files Browse the repository at this point in the history
…ingcap#5235)

* plan, executor: split selection to logical plan and physical plan.
hanfei1991 authored and ngaut committed Nov 28, 2017

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent f0f94b1 commit 6d5bb7c
Showing 22 changed files with 65 additions and 70 deletions.
4 changes: 2 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
@@ -111,7 +111,7 @@ func (b *executorBuilder) build(p plan.Plan) Executor {
return b.buildSemiJoin(v)
case *plan.PhysicalIndexJoin:
return b.buildIndexLookUpJoin(v)
case *plan.Selection:
case *plan.PhysicalSelection:
return b.buildSelection(v)
case *plan.PhysicalAggregation:
return b.buildAggregation(v)
@@ -622,7 +622,7 @@ func (b *executorBuilder) buildAggregation(v *plan.PhysicalAggregation) Executor
}
}

func (b *executorBuilder) buildSelection(v *plan.Selection) Executor {
func (b *executorBuilder) buildSelection(v *plan.PhysicalSelection) Executor {
exec := &SelectionExec{
baseExecutor: newBaseExecutor(v.Schema(), b.ctx, b.build(v.Children()[0])),
Conditions: v.Conditions,
4 changes: 2 additions & 2 deletions plan/build_key_info.go
Original file line number Diff line number Diff line change
@@ -57,7 +57,7 @@ func (p *LogicalAggregation) buildKeyInfo() {

// If a condition is the form of (uniqueKey = constant) or (uniqueKey = Correlated column), it returns at most one row.
// This function will check it.
func (p *Selection) checkMaxOneRowCond(unique expression.Expression, constOrCorCol expression.Expression) bool {
func (p *LogicalSelection) checkMaxOneRowCond(unique expression.Expression, constOrCorCol expression.Expression) bool {
col, ok := unique.(*expression.Column)
if !ok {
return false
@@ -73,7 +73,7 @@ func (p *Selection) checkMaxOneRowCond(unique expression.Expression, constOrCorC
return okCorCol
}

func (p *Selection) buildKeyInfo() {
func (p *LogicalSelection) buildKeyInfo() {
p.baseLogicalPlan.buildKeyInfo()
p.schema.MaxOneRow = p.children[0].Schema().MaxOneRow
for _, cond := range p.Conditions {
2 changes: 1 addition & 1 deletion plan/column_pruning.go
Original file line number Diff line number Diff line change
@@ -77,7 +77,7 @@ func (p *Projection) PruneColumns(parentUsedCols []*expression.Column) {
}

// PruneColumns implements LogicalPlan interface.
func (p *Selection) PruneColumns(parentUsedCols []*expression.Column) {
func (p *LogicalSelection) PruneColumns(parentUsedCols []*expression.Column) {
child := p.children[0].(LogicalPlan)
for _, cond := range p.Conditions {
parentUsedCols = append(parentUsedCols, expression.ExtractColumns(cond)...)
2 changes: 1 addition & 1 deletion plan/decorrelate.go
Original file line number Diff line number Diff line change
@@ -95,7 +95,7 @@ func (s *decorrelateSolver) optimize(p LogicalPlan, _ context.Context) (LogicalP
outerPlan.SetParents(join)
join.self = join
p = join
} else if sel, ok := innerPlan.(*Selection); ok {
} else if sel, ok := innerPlan.(*LogicalSelection); ok {
// If the inner plan is a selection, we add this condition to join predicates.
// Notice that no matter what kind of join is, it's always right.
newConds := make([]expression.Expression, 0, len(sel.Conditions))
4 changes: 2 additions & 2 deletions plan/eliminate_projection.go
Original file line number Diff line number Diff line change
@@ -129,7 +129,7 @@ func (pe *projectionEliminater) eliminate(p LogicalPlan, replace map[string]*exp
setParentAndChildren(p, children...)

switch p.(type) {
case *Sort, *TopN, *Limit, *Selection, *MaxOneRow, *Update, *SelectLock:
case *Sort, *TopN, *Limit, *LogicalSelection, *MaxOneRow, *Update, *SelectLock:
p.SetSchema(p.Children()[0].Schema())
case *LogicalJoin, *LogicalApply:
var joinTp JoinType
@@ -239,7 +239,7 @@ func (p *LogicalAggregation) replaceExprColumns(replace map[string]*expression.C
p.collectGroupByColumns()
}

func (p *Selection) replaceExprColumns(replace map[string]*expression.Column) {
func (p *LogicalSelection) replaceExprColumns(replace map[string]*expression.Column) {
for _, expr := range p.Conditions {
resolveExprAndReplace(expr, replace)
}
2 changes: 1 addition & 1 deletion plan/explain.go
Original file line number Diff line number Diff line change
@@ -143,7 +143,7 @@ func (p *PhysicalUnionScan) ExplainInfo() string {
}

// ExplainInfo implements PhysicalPlan interface.
func (p *Selection) ExplainInfo() string {
func (p *PhysicalSelection) ExplainInfo() string {
return string(expression.ExplainExpressionList(p.Conditions))
}

9 changes: 9 additions & 0 deletions plan/gen_physical_plans.go
Original file line number Diff line number Diff line change
@@ -267,3 +267,12 @@ func (p *LogicalAggregation) generatePhysicalPlans() []PhysicalPlan {

return aggs
}

func (p *LogicalSelection) generatePhysicalPlans() []PhysicalPlan {
sel := PhysicalSelection{
Conditions: p.Conditions,
}.init(p.ctx)
sel.profile = p.profile
sel.SetSchema(p.Schema())
return []PhysicalPlan{sel}
}
7 changes: 6 additions & 1 deletion plan/initialize.go
Original file line number Diff line number Diff line change
@@ -109,9 +109,14 @@ func (p LogicalApply) init(ctx context.Context) *LogicalApply {
return &p
}

func (p Selection) init(ctx context.Context) *Selection {
func (p LogicalSelection) init(ctx context.Context) *LogicalSelection {
p.basePlan = newBasePlan(TypeSel, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
return &p
}

func (p PhysicalSelection) init(ctx context.Context) *PhysicalSelection {
p.basePlan = newBasePlan(TypeSel, ctx, &p)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
return &p
}
2 changes: 1 addition & 1 deletion plan/logical_plan_builder.go
Original file line number Diff line number Diff line change
@@ -405,7 +405,7 @@ func (b *planBuilder) buildSelection(p LogicalPlan, where ast.ExprNode, AggMappe
}
conditions := splitWhere(where)
expressions := make([]expression.Expression, 0, len(conditions))
selection := Selection{}.init(b.ctx)
selection := LogicalSelection{}.init(b.ctx)
for _, cond := range conditions {
expr, np, err := b.rewrite(cond, p, AggMapper, false)
if err != nil {
9 changes: 4 additions & 5 deletions plan/logical_plans.go
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ var (
_ LogicalPlan = &LogicalJoin{}
_ LogicalPlan = &LogicalAggregation{}
_ LogicalPlan = &Projection{}
_ LogicalPlan = &Selection{}
_ LogicalPlan = &LogicalSelection{}
_ LogicalPlan = &LogicalApply{}
_ LogicalPlan = &Exists{}
_ LogicalPlan = &MaxOneRow{}
@@ -206,19 +206,18 @@ func (p *LogicalAggregation) extractCorrelatedCols() []*expression.CorrelatedCol
return corCols
}

// Selection means a filter.
type Selection struct {
// LogicalSelection represents a where or having predicate.
type LogicalSelection struct {
*basePlan
baseLogicalPlan
basePhysicalPlan

// Originally the WHERE or ON condition is parsed into a single expression,
// but after we converted to CNF(Conjunctive normal form), it can be
// split into a list of AND conditions.
Conditions []expression.Expression
}

func (p *Selection) extractCorrelatedCols() []*expression.CorrelatedColumn {
func (p *LogicalSelection) extractCorrelatedCols() []*expression.CorrelatedColumn {
corCols := p.baseLogicalPlan.extractCorrelatedCols()
for _, cond := range p.Conditions {
corCols = append(corCols, extractCorColumns(cond)...)
13 changes: 9 additions & 4 deletions plan/physical_plan_builder.go
Original file line number Diff line number Diff line change
@@ -384,7 +384,7 @@ func (p *DataSource) tryToGetMemTask(prop *requiredProp) (task task, err error)
memTable.profile = p.profile
var retPlan PhysicalPlan = memTable
if len(p.pushedDownConds) > 0 {
sel := Selection{
sel := PhysicalSelection{
Conditions: p.pushedDownConds,
}.init(p.ctx)
sel.SetSchema(p.schema)
@@ -675,7 +675,7 @@ func (is *PhysicalIndexScan) addPushedDownSelection(copTask *copTask, p *DataSou
for _, cond := range indexConds {
condsClone = append(condsClone, cond.Clone())
}
indexSel := Selection{Conditions: condsClone}.init(is.ctx)
indexSel := PhysicalSelection{Conditions: condsClone}.init(is.ctx)
indexSel.SetSchema(is.schema)
indexSel.SetChildren(is)
indexSel.profile = p.getStatsProfileByFilter(append(is.AccessCondition, indexConds...))
@@ -686,7 +686,7 @@ func (is *PhysicalIndexScan) addPushedDownSelection(copTask *copTask, p *DataSou
}
if tableConds != nil {
copTask.finishIndexPlan()
tableSel := Selection{Conditions: tableConds}.init(is.ctx)
tableSel := PhysicalSelection{Conditions: tableConds}.init(is.ctx)
tableSel.SetSchema(copTask.tablePlan.Schema())
tableSel.SetChildren(copTask.tablePlan)
tableSel.profile = p.profile
@@ -863,7 +863,7 @@ func (p *DataSource) convertToTableScan(prop *requiredProp) (task task, err erro
func (ts *PhysicalTableScan) addPushedDownSelection(copTask *copTask, profile *statsProfile, expectedCnt float64) {
// Add filter condition to table plan now.
if len(ts.filterCondition) > 0 {
sel := Selection{Conditions: ts.filterCondition}.init(ts.ctx)
sel := PhysicalSelection{Conditions: ts.filterCondition}.init(ts.ctx)
sel.SetSchema(ts.schema)
sel.SetChildren(ts)
sel.profile = profile
@@ -884,6 +884,11 @@ func (p *basePhysicalPlan) getChildrenPossibleProps(prop *requiredProp) [][]*req
return [][]*requiredProp{props}
}

func (p *PhysicalSelection) getChildrenPossibleProps(prop *requiredProp) [][]*requiredProp {
p.expectedCnt = prop.expectedCnt
return [][]*requiredProp{{prop}}
}

func (p *PhysicalHashJoin) getChildrenPossibleProps(prop *requiredProp) [][]*requiredProp {
p.expectedCnt = prop.expectedCnt
if !prop.isEmpty() {
15 changes: 11 additions & 4 deletions plan/physical_plans.go
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ import (
)

var (
_ PhysicalPlan = &Selection{}
_ PhysicalPlan = &PhysicalSelection{}
_ PhysicalPlan = &Projection{}
_ PhysicalPlan = &Exists{}
_ PhysicalPlan = &MaxOneRow{}
@@ -387,11 +387,18 @@ func (p *PhysicalMergeJoin) Copy() PhysicalPlan {
return &np
}

// PhysicalSelection represents a filter.
type PhysicalSelection struct {
*basePlan
basePhysicalPlan

Conditions []expression.Expression
}

// Copy implements the PhysicalPlan Copy interface.
func (p *Selection) Copy() PhysicalPlan {
func (p *PhysicalSelection) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan)
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}
@@ -543,7 +550,7 @@ func buildJoinSchema(joinType JoinType, join Plan, outerID int) *expression.Sche

func buildSchema(p PhysicalPlan) {
switch x := p.(type) {
case *Limit, *TopN, *Sort, *Selection, *MaxOneRow, *SelectLock:
case *Limit, *TopN, *Sort, *PhysicalSelection, *MaxOneRow, *SelectLock:
p.SetSchema(p.Children()[0].Schema())
case *PhysicalIndexJoin:
p.SetSchema(buildJoinSchema(x.JoinType, p, x.outerIndex))
4 changes: 2 additions & 2 deletions plan/plan_to_pb.go
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ import (

// ToPB implements PhysicalPlan ToPB interface.
func (p *basePhysicalPlan) ToPB(_ context.Context) (*tipb.Executor, error) {
return nil, errors.Errorf("plan %d fails converts to PB", p.basePlan.id)
return nil, errors.Errorf("plan %s fails converts to PB", p.basePlan.ExplainID())
}

// ToPB implements PhysicalPlan ToPB interface.
@@ -44,7 +44,7 @@ func (p *PhysicalAggregation) ToPB(ctx context.Context) (*tipb.Executor, error)
}

// ToPB implements PhysicalPlan ToPB interface.
func (p *Selection) ToPB(ctx context.Context) (*tipb.Executor, error) {
func (p *PhysicalSelection) ToPB(ctx context.Context) (*tipb.Executor, error) {
sc := ctx.GetSessionVars().StmtCtx
client := ctx.GetClient()
selExec := &tipb.Selection{
2 changes: 1 addition & 1 deletion plan/planbuilder.go
Original file line number Diff line number Diff line change
@@ -623,7 +623,7 @@ func (b *planBuilder) buildShow(show *ast.ShowStmt) Plan {
}
}
if len(conditions) != 0 {
sel := Selection{Conditions: conditions}.init(b.ctx)
sel := LogicalSelection{Conditions: conditions}.init(b.ctx)
setParentAndChildren(sel, p)
sel.SetSchema(p.Schema())
resultPlan = sel
4 changes: 2 additions & 2 deletions plan/predicate_push_down.go
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ func (s *ppdSolver) optimize(lp LogicalPlan, _ context.Context) (LogicalPlan, er

func addSelection(p Plan, child LogicalPlan, conditions []expression.Expression) {
conditions = expression.PropagateConstant(p.context(), conditions)
selection := Selection{Conditions: conditions}.init(p.context())
selection := LogicalSelection{Conditions: conditions}.init(p.context())
selection.SetSchema(child.Schema().Clone())
replaceChild(p, child, selection)
selection.SetChildren(child)
@@ -52,7 +52,7 @@ func (p *baseLogicalPlan) PredicatePushDown(predicates []expression.Expression)
}

// PredicatePushDown implements LogicalPlan PredicatePushDown interface.
func (p *Selection) PredicatePushDown(predicates []expression.Expression) ([]expression.Expression, LogicalPlan) {
func (p *LogicalSelection) PredicatePushDown(predicates []expression.Expression) ([]expression.Expression, LogicalPlan) {
retConditions, child := p.children[0].(LogicalPlan).PredicatePushDown(append(p.Conditions, predicates...))
if len(retConditions) > 0 {
p.Conditions = expression.PropagateConstant(p.ctx, retConditions)
2 changes: 1 addition & 1 deletion plan/property_cols_prune.go
Original file line number Diff line number Diff line change
@@ -35,7 +35,7 @@ func (p *DataSource) preparePossibleProperties() (result [][]*expression.Column)
return
}

func (p *Selection) preparePossibleProperties() (result [][]*expression.Column) {
func (p *LogicalSelection) preparePossibleProperties() (result [][]*expression.Column) {
return p.children[0].(LogicalPlan).preparePossibleProperties()
}

2 changes: 1 addition & 1 deletion plan/resolve_indices.go
Original file line number Diff line number Diff line change
@@ -140,7 +140,7 @@ func (p *PhysicalIndexLookUpReader) ResolveIndices() {
}

// ResolveIndices implements Plan interface.
func (p *Selection) ResolveIndices() {
func (p *PhysicalSelection) ResolveIndices() {
p.basePlan.ResolveIndices()
for _, expr := range p.Conditions {
expr.ResolveIndices(p.children[0].Schema())
2 changes: 1 addition & 1 deletion plan/stats.go
Original file line number Diff line number Diff line change
@@ -100,7 +100,7 @@ func (p *DataSource) prepareStatsProfile() *statsProfile {
return p.profile
}

func (p *Selection) prepareStatsProfile() *statsProfile {
func (p *LogicalSelection) prepareStatsProfile() *statsProfile {
childProfile := p.children[0].(LogicalPlan).prepareStatsProfile()
p.profile = childProfile.collapse(selectionFactor)
return p.profile
4 changes: 3 additions & 1 deletion plan/stringer.go
Original file line number Diff line number Diff line change
@@ -150,7 +150,9 @@ func toString(in Plan, strs []string, idxs []int) ([]string, []int) {
} else {
str = fmt.Sprintf("DataScan(%s)", x.tableInfo.Name)
}
case *Selection:
case *LogicalSelection:
str = fmt.Sprintf("Sel(%s)", x.Conditions)
case *PhysicalSelection:
str = fmt.Sprintf("Sel(%s)", x.Conditions)
case *Projection:
str = "Projection"
2 changes: 1 addition & 1 deletion plan/task.go
Original file line number Diff line number Diff line change
@@ -431,7 +431,7 @@ func (p *Union) attach2Task(tasks ...task) task {
return newTask
}

func (sel *Selection) attach2Task(tasks ...task) task {
func (sel *PhysicalSelection) attach2Task(tasks ...task) task {
if tasks[0].invalid() {
return invalidTask
}
11 changes: 1 addition & 10 deletions statistics/selectivity_test.go
Original file line number Diff line number Diff line change
@@ -169,16 +169,7 @@ func (s *testSelectivitySuite) TestSelectivity(c *C) {
c.Assert(err, IsNil, comment)
p, err := plan.BuildLogicalPlan(ctx, stmts[0], is)
c.Assert(err, IsNil, Commentf("error %v, for building plan, expr %s", err, tt.exprs))
var sel *plan.Selection
for _, child := range p.Children() {
p, ok := child.(*plan.Selection)
if ok {
sel = p
break
}
}
c.Assert(sel, NotNil, comment)
ratio, err := statsTbl.Selectivity(ctx, sel.Conditions)
ratio, err := statsTbl.Selectivity(ctx, p.Children()[0].(*plan.LogicalSelection).Conditions)
c.Assert(err, IsNil, comment)
c.Assert(math.Abs(ratio-tt.selectivity) < eps, IsTrue, comment)
}
29 changes: 3 additions & 26 deletions util/ranger/ranger_test.go
Original file line number Diff line number Diff line change
@@ -308,15 +308,7 @@ func (s *testRangerSuite) TestTableRange(c *C) {
c.Assert(err, IsNil, Commentf("error %v, for resolve name, expr %s", err, tt.exprStr))
p, err := plan.BuildLogicalPlan(ctx, stmts[0], is)
c.Assert(err, IsNil, Commentf("error %v, for build plan, expr %s", err, tt.exprStr))
var selection *plan.Selection
for _, child := range p.Children() {
p, ok := child.(*plan.Selection)
if ok {
selection = p
break
}
}
c.Assert(selection, NotNil, Commentf("expr:%v", tt.exprStr))
selection := p.Children()[0].(*plan.LogicalSelection)
conds := make([]expression.Expression, 0, len(selection.Conditions))
for _, cond := range selection.Conditions {
conds = append(conds, expression.PushDownNot(cond, false, ctx))
@@ -498,14 +490,7 @@ func (s *testRangerSuite) TestIndexRange(c *C) {
c.Assert(err, IsNil, Commentf("error %v, for resolve name, expr %s", err, tt.exprStr))
p, err := plan.BuildLogicalPlan(ctx, stmts[0], is)
c.Assert(err, IsNil, Commentf("error %v, for build plan, expr %s", err, tt.exprStr))
var selection *plan.Selection
for _, child := range p.Children() {
p, ok := child.(*plan.Selection)
if ok {
selection = p
break
}
}
selection := p.Children()[0].(*plan.LogicalSelection)
tbl := selection.Children()[0].(*plan.DataSource).TableInfo()
c.Assert(selection, NotNil, Commentf("expr:%v", tt.exprStr))
conds := make([]expression.Expression, 0, len(selection.Conditions))
@@ -749,15 +734,7 @@ func (s *testRangerSuite) TestColumnRange(c *C) {
c.Assert(err, IsNil, Commentf("error %v, for resolve name, expr %s", err, tt.exprStr))
p, err := plan.BuildLogicalPlan(ctx, stmts[0], is)
c.Assert(err, IsNil, Commentf("error %v, for build plan, expr %s", err, tt.exprStr))
var sel *plan.Selection
for _, child := range p.Children() {
plan, ok := child.(*plan.Selection)
if ok {
sel = plan
break
}
}
c.Assert(sel, NotNil, Commentf("expr:%v", tt.exprStr))
sel := p.Children()[0].(*plan.LogicalSelection)
ds, ok := sel.Children()[0].(*plan.DataSource)
c.Assert(ok, IsTrue, Commentf("expr:%v", tt.exprStr))
conds := make([]expression.Expression, 0, len(sel.Conditions))

0 comments on commit 6d5bb7c

Please sign in to comment.