Skip to content

Commit

Permalink
planner, sessionctx: turn on the mpp by default (pingcap#23401)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored Mar 23, 2021
1 parent 5243049 commit 1cebae2
Show file tree
Hide file tree
Showing 14 changed files with 275 additions and 265 deletions.
4 changes: 2 additions & 2 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1265,10 +1265,10 @@ func (s *testSuiteWithData) TestIndexScanWithYearCol(c *C) {
for i, tt := range input {
s.testData.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + tt).Rows())
output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery("explain format = 'brief' " + tt).Rows())
output[i].Res = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Sort().Rows())
})
tk.MustQuery("explain " + tt).Check(testkit.Rows(output[i].Plan...))
tk.MustQuery("explain format = 'brief' " + tt).Check(testkit.Rows(output[i].Plan...))
tk.MustQuery(tt).Sort().Check(testkit.Rows(output[i].Res...))
}
}
Expand Down
66 changes: 31 additions & 35 deletions executor/testdata/executor_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -525,30 +525,28 @@
{
"SQL": "select t1.c1, t2.c1 from t as t1 inner join t as t2 on t1.c1 = t2.c1 where t1.c1 != NULL",
"Plan": [
"MergeJoin_9 0.00 root inner join, left key:test.t.c1, right key:test.t.c1",
"├─TableDual_35(Build) 0.00 root rows:0",
"└─TableDual_34(Probe) 0.00 root rows:0"
"MergeJoin 0.00 root inner join, left key:test.t.c1, right key:test.t.c1",
"├─TableDual(Build) 0.00 root rows:0",
"└─TableDual(Probe) 0.00 root rows:0"
],
"Res": [
]
"Res": null
},
{
"SQL": "select * from t as t1 inner join t as t2 on t1.c1 = t2.c1 where t1.c1 != NULL",
"Plan": [
"MergeJoin_9 0.00 root inner join, left key:test.t.c1, right key:test.t.c1",
"├─TableDual_41(Build) 0.00 root rows:0",
"└─TableDual_40(Probe) 0.00 root rows:0"
"MergeJoin 0.00 root inner join, left key:test.t.c1, right key:test.t.c1",
"├─TableDual(Build) 0.00 root rows:0",
"└─TableDual(Probe) 0.00 root rows:0"
],
"Res": [
]
"Res": null
},
{
"SQL": "select count(*) from t as t1 inner join t as t2 on t1.c1 = t2.c1 where t1.c1 != NULL",
"Plan": [
"StreamAgg_11 1.00 root funcs:count(1)->Column#7",
"└─MergeJoin_12 0.00 root inner join, left key:test.t.c1, right key:test.t.c1",
" ├─TableDual_38(Build) 0.00 root rows:0",
" └─TableDual_37(Probe) 0.00 root rows:0"
"StreamAgg 1.00 root funcs:count(1)->Column#7",
"└─MergeJoin 0.00 root inner join, left key:test.t.c1, right key:test.t.c1",
" ├─TableDual(Build) 0.00 root rows:0",
" └─TableDual(Probe) 0.00 root rows:0"
],
"Res": [
"0"
Expand All @@ -557,30 +555,28 @@
{
"SQL": "select t1.c1, t2.c1 from t as t1 left join t as t2 on t1.c1 = t2.c1 where t1.c1 != NULL",
"Plan": [
"MergeJoin_7 0.00 root left outer join, left key:test.t.c1, right key:test.t.c1",
"├─TableDual_22(Build) 0.00 root rows:0",
"└─TableDual_21(Probe) 0.00 root rows:0"
"MergeJoin 0.00 root left outer join, left key:test.t.c1, right key:test.t.c1",
"├─TableDual(Build) 0.00 root rows:0",
"└─TableDual(Probe) 0.00 root rows:0"
],
"Res": [
]
"Res": null
},
{
"SQL": "select * from t as t1 left join t as t2 on t1.c1 = t2.c1 where t1.c1 != NULL",
"Plan": [
"MergeJoin_7 0.00 root left outer join, left key:test.t.c1, right key:test.t.c1",
"├─TableDual_25(Build) 0.00 root rows:0",
"└─TableDual_24(Probe) 0.00 root rows:0"
"MergeJoin 0.00 root left outer join, left key:test.t.c1, right key:test.t.c1",
"├─TableDual(Build) 0.00 root rows:0",
"└─TableDual(Probe) 0.00 root rows:0"
],
"Res": [
]
"Res": null
},
{
"SQL": "select count(*) from t as t1 left join t as t2 on t1.c1 = t2.c1 where t1.c1 != NULL",
"Plan": [
"StreamAgg_9 1.00 root funcs:count(1)->Column#7",
"└─MergeJoin_10 0.00 root left outer join, left key:test.t.c1, right key:test.t.c1",
" ├─TableDual_25(Build) 0.00 root rows:0",
" └─TableDual_24(Probe) 0.00 root rows:0"
"StreamAgg 1.00 root funcs:count(1)->Column#7",
"└─MergeJoin 0.00 root left outer join, left key:test.t.c1, right key:test.t.c1",
" ├─TableDual(Build) 0.00 root rows:0",
" └─TableDual(Probe) 0.00 root rows:0"
],
"Res": [
"0"
Expand All @@ -589,13 +585,13 @@
{
"SQL": "select * from t as t1 left join t as t2 on t1.c1 = t2.c1 where t1.c1 is not NULL",
"Plan": [
"HashJoin_22 12487.50 root left outer join, equal:[eq(test.t.c1, test.t.c1)]",
"├─TableReader_40(Build) 9990.00 root data:Selection_39",
"│ └─Selection_39 9990.00 cop[tikv] not(isnull(test.t.c1))",
"│ └─TableFullScan_38 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo",
"└─TableReader_34(Probe) 9990.00 root data:Selection_33",
" └─Selection_33 9990.00 cop[tikv] not(isnull(test.t.c1))",
" └─TableFullScan_32 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo"
"HashJoin 12487.50 root left outer join, equal:[eq(test.t.c1, test.t.c1)]",
"├─TableReader(Build) 9990.00 root data:Selection",
"│ └─Selection 9990.00 cop[tikv] not(isnull(test.t.c1))",
"│ └─TableFullScan 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo",
"└─TableReader(Probe) 9990.00 root data:Selection",
" └─Selection 9990.00 cop[tikv] not(isnull(test.t.c1))",
" └─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo"
],
"Res": [
"2001 1 2001 1"
Expand Down
93 changes: 49 additions & 44 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1667,7 +1667,8 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P
return nil, false
}
joins := make([]PhysicalPlan, 0, 8)
if p.ctx.GetSessionVars().AllowMPPExecution && !collate.NewCollationEnabled() {
canPushToTiFlash := p.canPushToCop(kv.TiFlash)
if p.ctx.GetSessionVars().AllowMPPExecution && !collate.NewCollationEnabled() && canPushToTiFlash {
if p.shouldUseMPPBCJ() {
mppJoins := p.tryToGetMppHashJoin(prop, true)
if (p.preferJoinType & preferBCJoin) > 0 {
Expand All @@ -1678,7 +1679,7 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P
mppJoins := p.tryToGetMppHashJoin(prop, false)
joins = append(joins, mppJoins...)
}
} else if p.ctx.GetSessionVars().AllowBCJ {
} else if p.ctx.GetSessionVars().AllowBCJ && canPushToTiFlash {
broadCastJoins := p.tryToGetBroadCastJoin(prop)
if (p.preferJoinType & preferBCJoin) > 0 {
return broadCastJoins, true
Expand Down Expand Up @@ -1956,13 +1957,9 @@ func (p *LogicalProjection) exhaustPhysicalPlans(prop *property.PhysicalProperty
return []PhysicalPlan{proj}, true
}

func (lt *LogicalTopN) canPushToCop() bool {
return lt.canChildPushDown()
}

func (lt *LogicalTopN) getPhysTopN(prop *property.PhysicalProperty) []PhysicalPlan {
if lt.limitHints.preferLimitToCop {
if !lt.canPushToCop() {
if !lt.canPushToCop(kv.TiKV) {
errMsg := "Optimizer Hint LIMIT_TO_COP is inapplicable"
warning := ErrInternal.GenWithStack(errMsg)
lt.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
Expand Down Expand Up @@ -1993,7 +1990,7 @@ func (lt *LogicalTopN) getPhysLimits(prop *property.PhysicalProperty) []Physical
}

if lt.limitHints.preferLimitToCop {
if !lt.canPushToCop() {
if !lt.canPushToCop(kv.TiKV) {
errMsg := "Optimizer Hint LIMIT_TO_COP is inapplicable"
warning := ErrInternal.GenWithStack(errMsg)
lt.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
Expand Down Expand Up @@ -2115,25 +2112,36 @@ func (p *LogicalWindow) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([
func (p *baseLogicalPlan) exhaustPhysicalPlans(_ *property.PhysicalProperty) ([]PhysicalPlan, bool) {
panic("baseLogicalPlan.exhaustPhysicalPlans() should never be called.")
}
func (p *baseLogicalPlan) canChildPushDown() bool {
// At present for TiKV, only Aggregation, Limit, TopN can be pushed to cop task, and Projection will be supported in the future.
// When we push task to coprocessor, convertToRootTask will close the cop task and create a root task in the current implementation.
// Thus, we can't push two different tasks to coprocessor now, and can only push task to coprocessor when the child is Datasource.
// But for TiFlash, Projection, Join and DataSource can also be pushed down in most cases. Other operators will be
// supported in the future, such as Aggregation, Limit, TopN.
switch p.children[0].(type) {
case *DataSource:
return true
case *LogicalJoin, *LogicalProjection:
// TiFlash supports pushing down more operators
return p.SCtx().GetSessionVars().AllowBCJ || (p.SCtx().GetSessionVars().AllowMPPExecution && !collate.NewCollationEnabled())
default:
return false

// canPushToCop checks if it can be pushed to some stores. For TiKV, it only checks datasource.
// For TiFlash, it will check whether the operator is supported, but note that the check might be inaccrute.
func (p *baseLogicalPlan) canPushToCop(storeTp kv.StoreType) bool {
ret := true
for _, ch := range p.children {
switch c := ch.(type) {
case *DataSource:
validDs := false
for _, path := range c.possibleAccessPaths {
if path.StoreType == storeTp {
validDs = true
}
}
ret = ret && validDs
case *LogicalAggregation, *LogicalProjection, *LogicalSelection, *LogicalJoin:
if storeTp == kv.TiFlash {
ret = ret && c.canPushToCop(storeTp)
} else {
return false
}
default:
return false
}
}
return ret
}

func (la *LogicalAggregation) canPushToCop() bool {
return la.canChildPushDown() && !la.noCopPushDown
func (la *LogicalAggregation) canPushToCop(storeTp kv.StoreType) bool {
return la.baseLogicalPlan.canPushToCop(storeTp) && !la.noCopPushDown
}

func (la *LogicalAggregation) getEnforcedStreamAggs(prop *property.PhysicalProperty) []PhysicalPlan {
Expand All @@ -2155,7 +2163,7 @@ func (la *LogicalAggregation) getEnforcedStreamAggs(prop *property.PhysicalPrope
if la.HasDistinct() {
// TODO: remove AllowDistinctAggPushDown after the cost estimation of distinct pushdown is implemented.
// If AllowDistinctAggPushDown is set to true, we should not consider RootTask.
if !la.canPushToCop() || !la.ctx.GetSessionVars().AllowDistinctAggPushDown {
if !la.canPushToCop(kv.TiKV) || !la.ctx.GetSessionVars().AllowDistinctAggPushDown {
taskTypes = []property.TaskType{property.RootTaskType}
}
} else if !la.aggHints.preferAggToCop {
Expand Down Expand Up @@ -2235,7 +2243,7 @@ func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []P
} else if !la.aggHints.preferAggToCop {
taskTypes = append(taskTypes, property.RootTaskType)
}
if !la.canPushToCop() {
if !la.canPushToCop(kv.TiKV) {
taskTypes = []property.TaskType{property.RootTaskType}
}
for _, taskTp := range taskTypes {
Expand Down Expand Up @@ -2318,7 +2326,7 @@ func (la *LogicalAggregation) tryToGetMppHashAggs(prop *property.PhysicalPropert

// agg runs on TiDB with a partial agg on TiFlash if possible
if prop.TaskTp == property.RootTaskType {
childProp := &property.PhysicalProperty{TaskTp: property.RootTaskType, ExpectedCnt: math.MaxFloat64}
childProp := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64}
agg := NewPhysicalHashAgg(la, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProp)
agg.SetSchema(la.schema.Clone())
agg.MppRunMode = MppTiDB
Expand All @@ -2339,9 +2347,6 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy
if !prop.IsEmpty() {
return nil
}
if prop.IsFlashProp() && !la.canPushToCop() {
return nil
}
if prop.TaskTp == property.MppTaskType && !la.checkCanPushDownToMPP() {
return nil
}
Expand All @@ -2350,24 +2355,28 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy
if la.ctx.GetSessionVars().AllowBCJ {
taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType)
}
canPushDownToMPP := la.ctx.GetSessionVars().AllowMPPExecution && !collate.NewCollationEnabled() && la.checkCanPushDownToMPP()
if canPushDownToMPP {
taskTypes = append(taskTypes, property.MppTaskType)
}
canPushDownToTiFlash := la.canPushToCop(kv.TiFlash)
canPushDownToMPP := la.ctx.GetSessionVars().AllowMPPExecution && !collate.NewCollationEnabled() && la.checkCanPushDownToMPP() && canPushDownToTiFlash
if la.HasDistinct() {
// TODO: remove after the cost estimation of distinct pushdown is implemented.
if !la.ctx.GetSessionVars().AllowDistinctAggPushDown && !canPushDownToMPP {
if !la.ctx.GetSessionVars().AllowDistinctAggPushDown {
taskTypes = []property.TaskType{property.RootTaskType}
}
} else if !la.aggHints.preferAggToCop {
taskTypes = append(taskTypes, property.RootTaskType)
}
if !la.canPushToCop(kv.TiKV) {
taskTypes = []property.TaskType{property.RootTaskType}
if canPushDownToTiFlash {
taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType)
}
}
if canPushDownToMPP {
taskTypes = append(taskTypes, property.MppTaskType)
}
if prop.IsFlashProp() {
taskTypes = []property.TaskType{prop.TaskTp}
}
if !la.canPushToCop() {
taskTypes = []property.TaskType{property.RootTaskType}
}
for _, taskTp := range taskTypes {
if taskTp == property.MppTaskType {
mppAggs := la.tryToGetMppHashAggs(prop)
Expand Down Expand Up @@ -2400,7 +2409,7 @@ func (la *LogicalAggregation) ResetHintIfConflicted() (preferHash bool, preferSt

func (la *LogicalAggregation) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool) {
if la.aggHints.preferAggToCop {
if !la.canPushToCop() {
if !la.canPushToCop(kv.TiKV) {
errMsg := "Optimizer Hint AGG_TO_COP is inapplicable"
warning := ErrInternal.GenWithStack(errMsg)
la.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
Expand Down Expand Up @@ -2439,17 +2448,13 @@ func (p *LogicalSelection) exhaustPhysicalPlans(prop *property.PhysicalProperty)
return []PhysicalPlan{sel}, true
}

func (p *LogicalLimit) canPushToCop() bool {
return p.canChildPushDown()
}

func (p *LogicalLimit) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool) {
if !prop.IsEmpty() {
return nil, true
}

if p.limitHints.preferLimitToCop {
if !p.canPushToCop() {
if !p.canPushToCop(kv.TiKV) {
errMsg := "Optimizer Hint LIMIT_TO_COP is inapplicable"
warning := ErrInternal.GenWithStack(errMsg)
p.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
Expand Down
2 changes: 1 addition & 1 deletion planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ func (ds *DataSource) skylinePruning(prop *property.PhysicalProperty) []*candida
if len(path.Ranges) == 0 {
return []*candidatePath{{path: path}}
}
if path.StoreType != kv.TiFlash && (prop.TaskTp == property.CopTiFlashLocalReadTaskType || prop.TaskTp == property.CopTiFlashGlobalReadTaskType) {
if path.StoreType != kv.TiFlash && prop.IsFlashProp() {
continue
}
var currentCandidate *candidatePath
Expand Down
Loading

0 comments on commit 1cebae2

Please sign in to comment.