Skip to content

Commit

Permalink
executor: support Chunk for UnionExec (pingcap#5229)
Browse files Browse the repository at this point in the history
  • Loading branch information
zz-jason authored Dec 5, 2017
1 parent a1428ac commit 5155361
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 64 deletions.
14 changes: 9 additions & 5 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,14 +812,18 @@ func (b *executorBuilder) buildMaxOneRow(v *plan.MaxOneRow) Executor {
}

func (b *executorBuilder) buildUnion(v *plan.Union) Executor {
srcs := make([]Executor, len(v.Children()))
for i, sel := range v.Children() {
selExec := b.build(sel)
srcs[i] = selExec
childExecs := make([]Executor, len(v.Children()))
for i, child := range v.Children() {
childExecs[i] = b.build(child)
if b.err != nil {
b.err = errors.Trace(b.err)
return nil
}
}
e := &UnionExec{
baseExecutor: newBaseExecutor(v.Schema(), b.ctx, srcs...),
baseExecutor: newBaseExecutor(v.Schema(), b.ctx, childExecs...),
}
e.supportChk = true
return e
}

Expand Down
189 changes: 140 additions & 49 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,29 +884,62 @@ func (e *MaxOneRowExec) NextChunk(chk *chunk.Chunk) error {
return nil
}

// UnionExec represents union executor.
// UnionExec has multiple source Executors, it executes them sequentially, and do conversion to the same type
// as source Executors may has different field type, we need to do conversion.
// UnionExec pulls all it's childrens result and returns to its parent directly.
// A "resultPuller" is started for every child to pull result from that child and push it to the "resultPool", the used
// "Chunk" is obtained from the corresponding "resourcePool". All resultPullers are running concurrently.
// +----------------+
// +---> resourcePool 1 ---> | resultPuller 1 |-----+
// | +----------------+ |
// | |
// | +----------------+ v
// +---> resourcePool 2 ---> | resultPuller 2 |-----> resultPool ---+
// | +----------------+ ^ |
// | ...... | |
// | +----------------+ | |
// +---> resourcePool n ---> | resultPuller n |-----+ |
// | +----------------+ |
// | |
// | +-------------+ |
// |--------------------------| main thread | <---------------------+
// +-------------+
type UnionExec struct {
baseExecutor

finished atomic.Value
resultCh chan *execResult
rows []Row
cursor int
wg sync.WaitGroup
closedCh chan struct{}
stopFetchData atomic.Value
resultCh chan *execResult
rows []Row
cursor int
wg sync.WaitGroup

// For chunk execution.
finished chan struct{}
resourcePools []chan *chunk.Chunk
resultPool chan *unionWorkerResult
initialized bool
}

type execResult struct {
rows []Row
err error
}

func (e *UnionExec) waitAllFinished() {
// unionWorkerResult stores the result for a union worker.
// A "resultPuller" is started for every child to pull result from that child, unionWorkerResult is used to store that pulled result.
// "src" is used for Chunk resuse: after pulling result from "resultPool", main-thread must push a valid unused Chunk to "src" to
// enable the corresponding "resultPuller" continue to work.
type unionWorkerResult struct {
chk *chunk.Chunk
err error
src chan<- *chunk.Chunk
}

func (e *UnionExec) waitAllFinished(forChunk bool) {
e.wg.Wait()
close(e.resultCh)
close(e.closedCh)
if forChunk {
close(e.resultPool)
} else {
close(e.resultCh)
}
}

func (e *UnionExec) fetchData(goCtx goctx.Context, idx int) {
Expand All @@ -918,61 +951,99 @@ func (e *UnionExec) fetchData(goCtx goctx.Context, idx int) {
err: nil,
}
for i := 0; i < batchSize; i++ {
if e.finished.Load().(bool) {
if e.stopFetchData.Load().(bool) {
return
}
row, err := e.children[idx].Next(goCtx)
if err != nil {
e.finished.Store(true)
result.err = err
e.resultCh <- result
return
result.err = errors.Trace(err)
break
}
if row == nil {
if len(result.rows) > 0 {
e.resultCh <- result
}
return
}
// TODO: Add cast function in plan building phase.
for j := range row {
col := e.schema.Columns[j]
val, err := row[j].ConvertTo(e.ctx.GetSessionVars().StmtCtx, col.RetType)
if err != nil {
e.finished.Store(true)
result.err = err
e.resultCh <- result
return
}
row[j] = val
break
}
result.rows = append(result.rows, row)
}
e.resultCh <- result
if len(result.rows) == 0 && result.err == nil {
return
}
if result.err != nil {
e.stopFetchData.Store(true)
}
select {
case e.resultCh <- result:
case <-e.finished:
return
}
}
}

// Open implements the Executor Open interface.
func (e *UnionExec) Open(goCtx goctx.Context) error {
e.finished.Store(false)
e.resultCh = make(chan *execResult, len(e.children))
e.closedCh = make(chan struct{})
e.cursor = 0
var err error
for i, child := range e.children {
err = child.Open(goCtx)
if err != nil {
break
if err := e.baseExecutor.Open(goCtx); err != nil {
return errors.Trace(err)
}
e.stopFetchData.Store(false)
e.initialized = false
e.finished = make(chan struct{})
return nil
}

func (e *UnionExec) initialize(goCtx goctx.Context, forChunk bool) {
if forChunk {
e.resultPool = make(chan *unionWorkerResult, len(e.children))
e.resourcePools = make([]chan *chunk.Chunk, len(e.children))
for i := range e.children {
e.resourcePools[i] = make(chan *chunk.Chunk, 1)
e.resourcePools[i] <- e.childrenResults[i]
e.wg.Add(1)
go e.resultPuller(i)
}
} else {
e.resultCh = make(chan *execResult, len(e.children))
e.cursor = 0
for i := range e.children {
e.wg.Add(1)
go e.fetchData(goCtx, i)
}
}
go e.waitAllFinished(forChunk)
}

func (e *UnionExec) resultPuller(childID int) {
defer e.wg.Done()
result := &unionWorkerResult{
err: nil,
chk: nil,
src: e.resourcePools[childID],
}
for {
if e.stopFetchData.Load().(bool) {
return
}
select {
case <-e.finished:
return
case result.chk = <-e.resourcePools[childID]:
}
result.err = errors.Trace(e.children[childID].NextChunk(result.chk))
if result.err == nil && result.chk.NumRows() == 0 {
return
}
e.resultPool <- result
if result.err != nil {
e.stopFetchData.Store(true)
return
}
e.wg.Add(1)
go e.fetchData(goCtx, i)
}
go e.waitAllFinished()
return errors.Trace(err)
}

// Next implements the Executor Next interface.
func (e *UnionExec) Next(goCtx goctx.Context) (Row, error) {
if !e.initialized {
e.initialize(goCtx, false)
e.initialized = true
}
if e.cursor >= len(e.rows) {
result, ok := <-e.resultCh
if !ok {
Expand All @@ -992,10 +1063,30 @@ func (e *UnionExec) Next(goCtx goctx.Context) (Row, error) {
return row, nil
}

// NextChunk implements the Executor NextChunk interface.
func (e *UnionExec) NextChunk(chk *chunk.Chunk) error {
chk.Reset()
if !e.initialized {
e.initialize(nil, true)
e.initialized = true
}
result, ok := <-e.resultPool
if !ok {
return nil
}
if result.err != nil {
return errors.Trace(result.err)
}

chk.SwapColumns(result.chk)
result.src <- result.chk
return nil
}

// Close implements the Executor Close interface.
func (e *UnionExec) Close() error {
e.finished.Store(true)
<-e.closedCh
e.rows = nil
close(e.finished)
e.resourcePools = nil
return errors.Trace(e.baseExecutor.Close())
}
8 changes: 4 additions & 4 deletions plan/dag_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,22 +594,22 @@ func (s *testPlanSuite) TestDAGPlanBuilderUnion(c *C) {
// Test simple union.
{
sql: "select * from t union all select * from t",
best: "UnionAll{TableReader(Table(t))->TableReader(Table(t))}",
best: "UnionAll{TableReader(Table(t))->Projection->TableReader(Table(t))->Projection}",
},
// Test Order by + Union.
{
sql: "select * from t union all (select * from t) order by a ",
best: "UnionAll{TableReader(Table(t))->TableReader(Table(t))}->Sort",
best: "UnionAll{TableReader(Table(t))->Projection->TableReader(Table(t))->Projection}->Sort",
},
// Test Limit + Union.
{
sql: "select * from t union all (select * from t) limit 1",
best: "UnionAll{TableReader(Table(t)->Limit)->TableReader(Table(t)->Limit)}->Limit",
best: "UnionAll{TableReader(Table(t)->Limit)->Projection->TableReader(Table(t)->Limit)->Projection}->Limit",
},
// Test TopN + Union.
{
sql: "select a from t union all (select c from t) order by a limit 1",
best: "UnionAll{TableReader(Table(t)->Limit)->IndexReader(Index(t.c_d_e)[[<nil>,+inf]]->Limit)}->TopN([t.a],0,1)",
best: "UnionAll{TableReader(Table(t))->Projection->TableReader(Table(t))->Projection}->TopN([t.a],0,1)",
},
}
for _, tt := range tests {
Expand Down
30 changes: 29 additions & 1 deletion plan/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,34 @@ func joinFieldType(a, b *types.FieldType) *types.FieldType {
return resultTp
}

func (b *planBuilder) buildProjection4Union(u *Union) {
schema4Union := u.schema
for childID, child := range u.children {
exprs := make([]expression.Expression, len(child.Schema().Columns))
needProjection := false
for i, srcCol := range child.Schema().Columns {
dstType := schema4Union.Columns[i].RetType
srcType := srcCol.RetType
if !srcType.Equal(dstType) {
exprs[i] = expression.BuildCastFunction(b.ctx, srcCol.Clone(), dstType)
needProjection = true
} else {
exprs[i] = srcCol.Clone()
}
}
if needProjection {
proj := Projection{Exprs: exprs}.init(b.ctx)
proj.schema = schema4Union.Clone()
for _, col := range proj.schema.Columns {
col.FromID = proj.ID()
}
setParentAndChildren(proj, u.children[childID])
u.children[childID] = proj
}
}
setParentAndChildren(u, u.children...)
}

func (b *planBuilder) buildUnion(union *ast.UnionStmt) LogicalPlan {
u := Union{}.init(b.ctx)
u.children = make([]Plan, len(union.SelectList.Selects))
Expand Down Expand Up @@ -627,8 +655,8 @@ func (b *planBuilder) buildUnion(union *ast.UnionStmt) LogicalPlan {
v.FromID = u.id
v.DBName = model.NewCIStr("")
}

u.SetSchema(firstSchema)
b.buildProjection4Union(u)
var p LogicalPlan = u
if union.Distinct {
p = b.buildDistinct(u, u.Schema().Len())
Expand Down
10 changes: 5 additions & 5 deletions plan/logical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ func (s *testPlanSuite) TestPredicatePushDown(c *C) {
},
{
sql: "select a, d from (select * from t union all select * from t union all select * from t) z where a < 10",
best: "UnionAll{DataScan(t)->Projection->DataScan(t)->Projection->DataScan(t)->Projection}->Projection",
best: "UnionAll{DataScan(t)->Sel([lt(cast(test.t.a), 10)])->Projection->Projection->DataScan(t)->Sel([lt(cast(test.t.a), 10)])->Projection->Projection->DataScan(t)->Sel([lt(cast(test.t.a), 10)])->Projection->Projection}->Projection",
},
{
sql: "select (select count(*) from t where t.a = k.a) from t k",
Expand Down Expand Up @@ -767,7 +767,7 @@ func (s *testPlanSuite) TestEagerAggregation(c *C) {
},
{
sql: "select sum(c1) from (select c c1, d c2 from t a union all select a c1, b c2 from t b union all select b c1, e c2 from t c) x group by c2",
best: "UnionAll{DataScan(a)->Aggr(sum(a.c),firstrow(a.d))->DataScan(b)->Aggr(sum(b.a),firstrow(b.b))->DataScan(c)->Aggr(sum(c.b),firstrow(c.e))}->Aggr(sum(join_agg_0))->Projection",
best: "UnionAll{DataScan(a)->Projection->Aggr(sum(cast(a.c1)),firstrow(cast(a.c2)))->DataScan(b)->Projection->Aggr(sum(cast(b.c1)),firstrow(cast(b.c2)))->DataScan(c)->Projection->Aggr(sum(cast(c.c1)),firstrow(c.c2))}->Aggr(sum(join_agg_0))->Projection",
},
{
sql: "select max(a.b), max(b.b) from t a join t b on a.c = b.c group by a.a",
Expand All @@ -779,7 +779,7 @@ func (s *testPlanSuite) TestEagerAggregation(c *C) {
},
{
sql: "select max(c.b) from (select * from t a union all select * from t b) c group by c.a",
best: "UnionAll{DataScan(a)->Projection->Projection->DataScan(b)->Projection->Projection}->Aggr(max(join_agg_0))->Projection",
best: "UnionAll{DataScan(a)->Projection->Aggr(max(cast(a.b)),firstrow(cast(a.a)))->DataScan(b)->Projection->Aggr(max(cast(b.b)),firstrow(cast(b.a)))}->Aggr(max(join_agg_0))->Projection",
},
{
sql: "select max(a.c) from t a join t b on a.a=b.a and a.b=b.b group by a.b",
Expand Down Expand Up @@ -1524,12 +1524,12 @@ func (s *testPlanSuite) TestTopNPushDown(c *C) {
// Test TopN + UA + Proj.
{
sql: "select * from t union all (select * from t s) order by a,b limit 5",
best: "UnionAll{DataScan(t)->TopN([test.t.a test.t.b],0,5)->Projection->DataScan(s)->TopN([s.a s.b],0,5)->Projection}->TopN([t.a t.b],0,5)",
best: "UnionAll{DataScan(t)->TopN([cast(test.t.a) cast(test.t.b)],0,5)->Projection->DataScan(s)->TopN([cast(s.a) cast(s.b)],0,5)->Projection}->TopN([t.a t.b],0,5)",
},
// Test TopN + UA + Proj.
{
sql: "select * from t union all (select * from t s) order by a,b limit 5, 5",
best: "UnionAll{DataScan(t)->TopN([test.t.a test.t.b],0,10)->Projection->DataScan(s)->TopN([s.a s.b],0,10)->Projection}->TopN([t.a t.b],5,5)",
best: "UnionAll{DataScan(t)->TopN([cast(test.t.a) cast(test.t.b)],0,10)->Projection->DataScan(s)->TopN([cast(s.a) cast(s.b)],0,10)->Projection}->TopN([t.a t.b],5,5)",
},
// Test Limit + UA + Proj + Sort.
{
Expand Down
Loading

0 comments on commit 5155361

Please sign in to comment.