Skip to content

Commit

Permalink
plan: new plan supports merge join. (pingcap#3153)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored and winoros committed May 2, 2017
1 parent 588bf60 commit 98176c3
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 13 deletions.
24 changes: 24 additions & 0 deletions plan/dag_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,30 @@ func (s *testPlanSuite) TestDAGPlanBuilderJoin(c *C) {
sql: "select t.c in (select b from t s where s.a = t.a) from t",
best: "SemiJoinWithAux{TableReader(Table(t))->TableReader(Table(t))}->Projection",
},
// Test Single Merge Join.
{
sql: "select /*+ TIDB_SMJ(t1,t2)*/ * from t t1, t t2 where t1.a = t2.b",
best: "MergeJoin{TableReader(Table(t))->TableReader(Table(t))->Sort}(t1.a,t2.b)",
},
// Test Single Merge Join + Sort.
{
sql: "select /*+ TIDB_SMJ(t1,t2)*/ * from t t1, t t2 where t1.a = t2.a order by t2.a",
best: "MergeJoin{TableReader(Table(t))->TableReader(Table(t))}(t1.a,t2.a)",
},
// Test Multi Merge Join.
{
sql: "select /*+ TIDB_SMJ(t1,t2,t3)*/ * from t t1, t t2, t t3 where t1.a = t2.a and t2.a = t3.a",
best: "MergeJoin{MergeJoin{TableReader(Table(t))->TableReader(Table(t))}(t1.a,t2.a)->TableReader(Table(t))}(t2.a,t3.a)",
},
// Test Multi Merge Join + Outer Join.
{
sql: "select /*+ TIDB_SMJ(t1,t2,t3)*/ * from t t1 left outer join t t2 on t1.a = t2.a left outer join t t3 on t2.a = t3.a",
best: "MergeJoin{MergeJoin{TableReader(Table(t))->TableReader(Table(t))}(t1.a,t2.a)->Sort->TableReader(Table(t))}(t2.a,t3.a)",
},
{
sql: "select /*+ TIDB_SMJ(t1,t2,t3)*/ * from t t1 left outer join t t2 on t1.a = t2.a left outer join t t3 on t1.a = t3.a",
best: "MergeJoin{MergeJoin{TableReader(Table(t))->TableReader(Table(t))}(t1.a,t2.a)->TableReader(Table(t))}(t1.a,t3.a)",
},
// Test Apply.
{
sql: "select t.c in (select count(*) from t s , t t1 where s.a = t.a and s.a = t1.a) from t",
Expand Down
71 changes: 58 additions & 13 deletions plan/new_physical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,61 @@ func (p *LogicalJoin) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile,
}
switch p.JoinType {
case SemiJoin, LeftOuterSemiJoin:
task, err = p.convert2SemiJoin()
task, err = p.convert2SemiJoin(prop)
default:
// TODO: We will consider smj and index look up join in the future.
task, err = p.convert2HashJoin()
if p.preferUseMergeJoin() {
task, err = p.convert2MergeJoin(prop)
} else {
// TODO: We will consider index look up join in the future.
task, err = p.convert2HashJoin(prop)
}
}
if err != nil {
return nil, errors.Trace(err)
}
// Because hash join is executed by multiple goroutines, it will not propagate physical property any more.
// TODO: We will consider the problem of property again for parallel execution.
task = prop.enforceProperty(task, p.ctx, p.allocator)
return task, p.storeTaskProfile(prop, task)
}

func (p *LogicalJoin) convert2SemiJoin() (taskProfile, error) {
func (p *LogicalJoin) preferUseMergeJoin() bool {
return p.preferMergeJoin && len(p.EqualConditions) == 1
}

// TODO: Now we only process the case that the join has only one equal condition.
func (p *LogicalJoin) convert2MergeJoin(prop *requiredProp) (taskProfile, error) {
lChild := p.children[0].(LogicalPlan)
rChild := p.children[1].(LogicalPlan)
mergeJoin := PhysicalMergeJoin{
JoinType: p.JoinType,
EqualConditions: p.EqualConditions,
LeftConditions: p.LeftConditions,
RightConditions: p.RightConditions,
OtherConditions: p.OtherConditions,
}.init(p.allocator, p.ctx)
mergeJoin.SetSchema(p.schema)
lJoinKey := p.EqualConditions[0].GetArgs()[0].(*expression.Column)
lProp := &requiredProp{cols: []*expression.Column{lJoinKey}}
lTask, err := lChild.convert2NewPhysicalPlan(lProp)
if err != nil {
return nil, errors.Trace(err)
}
rJoinKey := p.EqualConditions[0].GetArgs()[1].(*expression.Column)
rProp := &requiredProp{cols: []*expression.Column{rJoinKey}}
rTask, err := rChild.convert2NewPhysicalPlan(rProp)
if err != nil {
return nil, errors.Trace(err)
}
task := mergeJoin.attach2TaskProfile(lTask, rTask)
if prop.equal(lProp) && p.JoinType != RightOuterJoin {
return task, nil
}
if prop.equal(rProp) && p.JoinType != LeftOuterJoin {
return task, nil
}
task = prop.enforceProperty(task, p.ctx, p.allocator)
return task, nil
}

func (p *LogicalJoin) convert2SemiJoin(prop *requiredProp) (taskProfile, error) {
lChild := p.children[0].(LogicalPlan)
rChild := p.children[1].(LogicalPlan)
semiJoin := PhysicalHashSemiJoin{
Expand All @@ -144,10 +184,14 @@ func (p *LogicalJoin) convert2SemiJoin() (taskProfile, error) {
if err != nil {
return nil, errors.Trace(err)
}
return semiJoin.attach2TaskProfile(lTask, rTask), nil
task := semiJoin.attach2TaskProfile(lTask, rTask)
// Because hash join is executed by multiple goroutines, it will not propagate physical property any more.
// TODO: We will consider the problem of property again for parallel execution.
task = prop.enforceProperty(task, p.ctx, p.allocator)
return task, nil
}

func (p *LogicalJoin) convert2HashJoin() (taskProfile, error) {
func (p *LogicalJoin) convert2HashJoin(prop *requiredProp) (taskProfile, error) {
lChild := p.children[0].(LogicalPlan)
rChild := p.children[1].(LogicalPlan)
hashJoin := PhysicalHashJoin{
Expand Down Expand Up @@ -179,8 +223,9 @@ func (p *LogicalJoin) convert2HashJoin() (taskProfile, error) {
hashJoin.SmallTable = 1
}
}
return hashJoin.attach2TaskProfile(lTask, rTask), nil

task := hashJoin.attach2TaskProfile(lTask, rTask)
task = prop.enforceProperty(task, p.ctx, p.allocator)
return task, nil
}

// getPushedProp will check if this sort property can be pushed or not. In order to simplify the problem, we only
Expand Down Expand Up @@ -652,9 +697,9 @@ func (p *LogicalApply) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile,
}
// TODO: refine this code.
if p.JoinType == SemiJoin || p.JoinType == LeftOuterSemiJoin {
task, err = p.convert2SemiJoin()
task, err = p.convert2SemiJoin(&requiredProp{})
} else {
task, err = p.convert2HashJoin()
task, err = p.convert2HashJoin(&requiredProp{})
}
if err != nil {
return nil, errors.Trace(err)
Expand Down
12 changes: 12 additions & 0 deletions plan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ type requiredProp struct {
desc bool
}

func (p *requiredProp) equal(prop *requiredProp) bool {
if len(p.cols) != len(prop.cols) || p.desc != prop.desc {
return false
}
for i := range p.cols {
if !p.cols[i].Equal(prop.cols[i], nil) {
return false
}
}
return true
}

func (p *requiredProp) isEmpty() bool {
return len(p.cols) == 0
}
Expand Down
13 changes: 13 additions & 0 deletions plan/task_profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,19 @@ func (p *PhysicalHashJoin) attach2TaskProfile(tasks ...taskProfile) taskProfile
}
}

func (p *PhysicalMergeJoin) attach2TaskProfile(tasks ...taskProfile) taskProfile {
lTask := finishCopTask(tasks[0].copy(), p.ctx, p.allocator)
rTask := finishCopTask(tasks[1].copy(), p.ctx, p.allocator)
np := p.Copy()
np.SetChildren(lTask.plan(), rTask.plan())
return &rootTaskProfile{
p: np,
// TODO: we will estimate the cost and count more precisely.
cst: lTask.cost() + rTask.cost(),
cnt: lTask.count() + rTask.count(),
}
}

func (p *PhysicalHashSemiJoin) attach2TaskProfile(tasks ...taskProfile) taskProfile {
lTask := finishCopTask(tasks[0].copy(), p.ctx, p.allocator)
rTask := finishCopTask(tasks[1].copy(), p.ctx, p.allocator)
Expand Down

0 comments on commit 98176c3

Please sign in to comment.