Skip to content

Commit

Permalink
planner: fix can not found column bug (pingcap#28067)
Browse files Browse the repository at this point in the history
  • Loading branch information
windtalker authored Sep 17, 2021
1 parent 2169483 commit d1a3279
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 2 deletions.
16 changes: 15 additions & 1 deletion planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1886,8 +1886,22 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC
lPartitionKeys, rPartitionKeys := p.GetPotentialPartitionKeys()
if prop.MPPPartitionTp == property.HashType {
var matches []int
if matches = prop.IsSubsetOf(lPartitionKeys); len(matches) == 0 {
if p.JoinType == InnerJoin {
if matches = prop.IsSubsetOf(lPartitionKeys); len(matches) == 0 {
matches = prop.IsSubsetOf(rPartitionKeys)
}
} else if p.JoinType == RightOuterJoin {
// for right out join, only the right partition keys can possibly matches the prop, because
// the left partition keys will generate NULL values randomly
// todo maybe we can add a null-sensitive flag in the MPPPartitionColumn to indicate whether the partition column is
// null-sensitive(used in aggregation) or null-insensitive(used in join)
matches = prop.IsSubsetOf(rPartitionKeys)
} else {
// for left out join, only the left partition keys can possibly matches the prop, because
// the right partition keys will generate NULL values randomly
// for semi/anti semi/left out semi/anti left out semi join, only left partition keys are returned,
// so just check the left partition keys
matches = prop.IsSubsetOf(lPartitionKeys)
}
if len(matches) == 0 {
return nil
Expand Down
49 changes: 49 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,55 @@ func (s *testIntegrationSerialSuite) TestMPPShuffledJoin(c *C) {
}
}

func (s *testIntegrationSerialSuite) TestMPPJoinWithCanNotFoundColumnInSchemaColumnsError(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(id int, v1 decimal(20,2), v2 decimal(20,2))")
tk.MustExec("create table t2(id int, v1 decimal(10,2), v2 decimal(10,2))")
tk.MustExec("create table t3(id int, v1 decimal(10,2), v2 decimal(10,2))")
tk.MustExec("insert into t1 values(1,1,1),(2,2,2)")
tk.MustExec("insert into t2 values(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),(6,6,6),(7,7,7),(8,8,8)")
tk.MustExec("insert into t3 values(1,1,1)")
tk.MustExec("analyze table t1")
tk.MustExec("analyze table t2")
tk.MustExec("analyze table t3")

dom := domain.GetDomain(tk.Se)
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
c.Assert(exists, IsTrue)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "t1" || tblInfo.Name.L == "t2" || tblInfo.Name.L == "t3" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}

tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'")
tk.MustExec("set @@session.tidb_enforce_mpp = 1")
tk.MustExec("set @@session.tidb_broadcast_join_threshold_size = 0")
tk.MustExec("set @@session.tidb_broadcast_join_threshold_count = 0")
tk.MustExec("set @@session.tidb_opt_mpp_outer_join_fixed_build_side = 0")

var input []string
var output []struct {
SQL string
Plan []string
}
s.testData.GetTestCases(c, &input, &output)
for i, tt := range input {
s.testData.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
})
res := tk.MustQuery(tt)
res.Check(testkit.Rows(output[i].Plan...))
}
}

func (s *testIntegrationSerialSuite) TestJoinNotSupportedByTiFlash(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
20 changes: 19 additions & 1 deletion planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,25 @@ func (p *PhysicalHashJoin) attach2TaskForMpp(tasks ...task) task {
lCost := lTask.cost()
rCost := rTask.cost()

outerTask := tasks[1-p.InnerChildIdx].(*mppTask)
// outer task is the task that will pass its MPPPartitionType to the join result
// for broadcast inner join, it should be the non-broadcast side, since broadcast side is always the build side, so
// just use the probe side is ok.
// for hash inner join, both side is ok, by default, we use the probe side
// for outer join, it should always be the outer side of the join
// for semi join, it should be the left side(the same as left out join)
outerTaskIndex := 1 - p.InnerChildIdx
if p.JoinType != InnerJoin {
if p.JoinType == RightOuterJoin {
outerTaskIndex = 1
} else {
outerTaskIndex = 0
}
}
// can not use the task from tasks because it maybe updated.
outerTask := lTask
if outerTaskIndex == 1 {
outerTask = rTask
}
task := &mppTask{
cst: lCost + rCost + p.GetCost(lTask.count(), rTask.count()),
p: p,
Expand Down
8 changes: 8 additions & 0 deletions planner/core/testdata/integration_serial_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@
"explain format = 'brief' select count(*) from fact_t where not exists (select 1 from d1_t where d1_k = fact_t.d1_k and value > fact_t.col1)"
]
},
{
"name": "TestMPPJoinWithCanNotFoundColumnInSchemaColumnsError",
"cases": [
"explain format = 'brief' select v from t3 as a left join (select t1.v1, t1.v2, t1.v1 + t1.v2 as v from t1 left join t2 on t1.v1 = t2.v1 and t1.v2 = t2.v2) b on a.v1 = b.v1 and a.v2 = b.v2",
"explain format = 'brief' select count(*), t2.v1, t2.v2 from t1 left join t2 on t1.v1 = t2.v1 and t1.v2 = t2.v2 group by t2.v1, t2.v2",
"explain format = 'brief' select count(*), t2.v1, t2.v2 from t3 left join t2 on t3.v1 = t2.v1 and t3.v2 = t2.v2 group by t2.v1, t2.v2"
]
},
{
"name": "TestJoinNotSupportedByTiFlash",
"cases": [
Expand Down
70 changes: 70 additions & 0 deletions planner/core/testdata/integration_serial_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,76 @@
}
]
},
{
"Name": "TestMPPJoinWithCanNotFoundColumnInSchemaColumnsError",
"Cases": [
{
"SQL": "explain format = 'brief' select v from t3 as a left join (select t1.v1, t1.v2, t1.v1 + t1.v2 as v from t1 left join t2 on t1.v1 = t2.v1 and t1.v2 = t2.v2) b on a.v1 = b.v1 and a.v2 = b.v2",
"Plan": [
"TableReader 1.00 root data:ExchangeSender",
"└─ExchangeSender 1.00 cop[tiflash] ExchangeType: PassThrough",
" └─Projection 1.00 cop[tiflash] Column#13",
" └─HashJoin 1.00 cop[tiflash] left outer join, equal:[eq(test.t3.v1, test.t1.v1) eq(test.t3.v2, test.t1.v2)]",
" ├─ExchangeReceiver(Build) 1.00 cop[tiflash] ",
" │ └─ExchangeSender 1.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: [name: Column#23, collate: N/A], [name: Column#24, collate: N/A]",
" │ └─Projection 1.00 cop[tiflash] test.t3.v1, test.t3.v2, cast(test.t3.v1, decimal(20,2))->Column#23, cast(test.t3.v2, decimal(20,2))->Column#24",
" │ └─TableFullScan 1.00 cop[tiflash] table:a keep order:false",
" └─Projection(Probe) 2.00 cop[tiflash] test.t1.v1, test.t1.v2, plus(test.t1.v1, test.t1.v2)->Column#13",
" └─HashJoin 2.00 cop[tiflash] left outer join, equal:[eq(test.t1.v1, test.t2.v1) eq(test.t1.v2, test.t2.v2)]",
" ├─ExchangeReceiver(Build) 2.00 cop[tiflash] ",
" │ └─ExchangeSender 2.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t1.v1, collate: N/A], [name: test.t1.v2, collate: N/A]",
" │ └─Selection 2.00 cop[tiflash] not(isnull(test.t1.v1)), not(isnull(test.t1.v2))",
" │ └─TableFullScan 2.00 cop[tiflash] table:t1 keep order:false",
" └─ExchangeReceiver(Probe) 8.00 cop[tiflash] ",
" └─ExchangeSender 8.00 cop[tiflash] ExchangeType: HashPartition, Hash Cols: [name: Column#15, collate: N/A], [name: Column#16, collate: N/A]",
" └─Projection 8.00 cop[tiflash] test.t2.v1, test.t2.v2, cast(test.t2.v1, decimal(20,2))->Column#15, cast(test.t2.v2, decimal(20,2))->Column#16",
" └─Selection 8.00 cop[tiflash] not(isnull(test.t2.v1)), not(isnull(test.t2.v2))",
" └─TableFullScan 8.00 cop[tiflash] table:t2 keep order:false"
]
},
{
"SQL": "explain format = 'brief' select count(*), t2.v1, t2.v2 from t1 left join t2 on t1.v1 = t2.v1 and t1.v2 = t2.v2 group by t2.v1, t2.v2",
"Plan": [
"TableReader 2.00 root data:ExchangeSender",
"└─ExchangeSender 2.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─Projection 2.00 batchCop[tiflash] Column#9, test.t2.v1, test.t2.v2",
" └─HashAgg 2.00 batchCop[tiflash] group by:test.t2.v1, test.t2.v2, funcs:sum(Column#22)->Column#9, funcs:firstrow(test.t2.v1)->test.t2.v1, funcs:firstrow(test.t2.v2)->test.t2.v2",
" └─ExchangeReceiver 2.00 batchCop[tiflash] ",
" └─ExchangeSender 2.00 batchCop[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t2.v1, collate: N/A], [name: test.t2.v2, collate: N/A]",
" └─HashAgg 2.00 batchCop[tiflash] group by:test.t2.v1, test.t2.v2, funcs:count(1)->Column#22",
" └─HashJoin 2.00 batchCop[tiflash] left outer join, equal:[eq(test.t1.v1, test.t2.v1) eq(test.t1.v2, test.t2.v2)]",
" ├─ExchangeReceiver(Build) 2.00 batchCop[tiflash] ",
" │ └─ExchangeSender 2.00 batchCop[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t1.v1, collate: N/A], [name: test.t1.v2, collate: N/A]",
" │ └─TableFullScan 2.00 batchCop[tiflash] table:t1 keep order:false",
" └─ExchangeReceiver(Probe) 8.00 batchCop[tiflash] ",
" └─ExchangeSender 8.00 batchCop[tiflash] ExchangeType: HashPartition, Hash Cols: [name: Column#14, collate: N/A], [name: Column#15, collate: N/A]",
" └─Projection 8.00 batchCop[tiflash] test.t2.v1, test.t2.v2, cast(test.t2.v1, decimal(20,2))->Column#14, cast(test.t2.v2, decimal(20,2))->Column#15",
" └─Selection 8.00 batchCop[tiflash] not(isnull(test.t2.v1)), not(isnull(test.t2.v2))",
" └─TableFullScan 8.00 batchCop[tiflash] table:t2 keep order:false"
]
},
{
"SQL": "explain format = 'brief' select count(*), t2.v1, t2.v2 from t3 left join t2 on t3.v1 = t2.v1 and t3.v2 = t2.v2 group by t2.v1, t2.v2",
"Plan": [
"TableReader 1.00 root data:ExchangeSender",
"└─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─Projection 1.00 batchCop[tiflash] Column#9, test.t2.v1, test.t2.v2",
" └─HashAgg 1.00 batchCop[tiflash] group by:test.t2.v1, test.t2.v2, funcs:sum(Column#16)->Column#9, funcs:firstrow(test.t2.v1)->test.t2.v1, funcs:firstrow(test.t2.v2)->test.t2.v2",
" └─ExchangeReceiver 1.00 batchCop[tiflash] ",
" └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t2.v1, collate: N/A], [name: test.t2.v2, collate: N/A]",
" └─HashAgg 1.00 batchCop[tiflash] group by:test.t2.v1, test.t2.v2, funcs:count(1)->Column#16",
" └─HashJoin 1.00 batchCop[tiflash] left outer join, equal:[eq(test.t3.v1, test.t2.v1) eq(test.t3.v2, test.t2.v2)]",
" ├─ExchangeReceiver(Build) 1.00 batchCop[tiflash] ",
" │ └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t3.v1, collate: N/A], [name: test.t3.v2, collate: N/A]",
" │ └─TableFullScan 1.00 batchCop[tiflash] table:t3 keep order:false",
" └─ExchangeReceiver(Probe) 8.00 batchCop[tiflash] ",
" └─ExchangeSender 8.00 batchCop[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t2.v1, collate: N/A], [name: test.t2.v2, collate: N/A]",
" └─Selection 8.00 batchCop[tiflash] not(isnull(test.t2.v1)), not(isnull(test.t2.v2))",
" └─TableFullScan 8.00 batchCop[tiflash] table:t2 keep order:false"
]
}
]
},
{
"Name": "TestJoinNotSupportedByTiFlash",
"Cases": [
Expand Down

0 comments on commit d1a3279

Please sign in to comment.