Skip to content

Commit

Permalink
executor, plan: abandon the selection controller (pingcap#4528)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored Sep 21, 2017
1 parent 7c3cc87 commit 67fa005
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 182 deletions.
5 changes: 2 additions & 3 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,9 +601,8 @@ func (b *executorBuilder) buildAggregation(v *plan.PhysicalAggregation) Executor

func (b *executorBuilder) buildSelection(v *plan.Selection) Executor {
exec := &SelectionExec{
baseExecutor: newBaseExecutor(v.Schema(), b.ctx, b.build(v.Children()[0])),
scanController: v.ScanController,
Conditions: v.Conditions,
baseExecutor: newBaseExecutor(v.Schema(), b.ctx, b.build(v.Children()[0])),
Conditions: v.Conditions,
}
return exec
}
Expand Down
66 changes: 1 addition & 65 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/types"
)

Expand Down Expand Up @@ -421,66 +420,11 @@ func (e *TableDualExec) Next() (Row, error) {
type SelectionExec struct {
baseExecutor

// scanController will tell whether this selection need to
// control the condition of below scan executor.
scanController bool
controllerInit bool
Conditions []expression.Expression
}

// initController will init the conditions of the below scan executor.
// It will first substitute the correlated column to constant, then build range and filter by new conditions.
func (e *SelectionExec) initController() error {
sc := e.ctx.GetSessionVars().StmtCtx
client := e.ctx.GetClient()
newConds := make([]expression.Expression, 0, len(e.Conditions))
for _, cond := range e.Conditions {
newCond, err := expression.SubstituteCorCol2Constant(cond.Clone())
if err != nil {
return errors.Trace(err)
}
newConds = append(newConds, newCond)
}

switch x := e.children[0].(type) {
case *XSelectTableExec:
accessCondition, restCondtion := ranger.DetachColumnConditions(newConds, x.tableInfo.GetPkName())
x.where, _, _ = expression.ExpressionsToPB(sc, restCondtion, client)
ranges, err := ranger.BuildTableRange(accessCondition, sc)
if err != nil {
return errors.Trace(err)
}
x.ranges = ranges
case *XSelectIndexExec:
var (
accessCondition []expression.Expression
accessInAndEqCount int
)
accessCondition, newConds, _, accessInAndEqCount = ranger.DetachIndexScanConditions(newConds, x.index)
idxConds, tblConds := ranger.DetachIndexFilterConditions(newConds, x.index.Columns, x.tableInfo)
x.indexConditionPBExpr, _, _ = expression.ExpressionsToPB(sc, idxConds, client)
tableConditionPBExpr, _, _ := expression.ExpressionsToPB(sc, tblConds, client)
var err error
x.ranges, err = ranger.BuildIndexRange(sc, x.tableInfo, x.index, accessInAndEqCount, accessCondition)
if err != nil {
return errors.Trace(err)
}
x.where = tableConditionPBExpr
default:
return errors.Errorf("Error type of Executor: %T", x)
}
return nil
Conditions []expression.Expression
}

// Next implements the Executor Next interface.
func (e *SelectionExec) Next() (Row, error) {
if e.scanController && !e.controllerInit {
err := e.initController()
if err != nil {
return nil, errors.Trace(err)
}
e.controllerInit = true
}
for {
srcRow, err := e.children[0].Next()
if err != nil {
Expand All @@ -499,14 +443,6 @@ func (e *SelectionExec) Next() (Row, error) {
}
}

// Open implements the Executor Open interface.
func (e *SelectionExec) Open() error {
if e.scanController {
e.controllerInit = false
}
return e.children[0].Open()
}

// TableScanExec is a table scan executor without result fields.
type TableScanExec struct {
baseExecutor
Expand Down
13 changes: 13 additions & 0 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,19 @@ func (s *testSuite) TestSubquery(c *C) {
tk.MustExec("insert into s values(2)")
result = tk.MustQuery("select (select id from s where s.id = t.id order by s.id) from t")
result.Check(testkit.Rows("2", "2"))

tk.MustExec("drop table if exists t")
tk.MustExec("create table t(dt datetime)")
result = tk.MustQuery("select (select 1 from t where DATE_FORMAT(o.dt,'%Y-%m')) from t o")
result.Check(testkit.Rows())

tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1(f1 int, f2 int)")
tk.MustExec("create table t2(fa int, fb int)")
tk.MustExec("insert into t1 values (1,1),(1,1),(1,2),(1,2),(1,2),(1,3)")
tk.MustExec("insert into t2 values (1,1),(1,2),(1,3)")
result = tk.MustQuery("select f1,f2 from t1 group by f1,f2 having count(1) >= all (select fb from t2 where fa = f1)")
result.Check(testkit.Rows("1 2"))
}

func (s *testSuite) TestInSubquery(c *C) {
Expand Down
55 changes: 0 additions & 55 deletions plan/decorrelate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/types"
)

Expand Down Expand Up @@ -162,11 +160,6 @@ func (s *decorrelateSolver) optimize(p LogicalPlan, _ context.Context, _ *idAllo
}
}
}
if sel, ok := p.(*Selection); ok && len(sel.extractCorrelatedCols()) > 0 {
if _, ok := p.Children()[0].(*DataSource); ok {
sel.controllerStatus = sel.checkScanController()
}
}
newChildren := make([]Plan, 0, len(p.Children()))
for _, child := range p.Children() {
np, _ := s.optimize(child.(LogicalPlan), nil, nil)
Expand All @@ -176,51 +169,3 @@ func (s *decorrelateSolver) optimize(p LogicalPlan, _ context.Context, _ *idAllo
p.SetChildren(newChildren...)
return p, nil
}

func (p *Selection) checkScanController() int {
var (
corColConds []expression.Expression
pkCol *expression.Column
)
ds := p.children[0].(*DataSource)
indices, includeTableScan := availableIndices(ds.indexHints, ds.tableInfo)
for _, expr := range p.Conditions {
if !expr.IsCorrelated() {
continue
}
cond := expression.PushDownNot(expr, false, nil)
corCols := extractCorColumns(cond)
for _, col := range corCols {
*col.Data = expression.One.Value
}
newCond, _ := expression.SubstituteCorCol2Constant(cond)
corColConds = append(corColConds, newCond)
}
if ds.tableInfo.PKIsHandle && includeTableScan {
for i, col := range ds.Columns {
if mysql.HasPriKeyFlag(col.Flag) {
pkCol = ds.schema.Columns[i]
break
}
}
}
if pkCol != nil {
access, _ := ranger.DetachColumnConditions(corColConds, pkCol.ColName)
for _, cond := range access {
if sf, ok := cond.(*expression.ScalarFunction); ok && sf.FuncName.L == ast.EQ {
return controlTableScan
}
}
}
for _, idx := range indices {
condsBackUp := make([]expression.Expression, 0, len(corColConds))
for _, cond := range corColConds {
condsBackUp = append(condsBackUp, cond.Clone())
}
_, _, eqCount, _ := ranger.DetachIndexScanConditions(condsBackUp, idx)
if eqCount > 0 {
return controlIndexScan
}
}
return notController
}
2 changes: 2 additions & 0 deletions plan/eliminate_projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func resolveExprAndReplace(origin expression.Expression, replace map[string]*exp
switch expr := origin.(type) {
case *expression.Column:
resolveColumnAndReplace(expr, replace)
case *expression.CorrelatedColumn:
resolveColumnAndReplace(&expr.Column, replace)
case *expression.ScalarFunction:
for _, arg := range expr.GetArgs() {
resolveExprAndReplace(arg, replace)
Expand Down
6 changes: 0 additions & 6 deletions plan/physical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,12 +610,6 @@ func (p *LogicalJoin) buildSelectionWithConds(leftAsOuter bool) (*Selection, []*
conds = append(conds, newCond)
}
selection.Conditions = conds
// Currently only eq conds will be considered when we call checkScanController, and innerConds from the below sel may contain correlated column,
// which will have side effect when we do check. So we do check before append other conditions into selection.
selection.controllerStatus = selection.checkScanController()
if selection.controllerStatus == notController {
return nil, nil
}
conds = append(conds, innerConditions...)
for _, cond := range p.OtherConditions {
newCond := expression.ConvertCol2CorCol(cond, corCols, outerSchema)
Expand Down
55 changes: 2 additions & 53 deletions plan/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,60 +846,8 @@ func (s *testPlanSuite) TestAddCache(c *C) {
}
}

func (s *testPlanSuite) TestScanController(c *C) {
defer testleak.AfterTest(c)()
tests := []struct {
sql string
ans string
}{
{
sql: "select (select count(1) k from t s where s.a = t.a having k != 0) from t",
ans: "Apply{Table(t)->Table(t)->Selection->StreamAgg}->Projection",
},
{
sql: "select (select count(1) k from t s where s.b = t.b having k != 0) from t",
ans: "Apply{Table(t)->Table(t)->Cache->Selection->StreamAgg}->Projection",
},
{
sql: "select (select count(1) k from t s where s.f = t.f having k != 0) from t",
ans: "Apply{Table(t)->Index(t.f)[]->Selection->StreamAgg}->Projection",
},
}
for _, tt := range tests {
comment := Commentf("for %s", tt.sql)
stmt, err := s.ParseOneStmt(tt.sql, "", "")
c.Assert(err, IsNil, comment)
ast.SetFlag(stmt)

is, err := MockResolve(stmt)
c.Assert(err, IsNil)

builder := &planBuilder{
allocator: new(idAllocator),
ctx: mockContext(),
colMapper: make(map[*ast.ColumnNameExpr]int),
is: is,
}
p := builder.build(stmt)
c.Assert(builder.err, IsNil)
lp := p.(LogicalPlan)
_, lp, err = lp.PredicatePushDown(nil)
c.Assert(err, IsNil)
lp.PruneColumns(lp.Schema().Columns)
dSolver := &decorrelateSolver{}
lp, err = dSolver.optimize(lp, mockContext(), new(idAllocator))
c.Assert(err, IsNil)
lp.ResolveIndices()
lp, err = (&projectionEliminater{}).optimize(lp, nil, nil)
c.Assert(err, IsNil)
info, err := lp.convert2PhysicalPlan(&requiredProperty{})
pp := info.p
addCachePlan(pp, builder.allocator)
c.Assert(ToString(pp), Equals, tt.ans, Commentf("for %s", tt.sql))
}
}

func (s *testPlanSuite) TestJoinAlgorithm(c *C) {
c.Skip("Move to new plan test.")
defer testleak.AfterTest(c)()
tests := []struct {
sql string
Expand Down Expand Up @@ -994,6 +942,7 @@ func (s *testPlanSuite) TestJoinAlgorithm(c *C) {
}

func (s *testPlanSuite) TestAutoJoinChosen(c *C) {
c.Skip("TODO: move to new plan test")
defer testleak.AfterTest(c)()
cases := []struct {
sql string
Expand Down

0 comments on commit 67fa005

Please sign in to comment.