Skip to content

Commit

Permalink
executor: fix CTE race case by avoiding reopen iterInTbl (pingcap#33731)
Browse files Browse the repository at this point in the history
  • Loading branch information
guo-shaoge authored Apr 15, 2022
1 parent 2810c1d commit 4844c83
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 15 deletions.
13 changes: 5 additions & 8 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
e.resTbl.Lock()
defer e.resTbl.Unlock()
if !e.resTbl.Done() {
if e.resTbl.Error() != nil {
return e.resTbl.Error()
}
resAction := setupCTEStorageTracker(e.resTbl, e.ctx, e.memTracker, e.diskTracker)
iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx, e.memTracker, e.diskTracker)
var iterOutAction *chunk.SpillDiskAction
Expand All @@ -167,17 +170,11 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
})

if err = e.computeSeedPart(ctx); err != nil {
// Don't put it in defer.
// Because it should be called only when the filling process is not completed.
if err1 := e.reopenTbls(); err1 != nil {
return err1
}
e.resTbl.SetError(err)
return err
}
if err = e.computeRecursivePart(ctx); err != nil {
if err1 := e.reopenTbls(); err1 != nil {
return err1
}
e.resTbl.SetError(err)
return err
}
e.resTbl.SetDone()
Expand Down
2 changes: 1 addition & 1 deletion executor/cte_table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (e *CTETableReaderExec) Next(ctx context.Context, req *chunk.Chunk) (err er

// We should read `iterInTbl` from the beginning when the next iteration starts.
// Can not directly judge whether to start the next iteration based on e.chkIdx,
// because some operators(Selection) may use forloop to read all data in `iterInTbl`.
// because some operators(Selection) may use for loop to read all data in `iterInTbl`.
if e.curIter != e.iterInTbl.GetIter() {
if e.curIter > e.iterInTbl.GetIter() {
return errors.Errorf("invalid iteration for CTETableReaderExec (e.curIter: %d, e.iterInTbl.GetIter(): %d)",
Expand Down
25 changes: 19 additions & 6 deletions util/cteutil/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ type Storage interface {
Done() bool
SetDone()

// Store error message, so we can return directly.
Error() error
SetError(err error)

// Readers use iter information to determine
// whether they need to read data from the beginning.
SetIter(iter int)
Expand All @@ -94,9 +98,9 @@ type StorageRC struct {
tp []*types.FieldType
chkSize int

begCh chan struct{}
done bool
iter int
done bool
iter int
err error

rc *chunk.RowContainer
}
Expand All @@ -111,7 +115,6 @@ func (s *StorageRC) OpenAndRef() (err error) {
if !s.valid() {
s.rc = chunk.NewRowContainer(s.tp, s.chkSize)
s.refCnt = 1
s.begCh = make(chan struct{})
s.iter = 0
} else {
s.refCnt += 1
Expand Down Expand Up @@ -158,8 +161,8 @@ func (s *StorageRC) Reopen() (err error) {
return err
}
s.iter = 0
s.begCh = make(chan struct{})
s.done = false
s.err = nil
// Create a new RowContainer.
// Because some meta infos in old RowContainer are not resetted.
// Such as memTracker/actionSpill etc. So we just use a new one.
Expand Down Expand Up @@ -224,6 +227,16 @@ func (s *StorageRC) SetDone() {
s.done = true
}

// Error impls Storage Error interface.
func (s *StorageRC) Error() error {
return s.err
}

// SetError impls Storage SetError interface.
func (s *StorageRC) SetError(err error) {
s.err = err
}

// SetIter impls Storage SetIter interface.
func (s *StorageRC) SetIter(iter int) {
s.iter = iter
Expand Down Expand Up @@ -256,8 +269,8 @@ func (s *StorageRC) ActionSpillForTest() *chunk.SpillDiskAction {

func (s *StorageRC) resetAll() error {
s.refCnt = -1
s.begCh = nil
s.done = false
s.err = nil
s.iter = 0
if err := s.rc.Reset(); err != nil {
return err
Expand Down

0 comments on commit 4844c83

Please sign in to comment.