Skip to content

Commit

Permalink
planner/cascades: implement PreparePossibleProperties in cascades pla…
Browse files Browse the repository at this point in the history
…nner (pingcap#13910)
  • Loading branch information
francis0407 authored and sre-bot committed Dec 6, 2019
1 parent 01a7d00 commit 64da4b5
Show file tree
Hide file tree
Showing 12 changed files with 244 additions and 87 deletions.
14 changes: 7 additions & 7 deletions cmd/explaintest/r/index_join.result
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ create table t2(a int not null, b int not null, key a(a));
explain select /*+ TIDB_INLJ(t1) */ * from t1 where t1.a in (select t2.a from t2);
id count task operator info
Projection_8 10000.00 root test.t1.a, test.t1.b
└─IndexJoin_12 10000.00 root inner join, inner:IndexLookUp_11, outer key:test.t2.a, inner key:test.t1.a
├─IndexLookUp_11 1.25 root
│ ├─IndexScan_9 1.25 cop[tikv] table:t1, index:a, range: decided by [eq(test.t1.a, test.t2.a)], keep order:false, stats:pseudo
│ └─TableScan_10 1.25 cop[tikv] table:t1, keep order:false, stats:pseudo
└─StreamAgg_25 8000.00 root group by:test.t2.a, funcs:firstrow(test.t2.a)->test.t2.a
└─IndexReader_38 10000.00 root index:IndexScan_37
└─IndexScan_37 10000.00 cop[tikv] table:t2, index:a, range:[NULL,+inf], keep order:true, stats:pseudo
└─IndexJoin_13 10000.00 root inner join, inner:IndexLookUp_12, outer key:test.t2.a, inner key:test.t1.a
├─IndexLookUp_12 1.25 root
│ ├─IndexScan_10 1.25 cop[tikv] table:t1, index:a, range: decided by [eq(test.t1.a, test.t2.a)], keep order:false, stats:pseudo
│ └─TableScan_11 1.25 cop[tikv] table:t1, keep order:false, stats:pseudo
└─StreamAgg_26 8000.00 root group by:test.t2.a, funcs:firstrow(test.t2.a)->test.t2.a
└─IndexReader_39 10000.00 root index:IndexScan_38
└─IndexScan_38 10000.00 cop[tikv] table:t2, index:a, range:[NULL,+inf], keep order:true, stats:pseudo
90 changes: 45 additions & 45 deletions cmd/explaintest/r/tpch.result
Original file line number Diff line number Diff line change
Expand Up @@ -184,37 +184,37 @@ limit 100;
id count task operator info
Projection_37 100.00 root tpch.supplier.s_acctbal, tpch.supplier.s_name, tpch.nation.n_name, tpch.part.p_partkey, tpch.part.p_mfgr, tpch.supplier.s_address, tpch.supplier.s_phone, tpch.supplier.s_comment
└─TopN_40 100.00 root tpch.supplier.s_acctbal:desc, tpch.nation.n_name:asc, tpch.supplier.s_name:asc, tpch.part.p_partkey:asc, offset:0, count:100
└─HashRightJoin_45 155496.00 root inner join, inner:IndexMergeJoin_55, equal:[eq(tpch.part.p_partkey, tpch.partsupp.ps_partkey) eq(tpch.partsupp.ps_supplycost, Column#50)]
├─IndexMergeJoin_55 155496.00 root inner join, inner:TableReader_53, outer key:tpch.partsupp.ps_partkey, inner key:tpch.part.p_partkey
│ ├─HashRightJoin_61 8155010.44 root inner join, inner:HashRightJoin_63, equal:[eq(tpch.supplier.s_suppkey, tpch.partsupp.ps_suppkey)]
│ │ ├─HashRightJoin_63 100000.00 root inner join, inner:HashRightJoin_76, equal:[eq(tpch.nation.n_nationkey, tpch.supplier.s_nationkey)]
│ │ │ ├─HashRightJoin_76 5.00 root inner join, inner:TableReader_81, equal:[eq(tpch.region.r_regionkey, tpch.nation.n_regionkey)]
│ │ │ │ ├─TableReader_81 1.00 root data:Selection_80
│ │ │ │ │ └─Selection_80 1.00 cop[tikv] eq(tpch.region.r_name, "ASIA")
│ │ │ │ │ └─TableScan_79 5.00 cop[tikv] table:region, range:[-inf,+inf], keep order:false
│ │ │ │ └─TableReader_78 25.00 root data:TableScan_77
│ │ │ │ └─TableScan_77 25.00 cop[tikv] table:nation, range:[-inf,+inf], keep order:false
│ │ │ └─TableReader_83 500000.00 root data:TableScan_82
│ │ │ └─TableScan_82 500000.00 cop[tikv] table:supplier, range:[-inf,+inf], keep order:false
│ │ └─TableReader_85 40000000.00 root data:TableScan_84
│ │ └─TableScan_84 40000000.00 cop[tikv] table:partsupp, range:[-inf,+inf], keep order:false
│ └─TableReader_53 0.02 root data:Selection_52
│ └─Selection_52 0.02 cop[tikv] eq(tpch.part.p_size, 30), like(tpch.part.p_type, "%STEEL", 92)
│ └─TableScan_51 0.02 cop[tikv] table:part, range: decided by [tpch.partsupp.ps_partkey], keep order:true
└─Selection_89 6524008.35 root not(isnull(Column#50))
└─HashAgg_92 8155010.44 root group by:tpch.partsupp.ps_partkey, funcs:min(tpch.partsupp.ps_supplycost)->Column#50, funcs:firstrow(tpch.partsupp.ps_partkey)->tpch.partsupp.ps_partkey
└─HashRightJoin_96 8155010.44 root inner join, inner:HashRightJoin_98, equal:[eq(tpch.supplier.s_suppkey, tpch.partsupp.ps_suppkey)]
├─HashRightJoin_98 100000.00 root inner join, inner:HashRightJoin_111, equal:[eq(tpch.nation.n_nationkey, tpch.supplier.s_nationkey)]
│ ├─HashRightJoin_111 5.00 root inner join, inner:TableReader_116, equal:[eq(tpch.region.r_regionkey, tpch.nation.n_regionkey)]
│ │ ├─TableReader_116 1.00 root data:Selection_115
│ │ │ └─Selection_115 1.00 cop[tikv] eq(tpch.region.r_name, "ASIA")
│ │ │ └─TableScan_114 5.00 cop[tikv] table:region, range:[-inf,+inf], keep order:false
│ │ └─TableReader_113 25.00 root data:TableScan_112
│ │ └─TableScan_112 25.00 cop[tikv] table:nation, range:[-inf,+inf], keep order:false
│ └─TableReader_118 500000.00 root data:TableScan_117
│ └─TableScan_117 500000.00 cop[tikv] table:supplier, range:[-inf,+inf], keep order:false
└─TableReader_120 40000000.00 root data:TableScan_119
└─TableScan_119 40000000.00 cop[tikv] table:partsupp, range:[-inf,+inf], keep order:false
└─HashRightJoin_46 155496.00 root inner join, inner:IndexMergeJoin_57, equal:[eq(tpch.part.p_partkey, tpch.partsupp.ps_partkey) eq(tpch.partsupp.ps_supplycost, Column#50)]
├─IndexMergeJoin_57 155496.00 root inner join, inner:TableReader_55, outer key:tpch.partsupp.ps_partkey, inner key:tpch.part.p_partkey
│ ├─HashRightJoin_63 8155010.44 root inner join, inner:HashRightJoin_65, equal:[eq(tpch.supplier.s_suppkey, tpch.partsupp.ps_suppkey)]
│ │ ├─HashRightJoin_65 100000.00 root inner join, inner:HashRightJoin_78, equal:[eq(tpch.nation.n_nationkey, tpch.supplier.s_nationkey)]
│ │ │ ├─HashRightJoin_78 5.00 root inner join, inner:TableReader_83, equal:[eq(tpch.region.r_regionkey, tpch.nation.n_regionkey)]
│ │ │ │ ├─TableReader_83 1.00 root data:Selection_82
│ │ │ │ │ └─Selection_82 1.00 cop[tikv] eq(tpch.region.r_name, "ASIA")
│ │ │ │ │ └─TableScan_81 5.00 cop[tikv] table:region, range:[-inf,+inf], keep order:false
│ │ │ │ └─TableReader_80 25.00 root data:TableScan_79
│ │ │ │ └─TableScan_79 25.00 cop[tikv] table:nation, range:[-inf,+inf], keep order:false
│ │ │ └─TableReader_85 500000.00 root data:TableScan_84
│ │ │ └─TableScan_84 500000.00 cop[tikv] table:supplier, range:[-inf,+inf], keep order:false
│ │ └─TableReader_87 40000000.00 root data:TableScan_86
│ │ └─TableScan_86 40000000.00 cop[tikv] table:partsupp, range:[-inf,+inf], keep order:false
│ └─TableReader_55 0.02 root data:Selection_54
│ └─Selection_54 0.02 cop[tikv] eq(tpch.part.p_size, 30), like(tpch.part.p_type, "%STEEL", 92)
│ └─TableScan_53 0.02 cop[tikv] table:part, range: decided by [tpch.partsupp.ps_partkey], keep order:true
└─Selection_91 6524008.35 root not(isnull(Column#50))
└─HashAgg_94 8155010.44 root group by:tpch.partsupp.ps_partkey, funcs:min(tpch.partsupp.ps_supplycost)->Column#50, funcs:firstrow(tpch.partsupp.ps_partkey)->tpch.partsupp.ps_partkey
└─HashRightJoin_98 8155010.44 root inner join, inner:HashRightJoin_100, equal:[eq(tpch.supplier.s_suppkey, tpch.partsupp.ps_suppkey)]
├─HashRightJoin_100 100000.00 root inner join, inner:HashRightJoin_113, equal:[eq(tpch.nation.n_nationkey, tpch.supplier.s_nationkey)]
│ ├─HashRightJoin_113 5.00 root inner join, inner:TableReader_118, equal:[eq(tpch.region.r_regionkey, tpch.nation.n_regionkey)]
│ │ ├─TableReader_118 1.00 root data:Selection_117
│ │ │ └─Selection_117 1.00 cop[tikv] eq(tpch.region.r_name, "ASIA")
│ │ │ └─TableScan_116 5.00 cop[tikv] table:region, range:[-inf,+inf], keep order:false
│ │ └─TableReader_115 25.00 root data:TableScan_114
│ │ └─TableScan_114 25.00 cop[tikv] table:nation, range:[-inf,+inf], keep order:false
│ └─TableReader_120 500000.00 root data:TableScan_119
│ └─TableScan_119 500000.00 cop[tikv] table:supplier, range:[-inf,+inf], keep order:false
└─TableReader_122 40000000.00 root data:TableScan_121
└─TableScan_121 40000000.00 cop[tikv] table:partsupp, range:[-inf,+inf], keep order:false
/*
Q3 Shipping Priority Query
This query retrieves the 10 unshipped orders with the highest value.
Expand Down Expand Up @@ -1030,20 +1030,20 @@ id count task operator info
Projection_24 100.00 root tpch.customer.c_name, tpch.customer.c_custkey, tpch.orders.o_orderkey, tpch.orders.o_orderdate, tpch.orders.o_totalprice, Column#54
└─TopN_27 100.00 root tpch.orders.o_totalprice:desc, tpch.orders.o_orderdate:asc, offset:0, count:100
└─HashAgg_33 59251097.60 root group by:tpch.customer.c_custkey, tpch.customer.c_name, tpch.orders.o_orderdate, tpch.orders.o_orderkey, tpch.orders.o_totalprice, funcs:sum(tpch.lineitem.l_quantity)->Column#54, funcs:firstrow(tpch.customer.c_custkey)->tpch.customer.c_custkey, funcs:firstrow(tpch.customer.c_name)->tpch.customer.c_name, funcs:firstrow(tpch.orders.o_orderkey)->tpch.orders.o_orderkey, funcs:firstrow(tpch.orders.o_totalprice)->tpch.orders.o_totalprice, funcs:firstrow(tpch.orders.o_orderdate)->tpch.orders.o_orderdate
└─HashRightJoin_48 240004648.80 root inner join, inner:HashLeftJoin_49, equal:[eq(tpch.orders.o_orderkey, tpch.lineitem.l_orderkey)]
├─HashLeftJoin_49 59251097.60 root inner join, inner:Selection_66, equal:[eq(tpch.orders.o_orderkey, tpch.lineitem.l_orderkey)]
│ ├─HashRightJoin_61 75000000.00 root inner join, inner:TableReader_65, equal:[eq(tpch.customer.c_custkey, tpch.orders.o_custkey)]
│ │ ├─TableReader_65 7500000.00 root data:TableScan_64
│ │ │ └─TableScan_64 7500000.00 cop[tikv] table:customer, range:[-inf,+inf], keep order:false
│ │ └─TableReader_63 75000000.00 root data:TableScan_62
│ │ └─TableScan_62 75000000.00 cop[tikv] table:orders, range:[-inf,+inf], keep order:false
│ └─Selection_66 59251097.60 root gt(Column#52, 314)
│ └─HashAgg_73 74063872.00 root group by:tpch.lineitem.l_orderkey, funcs:sum(Column#60)->Column#52, funcs:firstrow(tpch.lineitem.l_orderkey)->tpch.lineitem.l_orderkey
│ └─TableReader_74 74063872.00 root data:HashAgg_67
│ └─HashAgg_67 74063872.00 cop[tikv] group by:tpch.lineitem.l_orderkey, funcs:sum(tpch.lineitem.l_quantity)->Column#60
│ └─TableScan_72 300005811.00 cop[tikv] table:lineitem, range:[-inf,+inf], keep order:false
└─TableReader_82 300005811.00 root data:TableScan_81
└─TableScan_81 300005811.00 cop[tikv] table:lineitem, range:[-inf,+inf], keep order:false
└─HashRightJoin_48 240004648.80 root inner join, inner:HashLeftJoin_72, equal:[eq(tpch.orders.o_orderkey, tpch.lineitem.l_orderkey)]
├─HashLeftJoin_72 59251097.60 root inner join, inner:Selection_89, equal:[eq(tpch.orders.o_orderkey, tpch.lineitem.l_orderkey)]
│ ├─HashRightJoin_84 75000000.00 root inner join, inner:TableReader_88, equal:[eq(tpch.customer.c_custkey, tpch.orders.o_custkey)]
│ │ ├─TableReader_88 7500000.00 root data:TableScan_87
│ │ │ └─TableScan_87 7500000.00 cop[tikv] table:customer, range:[-inf,+inf], keep order:false
│ │ └─TableReader_86 75000000.00 root data:TableScan_85
│ │ └─TableScan_85 75000000.00 cop[tikv] table:orders, range:[-inf,+inf], keep order:false
│ └─Selection_89 59251097.60 root gt(Column#52, 314)
│ └─HashAgg_96 74063872.00 root group by:tpch.lineitem.l_orderkey, funcs:sum(Column#66)->Column#52, funcs:firstrow(tpch.lineitem.l_orderkey)->tpch.lineitem.l_orderkey
│ └─TableReader_97 74063872.00 root data:HashAgg_90
│ └─HashAgg_90 74063872.00 cop[tikv] group by:tpch.lineitem.l_orderkey, funcs:sum(tpch.lineitem.l_quantity)->Column#66
│ └─TableScan_95 300005811.00 cop[tikv] table:lineitem, range:[-inf,+inf], keep order:false
└─TableReader_101 300005811.00 root data:TableScan_100
└─TableScan_100 300005811.00 cop[tikv] table:lineitem, range:[-inf,+inf], keep order:false
/*
Q19 Discounted Revenue Query
The Discounted Revenue Query reports the gross discounted revenue attributed to the sale of selected parts handled
Expand Down
33 changes: 33 additions & 0 deletions planner/cascades/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ func (opt *Optimizer) onPhaseImplementation(sctx sessionctx.Context, g *memo.Gro
prop := &property.PhysicalProperty{
ExpectedCnt: math.MaxFloat64,
}
preparePossibleProperties(g, make(map[*memo.Group][][]*expression.Column))
// TODO replace MaxFloat64 costLimit by variable from sctx, or other sources.
impl, err := opt.implGroup(g, prop, math.MaxFloat64)
if err != nil {
Expand Down Expand Up @@ -355,3 +356,35 @@ func (opt *Optimizer) implGroupExpr(cur *memo.GroupExpr, reqPhysProp *property.P
}
return impls, nil
}

// preparePossibleProperties recursively calls LogicalPlan PreparePossibleProperties
// interface. It will fulfill the the possible properties fields of LogicalAggregation
// and LogicalJoin.
func preparePossibleProperties(g *memo.Group, propertyMap map[*memo.Group][][]*expression.Column) [][]*expression.Column {
if prop, ok := propertyMap[g]; ok {
return prop
}
groupPropertyMap := make(map[string][]*expression.Column)
for elem := g.Equivalents.Front(); elem != nil; elem = elem.Next() {
expr := elem.Value.(*memo.GroupExpr)
childrenProperties := make([][][]*expression.Column, len(expr.Children))
for i, child := range expr.Children {
childrenProperties[i] = preparePossibleProperties(child, propertyMap)
}
exprProperties := expr.ExprNode.PreparePossibleProperties(expr.Schema(), childrenProperties...)
for _, newPropCols := range exprProperties {
// Check if the prop has already been in `groupPropertyMap`.
newProp := property.PhysicalProperty{Items: property.ItemsFromCols(newPropCols, true)}
key := newProp.HashCode()
if _, ok := groupPropertyMap[string(key)]; !ok {
groupPropertyMap[string(key)] = newPropCols
}
}
}
resultProps := make([][]*expression.Column, 0, len(groupPropertyMap))
for _, prop := range groupPropertyMap {
resultProps = append(resultProps, prop)
}
propertyMap[g] = resultProps
return resultProps
}
67 changes: 67 additions & 0 deletions planner/cascades/optimize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/parser"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/planner/memo"
Expand Down Expand Up @@ -97,3 +98,69 @@ func (s *testCascadesSuite) TestFillGroupStats(c *C) {
c.Assert(err, IsNil)
c.Assert(rootGroup.Prop.Stats, NotNil)
}

func (s *testCascadesSuite) TestPreparePossibleProperties(c *C) {
s.optimizer.ResetTransformationRules(map[memo.Operand][]Transformation{
memo.OperandDataSource: {
NewRuleEnumeratePaths(),
},
})
defer func() {
s.optimizer.ResetTransformationRules(defaultTransformationMap)
}()
stmt, err := s.ParseOneStmt("select f, sum(a) from t group by f", "", "")
c.Assert(err, IsNil)
p, _, err := plannercore.BuildLogicalPlan(context.Background(), s.sctx, stmt, s.is)
c.Assert(err, IsNil)
logic, ok := p.(plannercore.LogicalPlan)
c.Assert(ok, IsTrue)
logic, err = s.optimizer.onPhasePreprocessing(s.sctx, logic)
c.Assert(err, IsNil)
// collect the target columns: f, a
ds, ok := logic.Children()[0].Children()[0].(*plannercore.DataSource)
c.Assert(ok, IsTrue)
var columnF, columnA *expression.Column
for i, col := range ds.Columns {
if col.Name.L == "f" {
columnF = ds.Schema().Columns[i]
} else if col.Name.L == "a" {
columnA = ds.Schema().Columns[i]
}
}
c.Assert(columnF, NotNil)
c.Assert(columnA, NotNil)

agg, ok := logic.Children()[0].(*plannercore.LogicalAggregation)
c.Assert(ok, IsTrue)
group := memo.Convert2Group(agg)
err = s.optimizer.onPhaseExploration(s.sctx, group)
c.Assert(err, IsNil)
// The memo looks like this:
// Group#0 Schema:[Column#13,test.t.f]
// Aggregation_2 input:[Group#1], group by:test.t.f, funcs:sum(test.t.a), firstrow(test.t.f)
// Group#1 Schema:[test.t.a,test.t.f]
// TiKVSingleGather_5 input:[Group#2], table:t
// TiKVSingleGather_9 input:[Group#3], table:t, index:f_g
// TiKVSingleGather_7 input:[Group#4], table:t, index:f
// Group#2 Schema:[test.t.a,test.t.f]
// TableScan_4 table:t, pk col:test.t.a
// Group#3 Schema:[test.t.a,test.t.f]
// IndexScan_8 table:t, index:f, g
// Group#4 Schema:[test.t.a,test.t.f]
// IndexScan_6 table:t, index:f
propMap := make(map[*memo.Group][][]*expression.Column)
aggProp := preparePossibleProperties(group, propMap)
// We only have one prop for Group0 : f
c.Assert(len(aggProp), Equals, 1)
c.Assert(aggProp[0][0].Equal(nil, columnF), IsTrue)

gatherGroup := group.Equivalents.Front().Value.(*memo.GroupExpr).Children[0]
gatherProp, ok := propMap[gatherGroup]
c.Assert(ok, IsTrue)
// We have 2 props for Group1: [f], [a]
c.Assert(len(gatherProp), Equals, 2)
for _, prop := range gatherProp {
c.Assert(len(prop), Equals, 1)
c.Assert(prop[0].Equal(nil, columnA) || prop[0].Equal(nil, columnF), IsTrue)
}
}
8 changes: 1 addition & 7 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1513,16 +1513,10 @@ func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []P
}

for _, possibleChildProperty := range la.possibleProperties {
sortColOffsets := getMaxSortPrefix(possibleChildProperty, la.groupByCols)
if len(sortColOffsets) != len(la.groupByCols) {
continue
}

childProp.Items = property.ItemsFromCols(possibleChildProperty[:len(sortColOffsets)], desc)
childProp.Items = property.ItemsFromCols(possibleChildProperty[:len(la.groupByCols)], desc)
if !prop.IsPrefix(childProp) {
continue
}

// The table read of "CopDoubleReadTaskType" can't promises the sort
// property that the stream aggregation required, no need to consider.
taskTypes := []property.TaskType{property.CopSingleReadTaskType}
Expand Down
1 change: 1 addition & 0 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ func (ds *DataSource) buildIndexGather(path *util.AccessPath) LogicalPlan {
is.Columns = make([]*model.ColumnInfo, len(ds.Columns))
copy(is.Columns, ds.Columns)
is.SetSchema(ds.Schema())
is.idxCols, is.idxColLens = expression.IndexInfo2PrefixCols(is.Columns, is.schema.Columns, is.Index)

sg := TiKVSingleGather{
Source: ds,
Expand Down
2 changes: 1 addition & 1 deletion planner/core/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func newDateType() types.FieldType {
func MockSignedTable() *model.TableInfo {
// column: a, b, c, d, e, c_str, d_str, e_str, f, g
// PK: a
// indeices: c_d_e, e, f, g, f_g, c_d_e_str, c_d_e_str_prefix
// indices: c_d_e, e, f, g, f_g, c_d_e_str, c_d_e_str_prefix
indices := []*model.IndexInfo{
{
Name: model.NewCIStr("c_d_e"),
Expand Down
2 changes: 1 addition & 1 deletion planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func physicalOptimize(logic LogicalPlan) (PhysicalPlan, float64, error) {
return nil, 0, err
}

logic.preparePossibleProperties()
preparePossibleProperties(logic)

prop := &property.PhysicalProperty{
TaskTp: property.RootTaskType,
Expand Down
Loading

0 comments on commit 64da4b5

Please sign in to comment.