Skip to content

Commit

Permalink
store/tikv: fix a potential goroutine leak problem in copIterator (pi…
Browse files Browse the repository at this point in the history
…ngcap#6140)

copIterator.Close() closes finished channel and wait for worker goroutines to
exit, worker goroutine send response to channel but nobody receive from the
channel (copIterator already closed), so those goroutine leak and Close() function
hang forever.

Change the worker goroutine to check both response channel and finished channel.
  • Loading branch information
tiancaiamao authored Mar 26, 2018
1 parent 9fe407f commit 3ff51fb
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 9 deletions.
16 changes: 14 additions & 2 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"testing"
"time"

gofail "github.com/coreos/gofail/runtime"
"github.com/juju/errors"
. "github.com/pingcap/check"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
Expand Down Expand Up @@ -2460,14 +2461,25 @@ func (s *testSuite) TestEarlyClose(c *C) {

ctx := context.Background()
for i := 0; i < 500; i++ {
rss, err := tk.Se.Execute(ctx, "select * from earlyclose order by id")
c.Assert(err, IsNil)
rss, err1 := tk.Se.Execute(ctx, "select * from earlyclose order by id")
c.Assert(err1, IsNil)
rs := rss[0]
chk := rs.NewChunk()
err = rs.NextChunk(ctx, chk)
c.Assert(err, IsNil)
rs.Close()
}

// Goroutine should not leak when error happen.
gofail.Enable("github.com/pingcap/tidb/store/tikv/handleTaskOnceError", `return(true)`)
defer gofail.Disable("github.com/pingcap/tidb/store/tikv/handleTaskOnceError")
rss, err := tk.Se.Execute(ctx, "select * from earlyclose")
c.Assert(err, IsNil)
rs := rss[0]
chk := rs.NewChunk()
err = rs.NextChunk(ctx, chk)
c.Assert(err, NotNil)
rs.Close()
}

func (s *testSuite) TestIssue5666(c *C) {
Expand Down
21 changes: 14 additions & 7 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ func (it *copIterator) sendToTaskCh(t *copTask, taskCh chan<- *copTask) (exit bo
return
}

func (it *copIterator) sendToRespCh(resp copResponse, respCh chan copResponse) (exit bool) {
func (it *copIterator) sendToRespCh(resp copResponse, respCh chan<- copResponse) (exit bool) {
select {
case respCh <- resp:
case <-it.finished:
Expand Down Expand Up @@ -504,19 +504,19 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
metrics.TiKVCoprocessorCounter.WithLabelValues("next").Inc()

var (
resp copResponse
ok bool
resp copResponse
ok bool
closed bool
)
// If data order matters, response should be returned in the same order as copTask slice.
// Otherwise all responses are returned from a single channel.
if !it.req.KeepOrder {
// Get next fetched resp from chan
resp, ok = <-it.respChan
if !ok {
resp, ok, closed = it.recvFromRespCh(ctx, it.respChan)
if !ok || closed {
return nil, nil
}
} else {
var closed bool
for {
if it.curr >= len(it.tasks) {
// Resp will be nil if iterator is finished.
Expand Down Expand Up @@ -558,7 +558,8 @@ func (it *copIterator) handleTask(bo *Backoffer, task *copTask, ch chan copRespo
for len(remainTasks) > 0 {
tasks, err := it.handleTaskOnce(bo, remainTasks[0], ch)
if err != nil {
ch <- copResponse{err: errors.Trace(err)}
resp := copResponse{err: errors.Trace(err)}
it.sendToRespCh(resp, ch)
return
}
if len(tasks) > 0 {
Expand All @@ -572,6 +573,12 @@ func (it *copIterator) handleTask(bo *Backoffer, task *copTask, ch chan copRespo
// handleTaskOnce handles single copTask, successful results are send to channel.
// If error happened, returns error. If region split or meet lock, returns the remain tasks.
func (it *copIterator) handleTaskOnce(bo *Backoffer, task *copTask, ch chan copResponse) ([]*copTask, error) {

// gofail: var handleTaskOnceError bool
// if handleTaskOnceError {
// return nil, errors.New("mock handleTaskOnce error")
// }

sender := NewRegionRequestSender(it.store.regionCache, it.store.client)
req := &tikvrpc.Request{
Type: task.cmdType,
Expand Down

0 comments on commit 3ff51fb

Please sign in to comment.