Skip to content

Commit

Permalink
Add a controller in Selection to control the conditions of the below …
Browse files Browse the repository at this point in the history
…scan plan. (pingcap#2834)

Prepare for IndexLookupJoin.
  • Loading branch information
winoros authored and shenli committed Mar 28, 2017
1 parent 95fb7d0 commit 0e172ac
Show file tree
Hide file tree
Showing 13 changed files with 445 additions and 33 deletions.
9 changes: 5 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,10 +500,11 @@ func (b *executorBuilder) buildAggregation(v *plan.PhysicalAggregation) Executor

func (b *executorBuilder) buildSelection(v *plan.Selection) Executor {
exec := &SelectionExec{
Src: b.build(v.Children()[0]),
Condition: expression.ComposeCNFCondition(b.ctx, v.Conditions...),
schema: v.Schema(),
ctx: b.ctx,
Src: b.build(v.Children()[0]),
schema: v.Schema(),
ctx: b.ctx,
scanController: v.ScanController,
Conditions: v.Conditions,
}
return exec
}
Expand Down
70 changes: 66 additions & 4 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,15 +473,67 @@ type SelectionExec struct {
Condition expression.Expression
ctx context.Context
schema *expression.Schema

// scanController will tell whether this selection need to
// control the condition of below scan executor.
scanController bool
controllerInit bool
Conditions []expression.Expression
}

// Schema implements the Executor Schema interface.
func (e *SelectionExec) Schema() *expression.Schema {
return e.schema
}

// initController will init the conditions of the below scan executor.
// It will first substitute the correlated column to constant, then build range and filter by new conditions.
func (e *SelectionExec) initController() error {
sc := e.ctx.GetSessionVars().StmtCtx
client := e.ctx.GetClient()
newConds := make([]expression.Expression, 0, len(e.Conditions))
for _, cond := range e.Conditions {
newCond, err := expression.SubstituteCorCol2Constant(cond.Clone())
if err != nil {
return errors.Trace(err)
}
newConds = append(newConds, newCond)
}

switch x := e.Src.(type) {
case *XSelectTableExec:
accessCondition, restCondtion := plan.DetachTableScanConditions(newConds, x.tableInfo)
x.where, _, _ = plan.ExpressionsToPB(sc, restCondtion, client)
ranges, err := plan.BuildTableRange(accessCondition, sc)
if err != nil {
return errors.Trace(err)
}
x.ranges = ranges
case *XSelectIndexExec:
x.indexPlan.AccessCondition, newConds = plan.DetachIndexScanConditions(newConds, x.indexPlan)
idxConds, tblConds := plan.DetachIndexFilterConditions(newConds, x.indexPlan.Index.Columns, x.indexPlan.Table)
x.indexPlan.IndexConditionPBExpr, _, _ = plan.ExpressionsToPB(sc, idxConds, client)
x.indexPlan.TableConditionPBExpr, _, _ = plan.ExpressionsToPB(sc, tblConds, client)
err := plan.BuildIndexRange(sc, x.indexPlan)
if err != nil {
return errors.Trace(err)
}
x.where = x.indexPlan.TableConditionPBExpr
default:
return errors.Errorf("Error type of Executor: %T", x)
}
return nil
}

// Next implements the Executor Next interface.
func (e *SelectionExec) Next() (*Row, error) {
if e.scanController && !e.controllerInit {
err := e.initController()
if err != nil {
return nil, errors.Trace(err)
}
e.controllerInit = true
}
for {
srcRow, err := e.Src.Next()
if err != nil {
Expand All @@ -490,18 +542,28 @@ func (e *SelectionExec) Next() (*Row, error) {
if srcRow == nil {
return nil, nil
}
match, err := expression.EvalBool(e.Condition, srcRow.Data, e.ctx)
if err != nil {
return nil, errors.Trace(err)
allMatch := true
for _, cond := range e.Conditions {
match, err := expression.EvalBool(cond, srcRow.Data, e.ctx)
if err != nil {
return nil, errors.Trace(err)
}
if !match {
allMatch = false
break
}
}
if match {
if allMatch {
return srcRow, nil
}
}
}

// Close implements the Executor Close interface.
func (e *SelectionExec) Close() error {
if e.scanController {
e.controllerInit = false
}
return e.Src.Close()
}

Expand Down
13 changes: 13 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1278,3 +1278,16 @@ func (s *testSuite) TestHistoryRead(c *C) {
tk.MustExec("set @@tidb_snapshot = ''")
tk.MustQuery("select * from history_read order by a").Check(testkit.Rows("2 <nil>", "4 <nil>", "8 8", "9 9"))
}

func (s *testSuite) TestScanControlSelection(c *C) {
defer func() {
s.cleanEnv(c)
testleak.AfterTest(c)()
}()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key, b int, c int, index idx_b(b))")
tk.MustExec("insert into t values (1, 1, 1), (2, 1, 1), (3, 1, 2), (4, 2, 3)")
tk.MustQuery("select (select count(1) k from t s where s.b = t1.c) from t t1").Check(testkit.Rows("3", "3", "1", "0"))
}
74 changes: 74 additions & 0 deletions executor/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,80 @@ func (s *testSuite) TestExplain(c *C) {
"index filter conditions": null,
"table filter conditions": null
}
}`,
},
},
{
"select (select count(1) k from t1 s where s.c1 = t1.c1 having k != 0) from t1",
[]string{
"TableScan_12", "TableScan_13", "Selection_4", "StreamAgg_15", "Selection_10", "Apply_16", "Projection_2",
},
[]string{
"Apply_16", "Selection_4", "StreamAgg_15", "Selection_10", "Apply_16", "Projection_2", "",
},
[]string{
`{
"db": "test",
"table": "t1",
"desc": false,
"keep order": false,
"push down info": {
"limit": 0,
"access conditions": null,
"index filter conditions": null,
"table filter conditions": null
}
}`,
`{
"db": "test",
"table": "t1",
"desc": false,
"keep order": false,
"push down info": {
"limit": 0,
"access conditions": null,
"index filter conditions": null,
"table filter conditions": null
}
}`,
`{
"condition": [
"eq(s.c1, test.t1.c1)"
],
"scanController": true,
"child": "TableScan_13"
}`,
`{
"AggFuncs": [
"count(1)"
],
"GroupByItems": null,
"child": "Selection_4"
}`,
`{
"condition": [
"ne(aggregation_5_col_0, 0)"
],
"scanController": false,
"child": "StreamAgg_15"
}`,
`{
"innerPlan": "Selection_10",
"outerPlan": "TableScan_12",
"join": {
"eqCond": null,
"leftCond": null,
"rightCond": null,
"otherCond": null,
"leftPlan": "TableScan_12",
"rightPlan": "Selection_10"
}
}`,
`{
"exprs": [
"k"
],
"child": "Apply_16"
}`,
},
},
Expand Down
37 changes: 37 additions & 0 deletions expression/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,40 @@ func (d *distinctChecker) Check(values []interface{}) (bool, error) {
d.existingKeys[key] = true
return true, nil
}

// SubstituteCorCol2Constant will substitute correlated column to constant value which it contains.
// If the args of one scalar function are all constant, we will substitute it to constant.
func SubstituteCorCol2Constant(expr Expression) (Expression, error) {
switch x := expr.(type) {
case *ScalarFunction:
allConstant := true
newArgs := make([]Expression, 0, len(x.GetArgs()))
for _, arg := range x.GetArgs() {
newArg, err := SubstituteCorCol2Constant(arg)
if err != nil {
return nil, errors.Trace(err)
}
_, ok := newArg.(*Constant)
newArgs = append(newArgs, newArg)
allConstant = allConstant && ok
}
if allConstant {
val, err := x.Eval(nil)
if err != nil {
return nil, errors.Trace(err)
}
return &Constant{Value: val}, nil
}
var newSf Expression
if x.FuncName.L == ast.Cast {
newSf = NewCastFunc(x.RetType, newArgs[0], x.GetCtx())
} else {
newSf, _ = NewFunction(x.GetCtx(), x.FuncName.L, x.GetType(), newArgs...)
}
return newSf, nil
case *CorrelatedColumn:
return &Constant{Value: *x.Data, RetType: x.GetType()}, nil
default:
return x.Clone(), nil
}
}
21 changes: 21 additions & 0 deletions expression/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ package expression

import (
"github.com/pingcap/check"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/types"
)

var _ = check.Suite(&testUtilSuite{})
Expand Down Expand Up @@ -43,3 +47,20 @@ func (s *testUtilSuite) TestDistinct(c *check.C) {
c.Assert(d, check.Equals, t.expect)
}
}

func (s *testUtilSuite) TestSubstituteCorCol2Constant(c *check.C) {
defer testleak.AfterTest(c)()
ctx := mock.NewContext()
corCol1 := &CorrelatedColumn{Data: &One.Value}
corCol2 := &CorrelatedColumn{Data: &One.Value}
cast := NewCastFunc(types.NewFieldType(mysql.TypeLonglong), corCol1, ctx)
plus := newFunction(ast.Plus, cast, corCol2)
plus2 := newFunction(ast.Plus, plus, One)
ans := &Constant{Value: types.NewIntDatum(3)}
ret, err := SubstituteCorCol2Constant(plus2)
c.Assert(err, check.IsNil)
c.Assert(ret.Equal(ans, ctx), check.IsTrue)
col1 := &Column{Index: 1}
newCol, err := SubstituteCorCol2Constant(col1)
c.Assert(newCol.Equal(col1, ctx), check.IsTrue)
}
5 changes: 5 additions & 0 deletions plan/decorrelate.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ func (s *decorrelateSolver) optimize(p LogicalPlan, _ context.Context, _ *idAllo
}
}
}
if sel, ok := p.(*Selection); ok {
if _, ok := p.Children()[0].(*DataSource); ok {
_, sel.canControlScan = sel.makeScanController(true)
}
}
newChildren := make([]Plan, 0, len(p.Children()))
for _, child := range p.Children() {
np, _ := s.optimize(child.(LogicalPlan), nil, nil)
Expand Down
3 changes: 2 additions & 1 deletion plan/expr_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (
"github.com/pingcap/tipb/go-tipb"
)

func expressionsToPB(sc *variable.StatementContext, exprs []expression.Expression, client kv.Client) (pbExpr *tipb.Expr, pushed []expression.Expression, remained []expression.Expression) {
// ExpressionsToPB converts expression to tipb.Expr.
func ExpressionsToPB(sc *variable.StatementContext, exprs []expression.Expression, client kv.Client) (pbExpr *tipb.Expr, pushed []expression.Expression, remained []expression.Expression) {
pc := pbConverter{client: client, sc: sc}
for _, expr := range exprs {
v := pc.exprToPB(expr)
Expand Down
8 changes: 8 additions & 0 deletions plan/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ type Selection struct {

// onTable means if this selection's child is a table scan or index scan.
onTable bool

// If ScanController is true, then the child of this selection is a scan,
// which use pk or index. we will record the accessConditions, idxConditions,
// and tblConditions to control the below plan.
ScanController bool

// We will check this at decorrelate phase.
canControlScan bool
}

func (p *Selection) extractCorrelatedCols() []*expression.CorrelatedColumn {
Expand Down
Loading

0 comments on commit 0e172ac

Please sign in to comment.