Skip to content

Commit

Permalink
store/tivk: wait task channel in Close (pingcap#2925)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored Mar 27, 2017
1 parent 6aa45e4 commit 9f23ab7
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 13 deletions.
26 changes: 15 additions & 11 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ func extractHandlesFromIndexResult(idxResult distsql.SelectResult) (handles []in
}

func extractHandlesFromIndexSubResult(subResult distsql.PartialResult) ([]int64, error) {
defer subResult.Close()
var handles []int64
for {
h, data, err := subResult.Next()
Expand Down Expand Up @@ -340,11 +341,10 @@ type XSelectIndexExec struct {
partialResult distsql.PartialResult

// Variables only used for double read.
doubleReadIdxResult distsql.SelectResult
taskChan chan *lookupTableTask
tasksErr error // not nil if tasks closed due to error.
taskCurr *lookupTableTask
handleCount uint64 // returned handle count in double read.
taskChan chan *lookupTableTask
tasksErr error // not nil if tasks closed due to error.
taskCurr *lookupTableTask
handleCount uint64 // returned handle count in double read.

where *tipb.Expr
startTS uint64
Expand Down Expand Up @@ -375,10 +375,9 @@ func (e *XSelectIndexExec) Schema() *expression.Schema {

// Close implements Exec Close interface.
func (e *XSelectIndexExec) Close() error {
err := closeAll(e.result, e.partialResult, e.doubleReadIdxResult)
err := closeAll(e.result, e.partialResult)
e.result = nil
e.partialResult = nil
e.doubleReadIdxResult = nil

e.taskCurr = nil
if e.taskChan != nil {
Expand Down Expand Up @@ -444,6 +443,7 @@ func (e *XSelectIndexExec) nextForSingleRead() (*Row, error) {
}
if rowData == nil {
// Finish current partial result and get the next one.
e.partialResult.Close()
e.partialResult = nil
continue
}
Expand Down Expand Up @@ -483,7 +483,6 @@ func (e *XSelectIndexExec) nextForDoubleRead() (*Row, error) {
if err != nil {
return nil, errors.Trace(err)
}
e.doubleReadIdxResult = idxResult
idxResult.IgnoreData()
idxResult.Fetch(context.CtxForCancel{e.ctx})

Expand Down Expand Up @@ -533,10 +532,12 @@ func (e *XSelectIndexExec) addWorker(workCh chan *lookupTableTask, concurrency *
}

func (e *XSelectIndexExec) fetchHandles(idxResult distsql.SelectResult, ch chan<- *lookupTableTask) {
defer close(ch)

workCh := make(chan *lookupTableTask, 1)
defer close(workCh)
defer func() {
close(ch)
close(workCh)
idxResult.Close()
}()

lookupConcurrencyLimit := e.ctx.GetSessionVars().IndexLookupConcurrency
var concurrency int
Expand Down Expand Up @@ -681,6 +682,7 @@ func (e *XSelectIndexExec) executeTask(task *lookupTableTask) error {
}

func (e *XSelectIndexExec) extractRowsFromTableResult(t table.Table, tblResult distsql.SelectResult) ([]*Row, error) {
defer tblResult.Close()
var rows []*Row
for {
partialResult, err := tblResult.Next()
Expand All @@ -700,6 +702,7 @@ func (e *XSelectIndexExec) extractRowsFromTableResult(t table.Table, tblResult d
}

func (e *XSelectIndexExec) extractRowsFromPartialResult(t table.Table, partialResult distsql.PartialResult) ([]*Row, error) {
defer partialResult.Close()
var rows []*Row
for {
h, rowData, err := partialResult.Next()
Expand Down Expand Up @@ -888,6 +891,7 @@ func (e *XSelectTableExec) Next() (*Row, error) {
}
if rowData == nil {
// Finish the current partial result and get the next one.
e.partialResult.Close()
e.partialResult = nil
continue
}
Expand Down
3 changes: 1 addition & 2 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ func (s *testSuite) TestCopClientSend(c *C) {
_, err = rs.Next()
c.Assert(err, IsNil)
rs.Close()
time.Sleep(time.Millisecond * 10)
keyword := "copIterator"
keyword := "(*copIterator).work"
c.Check(checkGoroutineExists(keyword), IsFalse)
}
1 change: 1 addition & 0 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ func (it *copIterator) handleRegionErrorTask(bo *Backoffer, task *copTask) []cop

func (it *copIterator) Close() error {
close(it.finished)
it.wg.Wait()
return nil
}

Expand Down

0 comments on commit 9f23ab7

Please sign in to comment.