Skip to content

Commit

Permalink
*: fix batch insert test data race (pingcap#2968)
Browse files Browse the repository at this point in the history
Use atomic functions to access limit variable.
  • Loading branch information
coocood authored Mar 30, 2017
1 parent aa8298e commit 992f367
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 8 deletions.
7 changes: 4 additions & 3 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor_test
import (
"errors"
"fmt"
"sync/atomic"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/ast"
Expand Down Expand Up @@ -853,16 +854,16 @@ func makeLoadDataInfo(column int, ctx context.Context, c *C) (ld *executor.LoadD
}

func (s *testSuite) TestBatchInsert(c *C) {
originLimit := kv.TxnEntryCountLimit
originLimit := atomic.LoadUint64(&kv.TxnEntryCountLimit)
originBatch := executor.BatchInsertSize
defer func() {
s.cleanEnv(c)
testleak.AfterTest(c)()
kv.TxnEntryCountLimit = originLimit
atomic.StoreUint64(&kv.TxnEntryCountLimit, originLimit)
executor.BatchInsertSize = originBatch
}()
// Set the limitation to a small value, make it easier to reach the limitation.
kv.TxnEntryCountLimit = 100
atomic.StoreUint64(&kv.TxnEntryCountLimit, 100)
executor.BatchInsertSize = 50
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
2 changes: 1 addition & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var (
// The limit of single entry size (len(key) + len(value)).
TxnEntrySizeLimit = 6 * 1024 * 1024
// The limit of number of entries in the MemBuffer.
TxnEntryCountLimit = 300 * 1000
TxnEntryCountLimit uint64 = 300 * 1000
// The limit of the sum of all entry size.
TxnTotalSizeLimit = 100 * 1024 * 1024
)
Expand Down
8 changes: 5 additions & 3 deletions kv/memdb_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package kv

import (
"sync/atomic"

"github.com/juju/errors"
"github.com/pingcap/goleveldb/leveldb"
"github.com/pingcap/goleveldb/leveldb/comparer"
Expand All @@ -28,7 +30,7 @@ import (
type memDbBuffer struct {
db *memdb.DB
entrySizeLimit int
bufferLenLimit int
bufferLenLimit uint64
bufferSizeLimit int
}

Expand All @@ -42,7 +44,7 @@ func NewMemDbBuffer() MemBuffer {
return &memDbBuffer{
db: memdb.New(comparer.DefaultComparer, 4*1024),
entrySizeLimit: TxnEntrySizeLimit,
bufferLenLimit: TxnEntryCountLimit,
bufferLenLimit: atomic.LoadUint64(&TxnEntryCountLimit),
bufferSizeLimit: TxnTotalSizeLimit,
}
}
Expand Down Expand Up @@ -92,7 +94,7 @@ func (m *memDbBuffer) Set(k Key, v []byte) error {
if m.Size() > m.bufferSizeLimit {
return ErrTxnTooLarge.Gen("transaction too large, size:%d", m.Size())
}
if m.Len() > m.bufferLenLimit {
if m.Len() > int(m.bufferLenLimit) {
return ErrTxnTooLarge.Gen("transaction too large, len:%d", m.Len())
}
return errors.Trace(err)
Expand Down
4 changes: 3 additions & 1 deletion store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"math"
"sync"
"sync/atomic"
"time"

"github.com/coreos/etcd/pkg/monotime"
Expand Down Expand Up @@ -123,7 +124,8 @@ func newTwoPhaseCommitter(txn *tikvTxn) (*twoPhaseCommitter, error) {
size += len(lockKey)
}
}
if len(keys) > kv.TxnEntryCountLimit || size > kv.TxnTotalSizeLimit {
entrylimit := atomic.LoadUint64(&kv.TxnEntryCountLimit)
if len(keys) > int(entrylimit) || size > kv.TxnTotalSizeLimit {
return nil, kv.ErrTxnTooLarge
}
const logEntryCount = 10000
Expand Down

0 comments on commit 992f367

Please sign in to comment.