From 4844c833a9e5f7f8eab66f16ca8e0f8189ca46dc Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Fri, 15 Apr 2022 18:02:36 +0800 Subject: [PATCH] executor: fix CTE race case by avoiding reopen iterInTbl (#33731) close pingcap/tidb#33193 --- executor/cte.go | 13 +++++-------- executor/cte_table_reader.go | 2 +- util/cteutil/storage.go | 25 +++++++++++++++++++------ 3 files changed, 25 insertions(+), 15 deletions(-) diff --git a/executor/cte.go b/executor/cte.go index 7ee4a78dc417b..abce5fb7a4c3f 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -149,6 +149,9 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { e.resTbl.Lock() defer e.resTbl.Unlock() if !e.resTbl.Done() { + if e.resTbl.Error() != nil { + return e.resTbl.Error() + } resAction := setupCTEStorageTracker(e.resTbl, e.ctx, e.memTracker, e.diskTracker) iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx, e.memTracker, e.diskTracker) var iterOutAction *chunk.SpillDiskAction @@ -167,17 +170,11 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { }) if err = e.computeSeedPart(ctx); err != nil { - // Don't put it in defer. - // Because it should be called only when the filling process is not completed. - if err1 := e.reopenTbls(); err1 != nil { - return err1 - } + e.resTbl.SetError(err) return err } if err = e.computeRecursivePart(ctx); err != nil { - if err1 := e.reopenTbls(); err1 != nil { - return err1 - } + e.resTbl.SetError(err) return err } e.resTbl.SetDone() diff --git a/executor/cte_table_reader.go b/executor/cte_table_reader.go index 4afd8aabbb79f..261a99f21f9c5 100644 --- a/executor/cte_table_reader.go +++ b/executor/cte_table_reader.go @@ -43,7 +43,7 @@ func (e *CTETableReaderExec) Next(ctx context.Context, req *chunk.Chunk) (err er // We should read `iterInTbl` from the beginning when the next iteration starts. // Can not directly judge whether to start the next iteration based on e.chkIdx, - // because some operators(Selection) may use forloop to read all data in `iterInTbl`. + // because some operators(Selection) may use for loop to read all data in `iterInTbl`. if e.curIter != e.iterInTbl.GetIter() { if e.curIter > e.iterInTbl.GetIter() { return errors.Errorf("invalid iteration for CTETableReaderExec (e.curIter: %d, e.iterInTbl.GetIter(): %d)", diff --git a/util/cteutil/storage.go b/util/cteutil/storage.go index a629398000898..c84cbc5217ad3 100644 --- a/util/cteutil/storage.go +++ b/util/cteutil/storage.go @@ -77,6 +77,10 @@ type Storage interface { Done() bool SetDone() + // Store error message, so we can return directly. + Error() error + SetError(err error) + // Readers use iter information to determine // whether they need to read data from the beginning. SetIter(iter int) @@ -94,9 +98,9 @@ type StorageRC struct { tp []*types.FieldType chkSize int - begCh chan struct{} - done bool - iter int + done bool + iter int + err error rc *chunk.RowContainer } @@ -111,7 +115,6 @@ func (s *StorageRC) OpenAndRef() (err error) { if !s.valid() { s.rc = chunk.NewRowContainer(s.tp, s.chkSize) s.refCnt = 1 - s.begCh = make(chan struct{}) s.iter = 0 } else { s.refCnt += 1 @@ -158,8 +161,8 @@ func (s *StorageRC) Reopen() (err error) { return err } s.iter = 0 - s.begCh = make(chan struct{}) s.done = false + s.err = nil // Create a new RowContainer. // Because some meta infos in old RowContainer are not resetted. // Such as memTracker/actionSpill etc. So we just use a new one. @@ -224,6 +227,16 @@ func (s *StorageRC) SetDone() { s.done = true } +// Error impls Storage Error interface. +func (s *StorageRC) Error() error { + return s.err +} + +// SetError impls Storage SetError interface. +func (s *StorageRC) SetError(err error) { + s.err = err +} + // SetIter impls Storage SetIter interface. func (s *StorageRC) SetIter(iter int) { s.iter = iter @@ -256,8 +269,8 @@ func (s *StorageRC) ActionSpillForTest() *chunk.SpillDiskAction { func (s *StorageRC) resetAll() error { s.refCnt = -1 - s.begCh = nil s.done = false + s.err = nil s.iter = 0 if err := s.rc.Reset(); err != nil { return err