Skip to content

Commit

Permalink
*: make union execution parallel and fix a union plan bug. (pingcap#2195
Browse files Browse the repository at this point in the history
)
  • Loading branch information
hanfei1991 authored Dec 8, 2016
1 parent 8a5a192 commit 292e58c
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 47 deletions.
112 changes: 86 additions & 26 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ func (e *HashJoinExec) fetchBigExec() {
// prepare runs the first time when 'Next' is called, it starts one worker goroutine to fetch rows from the big table,
// and reads all data from the small table to build a hash table, then starts multiple join worker goroutines.
func (e *HashJoinExec) prepare() error {
e.closeCh = make(chan struct{}, 1)
e.closeCh = make(chan struct{})
e.finished.Store(false)
e.bigTableRows = make([]chan []*Row, e.concurrency)
e.wg = sync.WaitGroup{}
Expand Down Expand Up @@ -1907,51 +1907,111 @@ func (e *TrimExec) Next() (*Row, error) {
// UnionExec has multiple source Executors, it executes them sequentially, and do conversion to the same type
// as source Executors may has different field type, we need to do conversion.
type UnionExec struct {
schema expression.Schema
Srcs []Executor
ctx context.Context
cursor int
schema expression.Schema
Srcs []Executor
ctx context.Context
inited bool
finished atomic.Value
rowsCh chan []*Row
rows []*Row
cursor int
wg sync.WaitGroup
closedCh chan struct{}
errCh chan error
}

// Schema implements the Executor Schema interface.
func (e *UnionExec) Schema() expression.Schema {
return e.schema
}

func (e *UnionExec) waitAllFinished() {
e.wg.Wait()
close(e.rowsCh)
close(e.closedCh)
}

func (e *UnionExec) fetchData(idx int) {
defer e.wg.Done()
for {
rows := make([]*Row, 0, batchSize)
for i := 0; i < batchSize; i++ {
if e.finished.Load().(bool) {
return
}
row, err := e.Srcs[idx].Next()
if err != nil {
e.finished.Store(true)
e.errCh <- err
return
}
if row == nil {
if len(rows) > 0 {
e.rowsCh <- rows
}
return
}
if idx != 0 {
// TODO: Add cast function in plan building phase.
for j := range row.Data {
col := e.schema[j]
val, err := row.Data[j].ConvertTo(e.ctx.GetSessionVars().StmtCtx, col.RetType)
if err != nil {
e.finished.Store(true)
e.errCh <- err
return
}
row.Data[j] = val
}
}
rows = append(rows, row)
}
e.rowsCh <- rows
}
}

// Next implements the Executor Next interface.
func (e *UnionExec) Next() (*Row, error) {
for {
if e.cursor >= len(e.Srcs) {
return nil, nil
if !e.inited {
e.finished.Store(false)
e.rowsCh = make(chan []*Row, batchSize*len(e.Srcs))
e.errCh = make(chan error, len(e.Srcs))
e.closedCh = make(chan struct{})
for i := range e.Srcs {
e.wg.Add(1)
go e.fetchData(i)
}
go e.waitAllFinished()
e.inited = true
}
if e.cursor >= len(e.rows) {
var rows []*Row
var err error
select {
case rows, _ = <-e.rowsCh:
case err, _ = <-e.errCh:
}
sel := e.Srcs[e.cursor]
row, err := sel.Next()
if err != nil {
return nil, errors.Trace(err)
}
if row == nil {
e.cursor++
continue
}
if e.cursor != 0 {
for i := range row.Data {
// The column value should be casted as the same type of the first select statement in corresponding position.
col := e.schema[i]
var val types.Datum
val, err = row.Data[i].ConvertTo(e.ctx.GetSessionVars().StmtCtx, col.RetType)
if err != nil {
return nil, errors.Trace(err)
}
row.Data[i] = val
}
if rows == nil {
return nil, nil
}
return row, nil
e.rows = rows
e.cursor = 0
}
row := e.rows[e.cursor]
e.cursor++
return row, nil
}

// Close implements the Executor Close interface.
func (e *UnionExec) Close() error {
e.finished.Store(true)
<-e.closedCh
e.cursor = 0
e.inited = false
e.rows = nil
for _, sel := range e.Srcs {
er := sel.Close()
if er != nil {
Expand Down
34 changes: 27 additions & 7 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func (s *testSuite) TestUnion(c *C) {
r := tk.MustQuery(testSQL)
r.Check(testkit.Rows("1", "2"))

testSQL = `select * from (select id from union_test union select id from union_test) t;`
testSQL = `select * from (select id from union_test union select id from union_test) t order by id;`
tk.MustExec("begin")
r = tk.MustQuery(testSQL)
r.Check(testkit.Rows("1", "2"))
Expand All @@ -465,10 +465,10 @@ func (s *testSuite) TestUnion(c *C) {
r = tk.MustQuery("select 1 union all select 1 union select 1")
r.Check(testkit.Rows("1"))

r = tk.MustQuery("select 1 union (select 2) limit 1")
r = tk.MustQuery("select 1 as a union (select 2) order by a limit 1")
r.Check(testkit.Rows("1"))

r = tk.MustQuery("select 1 union (select 2) limit 1, 1")
r = tk.MustQuery("select 1 as a union (select 2) order by a limit 1, 1")
r.Check(testkit.Rows("2"))

r = tk.MustQuery("select id from union_test union all (select 1) order by id desc")
Expand All @@ -477,12 +477,12 @@ func (s *testSuite) TestUnion(c *C) {
r = tk.MustQuery("select id as a from union_test union (select 1) order by a desc")
r.Check(testkit.Rows("2", "1"))

r = tk.MustQuery(`select null union select "abc"`)
r = tk.MustQuery(`select null as a union (select "abc") order by a`)
rowStr1 := fmt.Sprintf("%v", nil)
r.Check(testkit.Rows(rowStr1, "abc"))

r = tk.MustQuery(`select "abc" union select 1`)
r.Check(testkit.Rows("abc", "1"))
r = tk.MustQuery(`select "abc" as a union (select 1) order by a`)
r.Check(testkit.Rows("1", "abc"))

tk.MustExec("commit")

Expand All @@ -499,8 +499,28 @@ func (s *testSuite) TestUnion(c *C) {
tk.MustExec("create table t3 (c int, d int)")
tk.MustExec("insert t3 values (3, 2)")
tk.MustExec("insert t3 values (4, 3)")
r = tk.MustQuery(`select sum(c1), c2 from (select c c1, d c2 from t1 union all select d c1, c c2 from t2 union all select c c1, d c2 from t3) x group by c2`)
r = tk.MustQuery(`select sum(c1), c2 from (select c c1, d c2 from t1 union all select d c1, c c2 from t2 union all select c c1, d c2 from t3) x group by c2 order by c2`)
r.Check(testkit.Rows("5 1", "4 2", "4 3"))

tk.MustExec("drop table if exists t1, t2, t3")
tk.MustExec("create table t1 (a int primary key)")
tk.MustExec("create table t2 (a int primary key)")
tk.MustExec("create table t3 (a int primary key)")
tk.MustExec("insert t1 values (7), (8)")
tk.MustExec("insert t2 values (1), (9)")
tk.MustExec("insert t3 values (2), (3)")
r = tk.MustQuery("select * from t1 union all select * from t2 union all (select * from t3) order by a limit 2")
r.Check(testkit.Rows("1", "2"))

tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1 (a int)")
tk.MustExec("create table t2 (a int)")
tk.MustExec("insert t1 values (2), (1)")
tk.MustExec("insert t2 values (3), (4)")
r = tk.MustQuery("select * from t1 union all (select * from t2) order by a limit 1")
r.Check(testkit.Rows("1"))
r = tk.MustQuery("select (select * from t1 where a != t.a union all (select * from t2 where a != t.a) order by a limit 1) from t1 t")
r.Check(testkit.Rows("1", "2"))
}

func (s *testSuite) TestIn(c *C) {
Expand Down
12 changes: 12 additions & 0 deletions executor/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash
id=1
for ((i=0;i<1000;i++))
do
go test -check.f=Subquery > ../log1/tmp.log.$id 2>&1
if [ "$?" -ne "0" ];
then
echo $id
break
fi
id=`expr $id + 1`
done
4 changes: 3 additions & 1 deletion plan/match_property.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,14 @@ func (p *Union) matchProperty(_ *requiredProperty, childPlanInfo ...*physicalPla
np := *p
children := make([]Plan, 0, len(childPlanInfo))
cost := float64(0)
count := uint64(0)
for _, res := range childPlanInfo {
children = append(children, res.p)
cost += res.cost
count += res.count
}
np.SetChildren(children...)
return &physicalPlanInfo{p: &np, cost: cost}
return &physicalPlanInfo{p: &np, cost: cost, count: count}
}

// matchProperty implements PhysicalPlan matchProperty interface.
Expand Down
22 changes: 11 additions & 11 deletions plan/physical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,24 +871,24 @@ func (p *Union) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo,
}
limit := prop.limit
childInfos := make([]*physicalPlanInfo, 0, len(p.children))
var count uint64
for _, child := range p.GetChildren() {
newProp := convertLimitOffsetToCount(prop)
newProp.props = make([]*columnProp, 0, len(prop.props))
for _, c := range prop.props {
idx := p.GetSchema().GetIndex(c.col)
newProp.props = append(newProp.props, &columnProp{col: child.GetSchema()[idx], desc: c.desc})
newProp := &requiredProperty{}
if limit != nil {
newProp = convertLimitOffsetToCount(prop)
newProp.props = make([]*columnProp, 0, len(prop.props))
for _, c := range prop.props {
idx := p.GetSchema().GetIndex(c.col)
newProp.props = append(newProp.props, &columnProp{col: child.GetSchema()[idx], desc: c.desc})
}
}
info, err = child.(LogicalPlan).convert2PhysicalPlan(newProp)
count += info.count
childInfo, err := child.(LogicalPlan).convert2PhysicalPlan(newProp)
if err != nil {
return nil, errors.Trace(err)
}
childInfos = append(childInfos, info)
childInfos = append(childInfos, childInfo)
}
info = p.matchProperty(prop, childInfos...)
info = enforceProperty(limitProperty(limit), info)
info.count = count
info = enforceProperty(prop, info)
p.storePlanInfo(prop, info)
return info, nil
}
Expand Down
6 changes: 5 additions & 1 deletion plan/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,11 @@ func (s *testPlanSuite) TestCBO(c *C) {
},
{
sql: "select * from (select t.a from t union select t.d from t where t.c = 1 union select t.c from t) k order by a limit 1",
best: "UnionAll{Table(t)->Index(t.c_d_e)[[1,1]]->Projection->Index(t.c_d_e)[[<nil>,+inf]]}->Distinct->Limit",
best: "UnionAll{Table(t)->Index(t.c_d_e)[[1,1]]->Projection->Table(t)}->Distinct->Sort + Limit(1) + Offset(0)",
},
{
sql: "select * from (select t.a from t union all select t.d from t where t.c = 1 union all select t.c from t) k order by a limit 1",
best: "UnionAll{Table(t)->Limit->Index(t.c_d_e)[[1,1]]->Projection->Index(t.c_d_e)[[<nil>,+inf]]->Limit}->Sort + Limit(1) + Offset(0)",
},
{
sql: "select * from (select t.a from t union select t.d from t union select t.c from t) k order by a limit 1",
Expand Down
2 changes: 1 addition & 1 deletion session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1465,7 +1465,7 @@ func (s *testSessionSuite) TestDefaultFlenBug(c *C) {
mustExecSQL(c, se, "insert into t2 value (930);")
// The data in the second src will be casted as the type of the first src.
// If use flen=0, it will be truncated.
r := mustExecSQL(c, se, "select c from t1 union select c from t2;")
r := mustExecSQL(c, se, "select c from t1 union (select c from t2) order by c;")
rows, err := GetRows(r)
c.Assert(err, IsNil)
c.Assert(rows, HasLen, 2)
Expand Down

0 comments on commit 292e58c

Please sign in to comment.