Skip to content

Commit

Permalink
planner: push avg & distinct functions across union (pingcap#16344)
Browse files Browse the repository at this point in the history
  • Loading branch information
SunRunAway authored Apr 15, 2020
1 parent d26875d commit 8f000fc
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 65 deletions.
18 changes: 9 additions & 9 deletions cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ id estRows task access object operator info
Union_17 26000.00 root
├─HashAgg_21 16000.00 root group by:Column#10, funcs:firstrow(Column#12)->Column#10
│ └─Union_22 16000.00 root
│ ├─StreamAgg_27 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#12, funcs:firstrow(test.t2.c1)->Column#10
│ ├─StreamAgg_27 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#12
│ │ └─IndexReader_40 10000.00 root index:IndexFullScan_39
│ │ └─IndexFullScan_39 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:true, stats:pseudo
│ └─StreamAgg_45 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#12, funcs:firstrow(test.t2.c1)->Column#10
│ └─StreamAgg_45 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#12
│ └─IndexReader_58 10000.00 root index:IndexFullScan_57
│ └─IndexFullScan_57 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:true, stats:pseudo
└─IndexReader_63 10000.00 root index:IndexFullScan_62
Expand All @@ -176,13 +176,13 @@ explain select c1 from t2 union all select c1 from t2 union select c1 from t2;
id estRows task access object operator info
HashAgg_18 24000.00 root group by:Column#10, funcs:firstrow(Column#11)->Column#10
└─Union_19 24000.00 root
├─StreamAgg_24 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#11, funcs:firstrow(test.t2.c1)->Column#10
├─StreamAgg_24 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#11
│ └─IndexReader_37 10000.00 root index:IndexFullScan_36
│ └─IndexFullScan_36 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:true, stats:pseudo
├─StreamAgg_42 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#11, funcs:firstrow(test.t2.c1)->Column#10
├─StreamAgg_42 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#11
│ └─IndexReader_55 10000.00 root index:IndexFullScan_54
│ └─IndexFullScan_54 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:true, stats:pseudo
└─StreamAgg_60 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#11, funcs:firstrow(test.t2.c1)->Column#10
└─StreamAgg_60 8000.00 root group by:test.t2.c1, funcs:firstrow(test.t2.c1)->Column#11
└─IndexReader_73 10000.00 root index:IndexFullScan_72
└─IndexFullScan_72 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:true, stats:pseudo
select * from information_schema.tidb_indexes where table_name='t4';
Expand Down Expand Up @@ -669,17 +669,17 @@ id estRows task access object operator info
Sort_13 2.00 root Column#3:asc
└─HashAgg_17 2.00 root group by:Column#3, funcs:firstrow(Column#6)->Column#3
└─Union_18 2.00 root
├─HashAgg_19 1.00 root group by:1, funcs:firstrow(0)->Column#6, funcs:firstrow(0)->Column#3
├─HashAgg_19 1.00 root group by:1, funcs:firstrow(0)->Column#6
│ └─TableDual_22 1.00 root rows:1
└─HashAgg_25 1.00 root group by:1, funcs:firstrow(1)->Column#6, funcs:firstrow(1)->Column#3
└─HashAgg_25 1.00 root group by:1, funcs:firstrow(1)->Column#6
└─TableDual_28 1.00 root rows:1
explain SELECT 0 AS a FROM dual UNION (SELECT 1 AS a FROM dual ORDER BY a);
id estRows task access object operator info
HashAgg_15 2.00 root group by:Column#3, funcs:firstrow(Column#6)->Column#3
└─Union_16 2.00 root
├─HashAgg_17 1.00 root group by:1, funcs:firstrow(0)->Column#6, funcs:firstrow(0)->Column#3
├─HashAgg_17 1.00 root group by:1, funcs:firstrow(0)->Column#6
│ └─TableDual_20 1.00 root rows:1
└─StreamAgg_27 1.00 root group by:Column#1, funcs:firstrow(Column#1)->Column#6, funcs:firstrow(Column#1)->Column#3
└─StreamAgg_27 1.00 root group by:Column#1, funcs:firstrow(Column#1)->Column#6
└─Projection_32 1.00 root 1->Column#1
└─TableDual_33 1.00 root rows:1
create table t (i int key, j int, unique key (i, j));
Expand Down
18 changes: 18 additions & 0 deletions expression/aggregation/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package aggregation

import (
"bytes"
"fmt"
"math"
"strconv"
Expand Down Expand Up @@ -44,6 +45,23 @@ func NewAggFuncDesc(ctx sessionctx.Context, name string, args []expression.Expre
return &AggFuncDesc{baseFuncDesc: b, HasDistinct: hasDistinct}, nil
}

// String implements the fmt.Stringer interface.
func (a *AggFuncDesc) String() string {
buffer := bytes.NewBufferString(a.Name)
buffer.WriteString("(")
if a.HasDistinct {
buffer.WriteString("distinct ")
}
for i, arg := range a.Args {
buffer.WriteString(arg.String())
if i+1 != len(a.Args) {
buffer.WriteString(", ")
}
}
buffer.WriteString(")")
return buffer.String()
}

// Equal checks whether two aggregation function signatures are equal.
func (a *AggFuncDesc) Equal(ctx sessionctx.Context, other *AggFuncDesc) bool {
if a.HasDistinct != other.HasDistinct {
Expand Down
4 changes: 2 additions & 2 deletions planner/cascades/testdata/integration_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@
"Plan": [
"HashAgg_14 1.00 root funcs:avg(Column#6, Column#7)->Column#5",
"└─TableReader_15 1.00 root data:HashAgg_16",
" └─HashAgg_16 1.00 cop[tikv] funcs:avg(test.t.b)->Column#6",
" └─HashAgg_16 1.00 cop[tikv] funcs:count(test.t.b)->Column#6, funcs:sum(test.t.b)->Column#7",
" └─TableFullScan_12 10000.00 cop[tikv] table:a keep order:false, stats:pseudo"
],
"Result": [
Expand Down Expand Up @@ -489,7 +489,7 @@
"Projection_8 8000.00 root Column#5, test.t.c, Column#5, Column#6, Column#7, Column#8, Column#9",
"└─HashAgg_13 8000.00 root group by:test.t.c, funcs:avg(Column#11, Column#12)->Column#5, funcs:count(distinct test.t.a, test.t.b)->Column#6, funcs:count(distinct test.t.a)->Column#7, funcs:count(distinct test.t.c)->Column#8, funcs:sum(Column#13)->Column#9, funcs:firstrow(test.t.c)->test.t.c",
" └─TableReader_14 8000.00 root data:HashAgg_15",
" └─HashAgg_15 8000.00 cop[tikv] group by:test.t.a, test.t.b, test.t.c, funcs:avg(test.t.b)->Column#11, funcs:sum(test.t.b)->Column#12",
" └─HashAgg_15 8000.00 cop[tikv] group by:test.t.a, test.t.b, test.t.c, funcs:count(test.t.b)->Column#11, funcs:sum(test.t.b)->Column#12, funcs:sum(test.t.b)->Column#13",
" └─TableFullScan_11 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"
],
"Result": [
Expand Down
2 changes: 1 addition & 1 deletion planner/cascades/transformation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func (r *PushAggDownGather) OnTransform(old *memo.ExprIter) (newExprs []*memo.Gr
AggFuncs: aggFuncs,
GroupByItems: gbyItems,
Schema: aggSchema,
})
}, true)
// Remove unnecessary FirstRow.
partialPref.AggFuncs =
plannercore.RemoveUnnecessaryFirstRow(agg.SCtx(), finalPref.AggFuncs, finalPref.GroupByItems, partialPref.AggFuncs, partialPref.GroupByItems, partialPref.Schema, funcMap)
Expand Down
19 changes: 19 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package core_test

import (
"fmt"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -823,3 +825,20 @@ func (s *testIntegrationSuite) TestIssue15846(c *C) {
tk.MustExec("INSERT INTO t0(t0) VALUES (NULL), (NULL);")
tk.MustQuery("SELECT t1.c0 FROM t1 LEFT JOIN t0 ON 1;").Check(testkit.Rows("0", "0"))
}

func (s *testIntegrationSuite) TestIssue16290And16292(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t(a int, b int, primary key(a));")
tk.MustExec("insert into t values(1, 1);")

for i := 0; i <= 1; i++ {
tk.MustExec(fmt.Sprintf("set session tidb_opt_agg_push_down = %v", i))

tk.MustQuery("select avg(a) from (select * from t ta union all select * from t tb) t;").Check(testkit.Rows("1.0000"))
tk.MustQuery("select avg(b) from (select * from t ta union all select * from t tb) t;").Check(testkit.Rows("1.0000"))
tk.MustQuery("select count(distinct a) from (select * from t ta union all select * from t tb) t;").Check(testkit.Rows("1"))
tk.MustQuery("select count(distinct b) from (select * from t ta union all select * from t tb) t;").Check(testkit.Rows("1"))
}
}
31 changes: 25 additions & 6 deletions planner/core/rule_aggregation_push_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ func (a *aggregationPushDownSolver) checkAnyCountAndSum(aggFuncs []*aggregation.
return false
}

// TODO:
// 1. https://github.com/pingcap/tidb/issues/16355, push avg & distinct functions across join
// 2. remove this method and use splitPartialAgg instead for clean code.
func (a *aggregationPushDownSolver) makeNewAgg(ctx sessionctx.Context, aggFuncs []*aggregation.AggFuncDesc, gbyCols []*expression.Column, aggHints aggHintInfo, blockOffset int) (*LogicalAggregation, error) {
agg := LogicalAggregation{
GroupByItems: expression.Column2Exprs(gbyCols),
Expand Down Expand Up @@ -275,6 +278,27 @@ func (a *aggregationPushDownSolver) makeNewAgg(ctx sessionctx.Context, aggFuncs
return agg, nil
}

func (a *aggregationPushDownSolver) splitPartialAgg(agg *LogicalAggregation) (pushedAgg *LogicalAggregation) {
partial, final, _ := BuildFinalModeAggregation(agg.ctx, &AggInfo{
AggFuncs: agg.AggFuncs,
GroupByItems: agg.GroupByItems,
Schema: agg.schema,
}, false)
agg.SetSchema(final.Schema)
agg.AggFuncs = final.AggFuncs
agg.GroupByItems = final.GroupByItems
agg.collectGroupByColumns()

pushedAgg = LogicalAggregation{
AggFuncs: partial.AggFuncs,
GroupByItems: partial.GroupByItems,
aggHints: agg.aggHints,
}.Init(agg.ctx, agg.blockOffset)
pushedAgg.SetSchema(partial.Schema)
pushedAgg.collectGroupByColumns()
return
}

// pushAggCrossUnion will try to push the agg down to the union. If the new aggregation's group-by columns doesn't contain unique key.
// We will return the new aggregation. Otherwise we will transform the aggregation to projection.
func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, unionSchema *expression.Schema, unionChild LogicalPlan) LogicalPlan {
Expand Down Expand Up @@ -379,12 +403,7 @@ func (a *aggregationPushDownSolver) aggPushDown(p LogicalPlan) (_ LogicalPlan, e
projChild := proj.children[0]
agg.SetChildren(projChild)
} else if union, ok1 := child.(*LogicalUnionAll); ok1 {
var gbyCols []*expression.Column
gbyCols = expression.ExtractColumnsFromExpressions(gbyCols, agg.GroupByItems, nil)
pushedAgg, err := a.makeNewAgg(agg.ctx, agg.AggFuncs, gbyCols, agg.aggHints, agg.blockOffset)
if err != nil {
return nil, err
}
pushedAgg := a.splitPartialAgg(agg)
newChildren := make([]LogicalPlan, 0, len(union.children))
for _, child := range union.children {
newChild := a.pushAggCrossUnion(pushedAgg, union.Schema(), child)
Expand Down
71 changes: 32 additions & 39 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,38 +580,6 @@ func (p *PhysicalMergeJoin) attach2Task(tasks ...task) task {
}
}

// splitCopAvg2CountAndSum splits the cop avg function to count and sum.
// Now it's only used for TableReader.
func splitCopAvg2CountAndSum(p PhysicalPlan) {
var baseAgg *basePhysicalAgg
if agg, ok := p.(*PhysicalStreamAgg); ok {
baseAgg = &agg.basePhysicalAgg
}
if agg, ok := p.(*PhysicalHashAgg); ok {
baseAgg = &agg.basePhysicalAgg
}
if baseAgg == nil {
return
}

schemaCursor := len(baseAgg.Schema().Columns) - len(baseAgg.GroupByItems)
for i := len(baseAgg.AggFuncs) - 1; i >= 0; i-- {
f := baseAgg.AggFuncs[i]
schemaCursor--
if f.Name == ast.AggFuncAvg {
schemaCursor--
sumAgg := *f
sumAgg.Name = ast.AggFuncSum
sumAgg.RetTp = baseAgg.Schema().Columns[schemaCursor+1].RetType
cntAgg := *f
cntAgg.Name = ast.AggFuncCount
cntAgg.RetTp = baseAgg.Schema().Columns[schemaCursor].RetType
cntAgg.RetTp.Flag = f.RetTp.Flag
baseAgg.AggFuncs = append(baseAgg.AggFuncs[:i], append([]*aggregation.AggFuncDesc{&cntAgg, &sumAgg}, baseAgg.AggFuncs[i+1:]...)...)
}
}
}

func buildIndexLookUpTask(ctx sessionctx.Context, t *copTask) *rootTask {
newTask := &rootTask{cst: t.cst}
sessVars := ctx.GetSessionVars()
Expand Down Expand Up @@ -697,7 +665,6 @@ func finishCopTask(ctx sessionctx.Context, task task) task {
newTask.p = p
} else {
tp := t.tablePlan
splitCopAvg2CountAndSum(tp)
for len(tp.Children()) > 0 {
tp = tp.Children()[0]
}
Expand Down Expand Up @@ -1023,9 +990,10 @@ type AggInfo struct {
}

// BuildFinalModeAggregation splits either LogicalAggregation or PhysicalAggregation to finalAgg and partial1Agg,
// returns the body of finalAgg and the schema of partialAgg.
// returns the information of partial and final agg.
// partialIsCop means whether partial agg is a cop task.
func BuildFinalModeAggregation(
sctx sessionctx.Context, original *AggInfo) (partial, final *AggInfo, funcMap map[*aggregation.AggFuncDesc]*aggregation.AggFuncDesc) {
sctx sessionctx.Context, original *AggInfo, partialIsCop bool) (partial, final *AggInfo, funcMap map[*aggregation.AggFuncDesc]*aggregation.AggFuncDesc) {

funcMap = make(map[*aggregation.AggFuncDesc]*aggregation.AggFuncDesc, len(original.AggFuncs))
partial = &AggInfo{
Expand Down Expand Up @@ -1095,11 +1063,25 @@ func BuildFinalModeAggregation(
}
}
partialGbySchema.Append(gbyCol)
if !partialIsCop {
// if partial is a cop task, firstrow function is redundant since group by items are outputted
// by group by schema, and final functions use group by schema as their arguments.
// if partial agg is not cop, we must append firstrow function & schema, to output the group by
// items.
// maybe we can unify them sometime.
firstRow, err := aggregation.NewAggFuncDesc(sctx, ast.AggFuncFirstRow, []expression.Expression{gbyCol}, false)
if err != nil {
panic("NewAggFuncDesc FirstRow meets error: " + err.Error())
}
partial.AggFuncs = append(partial.AggFuncs, firstRow)
newCol, _ := gbyCol.Clone().(*expression.Column)
newCol.RetType = firstRow.RetTp
partial.Schema.Append(newCol)
partialCursor++
}
args = append(args, gbyCol)
}
}
// Just use groupBy items in Schema of partialAgg as arguments,
// no need to spawn FirstRow function.

finalAggFunc.HasDistinct = true
finalAggFunc.Mode = aggregation.CompleteMode
Expand All @@ -1122,7 +1104,18 @@ func BuildFinalModeAggregation(
args = append(args, partial.Schema.Columns[partialCursor])
partialCursor++
}
partial.AggFuncs = append(partial.AggFuncs, aggFunc)
if aggFunc.Name == ast.AggFuncAvg {
cntAgg := *aggFunc
cntAgg.Name = ast.AggFuncCount
cntAgg.RetTp = partial.Schema.Columns[partialCursor-2].GetType()
cntAgg.RetTp.Flag = aggFunc.RetTp.Flag
sumAgg := *aggFunc
sumAgg.Name = ast.AggFuncSum
sumAgg.RetTp = partial.Schema.Columns[partialCursor-1].GetType()
partial.AggFuncs = append(partial.AggFuncs, &cntAgg, &sumAgg)
} else {
partial.AggFuncs = append(partial.AggFuncs, aggFunc)
}

finalAggFunc.Mode = aggregation.FinalMode
funcMap[aggFunc] = finalAggFunc
Expand All @@ -1145,7 +1138,7 @@ func (p *basePhysicalAgg) newPartialAggregate(copTaskType kv.StoreType) (partial
AggFuncs: p.AggFuncs,
GroupByItems: p.GroupByItems,
Schema: p.Schema().Clone(),
})
}, true)
if p.tp == plancodec.TypeStreamAgg && len(partialPref.GroupByItems) != len(finalPref.GroupByItems) {
return nil, p.self
}
Expand Down
6 changes: 3 additions & 3 deletions planner/core/testdata/integration_serial_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@
"Plan": [
"StreamAgg_20 1.00 root funcs:avg(Column#7, Column#8)->Column#4",
"└─IndexReader_21 1.00 root index:StreamAgg_8",
" └─StreamAgg_8 1.00 cop[tikv] funcs:avg(test.t.a)->Column#7",
" └─StreamAgg_8 1.00 cop[tikv] funcs:count(test.t.a)->Column#7, funcs:sum(test.t.a)->Column#8",
" └─IndexFullScan_19 10000.00 cop[tikv] table:t, index:ia(a) keep order:false, stats:pseudo"
],
"Warn": [
Expand All @@ -182,7 +182,7 @@
"Plan": [
"StreamAgg_20 1.00 root funcs:avg(Column#7, Column#8)->Column#4",
"└─IndexReader_21 1.00 root index:StreamAgg_8",
" └─StreamAgg_8 1.00 cop[tikv] funcs:avg(test.t.a)->Column#7",
" └─StreamAgg_8 1.00 cop[tikv] funcs:count(test.t.a)->Column#7, funcs:sum(test.t.a)->Column#8",
" └─IndexFullScan_19 10000.00 cop[tikv] table:t, index:ia(a) keep order:false, stats:pseudo"
],
"Warn": null
Expand All @@ -192,7 +192,7 @@
"Plan": [
"StreamAgg_20 1.00 root funcs:avg(Column#7, Column#8)->Column#4",
"└─IndexReader_21 1.00 root index:StreamAgg_8",
" └─StreamAgg_8 1.00 cop[tikv] funcs:avg(test.t.a)->Column#7",
" └─StreamAgg_8 1.00 cop[tikv] funcs:count(test.t.a)->Column#7, funcs:sum(test.t.a)->Column#8",
" └─IndexFullScan_19 10000.00 cop[tikv] table:t, index:ia(a) keep order:false, stats:pseudo"
],
"Warn": [
Expand Down
5 changes: 4 additions & 1 deletion planner/core/testdata/plan_suite_unexported_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
"select max(a.b), max(b.b) from t a join t b on a.a = b.a group by a.c",
"select max(c.b) from (select * from t a union all select * from t b) c group by c.a",
"select max(a.c) from t a join t b on a.a=b.a and a.b=b.b group by a.b",
"select t1.a, count(t2.b) from t t1, t t2 where t1.a = t2.a group by t1.a"
"select t1.a, count(t2.b) from t t1, t t2 where t1.a = t2.a group by t1.a",
"select avg(a) from (select * from t t1 union all select * from t t2) t",
"select count(distinct a) from (select * from t t1 union all select * from t t2) t",
"select count(distinct b) from (select * from t t1 union all select * from t t2) t"
]
},
{
Expand Down
11 changes: 7 additions & 4 deletions planner/core/testdata/plan_suite_unexported_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
"Join{DataScan(a)->Aggr(sum(test.t.a),firstrow(test.t.c))->DataScan(b)}(test.t.c,test.t.c)->Aggr(sum(Column#26))->Projection",
"Join{DataScan(a)->Aggr(sum(test.t.a),firstrow(test.t.c))->DataScan(b)}(test.t.c,test.t.c)->Aggr(sum(Column#26))->Projection",
"DataScan(t)->Aggr(sum(test.t.a))->Projection",
"UnionAll{DataScan(a)->Projection->Aggr(sum(test.t.c),firstrow(test.t.d))->DataScan(b)->Projection->Aggr(sum(test.t.a),firstrow(test.t.b))->DataScan(c)->Projection->Aggr(sum(test.t.b),firstrow(test.t.e))}->Aggr(sum(Column#40))->Projection",
"UnionAll{DataScan(a)->Projection->Aggr(sum(test.t.c))->DataScan(b)->Projection->Aggr(sum(test.t.a))->DataScan(c)->Projection->Aggr(sum(test.t.b))}->Aggr(sum(Column#40))->Projection",
"Join{DataScan(a)->DataScan(b)->Aggr(max(test.t.b),firstrow(test.t.c))}(test.t.c,test.t.c)->Projection->Projection",
"Join{DataScan(a)->DataScan(b)}(test.t.a,test.t.a)->Aggr(max(test.t.b),max(test.t.b))->Projection",
"UnionAll{DataScan(a)->Projection->Projection->Projection->DataScan(b)->Projection->Projection->Projection}->Aggr(max(Column#38))->Projection",
"Join{DataScan(a)->DataScan(b)}(test.t.a,test.t.a)(test.t.b,test.t.b)->Aggr(max(test.t.c))->Projection",
"Join{DataScan(t1)->DataScan(t2)}(test.t.a,test.t.a)->Projection->Projection"
"Join{DataScan(t1)->DataScan(t2)}(test.t.a,test.t.a)->Projection->Projection",
"UnionAll{DataScan(t1)->Projection->Aggr(count(test.t.a),sum(test.t.a))->DataScan(t2)->Projection->Aggr(count(test.t.a),sum(test.t.a))}->Aggr(avg(Column#38, Column#39))->Projection",
"UnionAll{DataScan(t1)->Projection->Projection->Projection->DataScan(t2)->Projection->Projection->Projection}->Aggr(count(distinct Column#25))->Projection",
"UnionAll{DataScan(t1)->Projection->Aggr(firstrow(test.t.b))->DataScan(t2)->Projection->Aggr(firstrow(test.t.b))}->Aggr(count(distinct Column#26))->Projection"
]
},
{
Expand Down Expand Up @@ -897,8 +900,8 @@
"DataScan(t1)->Aggr(firstrow(test.t.a),firstrow(test.t.b))",
"DataScan(t2)->Aggr(firstrow(test.t.a),firstrow(test.t.b))",
"DataScan(t1)->Aggr(max(test.t.a),min(test.t.b))->Projection",
"DataScan(t1)->Aggr(sum(test.t.a))->Projection",
"DataScan(t1)->Aggr(count(test.t.a, test.t.b))->Projection",
"DataScan(t1)->Aggr(sum(distinct test.t.a))->Projection",
"DataScan(t1)->Aggr(count(distinct test.t.a, test.t.b))->Projection",
"DataScan(t1)->Projection",
"DataScan(t2)->Projection",
"Join{Join{DataScan(t1)->DataScan(t2)}(test.t.a,test.t.a)->DataScan(t3)->TopN([test.t.b true],0,1)}(test.t.b,test.t.b)->TopN([test.t.b true],0,1)->Aggr(max(test.t.b))->Projection",
Expand Down

0 comments on commit 8f000fc

Please sign in to comment.