Skip to content

Commit

Permalink
planner: Mpp outer join build side (pingcap#25130)
Browse files Browse the repository at this point in the history
  • Loading branch information
windtalker authored Jun 4, 2021
1 parent 99b1fa5 commit c59b3bc
Show file tree
Hide file tree
Showing 7 changed files with 290 additions and 2 deletions.
16 changes: 15 additions & 1 deletion planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1808,9 +1808,23 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC
if p.children[0].statsInfo().Count() > p.children[1].statsInfo().Count() {
preferredBuildIndex = 1
}
} else if p.JoinType == SemiJoin || p.JoinType == AntiSemiJoin || p.JoinType == LeftOuterJoin {
} else if p.JoinType == SemiJoin || p.JoinType == AntiSemiJoin {
preferredBuildIndex = 1
}
if p.JoinType == LeftOuterJoin || p.JoinType == RightOuterJoin {
// TiFlash does not requires that the build side must be the inner table for outer join
// so we can choose the build side based on the row count, except that
// 1. it is a broadcast join(for broadcast join, it make sense to use the broadcast side as the build side)
// 2. or session variable MPPOuterJoinFixedBuildSide is set to true
// 3. or there are otherConditions for this join
if useBCJ || p.ctx.GetSessionVars().MPPOuterJoinFixedBuildSide || len(p.OtherConditions) > 0 {
if p.JoinType == LeftOuterJoin {
preferredBuildIndex = 1
}
} else if p.children[0].statsInfo().Count() > p.children[1].statsInfo().Count() {
preferredBuildIndex = 1
}
}
baseJoin.InnerChildIdx = preferredBuildIndex
childrenProps := make([]*property.PhysicalProperty, 2)
if useBCJ {
Expand Down
132 changes: 132 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,138 @@ func (s *testIntegrationSerialSuite) TestMPPJoin(c *C) {
}
}

func (s *testIntegrationSerialSuite) TestMPPOuterJoinBuildSideForBroadcastJoin(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists a")
tk.MustExec("create table a(id int, value int)")
tk.MustExec("insert into a values(1,2),(2,3)")
tk.MustExec("analyze table a")
tk.MustExec("drop table if exists b")
tk.MustExec("create table b(id int, value int)")
tk.MustExec("insert into b values(1,2),(2,3),(3,4)")
tk.MustExec("analyze table b")
// Create virtual tiflash replica info.
dom := domain.GetDomain(tk.Se)
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
c.Assert(exists, IsTrue)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "a" || tblInfo.Name.L == "b" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}
tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'")
tk.MustExec("set @@session.tidb_opt_mpp_outer_join_fixed_build_side = 0")
tk.MustExec("set @@session.tidb_broadcast_join_threshold_size = 10000")
tk.MustExec("set @@session.tidb_broadcast_join_threshold_count = 10000")
var input []string
var output []struct {
SQL string
Plan []string
}
s.testData.GetTestCases(c, &input, &output)
for i, tt := range input {
s.testData.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
})
res := tk.MustQuery(tt)
res.Check(testkit.Rows(output[i].Plan...))
}
}

func (s *testIntegrationSerialSuite) TestMPPOuterJoinBuildSideForShuffleJoinWithFixedBuildSide(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists a")
tk.MustExec("create table a(id int, value int)")
tk.MustExec("insert into a values(1,2),(2,3)")
tk.MustExec("analyze table a")
tk.MustExec("drop table if exists b")
tk.MustExec("create table b(id int, value int)")
tk.MustExec("insert into b values(1,2),(2,3),(3,4)")
tk.MustExec("analyze table b")
// Create virtual tiflash replica info.
dom := domain.GetDomain(tk.Se)
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
c.Assert(exists, IsTrue)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "a" || tblInfo.Name.L == "b" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}
tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'")
tk.MustExec("set @@session.tidb_opt_mpp_outer_join_fixed_build_side = 1")
tk.MustExec("set @@session.tidb_broadcast_join_threshold_size = 0")
tk.MustExec("set @@session.tidb_broadcast_join_threshold_count = 0")
var input []string
var output []struct {
SQL string
Plan []string
}
s.testData.GetTestCases(c, &input, &output)
for i, tt := range input {
s.testData.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
})
res := tk.MustQuery(tt)
res.Check(testkit.Rows(output[i].Plan...))
}
}

func (s *testIntegrationSerialSuite) TestMPPOuterJoinBuildSideForShuffleJoin(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists a")
tk.MustExec("create table a(id int, value int)")
tk.MustExec("insert into a values(1,2),(2,3)")
tk.MustExec("analyze table a")
tk.MustExec("drop table if exists b")
tk.MustExec("create table b(id int, value int)")
tk.MustExec("insert into b values(1,2),(2,3),(3,4)")
tk.MustExec("analyze table b")
// Create virtual tiflash replica info.
dom := domain.GetDomain(tk.Se)
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
c.Assert(exists, IsTrue)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "a" || tblInfo.Name.L == "b" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}
tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'")
tk.MustExec("set @@session.tidb_opt_mpp_outer_join_fixed_build_side = 0")
tk.MustExec("set @@session.tidb_broadcast_join_threshold_size = 0")
tk.MustExec("set @@session.tidb_broadcast_join_threshold_count = 0")
var input []string
var output []struct {
SQL string
Plan []string
}
s.testData.GetTestCases(c, &input, &output)
for i, tt := range input {
s.testData.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
})
res := tk.MustQuery(tt)
res.Check(testkit.Rows(output[i].Plan...))
}
}

func (s *testIntegrationSerialSuite) TestMPPShuffledJoin(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
21 changes: 21 additions & 0 deletions planner/core/testdata/integration_serial_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,27 @@
"explain format = 'brief' select count(*) from fact_t where d1_k not in (select d1_k from d1_t)"
]
},
{
"name": "TestMPPOuterJoinBuildSideForBroadcastJoin",
"cases": [
"explain format = 'brief' select count(*) from a left join b on a.id = b.id",
"explain format = 'brief' select count(*) from b right join a on a.id = b.id"
]
},
{
"name": "TestMPPOuterJoinBuildSideForShuffleJoinWithFixedBuildSide",
"cases": [
"explain format = 'brief' select count(*) from a left join b on a.id = b.id",
"explain format = 'brief' select count(*) from b right join a on a.id = b.id"
]
},
{
"name": "TestMPPOuterJoinBuildSideForShuffleJoin",
"cases": [
"explain format = 'brief' select count(*) from a left join b on a.id = b.id",
"explain format = 'brief' select count(*) from b right join a on a.id = b.id"
]
},
{
"name": "TestMPPShuffledJoin",
"cases": [
Expand Down
107 changes: 107 additions & 0 deletions planner/core/testdata/integration_serial_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,113 @@
}
]
},
{
"Name": "TestMPPOuterJoinBuildSideForBroadcastJoin",
"Cases": [
{
"SQL": "explain format = 'brief' select count(*) from a left join b on a.id = b.id",
"Plan": [
"StreamAgg 1.00 root funcs:count(1)->Column#7",
"└─TableReader 2.00 root data:ExchangeSender",
" └─ExchangeSender 2.00 cop[tiflash] ExchangeType: PassThrough",
" └─HashJoin 2.00 cop[tiflash] left outer join, equal:[eq(test.a.id, test.b.id)]",
" ├─ExchangeReceiver(Build) 3.00 cop[tiflash] ",
" │ └─ExchangeSender 3.00 cop[tiflash] ExchangeType: Broadcast",
" │ └─Selection 3.00 cop[tiflash] not(isnull(test.b.id))",
" │ └─TableFullScan 3.00 cop[tiflash] table:b keep order:false",
" └─TableFullScan(Probe) 2.00 cop[tiflash] table:a keep order:false"
]
},
{
"SQL": "explain format = 'brief' select count(*) from b right join a on a.id = b.id",
"Plan": [
"StreamAgg 1.00 root funcs:count(1)->Column#7",
"└─TableReader 2.00 root data:ExchangeSender",
" └─ExchangeSender 2.00 cop[tiflash] ExchangeType: PassThrough",
" └─HashJoin 2.00 cop[tiflash] right outer join, equal:[eq(test.b.id, test.a.id)]",
" ├─ExchangeReceiver(Build) 3.00 cop[tiflash] ",
" │ └─ExchangeSender 3.00 cop[tiflash] ExchangeType: Broadcast",
" │ └─Selection 3.00 cop[tiflash] not(isnull(test.b.id))",
" │ └─TableFullScan 3.00 cop[tiflash] table:b keep order:false",
" └─TableFullScan(Probe) 2.00 cop[tiflash] table:a keep order:false"
]
}
]
},
{
"Name": "TestMPPOuterJoinBuildSideForShuffleJoinWithFixedBuildSide",
"Cases": [
{
"SQL": "explain format = 'brief' select count(*) from a left join b on a.id = b.id",
"Plan": [
"StreamAgg 1.00 root funcs:count(1)->Column#7",
"└─TableReader 2.00 root data:ExchangeSender",
" └─ExchangeSender 2.00 cop[tiflash] ExchangeType: PassThrough",
" └─HashJoin 2.00 cop[tiflash] left outer join, equal:[eq(test.a.id, test.b.id)]",
" ├─ExchangeReceiver(Build) 3.00 cop[tiflash] ",
" │ └─ExchangeSender 3.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.b.id",
" │ └─Selection 3.00 cop[tiflash] not(isnull(test.b.id))",
" │ └─TableFullScan 3.00 cop[tiflash] table:b keep order:false",
" └─ExchangeReceiver(Probe) 2.00 cop[tiflash] ",
" └─ExchangeSender 2.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.a.id",
" └─TableFullScan 2.00 cop[tiflash] table:a keep order:false"
]
},
{
"SQL": "explain format = 'brief' select count(*) from b right join a on a.id = b.id",
"Plan": [
"StreamAgg 1.00 root funcs:count(1)->Column#7",
"└─TableReader 2.00 root data:ExchangeSender",
" └─ExchangeSender 2.00 cop[tiflash] ExchangeType: PassThrough",
" └─HashJoin 2.00 cop[tiflash] right outer join, equal:[eq(test.b.id, test.a.id)]",
" ├─ExchangeReceiver(Build) 3.00 cop[tiflash] ",
" │ └─ExchangeSender 3.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.b.id",
" │ └─Selection 3.00 cop[tiflash] not(isnull(test.b.id))",
" │ └─TableFullScan 3.00 cop[tiflash] table:b keep order:false",
" └─ExchangeReceiver(Probe) 2.00 cop[tiflash] ",
" └─ExchangeSender 2.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.a.id",
" └─TableFullScan 2.00 cop[tiflash] table:a keep order:false"
]
}
]
},
{
"Name": "TestMPPOuterJoinBuildSideForShuffleJoin",
"Cases": [
{
"SQL": "explain format = 'brief' select count(*) from a left join b on a.id = b.id",
"Plan": [
"StreamAgg 1.00 root funcs:count(1)->Column#7",
"└─TableReader 2.00 root data:ExchangeSender",
" └─ExchangeSender 2.00 cop[tiflash] ExchangeType: PassThrough",
" └─HashJoin 2.00 cop[tiflash] left outer join, equal:[eq(test.a.id, test.b.id)]",
" ├─ExchangeReceiver(Build) 2.00 cop[tiflash] ",
" │ └─ExchangeSender 2.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.a.id",
" │ └─TableFullScan 2.00 cop[tiflash] table:a keep order:false",
" └─ExchangeReceiver(Probe) 3.00 cop[tiflash] ",
" └─ExchangeSender 3.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.b.id",
" └─Selection 3.00 cop[tiflash] not(isnull(test.b.id))",
" └─TableFullScan 3.00 cop[tiflash] table:b keep order:false"
]
},
{
"SQL": "explain format = 'brief' select count(*) from b right join a on a.id = b.id",
"Plan": [
"StreamAgg 1.00 root funcs:count(1)->Column#7",
"└─TableReader 2.00 root data:ExchangeSender",
" └─ExchangeSender 2.00 cop[tiflash] ExchangeType: PassThrough",
" └─HashJoin 2.00 cop[tiflash] right outer join, equal:[eq(test.b.id, test.a.id)]",
" ├─ExchangeReceiver(Build) 2.00 cop[tiflash] ",
" │ └─ExchangeSender 2.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.a.id",
" │ └─TableFullScan 2.00 cop[tiflash] table:a keep order:false",
" └─ExchangeReceiver(Probe) 3.00 cop[tiflash] ",
" └─ExchangeSender 3.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: test.b.id",
" └─Selection 3.00 cop[tiflash] not(isnull(test.b.id))",
" └─TableFullScan 3.00 cop[tiflash] table:b keep order:false"
]
}
]
},
{
"Name": "TestMPPShuffledJoin",
"Cases": [
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,9 @@ type SessionVars struct {
// size exceeds the broadcast threshold
AllowCartesianBCJ int

// MPPOuterJoinFixedBuildSide means in MPP plan, always use right(left) table as build side for left(right) out join
MPPOuterJoinFixedBuildSide bool

// AllowDistinctAggPushDown can be set true to allow agg with distinct push down to tikv/tiflash.
AllowDistinctAggPushDown bool

Expand Down Expand Up @@ -978,6 +981,7 @@ func NewSessionVars() *SessionVars {
AllowAggPushDown: false,
AllowBCJ: false,
AllowCartesianBCJ: DefOptCartesianBCJ,
MPPOuterJoinFixedBuildSide: DefOptMPPOuterJoinFixedBuildSide,
BroadcastJoinThresholdSize: DefBroadcastJoinThresholdSize,
BroadcastJoinThresholdCount: DefBroadcastJoinThresholdSize,
OptimizerSelectivityLevel: DefTiDBOptimizerSelectivityLevel,
Expand Down
9 changes: 8 additions & 1 deletion sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,14 @@ var defaultSysVars = []*SysVar{
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBBuildStatsConcurrency, skipInit: true, Value: strconv.Itoa(DefBuildStatsConcurrency)},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCartesianBCJ, Value: strconv.Itoa(DefOptCartesianBCJ), Type: TypeInt, MinValue: 0, MaxValue: 2},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCartesianBCJ, Value: strconv.Itoa(DefOptCartesianBCJ), Type: TypeInt, MinValue: 0, MaxValue: 2, SetSession: func(s *SessionVars, val string) error {
s.AllowCartesianBCJ = tidbOptInt(val, DefOptCartesianBCJ)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptMPPOuterJoinFixedBuildSide, Value: BoolToOnOff(DefOptMPPOuterJoinFixedBuildSide), Type: TypeBool, SetSession: func(s *SessionVars, val string) error {
s.MPPOuterJoinFixedBuildSide = TiDBOptOn(val)
return nil
}},
{Scope: ScopeGlobal, Name: TiDBAutoAnalyzeRatio, Value: strconv.FormatFloat(DefAutoAnalyzeRatio, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64},
{Scope: ScopeGlobal, Name: TiDBAutoAnalyzeStartTime, Value: DefAutoAnalyzeStartTime, Type: TypeTime},
{Scope: ScopeGlobal, Name: TiDBAutoAnalyzeEndTime, Value: DefAutoAnalyzeEndTime, Type: TypeTime},
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ const (
// TiDBOptCartesianBCJ is used to disable/enable broadcast cartesian join in MPP mode
TiDBOptCartesianBCJ = "tidb_opt_broadcast_cartesian_join"

TiDBOptMPPOuterJoinFixedBuildSide = "tidb_opt_mpp_outer_join_fixed_build_side"

// tidb_opt_distinct_agg_push_down is used to decide whether agg with distinct should be pushed to tikv/tiflash.
TiDBOptDistinctAggPushDown = "tidb_opt_distinct_agg_push_down"

Expand Down Expand Up @@ -594,6 +596,7 @@ const (
DefOptAggPushDown = false
DefOptBCJ = false
DefOptCartesianBCJ = 1
DefOptMPPOuterJoinFixedBuildSide = true
DefOptWriteRowID = false
DefOptCorrelationThreshold = 0.9
DefOptCorrelationExpFactor = 1
Expand Down

0 comments on commit c59b3bc

Please sign in to comment.