Skip to content

Commit

Permalink
executor: open doubleread close test and fix bug. (pingcap#3316)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored May 23, 2017
1 parent 79ccc13 commit d2557fe
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 13 deletions.
1 change: 0 additions & 1 deletion executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
// TestIndexDoubleReadClose checks that when a index double read returns before reading all the rows, the goroutine doesn't
// leak. For testing distsql with multiple regions, we need to manually split a mock TiKV.
func (s *testSuite) TestIndexDoubleReadClose(c *C) {
c.Skip("new dist sql use different executor, reopen this test in the future.")
if _, ok := s.store.GetClient().(*tikv.CopClient); !ok {
// Make sure the store is tikv store.
return
Expand Down
29 changes: 17 additions & 12 deletions executor/new_distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,22 @@ func (e *IndexLookUpExecutor) executeTask(task *lookupTableTask, goCtx goctx.Con
}
}

func (e *IndexLookUpExecutor) pickAndExecTask(workCh <-chan *lookupTableTask, txnCtx goctx.Context) {
childCtx, cancel := goctx.WithCancel(txnCtx)
defer cancel()
for {
select {
case task := <-workCh:
if task == nil {
return
}
e.executeTask(task, childCtx)
case <-childCtx.Done():
return
}
}
}

// fetchHandlesAndStartWorkers fetches a batch of handles from index data and builds the index lookup tasks.
// We initialize some workers to execute this tasks concurrently and put the task to taskCh by order.
func (e *IndexLookUpExecutor) fetchHandlesAndStartWorkers() {
Expand All @@ -309,18 +325,7 @@ func (e *IndexLookUpExecutor) fetchHandlesAndStartWorkers() {
lookupConcurrencyLimit := e.ctx.GetSessionVars().IndexLookupConcurrency
txnCtx := e.ctx.GoCtx()
for i := 0; i < lookupConcurrencyLimit; i++ {
go func() {
childCtx, cancel := goctx.WithCancel(txnCtx)
defer cancel()
select {
case task := <-workCh:
if task == nil {
return
}
e.executeTask(task, childCtx)
case <-childCtx.Done():
}
}()
go e.pickAndExecTask(workCh, txnCtx)
}

for {
Expand Down

0 comments on commit d2557fe

Please sign in to comment.