Skip to content

Commit

Permalink
planner: support out join in broadcast join (pingcap#18988)
Browse files Browse the repository at this point in the history
Co-authored-by: ti-srebot <[email protected]>
Co-authored-by: Zhuomin(Charming) Liu <[email protected]>
  • Loading branch information
3 people authored Aug 6, 2020
1 parent 031143a commit e49b4c0
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 16 deletions.
17 changes: 13 additions & 4 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1574,16 +1574,25 @@ func (p *LogicalJoin) tryToGetBroadCastJoin(prop *property.PhysicalProperty) []P
return nil
}

if p.JoinType != InnerJoin || len(p.LeftConditions) != 0 || len(p.RightConditions) != 0 || len(p.OtherConditions) != 0 || len(p.EqualConditions) == 0 {
// for left join the global idx must be 1, and for right join the global idx must be 0
if (p.JoinType != InnerJoin && p.JoinType != LeftOuterJoin && p.JoinType != RightOuterJoin) || len(p.LeftConditions) != 0 || len(p.RightConditions) != 0 || len(p.OtherConditions) != 0 || len(p.EqualConditions) == 0 {
return nil
}

if hasPrefer, idx := p.getPreferredBCJLocalIndex(); hasPrefer {
if (idx == 0 && p.JoinType == RightOuterJoin) || (idx == 1 && p.JoinType == LeftOuterJoin) {
return nil
}
return p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 1-idx)
}
results := p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 0)
results = append(results, p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 1)...)
return results
if p.JoinType == InnerJoin {
results := p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 0)
results = append(results, p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 1)...)
return results
} else if p.JoinType == LeftOuterJoin {
return p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 1)
}
return p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 0)
}

func (p *LogicalJoin) tryToGetBroadCastJoinByPreferGlobalIdx(prop *property.PhysicalProperty, preferredGlobalIndex int) []PhysicalPlan {
Expand Down
26 changes: 26 additions & 0 deletions planner/core/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,32 @@ func (p *PhysicalMergeJoin) ExplainNormalizedInfo() string {
return p.explainInfo(true)
}

// ExplainInfo implements Plan interface.
func (p *PhysicalBroadCastJoin) ExplainInfo() string {
return p.explainInfo()
}

// ExplainNormalizedInfo implements Plan interface.
func (p *PhysicalBroadCastJoin) ExplainNormalizedInfo() string {
return p.explainInfo()
}

func (p *PhysicalBroadCastJoin) explainInfo() string {
buffer := new(bytes.Buffer)

buffer.WriteString(p.JoinType.String())

if len(p.LeftJoinKeys) > 0 {
fmt.Fprintf(buffer, ", left key:%s",
expression.ExplainColumnList(p.LeftJoinKeys))
}
if len(p.RightJoinKeys) > 0 {
fmt.Fprintf(buffer, ", right key:%s",
expression.ExplainColumnList(p.RightJoinKeys))
}
return buffer.String()
}

// ExplainInfo implements Plan interface.
func (p *PhysicalTopN) ExplainInfo() string {
buffer := bytes.NewBufferString("")
Expand Down
4 changes: 2 additions & 2 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,8 @@ func (s *testIntegrationSerialSuite) TestBroadcastJoin(c *C) {
res.Check(testkit.Rows(output[i].Plan...))
}

// out join not supported
_, err := tk.Exec("explain select /*+ broadcast_join(fact_t, d1_t) */ count(*) from fact_t left join d1_t on fact_t.d1_k = d1_t.d1_k")
// out table of out join should not be global
_, err := tk.Exec("explain select /*+ broadcast_join(fact_t, d1_t), broadcast_join_local(d1_t) */ count(*) from fact_t left join d1_t on fact_t.d1_k = d1_t.d1_k")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[planner:1815]Internal : Can't find a proper physical plan for this query")
// join with non-equal condition not supported
Expand Down
9 changes: 8 additions & 1 deletion planner/core/plan_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,15 @@ func (p *PhysicalBroadCastJoin) ToPB(ctx sessionctx.Context, storeType kv.StoreT
if err != nil {
return nil, err
}
pbJoinType := tipb.JoinType_TypeInnerJoin
switch p.JoinType {
case LeftOuterJoin:
pbJoinType = tipb.JoinType_TypeLeftOuterJoin
case RightOuterJoin:
pbJoinType = tipb.JoinType_TypeRightOuterJoin
}
join := &tipb.Join{
JoinType: tipb.JoinType_TypeInnerJoin,
JoinType: pbJoinType,
JoinExecType: tipb.JoinExecType_TypeHashJoin,
InnerIdx: int64(p.InnerChildIdx),
LeftJoinKeys: left,
Expand Down
4 changes: 3 additions & 1 deletion planner/core/testdata/integration_serial_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
"explain select /*+ broadcast_join(fact_t,d1_t) */ count(*) from fact_t, d1_t where fact_t.d1_k = d1_t.d1_k",
"explain select /*+ broadcast_join(fact_t,d1_t,d2_t,d3_t) */ count(*) from fact_t, d1_t, d2_t, d3_t where fact_t.d1_k = d1_t.d1_k and fact_t.d2_k = d2_t.d2_k and fact_t.d3_k = d3_t.d3_k",
"explain select /*+ broadcast_join(fact_t,d1_t), broadcast_join_local(d1_t) */ count(*) from fact_t, d1_t where fact_t.d1_k = d1_t.d1_k",
"explain select /*+ broadcast_join(fact_t,d1_t,d2_t,d3_t), broadcast_join_local(d2_t) */ count(*) from fact_t, d1_t, d2_t, d3_t where fact_t.d1_k = d1_t.d1_k and fact_t.d2_k = d2_t.d2_k and fact_t.d3_k = d3_t.d3_k"
"explain select /*+ broadcast_join(fact_t,d1_t,d2_t,d3_t), broadcast_join_local(d2_t) */ count(*) from fact_t, d1_t, d2_t, d3_t where fact_t.d1_k = d1_t.d1_k and fact_t.d2_k = d2_t.d2_k and fact_t.d3_k = d3_t.d3_k",
"explain select /*+ broadcast_join(fact_t,d1_t) */ count(*) from fact_t left join d1_t on fact_t.d1_k = d1_t.d1_k",
"explain select /*+ broadcast_join(fact_t,d1_t) */ count(*) from fact_t right join d1_t on fact_t.d1_k = d1_t.d1_k"
]
},
{
Expand Down
40 changes: 32 additions & 8 deletions planner/core/testdata/integration_serial_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"StreamAgg_32 1.00 root funcs:count(Column#14)->Column#11",
"└─TableReader_33 1.00 root data:StreamAgg_13",
" └─StreamAgg_13 1.00 cop[tiflash] funcs:count(1)->Column#14",
" └─BroadcastJoin_31 8.00 cop[tiflash] ",
" └─BroadcastJoin_31 8.00 cop[tiflash] inner join, left key:test.fact_t.d1_k, right key:test.d1_t.d1_k",
" ├─Selection_23(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))",
" │ └─TableFullScan_22 2.00 cop[tiflash] table:d1_t keep order:false, global read",
" └─Selection_21(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k))",
Expand All @@ -42,13 +42,13 @@
"StreamAgg_52 1.00 root funcs:count(Column#20)->Column#17",
"└─TableReader_53 1.00 root data:StreamAgg_17",
" └─StreamAgg_17 1.00 cop[tiflash] funcs:count(1)->Column#20",
" └─BroadcastJoin_51 8.00 cop[tiflash] ",
" └─BroadcastJoin_51 8.00 cop[tiflash] inner join, left key:test.fact_t.d3_k, right key:test.d3_t.d3_k",
" ├─Selection_43(Build) 2.00 cop[tiflash] not(isnull(test.d3_t.d3_k))",
" │ └─TableFullScan_42 2.00 cop[tiflash] table:d3_t keep order:false, global read",
" └─BroadcastJoin_33(Probe) 8.00 cop[tiflash] ",
" └─BroadcastJoin_33(Probe) 8.00 cop[tiflash] inner join, left key:test.fact_t.d2_k, right key:test.d2_t.d2_k",
" ├─Selection_29(Build) 2.00 cop[tiflash] not(isnull(test.d2_t.d2_k))",
" │ └─TableFullScan_28 2.00 cop[tiflash] table:d2_t keep order:false, global read",
" └─BroadcastJoin_37(Probe) 8.00 cop[tiflash] ",
" └─BroadcastJoin_37(Probe) 8.00 cop[tiflash] inner join, left key:test.fact_t.d1_k, right key:test.d1_t.d1_k",
" ├─Selection_27(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))",
" │ └─TableFullScan_26 2.00 cop[tiflash] table:d1_t keep order:false, global read",
" └─Selection_41(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k)), not(isnull(test.fact_t.d2_k)), not(isnull(test.fact_t.d3_k))",
Expand All @@ -61,7 +61,7 @@
"StreamAgg_25 1.00 root funcs:count(Column#14)->Column#11",
"└─TableReader_26 1.00 root data:StreamAgg_13",
" └─StreamAgg_13 1.00 cop[tiflash] funcs:count(1)->Column#14",
" └─BroadcastJoin_24 8.00 cop[tiflash] ",
" └─BroadcastJoin_24 8.00 cop[tiflash] inner join, left key:test.fact_t.d1_k, right key:test.d1_t.d1_k",
" ├─Selection_18(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))",
" │ └─TableFullScan_17 2.00 cop[tiflash] table:d1_t keep order:false",
" └─Selection_16(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k))",
Expand All @@ -74,18 +74,42 @@
"StreamAgg_36 1.00 root funcs:count(Column#20)->Column#17",
"└─TableReader_37 1.00 root data:StreamAgg_17",
" └─StreamAgg_17 1.00 cop[tiflash] funcs:count(1)->Column#20",
" └─BroadcastJoin_35 8.00 cop[tiflash] ",
" └─BroadcastJoin_35 8.00 cop[tiflash] inner join, left key:test.fact_t.d3_k, right key:test.d3_t.d3_k",
" ├─Selection_29(Build) 2.00 cop[tiflash] not(isnull(test.d3_t.d3_k))",
" │ └─TableFullScan_28 2.00 cop[tiflash] table:d3_t keep order:false, global read",
" └─BroadcastJoin_19(Probe) 8.00 cop[tiflash] ",
" └─BroadcastJoin_19(Probe) 8.00 cop[tiflash] inner join, left key:test.fact_t.d2_k, right key:test.d2_t.d2_k",
" ├─Selection_27(Build) 2.00 cop[tiflash] not(isnull(test.d2_t.d2_k))",
" │ └─TableFullScan_26 2.00 cop[tiflash] table:d2_t keep order:false",
" └─BroadcastJoin_20(Probe) 8.00 cop[tiflash] ",
" └─BroadcastJoin_20(Probe) 8.00 cop[tiflash] inner join, left key:test.fact_t.d1_k, right key:test.d1_t.d1_k",
" ├─Selection_25(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))",
" │ └─TableFullScan_24 2.00 cop[tiflash] table:d1_t keep order:false, global read",
" └─Selection_23(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k)), not(isnull(test.fact_t.d2_k)), not(isnull(test.fact_t.d3_k))",
" └─TableFullScan_22 8.00 cop[tiflash] table:fact_t keep order:false, global read"
]
},
{
"SQL": "explain select /*+ broadcast_join(fact_t,d1_t) */ count(*) from fact_t left join d1_t on fact_t.d1_k = d1_t.d1_k",
"Plan": [
"StreamAgg_23 1.00 root funcs:count(Column#14)->Column#11",
"└─TableReader_24 1.00 root data:StreamAgg_12",
" └─StreamAgg_12 1.00 cop[tiflash] funcs:count(1)->Column#14",
" └─BroadcastJoin_22 8.00 cop[tiflash] left outer join, left key:test.fact_t.d1_k, right key:test.d1_t.d1_k",
" ├─Selection_16(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))",
" │ └─TableFullScan_15 2.00 cop[tiflash] table:d1_t keep order:false, global read",
" └─TableFullScan_14(Probe) 8.00 cop[tiflash] table:fact_t keep order:false"
]
},
{
"SQL": "explain select /*+ broadcast_join(fact_t,d1_t) */ count(*) from fact_t right join d1_t on fact_t.d1_k = d1_t.d1_k",
"Plan": [
"StreamAgg_23 1.00 root funcs:count(Column#14)->Column#11",
"└─TableReader_24 1.00 root data:StreamAgg_12",
" └─StreamAgg_12 1.00 cop[tiflash] funcs:count(1)->Column#14",
" └─BroadcastJoin_22 8.00 cop[tiflash] right outer join, left key:test.fact_t.d1_k, right key:test.d1_t.d1_k",
" ├─TableFullScan_16(Build) 2.00 cop[tiflash] table:d1_t keep order:false",
" └─Selection_15(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k))",
" └─TableFullScan_14 8.00 cop[tiflash] table:fact_t keep order:false, global read"
]
}
]
},
Expand Down

0 comments on commit e49b4c0

Please sign in to comment.