Skip to content

Commit

Permalink
executor: Check if the query is killed when joining rows in HashJoin (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
TszKitLo40 authored Oct 28, 2020
1 parent ccd48ee commit 2e4390b
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 0 deletions.
20 changes: 20 additions & 0 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,16 @@ func (e *HashJoinExec) join2Chunk(workerID uint, probeSideChk *chunk.Chunk, hCtx
}

for i := range selected {
killed := atomic.LoadUint32(&e.ctx.GetSessionVars().Killed) == 1
failpoint.Inject("killedInJoin2Chunk", func(val failpoint.Value) {
if val.(bool) {
killed = true
}
})
if killed {
joinResult.err = ErrQueryInterrupted
return false, joinResult
}
if !selected[i] || hCtx.hasNull[i] { // process unmatched probe side rows
e.joiners[workerID].onMissMatch(false, probeSideChk.GetRow(i), joinResult.chk)
} else { // process matched probe side rows
Expand Down Expand Up @@ -610,6 +620,16 @@ func (e *HashJoinExec) join2ChunkForOuterHashJoin(workerID uint, probeSideChk *c
}
}
for i := 0; i < probeSideChk.NumRows(); i++ {
killed := atomic.LoadUint32(&e.ctx.GetSessionVars().Killed) == 1
failpoint.Inject("killedInJoin2ChunkForOuterHashJoin", func(val failpoint.Value) {
if val.(bool) {
killed = true
}
})
if killed {
joinResult.err = ErrQueryInterrupted
return false, joinResult
}
probeKey, probeRow := hCtx.hashVals[i].Sum64(), probeSideChk.GetRow(i)
ok, joinResult = e.joinMatchedProbeSideRow2ChunkForOuterHashJoin(workerID, probeKey, probeRow, hCtx, joinResult)
if !ok {
Expand Down
24 changes: 24 additions & 0 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/executor"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -2321,3 +2322,26 @@ func (s *testSuiteJoinSerial) TestExplainAnalyzeJoin(c *C) {
c.Assert(rows[0][0], Matches, "HashJoin.*")
c.Assert(rows[0][5], Matches, "time:.*, loops:.*, build_hash_table:{total:.*, fetch:.*, build:.*}, probe:{concurrency:5, total:.*, max:.*, probe:.*, fetch:.*}")
}

func (s *testSuiteJoinSerial) TestIssue20270(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
failpoint.Enable("github.com/pingcap/tidb/executor/killedInJoin2Chunk", "return(true)")
tk.MustExec("drop table if exists t;")
tk.MustExec("drop table if exists t1;")
tk.MustExec("create table t(c1 int, c2 int)")
tk.MustExec("create table t1(c1 int, c2 int)")
tk.MustExec("insert into t values(1,1),(2,2)")
tk.MustExec("insert into t1 values(2,3),(4,4)")
err := tk.QueryToErr("select /*+ TIDB_HJ(t, t1) */ * from t left join t1 on t.c1 = t1.c1 where t.c1 = 1 or t1.c2 > 20")
c.Assert(err, Equals, executor.ErrQueryInterrupted)
failpoint.Disable("github.com/pingcap/tidb/executor/killedInJoin2Chunk")
plannercore.ForceUseOuterBuild4Test = true
defer func() {
plannercore.ForceUseOuterBuild4Test = false
}()
failpoint.Enable("github.com/pingcap/tidb/executor/killedInJoin2ChunkForOuterHashJoin", "return(true)")
tk.MustExec("insert into t1 values(1,30),(2,40)")
err = tk.QueryToErr("select /*+ TIDB_HJ(t, t1) */ * from t left outer join t1 on t.c1 = t1.c1 where t.c1 = 1 or t1.c2 > 20")
c.Assert(err, Equals, executor.ErrQueryInterrupted)
failpoint.Disable("github.com/pingcap/tidb/executor/killedInJoin2ChunkForOuterHashJoin")
}

0 comments on commit 2e4390b

Please sign in to comment.