Skip to content

Commit

Permalink
executor: fix a bug that cte.iterOutTbl did not close correctly (ping…
Browse files Browse the repository at this point in the history
  • Loading branch information
guo-shaoge authored Jul 12, 2021
1 parent d070f70 commit 7ee78d1
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 14 deletions.
20 changes: 13 additions & 7 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4356,17 +4356,23 @@ func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) Executor {
return nil
}

// 2. Build iterInTbl.
// 2. Build tables to store intermediate results.
chkSize := b.ctx.GetSessionVars().MaxChunkSize
tps := seedExec.base().retFieldTypes
iterOutTbl := cteutil.NewStorageRowContainer(tps, chkSize)
if err := iterOutTbl.OpenAndRef(); err != nil {
b.err = err
return nil
}

var resTbl cteutil.Storage
var iterInTbl cteutil.Storage
var iterOutTbl cteutil.Storage

if v.RecurPlan != nil {
// For non-recursive CTE, the result will be put into resTbl directly.
// So no need to build iterOutTbl.
iterOutTbl := cteutil.NewStorageRowContainer(tps, chkSize)
if err := iterOutTbl.OpenAndRef(); err != nil {
b.err = err
return nil
}
}

storageMap, ok := b.ctx.GetSessionVars().StmtCtx.CTEStorageMap.(map[int]*CTEStorages)
if !ok {
b.err = errors.New("type assertion for CTEStorageMap failed")
Expand Down
19 changes: 12 additions & 7 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,18 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
if !e.resTbl.Done() {
resAction := setupCTEStorageTracker(e.resTbl, e.ctx, e.memTracker, e.diskTracker)
iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx, e.memTracker, e.diskTracker)
iterOutAction := setupCTEStorageTracker(e.iterOutTbl, e.ctx, e.memTracker, e.diskTracker)
var iterOutAction *chunk.SpillDiskAction
if e.iterOutTbl != nil {
iterOutAction = setupCTEStorageTracker(e.iterOutTbl, e.ctx, e.memTracker, e.diskTracker)
}

failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) {
if val.(bool) && config.GetGlobalConfig().OOMUseTmpStorage {
defer resAction.WaitForTest()
defer iterInAction.WaitForTest()
defer iterOutAction.WaitForTest()
if iterOutAction != nil {
defer iterOutAction.WaitForTest()
}
}
})

Expand Down Expand Up @@ -196,13 +201,13 @@ func (e *CTEExec) Close() (err error) {
if err = e.recursiveExec.Close(); err != nil {
return err
}
// `iterInTbl` and `resTbl` are shared by multiple operators,
// so will be closed when the SQL finishes.
if err = e.iterOutTbl.DerefAndClose(); err != nil {
return err
}
}

// `iterInTbl` and `resTbl` are shared by multiple operators,
// so will be closed when the SQL finishes.
if err = e.iterOutTbl.DerefAndClose(); err != nil {
return err
}
return e.baseExecutor.Close()
}

Expand Down
3 changes: 3 additions & 0 deletions executor/cte_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ func (test *CTETestSuite) TestBasicCTE(c *check.C) {
rows.Check(testkit.Rows("1"))
rows = tk.MustQuery("SELECT * FROM t1 dt WHERE EXISTS( WITH RECURSIVE qn AS (SELECT a*0 AS b UNION ALL SELECT b+1 FROM qn WHERE b=0 or b = 1) SELECT * FROM qn WHERE b=a );")
rows.Check(testkit.Rows("1", "2"))

rows = tk.MustQuery("with recursive c(p) as (select 1), cte(a, b) as (select 1, 1 union select a+1, 1 from cte, c where a < 5) select * from cte order by 1, 2;")
rows.Check(testkit.Rows("1 1", "2 1", "3 1", "4 1", "5 1"))
}

func (test *CTESerialTestSuite) TestSpillToDisk(c *check.C) {
Expand Down

0 comments on commit 7ee78d1

Please sign in to comment.