Skip to content

Commit

Permalink
plan: new plan support union. (pingcap#3085)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored and coocood committed Apr 24, 2017
1 parent 22eef1d commit f9a0abc
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 4 deletions.
133 changes: 133 additions & 0 deletions plan/dag_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,136 @@ func (s *testPlanSuite) TestDAGPlanBuilderSimpleCase(c *C) {
c.Assert(ToString(p), Equals, tt.best, Commentf("for %s", tt.sql))
}
}

func (s *testPlanSuite) TestDAGPlanBuilderBasePhysicalPlan(c *C) {
UseDAGPlanBuilder = true
defer func() {
UseDAGPlanBuilder = false
testleak.AfterTest(c)()
}()
tests := []struct {
sql string
best string
}{
// Test for update.
{
sql: "select * from t order by b limit 1 for update",
// TODO: This is not reasonable. Mysql do like this because the limit of InnoDB, should TiDB keep consistency with MySQL?
best: "TableReader(Table(t))->Lock->Sort + Limit(1) + Offset(0)",
},
// Test complex update.
{
sql: "update t set a = 5 where b < 1 order by d limit 1",
best: "TableReader(Table(t)->Sel([lt(test.t.b, 1)])->Sort + Limit(1) + Offset(0))->Sort + Limit(1) + Offset(0)->*plan.Update",
},
// Test simple update.
{
sql: "update t set a = 5",
best: "TableReader(Table(t))->*plan.Update",
},
// TODO: Test delete/update with join.
// Test complex delete.
{
sql: "delete from t where b < 1 order by d limit 1",
best: "TableReader(Table(t)->Sel([lt(test.t.b, 1)])->Sort + Limit(1) + Offset(0))->Sort + Limit(1) + Offset(0)->*plan.Delete",
},
// Test simple delete.
{
sql: "delete from t",
best: "TableReader(Table(t))->*plan.Delete",
},
// Test complex insert.
{
sql: "insert into t select * from t where b < 1 order by d limit 1",
best: "TableReader(Table(t)->Sel([lt(test.t.b, 1)])->Sort + Limit(1) + Offset(0))->Sort + Limit(1) + Offset(0)->*plan.Insert",
},
// Test simple insert.
{
sql: "insert into t values(0,0,0,0,0,0,0)",
best: "*plan.Insert",
},
// Test dual.
{
sql: "select 1",
best: "Dual->Projection",
},
// Test show.
{
sql: "show tables",
best: "*plan.Show",
},
}
for _, tt := range tests {
comment := Commentf("for %s", tt.sql)
stmt, err := s.ParseOneStmt(tt.sql, "", "")
c.Assert(err, IsNil, comment)

is, err := mockResolve(stmt)
c.Assert(err, IsNil)

builder := &planBuilder{
allocator: new(idAllocator),
ctx: mockContext(),
colMapper: make(map[*ast.ColumnNameExpr]int),
is: is,
}
p := builder.build(stmt)
c.Assert(builder.err, IsNil)
p, err = doOptimize(builder.optFlag, p.(LogicalPlan), builder.ctx, builder.allocator)
c.Assert(err, IsNil)
c.Assert(ToString(p), Equals, tt.best, Commentf("for %s", tt.sql))
}
}

func (s *testPlanSuite) TestDAGPlanBuilderUnion(c *C) {
UseDAGPlanBuilder = true
defer func() {
UseDAGPlanBuilder = false
testleak.AfterTest(c)()
}()
tests := []struct {
sql string
best string
}{
// Test simple union.
{
sql: "select * from t union all select * from t",
best: "UnionAll{TableReader(Table(t))->TableReader(Table(t))}",
},
// Test Order by + Union.
{
sql: "select * from t union all (select * from t) order by a ",
best: "UnionAll{TableReader(Table(t))->TableReader(Table(t))}->Sort",
},
// Test Limit + Union.
{
sql: "select * from t union all (select * from t) limit 1",
best: "UnionAll{TableReader(Table(t)->Limit)->Limit->TableReader(Table(t)->Limit)->Limit}->Limit",
},
// Test TopN + Union.
{
sql: "select a from t union all (select c from t) order by a limit 1",
best: "UnionAll{TableReader(Table(t)->Limit)->Limit->IndexReader(Index(t.c_d_e)[[<nil>,+inf]]->Limit)->Limit}->Sort + Limit(1) + Offset(0)",
},
}
for _, tt := range tests {
comment := Commentf("for %s", tt.sql)
stmt, err := s.ParseOneStmt(tt.sql, "", "")
c.Assert(err, IsNil, comment)

is, err := mockResolve(stmt)
c.Assert(err, IsNil)

builder := &planBuilder{
allocator: new(idAllocator),
ctx: mockContext(),
colMapper: make(map[*ast.ColumnNameExpr]int),
is: is,
}
p := builder.build(stmt)
c.Assert(builder.err, IsNil)
p, err = doOptimize(builder.optFlag, p.(LogicalPlan), builder.ctx, builder.allocator)
c.Assert(err, IsNil)
c.Assert(ToString(p), Equals, tt.best, Commentf("for %s", tt.sql))
}
}
5 changes: 3 additions & 2 deletions plan/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,9 @@ func (a *havingAndOrderbyExprResolver) Leave(n ast.Node) (node ast.Node, ok bool
return n, true
}

// resolveHavingAndOrderBy will process aggregate functions and resolve the columns that don't exist in select fields.
// If we found some columns that are not in select fields, we will append it to select fields and update the colMapper.
// When we rewrite the order by / having expression, we will find column in map at first.
func (b *planBuilder) resolveHavingAndOrderBy(sel *ast.SelectStmt, p LogicalPlan) (
map[*ast.AggregateFuncExpr]int, map[*ast.AggregateFuncExpr]int) {
extractor := &havingAndOrderbyExprResolver{
Expand Down Expand Up @@ -1155,7 +1158,6 @@ func (b *planBuilder) buildUpdate(update *ast.UpdateStmt) LogicalPlan {
b.visitInfo = appendVisitInfo(b.visitInfo, mysql.UpdatePriv, dbName, t.Name.L, "")
}

_, _ = b.resolveHavingAndOrderBy(sel, p)
if sel.Where != nil {
p = b.buildSelection(p, sel.Where, nil)
if b.err != nil {
Expand Down Expand Up @@ -1221,7 +1223,6 @@ func (b *planBuilder) buildDelete(delete *ast.DeleteStmt) LogicalPlan {
return nil
}

_, _ = b.resolveHavingAndOrderBy(sel, p)
if sel.Where != nil {
p = b.buildSelection(p, sel.Where, nil)
if b.err != nil {
Expand Down
25 changes: 24 additions & 1 deletion plan/new_physical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ func (p *baseLogicalPlan) convert2NewPhysicalPlan(prop *requiredProp) (taskProfi
if err != nil {
return nil, errors.Trace(err)
}
task = p.basePlan.self.(PhysicalPlan).attach2TaskProfile(task)
}
task = p.basePlan.self.(PhysicalPlan).attach2TaskProfile(task)
task = prop.enforceProperty(task, p.basePlan.ctx, p.basePlan.allocator)
if !prop.isEmpty() && len(p.basePlan.children) > 0 {
orderedTask, err := p.basePlan.children[0].(LogicalPlan).convert2NewPhysicalPlan(prop)
Expand Down Expand Up @@ -434,6 +434,29 @@ func (p *DataSource) convertToTableScan(prop *requiredProp) (task taskProfile, e
return task, nil
}

func (p *Union) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, error) {
task, err := p.getTaskProfile(prop)
if err != nil {
return nil, errors.Trace(err)
}
if task != nil {
return task, nil
}
// Union is a sort blocker. We can only enforce it.
tasks := make([]taskProfile, 0, len(p.children))
for _, child := range p.children {
task, err = child.(LogicalPlan).convert2NewPhysicalPlan(&requiredProp{})
if err != nil {
return nil, errors.Trace(err)
}
tasks = append(tasks, task)
}
task = p.attach2TaskProfile(tasks...)
task = prop.enforceProperty(task, p.ctx, p.allocator)

return task, p.storeTaskProfile(prop, task)
}

func (ts *PhysicalTableScan) addPushedDownSelection(copTask *copTaskProfile) {
// Add filter condition to table plan now.
if len(ts.filterCondition) > 0 {
Expand Down
22 changes: 21 additions & 1 deletion plan/task_profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ func (t *copTaskProfile) finishIndexPlan() {
}

func (p *basePhysicalPlan) attach2TaskProfile(tasks ...taskProfile) taskProfile {
return attachPlan2TaskProfile(p.basePlan.self.(PhysicalPlan).Copy(), tasks[0])
profile := tasks[0].copy()
if cop, ok := profile.(*copTaskProfile); ok {
profile = cop.finishTask(p.basePlan.ctx, p.basePlan.allocator)
}
return attachPlan2TaskProfile(p.basePlan.self.(PhysicalPlan).Copy(), profile)
}

// finishTask means we close the coprocessor task and create a root task.
Expand Down Expand Up @@ -266,6 +270,22 @@ func (p *Projection) attach2TaskProfile(profiles ...taskProfile) taskProfile {
return nil
}

func (p *Union) attach2TaskProfile(profiles ...taskProfile) taskProfile {
np := p.Copy()
newTask := &rootTaskProfile{p: np}
newChildren := make([]Plan, 0, len(p.children))
for _, profile := range profiles {
if cop, ok := profile.(*copTaskProfile); ok {
profile = cop.finishTask(p.ctx, p.allocator)
}
newTask.cst += profile.cost()
newTask.cnt += profile.count()
newChildren = append(newChildren, profile.plan())
}
np.SetChildren(newChildren...)
return newTask
}

func (sel *Selection) attach2TaskProfile(profiles ...taskProfile) taskProfile {
profile := profiles[0].copy()
if cop, ok := profile.(*copTaskProfile); ok {
Expand Down

0 comments on commit f9a0abc

Please sign in to comment.