Skip to content

Commit

Permalink
*: implement index nested loop join (pingcap#2945)
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros authored and coocood committed Apr 1, 2017
1 parent 16cefef commit aaa6184
Show file tree
Hide file tree
Showing 19 changed files with 616 additions and 149 deletions.
2 changes: 1 addition & 1 deletion ast/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (ts *testMiscSuite) TestDMLVistorCover(c *C) {
delete t1, t2 from t1 inner join t2 inner join t3 where t1.id=t2.id and t2.id=t3.id;
select * from t where exists(select * from t k where t.c = k.c having sum(c) = 1);
insert into t_copy select * from t where t.x > 5;
(select a from t1 where a=10 and b=1) union (select /*+ TIDB_SMJ(t2) */ a from t2 where a=11 and b=2) order by a limit 10;
(select /*+ TIDB_INLJ(t1) */ a from t1 where a=10 and b=1) union (select /*+ TIDB_SMJ(t2) */ a from t2 where a=11 and b=2) order by a limit 10;
update t1 set col1 = col1 + 1, col2 = col1;
show create table t;
load data infile '/tmp/t.csv' into table t fields terminated by 'ab' enclosed by 'b';`
Expand Down
39 changes: 28 additions & 11 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,17 +634,34 @@ func (b *executorBuilder) buildSort(v *plan.Sort) Executor {
}

func (b *executorBuilder) buildNestedLoopJoin(v *plan.PhysicalHashJoin) *NestedLoopJoinExec {
bigExec := b.build(v.Children()[0])
smallExec := b.build(v.Children()[1])
for _, cond := range v.EqualConditions {
cond.GetArgs()[0].(*expression.Column).ResolveIndices(v.Schema())
cond.GetArgs()[1].(*expression.Column).ResolveIndices(v.Schema())
}
if v.SmallTable == 1 {
return &NestedLoopJoinExec{
SmallExec: b.build(v.Children()[1]),
BigExec: b.build(v.Children()[0]),
Ctx: b.ctx,
BigFilter: expression.ComposeCNFCondition(b.ctx, v.LeftConditions...),
SmallFilter: expression.ComposeCNFCondition(b.ctx, v.RightConditions...),
OtherFilter: expression.ComposeCNFCondition(b.ctx, append(expression.ScalarFuncs2Exprs(v.EqualConditions), v.OtherConditions...)...),
schema: v.Schema(),
outer: v.JoinType != plan.InnerJoin,
defaultValues: v.DefaultValues,
}
}
return &NestedLoopJoinExec{
SmallExec: smallExec,
BigExec: bigExec,
Ctx: b.ctx,
BigFilter: expression.ComposeCNFCondition(b.ctx, v.LeftConditions...),
SmallFilter: expression.ComposeCNFCondition(b.ctx, v.RightConditions...),
OtherFilter: expression.ComposeCNFCondition(b.ctx, append(expression.ScalarFuncs2Exprs(v.EqualConditions), v.OtherConditions...)...),
schema: v.Schema(),
outer: v.JoinType != plan.InnerJoin,
SmallExec: b.build(v.Children()[0]),
BigExec: b.build(v.Children()[1]),
leftSmall: true,
Ctx: b.ctx,
BigFilter: expression.ComposeCNFCondition(b.ctx, v.RightConditions...),
SmallFilter: expression.ComposeCNFCondition(b.ctx, v.LeftConditions...),
OtherFilter: expression.ComposeCNFCondition(b.ctx, append(expression.ScalarFuncs2Exprs(v.EqualConditions), v.OtherConditions...)...),
schema: v.Schema(),
outer: v.JoinType != plan.InnerJoin,
defaultValues: v.DefaultValues,
}
}

Expand All @@ -654,7 +671,7 @@ func (b *executorBuilder) buildApply(v *plan.PhysicalApply) Executor {
case *plan.PhysicalHashSemiJoin:
join = b.buildSemiJoin(x)
case *plan.PhysicalHashJoin:
if x.JoinType == plan.InnerJoin || x.JoinType == plan.LeftOuterJoin {
if x.JoinType == plan.InnerJoin || x.JoinType == plan.LeftOuterJoin || x.JoinType == plan.RightOuterJoin {
join = b.buildNestedLoopJoin(x)
} else {
b.err = errors.Errorf("Unsupported join type %v in nested loop join", x.JoinType)
Expand Down
2 changes: 1 addition & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ func (e *SelectionExec) initController() error {
}
x.ranges = ranges
case *XSelectIndexExec:
x.indexPlan.AccessCondition, newConds = plan.DetachIndexScanConditions(newConds, x.indexPlan)
x.indexPlan.AccessCondition, newConds, _, _ = plan.DetachIndexScanConditions(newConds, x.indexPlan.Index)
idxConds, tblConds := plan.DetachIndexFilterConditions(newConds, x.indexPlan.Index.Columns, x.indexPlan.Table)
x.indexPlan.IndexConditionPBExpr, _, _ = plan.ExpressionsToPB(sc, idxConds, client)
x.indexPlan.TableConditionPBExpr, _, _ = plan.ExpressionsToPB(sc, tblConds, client)
Expand Down
53 changes: 32 additions & 21 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,18 +498,20 @@ type joinExec interface {

// NestedLoopJoinExec implements nested-loop algorithm for join.
type NestedLoopJoinExec struct {
innerRows []*Row
cursor int
resultRows []*Row
SmallExec Executor
BigExec Executor
prepared bool
Ctx context.Context
SmallFilter expression.Expression
BigFilter expression.Expression
OtherFilter expression.Expression
schema *expression.Schema
outer bool
innerRows []*Row
cursor int
resultRows []*Row
SmallExec Executor
BigExec Executor
leftSmall bool
prepared bool
Ctx context.Context
SmallFilter expression.Expression
BigFilter expression.Expression
OtherFilter expression.Expression
schema *expression.Schema
outer bool
defaultValues []types.Datum
}

// Schema implements Executor interface.
Expand Down Expand Up @@ -589,24 +591,33 @@ func (e *NestedLoopJoinExec) prepare() error {
}
}

func (e *NestedLoopJoinExec) fillRowWithNullValue(row *Row) *Row {
newRow := &Row{
RowKeys: row.RowKeys,
Data: make([]types.Datum, len(row.Data)+e.SmallExec.Schema().Len()),
func (e *NestedLoopJoinExec) fillRowWithDefaultValue(bigRow *Row) (returnRow *Row) {
smallRow := &Row{
Data: make([]types.Datum, e.SmallExec.Schema().Len()),
}
copy(newRow.Data, row.Data)
return newRow
copy(smallRow.Data, e.defaultValues)
if e.leftSmall {
returnRow = makeJoinRow(smallRow, bigRow)
} else {
returnRow = makeJoinRow(bigRow, smallRow)
}
return returnRow
}

func (e *NestedLoopJoinExec) doJoin(bigRow *Row, match bool) ([]*Row, error) {
e.resultRows = e.resultRows[0:0]
if !match && e.outer {
row := e.fillRowWithNullValue(bigRow)
row := e.fillRowWithDefaultValue(bigRow)
e.resultRows = append(e.resultRows, row)
return e.resultRows, nil
}
for _, row := range e.innerRows {
mergedRow := makeJoinRow(bigRow, row)
var mergedRow *Row
if e.leftSmall {
mergedRow = makeJoinRow(row, bigRow)
} else {
mergedRow = makeJoinRow(bigRow, row)
}
if e.OtherFilter != nil {
matched, err := expression.EvalBool(e.OtherFilter, mergedRow.Data, e.Ctx)
if err != nil {
Expand All @@ -619,7 +630,7 @@ func (e *NestedLoopJoinExec) doJoin(bigRow *Row, match bool) ([]*Row, error) {
e.resultRows = append(e.resultRows, mergedRow)
}
if len(e.resultRows) == 0 && e.outer {
e.resultRows = append(e.resultRows, e.fillRowWithNullValue(bigRow))
e.resultRows = append(e.resultRows, e.fillRowWithDefaultValue(bigRow))
}
return e.resultRows, nil
}
Expand Down
15 changes: 15 additions & 0 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,21 @@ func (s *testSuite) TestJoin(c *C) {
tk.MustExec("insert into t values(1),(2),(3)")
result = tk.MustQuery("select * from t1 , t2 where t2.c1 = t1.c1 and t2.c2 = 0 and t1.c2 in (select * from t)")
result.Check(testkit.Rows("1 2 1 0", "2 3 2 0"))
tk.MustExec("drop table if exists t, t1")
tk.MustExec("create table t(a int primary key, b int)")
tk.MustExec("create table t1(a int, b int)")
tk.MustExec("insert into t values(1, 1), (2, 2), (3, 3)")
tk.MustExec("insert into t1 values(1, 2), (1, 3), (3, 4), (4, 5)")
// The physical plans of the two sql are tested at physical_plan_test.go
tk.MustQuery("select /*+ TIDB_INLJ(t, t1) */ * from t join t1 on t.a=t1.a").Check(testkit.Rows("1 1 1 2", "1 1 1 3", "3 3 3 4"))
tk.MustQuery("select /*+ TIDB_INLJ(t1) */ * from t1 join t on t.a=t1.a and t.a < t1.b").Check(testkit.Rows("1 2 1 1", "1 3 1 1", "3 4 3 3"))
tk.MustQuery("select /*+ TIDB_INLJ(t, t1) */ * from t right outer join t1 on t.a=t1.a").Check(testkit.Rows("1 1 1 2", "1 1 1 3", "3 3 3 4", "<nil> <nil> 4 5"))
tk.MustQuery("select /*+ TIDB_INLJ(t, t1) */ avg(t.b) from t right outer join t1 on t.a=t1.a").Check(testkit.Rows("1.6667"))

// Test that two conflict hints will return error
_, err = tk.Exec("select /*+ TIDB_INLJ(t) TIDB_SMJ(t) */ * from t join t1 on t.a=t1.a")
c.Assert(err, NotNil)

}

func (s *testSuite) TestMultiJoin(c *C) {
Expand Down
25 changes: 25 additions & 0 deletions expression/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,28 @@ func SubstituteCorCol2Constant(expr Expression) (Expression, error) {
return x.Clone(), nil
}
}

// ConvertCol2CorCol will convert the column in the condition which can be found in outerSchema to a correlated column whose
// Column is this column. And please make sure the outerSchema.Columns[i].Equal(corCols[i].Column)) holds when you call this.
func ConvertCol2CorCol(cond Expression, corCols []*CorrelatedColumn, outerSchema *Schema) Expression {
switch x := cond.(type) {
case *ScalarFunction:
newArgs := make([]Expression, 0, len(x.GetArgs()))
for _, arg := range x.GetArgs() {
newArg := ConvertCol2CorCol(arg, corCols, outerSchema)
newArgs = append(newArgs, newArg)
}
var newSf Expression
if x.FuncName.L == ast.Cast {
newSf = NewCastFunc(x.RetType, newArgs[0], x.GetCtx())
} else {
newSf, _ = NewFunction(x.GetCtx(), x.FuncName.L, x.GetType(), newArgs...)
}
return newSf
case *Column:
if pos := outerSchema.ColumnIndex(x); pos >= 0 {
return corCols[pos]
}
}
return cond
}
1 change: 1 addition & 0 deletions parser/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ var tokenMap = map[string]int{
"DISABLE": disable,
"DISTINCT": distinct,
"TIDB_SMJ": tidbSMJ,
"TIDB_INLJ": tidbINLJ,
"DIV": div,
"DO": do,
"DROP": drop,
Expand Down
5 changes: 5 additions & 0 deletions parser/parser.y
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ import (
describe "DESCRIBE"
distinct "DISTINCT"
tidbSMJ "TIDB_SMJ"
tidbINLJ "TIDB_INLJ"
div "DIV"
doubleType "DOUBLE"
drop "DROP"
Expand Down Expand Up @@ -4768,6 +4769,10 @@ TableOptimizerHintOpt:
{
$$ = &ast.TableOptimizerHint{HintName: model.NewCIStr($1), Tables: $3.([]model.CIStr)}
}
| tidbINLJ '(' HintTableList ')'
{
$$ = &ast.TableOptimizerHint{HintName: model.NewCIStr($1), Tables: $3.([]model.CIStr)}
}

SelectStmtCalcFoundRows:
%prec lowerThanCalcFoundRows
Expand Down
15 changes: 15 additions & 0 deletions parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,21 @@ func (s *testParserSuite) TestOptimizerHints(c *C) {
c.Assert(hints[1].Tables[1].L, Equals, "t4")

c.Assert(len(selectStmt.TableHints), Equals, 2)

stmt, err = parser.Parse("select /*+ TIDB_INLJ(t1, T2) tidb_inlj(t3, t4) */ c1, c2 from t1, t2 where t1.c1 = t2.c1", "", "")
c.Assert(err, IsNil)
selectStmt = stmt[0].(*ast.SelectStmt)

hints = selectStmt.TableHints
c.Assert(len(hints), Equals, 2)
c.Assert(hints[0].HintName.L, Equals, "tidb_inlj")
c.Assert(len(hints[0].Tables), Equals, 2)
c.Assert(hints[0].Tables[0].L, Equals, "t1")
c.Assert(hints[0].Tables[1].L, Equals, "t2")

c.Assert(hints[1].HintName.L, Equals, "tidb_inlj")
c.Assert(hints[1].Tables[0].L, Equals, "t3")
c.Assert(hints[1].Tables[1].L, Equals, "t4")
}

func (s *testParserSuite) TestType(c *C) {
Expand Down
58 changes: 56 additions & 2 deletions plan/decorrelate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/util/types"
)

Expand Down Expand Up @@ -158,9 +159,9 @@ func (s *decorrelateSolver) optimize(p LogicalPlan, _ context.Context, _ *idAllo
}
}
}
if sel, ok := p.(*Selection); ok {
if sel, ok := p.(*Selection); ok && len(sel.extractCorrelatedCols()) > 0 {
if _, ok := p.Children()[0].(*DataSource); ok {
_, sel.canControlScan = sel.makeScanController(true)
sel.controllerStatus = sel.checkScanController()
}
}
newChildren := make([]Plan, 0, len(p.Children()))
Expand All @@ -172,3 +173,56 @@ func (s *decorrelateSolver) optimize(p LogicalPlan, _ context.Context, _ *idAllo
p.SetChildren(newChildren...)
return p, nil
}

func (p *Selection) checkScanController() int {
var (
corColConds []expression.Expression
pkCol *expression.Column
)
ds := p.children[0].(*DataSource)
indices, includeTableScan := availableIndices(ds.indexHints, ds.tableInfo)
for _, expr := range p.Conditions {
if !expr.IsCorrelated() {
continue
}
cond := pushDownNot(expr, false, nil)
corCols := extractCorColumns(cond)
for _, col := range corCols {
*col.Data = expression.One.Value
}
newCond, _ := expression.SubstituteCorCol2Constant(cond)
corColConds = append(corColConds, newCond)
}
if ds.tableInfo.PKIsHandle && includeTableScan {
for i, col := range ds.Columns {
if mysql.HasPriKeyFlag(col.Flag) {
pkCol = ds.schema.Columns[i]
break
}
}
}
if pkCol != nil {
checker := conditionChecker{
pkName: pkCol.ColName,
length: types.UnspecifiedLength,
}
for _, cond := range corColConds {
if sf, ok := cond.(*expression.ScalarFunction); ok {
if sf.FuncName.L == ast.EQ && checker.checkScalarFunction(sf) {
return controlTableScan
}
}
}
}
for _, idx := range indices {
condsBackUp := make([]expression.Expression, 0, len(corColConds))
for _, cond := range corColConds {
condsBackUp = append(condsBackUp, cond.Clone())
}
_, _, eqCount, _ := DetachIndexScanConditions(condsBackUp, idx)
if eqCount > 0 {
return controlIndexScan
}
}
return notController
}
3 changes: 2 additions & 1 deletion plan/join_reorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ func tryToGetJoinGroup(j *Join) ([]LogicalPlan, bool) {
// 1. already reordered
// 2. not inner join
// 3. forced merge join
if j.reordered || !j.cartesianJoin || j.preferMergeJoin {
// 4. forced index nested loop join
if j.reordered || !j.cartesianJoin || j.preferMergeJoin || j.preferINLJ > 0 {
return nil, false
}
lChild := j.children[0].(LogicalPlan)
Expand Down
24 changes: 20 additions & 4 deletions plan/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import (
)

const (
// TiDBMergeJoin is hint enforce merge join
// TiDBMergeJoin is hint enforce merge join.
TiDBMergeJoin = "tidb_smj"
// TiDBIndexNestedLoopJoin is hint enforce index nested loop join.
TiDBIndexNestedLoopJoin = "tidb_inlj"
)

type idAllocator struct {
Expand Down Expand Up @@ -226,6 +228,15 @@ func (b *planBuilder) buildJoin(join *ast.Join) LogicalPlan {

if b.TableHints() != nil {
joinPlan.preferMergeJoin = b.TableHints().ifPreferMergeJoin(leftAlias, rightAlias)
if b.TableHints().ifPreferINLJ(leftAlias) {
joinPlan.preferINLJ = joinPlan.preferINLJ | preferLeftAsOuter
}
if b.TableHints().ifPreferINLJ(rightAlias) {
joinPlan.preferINLJ = joinPlan.preferINLJ | preferRightAsOuter
}
if joinPlan.preferMergeJoin && joinPlan.preferINLJ > 0 {
b.err = errors.New("Optimizer Hints is conflict")
}
}

if join.On != nil {
Expand Down Expand Up @@ -823,17 +834,22 @@ func (b *planBuilder) unfoldWildStar(p LogicalPlan, selectFields []*ast.SelectFi
}

func (b *planBuilder) pushTableHints(hints []*ast.TableOptimizerHint) bool {
var sortMergeTables []model.CIStr
var sortMergeTables, INLJTables []model.CIStr
for _, hint := range hints {
switch hint.HintName.L {
case TiDBMergeJoin:
sortMergeTables = append(sortMergeTables, hint.Tables...)
case TiDBIndexNestedLoopJoin:
INLJTables = append(INLJTables, hint.Tables...)
default:
// ignore hints that not implemented
}
}
if len(sortMergeTables) != 0 {
b.tableHintInfo = append(b.tableHintInfo, tableHintInfo{sortMergeTables})
if len(sortMergeTables) != 0 || len(INLJTables) != 0 {
b.tableHintInfo = append(b.tableHintInfo, tableHintInfo{
sortMergeJoinTables: sortMergeTables,
INLJTables: INLJTables,
})
return true
}
return false
Expand Down
Loading

0 comments on commit aaa6184

Please sign in to comment.