Skip to content

Commit

Permalink
*: fix data race when defer cancel() is called (pingcap#3068)
Browse files Browse the repository at this point in the history
* *: fix data race when defer cancel() is called

defer cancel() was introduced in this PR
pingcap#3010
  • Loading branch information
tiancaiamao authored and IANTHEREAL committed Apr 17, 2017
1 parent 195155a commit b698bb5
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 42 deletions.
29 changes: 4 additions & 25 deletions context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ package context

import (
"fmt"
"time"

"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
goctx "golang.org/x/net/context"
)

// Context is an interface for transaction and executive args environment.
Expand All @@ -32,6 +32,9 @@ type Context interface {
// Txn returns the current transaction which is created before executing a statement.
Txn() kv.Transaction

// GoCtx returns the standard context.Context which is bound with current transaction.
GoCtx() goctx.Context

// GetClient gets a kv.Client.
GetClient() kv.Client

Expand All @@ -55,30 +58,6 @@ type Context interface {
// InitTxnWithStartTS initializes a transaction with startTS.
// It should be called right before we builds an executor.
InitTxnWithStartTS(startTS uint64) error

// Done returns a channel for cancellation, the same as standard context.Context.
// See https://godoc.org/context for more examples of how to use it.
Done() <-chan struct{}
}

// CtxForCancel implements the standard Go context.Context interface.
type CtxForCancel struct {
Context
}

// Value implements the standard Go context.Context interface.
func (ctx CtxForCancel) Value(interface{}) interface{} {
return nil
}

// Deadline implements the standard Go context.Context interface.
func (ctx CtxForCancel) Deadline() (deadline time.Time, ok bool) {
return
}

// Err implements the standard Go context.Context interface.
func (ctx CtxForCancel) Err() error {
return nil
}

type basicCtxType int
Expand Down
2 changes: 1 addition & 1 deletion domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ const privilegeKey = "/tidb/privilege"
func (do *Domain) NotifyUpdatePrivilege(ctx context.Context) {
if do.etcdClient != nil {
kv := do.etcdClient.KV
_, err := kv.Put(context.CtxForCancel{ctx}, privilegeKey, "")
_, err := kv.Put(goctx.Background(), privilegeKey, "")
if err != nil {
log.Warn("notify update privilege failed:", err)
}
Expand Down
13 changes: 7 additions & 6 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (e *XSelectIndexExec) nextForSingleRead() (*Row, error) {
if err != nil {
return nil, errors.Trace(err)
}
e.result.Fetch(context.CtxForCancel{e.ctx})
e.result.Fetch(e.ctx.GoCtx())
}
for {
// Get partial result.
Expand Down Expand Up @@ -510,7 +510,7 @@ func (e *XSelectIndexExec) nextForDoubleRead() (*Row, error) {
if err != nil {
return nil, errors.Trace(err)
}
idxResult.Fetch(context.CtxForCancel{e.ctx})
idxResult.Fetch(e.ctx.GoCtx())

// Use a background goroutine to fetch index and put the result in e.taskChan.
// e.taskChan serves as a pipeline, so fetching index and getting table data can
Expand Down Expand Up @@ -569,6 +569,7 @@ func (e *XSelectIndexExec) fetchHandles(idxResult distsql.SelectResult, ch chan<
var concurrency int
e.addWorker(workCh, &concurrency, lookupConcurrencyLimit)

txnCtx := e.ctx.GoCtx()
for {
handles, finish, err := extractHandlesFromIndexResult(idxResult)
if err != nil || finish {
Expand All @@ -583,7 +584,7 @@ func (e *XSelectIndexExec) fetchHandles(idxResult distsql.SelectResult, ch chan<
}

select {
case <-e.ctx.Done():
case <-txnCtx.Done():
return
case workCh <- task:
default:
Expand Down Expand Up @@ -633,7 +634,7 @@ func (e *XSelectIndexExec) doIndexRequest() (distsql.SelectResult, error) {
if err != nil {
return nil, errors.Trace(err)
}
return distsql.Select(e.ctx.GetClient(), context.CtxForCancel{e.ctx}, selIdxReq, keyRanges, e.scanConcurrency, !e.indexPlan.OutOfOrder)
return distsql.Select(e.ctx.GetClient(), e.ctx.GoCtx(), selIdxReq, keyRanges, e.scanConcurrency, !e.indexPlan.OutOfOrder)
}

func (e *XSelectIndexExec) buildTableTasks(handles []int64) []*lookupTableTask {
Expand Down Expand Up @@ -781,7 +782,7 @@ func (e *XSelectIndexExec) doTableRequest(handles []int64) (distsql.SelectResult
if err != nil {
return nil, errors.Trace(err)
}
resp.Fetch(context.CtxForCancel{e.ctx})
resp.Fetch(e.ctx.GoCtx())
return resp, nil
}

Expand Down Expand Up @@ -861,7 +862,7 @@ func (e *XSelectTableExec) doRequest() error {
if err != nil {
return errors.Trace(err)
}
e.result.Fetch(context.CtxForCancel{e.ctx})
e.result.Fetch(e.ctx.GoCtx())
return nil
}

Expand Down
11 changes: 7 additions & 4 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func (e *HashJoinExec) fetchBigExec() {
}()
curBatchSize := 1
result := &execResult{rows: make([]*Row, 0, batchSize)}
txnCtx := e.ctx.GoCtx()
for {
done := false
idx := cnt % e.concurrency
Expand All @@ -175,7 +176,7 @@ func (e *HashJoinExec) fetchBigExec() {
result.rows = append(result.rows, row)
if len(result.rows) >= batchSize {
select {
case <-e.ctx.Done():
case <-txnCtx.Done():
return
case e.bigTableResultCh[idx] <- result:
result = &execResult{rows: make([]*Row, 0, batchSize)}
Expand All @@ -186,7 +187,7 @@ func (e *HashJoinExec) fetchBigExec() {
if done {
if len(result.rows) > 0 && len(result.rows) < batchSize {
select {
case <-e.ctx.Done():
case <-txnCtx.Done():
return
case e.bigTableResultCh[idx] <- result:
}
Expand Down Expand Up @@ -323,11 +324,12 @@ func (e *HashJoinExec) waitJoinWorkersAndCloseResultChan() {
func (e *HashJoinExec) runJoinWorker(idx int) {
maxRowsCnt := 1000
result := &execResult{rows: make([]*Row, 0, maxRowsCnt)}
txnCtx := e.ctx.GoCtx()
for {
var bigTableResult *execResult
var exit bool
select {
case <-e.ctx.Done():
case <-txnCtx.Done():
exit = true
case tmp, ok := <-e.bigTableResultCh[idx]:
if !ok {
Expand Down Expand Up @@ -457,6 +459,7 @@ func (e *HashJoinExec) Next() (*Row, error) {
return nil, errors.Trace(err)
}
}
txnCtx := e.ctx.GoCtx()
if e.cursor >= len(e.rows) {
var result *execResult
select {
Expand All @@ -469,7 +472,7 @@ func (e *HashJoinExec) Next() (*Row, error) {
e.finished.Store(true)
return nil, errors.Trace(result.err)
}
case <-e.ctx.Done():
case <-txnCtx.Done():
return nil, nil
}
if len(result.rows) == 0 {
Expand Down
6 changes: 3 additions & 3 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ func (s *session) Cancel() {
s.cancelFunc()
}

// Canceled implements context.Context interface.
func (s *session) Done() <-chan struct{} {
return s.goCtx.Done()
// GoCtx returns the standard context.Context that bind with current transaction.
func (s *session) GoCtx() goctx.Context {
return s.goCtx
}

func (s *session) cleanRetryInfo() {
Expand Down
7 changes: 4 additions & 3 deletions util/mock/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
goctx "golang.org/x/net/context"
)

var _ context.Context = (*Context)(nil)
Expand Down Expand Up @@ -149,9 +150,9 @@ func (c *Context) GetSessionManager() util.SessionManager {
func (c *Context) Cancel() {
}

// Done implements the context.Context interface.
func (c *Context) Done() <-chan struct{} {
return nil
// GoCtx returns standard context.Context that bind with current transaction.
func (c *Context) GoCtx() goctx.Context {
return goctx.Background()
}

// NewContext creates a new mocked context.Context.
Expand Down

0 comments on commit b698bb5

Please sign in to comment.