Skip to content

Commit

Permalink
plan: first step of join auto-select. (pingcap#3623)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored Jul 10, 2017
1 parent cb7b400 commit bd64339
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 231 deletions.
6 changes: 4 additions & 2 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,10 @@ func indexValuesToKVRanges(tid, idxID int64, values [][]types.Datum) ([]kv.KeyRa
if err != nil {
return nil, errors.Trace(err)
}
rangeKey := tablecodec.EncodeIndexSeekKey(tid, idxID, valKey)
krs = append(krs, kv.KeyRange{StartKey: rangeKey, EndKey: rangeKey})
valKeyNext := []byte(kv.Key(valKey).PrefixNext())
rangeBeginKey := tablecodec.EncodeIndexSeekKey(tid, idxID, valKey)
rangeEndKey := tablecodec.EncodeIndexSeekKey(tid, idxID, valKeyNext)
krs = append(krs, kv.KeyRange{StartKey: rangeBeginKey, EndKey: rangeEndKey})
}
return krs, nil
}
Expand Down
2 changes: 1 addition & 1 deletion executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ func (s *testSuite) TestSubquery(c *C) {
tk.MustExec("create table t(id int primary key, v int)")
tk.MustExec("insert into t values(1, 1), (2, 2), (3, 3)")
result = tk.MustQuery("select (select t.id from t where s.id < 2 and t.id = s.id) from t s")
result.Check(testkit.Rows("1", "<nil>", "<nil>"))
result.Sort().Check(testkit.Rows("1", "<nil>", "<nil>"))
rs, err := tk.Exec("select (select t.id from t where t.id = t.v and t.v != s.id) from t s")
c.Check(err, IsNil)
_, err = tidb.GetRows(rs)
Expand Down
2 changes: 2 additions & 0 deletions executor/new_distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ func (e *IndexLookUpExecutor) doRequestForDatums(values [][]types.Datum, goCtx g
return errors.Trace(err)
}
e.result.Fetch(goCtx)
e.taskChan = make(chan *lookupTableTask, LookupTableTaskChannelSize)
go e.fetchHandlesAndStartWorkers()
return nil
}

Expand Down
30 changes: 20 additions & 10 deletions plan/dag_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,20 +183,25 @@ func (s *testPlanSuite) TestDAGPlanBuilderJoin(c *C) {
}{
{
sql: "select * from t t1 join t t2 on t1.a = t2.a join t t3 on t1.a = t3.a",
best: "LeftHashJoin{LeftHashJoin{TableReader(Table(t))->TableReader(Table(t))}(t1.a,t2.a)->TableReader(Table(t))}(t1.a,t3.a)",
best: "IndexJoin{IndexJoin{TableReader(Table(t))->TableReader(Table(t))}(t1.a,t2.a)->TableReader(Table(t))}(t1.a,t3.a)",
},
{
sql: "select * from t t1 join t t2 on t1.a = t2.a order by t1.a",
best: "LeftHashJoin{TableReader(Table(t))->TableReader(Table(t))}(t1.a,t2.a)->Sort",
sql: "select * from t t1 join t t2 on t1.b = t2.b join t t3 on t1.b = t3.b",
best: "LeftHashJoin{LeftHashJoin{TableReader(Table(t))->TableReader(Table(t))}(t1.b,t2.b)->TableReader(Table(t))}(t1.b,t3.b)",
},
{
sql: "select * from t t1 left outer join t t2 on t1.a = t2.a right outer join t t3 on t1.a = t3.a",
best: "RightHashJoin{LeftHashJoin{TableReader(Table(t))->TableReader(Table(t))}(t1.a,t2.a)->TableReader(Table(t))}(t1.a,t3.a)",
sql: "select * from t t1 join t t2 on t1.a = t2.a order by t1.a",
best: "MergeJoin{TableReader(Table(t))->TableReader(Table(t))}(t1.a,t2.a)",
},
{
sql: "select * from t t1 join t t2 on t1.a = t2.a join t t3 on t1.a = t3.a and t1.b = 1 and t3.c = 1",
best: "LeftHashJoin{RightHashJoin{TableReader(Table(t)->Sel([eq(t1.b, 1)]))->TableReader(Table(t))}(t1.a,t2.a)->IndexLookUp(Index(t.c_d_e)[[1,1]], Table(t))}(t1.a,t3.a)",
sql: "select * from t t1 left outer join t t2 on t1.a = t2.a right outer join t t3 on t1.a = t3.a",
best: "RightHashJoin{IndexJoin{TableReader(Table(t))->TableReader(Table(t))}(t1.a,t2.a)->TableReader(Table(t))}(t1.a,t3.a)",
},
// TODO: Still have some problems, fix it in next pr.
//{
// sql: "select * from t t1 join t t2 on t1.a = t2.a join t t3 on t1.a = t3.a and t1.b = 1 and t3.c = 1",
// best: "LeftHashJoin{RightHashJoin{TableReader(Table(t)->Sel([eq(t1.b, 1)]))->TableReader(Table(t))}(t1.a,t2.a)->IndexLookUp(Index(t.c_d_e)[[1,1]], Table(t))}(t1.a,t3.a)",
//},
{
sql: "select * from t where t.c in (select b from t s where s.a = t.a)",
best: "SemiJoin{TableReader(Table(t))->TableReader(Table(t))}",
Expand Down Expand Up @@ -232,12 +237,17 @@ func (s *testPlanSuite) TestDAGPlanBuilderJoin(c *C) {
// Test Apply.
{
sql: "select t.c in (select count(*) from t s , t t1 where s.a = t.a and s.a = t1.a) from t",
best: "Apply{TableReader(Table(t))->RightHashJoin{TableReader(Table(t))->Sel([eq(s.a, test.t.a)])->TableReader(Table(t))}(s.a,t1.a)->HashAgg}->Projection",
best: "Apply{TableReader(Table(t))->IndexJoin{TableReader(Table(t))->Sel([eq(s.a, test.t.a)])->TableReader(Table(t))}(s.a,t1.a)->HashAgg}->Projection",
},
{
sql: "select (select count(*) from t s , t t1 where s.a = t.a and s.a = t1.a) from t",
best: "Apply{TableReader(Table(t))->RightHashJoin{TableReader(Table(t))->Sel([eq(s.a, test.t.a)])->TableReader(Table(t))}(s.a,t1.a)->HashAgg}->Projection",
best: "Apply{TableReader(Table(t))->IndexJoin{TableReader(Table(t))->Sel([eq(s.a, test.t.a)])->TableReader(Table(t))}(s.a,t1.a)->HashAgg}->Projection",
},
// TODO: Still have some problems. Reopen it in the future.
//{
// sql: "select (select count(*) from t s , t t1 where s.a = t.a and s.a = t1.a) from t order by t.a",
// best: "Apply{TableReader(Table(t))->IndexJoin{TableReader(Table(t))->Sel([eq(s.a, test.t.a)])->TableReader(Table(t))}(s.a,t1.a)->HashAgg}->Projection",
//},
// Test Index Join + TableScan.
{
sql: "select /*+ TIDB_INLJ(t1, t2) */ * from t t1, t t2 where t1.a = t2.a",
Expand Down Expand Up @@ -281,7 +291,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderJoin(c *C) {
// Test Index Join failed.
{
sql: "select /*+ TIDB_INLJ(t1) */ * from t t1 right outer join t t2 on t1.a = t2.b",
best: "RightHashJoin{TableReader(Table(t))->TableReader(Table(t))}(t1.a,t2.b)",
best: "IndexJoin{TableReader(Table(t))->TableReader(Table(t))}(t2.b,t1.a)->Projection",
},
}
for _, tt := range tests {
Expand Down
4 changes: 3 additions & 1 deletion plan/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,9 @@ func (b *planBuilder) buildApplyWithJoinType(outerPlan, innerPlan LogicalPlan, t
b.optFlag = b.optFlag | flagBuildKeyInfo
b.optFlag = b.optFlag | flagDecorrelate
ap := LogicalApply{LogicalJoin: LogicalJoin{JoinType: tp}}.init(b.allocator, b.ctx)

if tp == LeftOuterJoin {
ap.DefaultValues = make([]types.Datum, innerPlan.Schema().Len())
}
addChild(ap, outerPlan)
addChild(ap, innerPlan)
ap.SetSchema(expression.MergeSchema(outerPlan.Schema(), innerPlan.Schema()))
Expand Down
2 changes: 1 addition & 1 deletion plan/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (p *DataSource) getPKIsHandleCol() *expression.Column {
if !p.tableInfo.PKIsHandle {
return nil
}
for i, col := range p.tableInfo.Columns {
for i, col := range p.Columns {
if mysql.HasPriKeyFlag(col.Flag) {
return p.schema.Columns[i]
}
Expand Down
Loading

0 comments on commit bd64339

Please sign in to comment.