Skip to content

Commit

Permalink
*: merge statement buffer when BatchGetValues (pingcap#9374)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackysp authored Feb 22, 2019
1 parent 38a453d commit 18449d7
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 59 deletions.
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ func (w *addIndexWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*i
w.distinctCheckFlags = append(w.distinctCheckFlags, distinct)
}

batchVals, err := kv.BatchGetValues(txn, w.batchCheckKeys)
batchVals, err := txn.BatchGet(w.batchCheckKeys)
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (e *RecoverIndexExec) batchMarkDup(txn kv.Transaction, rows []recoverRows)
distinctFlags[i] = distinct
}

values, err := kv.BatchGetValues(txn, e.batchKeys)
values, err := txn.BatchGet(e.batchKeys)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -500,7 +500,7 @@ func (e *CleanupIndexExec) batchGetRecord(txn kv.Transaction) (map[string][]byte
for handle := range e.idxValues {
e.batchKeys = append(e.batchKeys, e.table.RecordKey(handle))
}
values, err := kv.BatchGetValues(txn, e.batchKeys)
values, err := txn.BatchGet(e.batchKeys)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (b *batchChecker) batchGetOldValues(ctx sessionctx.Context, batchKeys []kv.
if err != nil {
return errors.Trace(err)
}
values, err := kv.BatchGetValues(txn, batchKeys)
values, err := txn.BatchGet(batchKeys)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -213,7 +213,7 @@ func (b *batchChecker) batchGetInsertKeys(ctx sessionctx.Context, t table.Table,
if err != nil {
return errors.Trace(err)
}
b.dupKVs, err = kv.BatchGetValues(txn, batchKeys)
b.dupKVs, err = txn.BatchGet(batchKeys)
return errors.Trace(err)
}

Expand Down
11 changes: 11 additions & 0 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,17 @@ func (s *testSuite2) TestInsert(c *C) {
tk.MustExec("drop view v")
}

func (s *testSuite) TestMultiBatch(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table t0 (i int)")
tk.MustExec("insert into t0 values (1), (1)")
tk.MustExec("create table t (i int unique key)")
tk.MustExec("set @@tidb_dml_batch_size = 1")
tk.MustExec("insert ignore into t select * from t0")
tk.MustExec("admin check table t")
}

func (s *testSuite2) TestInsertAutoInc(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
18 changes: 10 additions & 8 deletions kv/fault_injection.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ func (t *InjectedTransaction) Get(k Key) ([]byte, error) {
return t.Transaction.Get(k)
}

// BatchGet returns an error if cfg.getError is set.
func (t *InjectedTransaction) BatchGet(keys []Key) (map[string][]byte, error) {
t.cfg.RLock()
defer t.cfg.RUnlock()
if t.cfg.getError != nil {
return nil, t.cfg.getError
}
return t.Transaction.BatchGet(keys)
}

// Commit returns an error if cfg.commitError is set.
func (t *InjectedTransaction) Commit(ctx context.Context) error {
t.cfg.RLock()
Expand All @@ -107,14 +117,6 @@ func (t *InjectedTransaction) Commit(ctx context.Context) error {
return t.Transaction.Commit(ctx)
}

// GetSnapshot implements Transaction GetSnapshot method.
func (t *InjectedTransaction) GetSnapshot() Snapshot {
return &InjectedSnapshot{
Snapshot: t.Transaction.GetSnapshot(),
cfg: t.cfg,
}
}

// InjectedSnapshot wraps a Snapshot with injections.
type InjectedSnapshot struct {
Snapshot
Expand Down
4 changes: 2 additions & 2 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ type Transaction interface {
Valid() bool
// GetMemBuffer return the MemBuffer binding to this transaction.
GetMemBuffer() MemBuffer
// GetSnapshot returns the snapshot of this transaction.
GetSnapshot() Snapshot
// SetVars sets variables to the transaction.
SetVars(vars *Variables)
// BatchGet gets kv from the memory buffer of statement and transaction, and the kv storage.
BatchGet(keys []Key) (map[string][]byte, error)
}

// Client is used to send request to KV layer.
Expand Down
10 changes: 4 additions & 6 deletions kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (t *mockTxn) Get(k Key) ([]byte, error) {
return nil, nil
}

func (t *mockTxn) BatchGet(keys []Key) (map[string][]byte, error) {
return nil, nil
}

func (t *mockTxn) Iter(k Key, upperBound Key) (Iterator, error) {
return nil, nil
}
Expand Down Expand Up @@ -98,12 +102,6 @@ func (t *mockTxn) GetMemBuffer() MemBuffer {
return nil
}

func (t *mockTxn) GetSnapshot() Snapshot {
return &mockSnapshot{
store: NewMemDbBuffer(DefaultTxnMembufCap),
}
}

func (t *mockTxn) SetCap(cap int) {

}
Expand Down
34 changes: 0 additions & 34 deletions kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,40 +91,6 @@ func BackOff(attempts uint) int {
return int(sleep)
}

// BatchGetValues gets values in batch.
// The values from buffer in transaction and the values from the storage node are merged together.
func BatchGetValues(txn Transaction, keys []Key) (map[string][]byte, error) {
if txn.IsReadOnly() {
return txn.GetSnapshot().BatchGet(keys)
}
bufferValues := make([][]byte, len(keys))
shrinkKeys := make([]Key, 0, len(keys))
for i, key := range keys {
val, err := txn.GetMemBuffer().Get(key)
if IsErrNotFound(err) {
shrinkKeys = append(shrinkKeys, key)
continue
}
if err != nil {
return nil, errors.Trace(err)
}
if len(val) != 0 {
bufferValues[i] = val
}
}
storageValues, err := txn.GetSnapshot().BatchGet(shrinkKeys)
if err != nil {
return nil, errors.Trace(err)
}
for i, key := range keys {
if bufferValues[i] == nil {
continue
}
storageValues[string(key)] = bufferValues[i]
}
return storageValues, nil
}

// mockCommitErrorEnable uses to enable `mockCommitError` and only mock error once.
var mockCommitErrorEnable = int64(0)

Expand Down
30 changes: 30 additions & 0 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,36 @@ func (st *TxnState) Get(k kv.Key) ([]byte, error) {
return val, nil
}

// BatchGet overrides the Transaction interface.
func (st *TxnState) BatchGet(keys []kv.Key) (map[string][]byte, error) {
bufferValues := make([][]byte, len(keys))
shrinkKeys := make([]kv.Key, 0, len(keys))
for i, key := range keys {
val, err := st.buf.Get(key)
if kv.IsErrNotFound(err) {
shrinkKeys = append(shrinkKeys, key)
continue
}
if err != nil {
return nil, errors.Trace(err)
}
if len(val) != 0 {
bufferValues[i] = val
}
}
storageValues, err := st.Transaction.BatchGet(shrinkKeys)
if err != nil {
return nil, errors.Trace(err)
}
for i, key := range keys {
if bufferValues[i] == nil {
continue
}
storageValues[string(key)] = bufferValues[i]
}
return storageValues, nil
}

// Set overrides the Transaction interface.
func (st *TxnState) Set(k kv.Key, v []byte) error {
return st.buf.Set(k, v)
Expand Down
36 changes: 32 additions & 4 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,38 @@ func (txn *tikvTxn) Get(k kv.Key) ([]byte, error) {
return ret, nil
}

func (txn *tikvTxn) BatchGet(keys []kv.Key) (map[string][]byte, error) {
if txn.IsReadOnly() {
return txn.snapshot.BatchGet(keys)
}
bufferValues := make([][]byte, len(keys))
shrinkKeys := make([]kv.Key, 0, len(keys))
for i, key := range keys {
val, err := txn.GetMemBuffer().Get(key)
if kv.IsErrNotFound(err) {
shrinkKeys = append(shrinkKeys, key)
continue
}
if err != nil {
return nil, errors.Trace(err)
}
if len(val) != 0 {
bufferValues[i] = val
}
}
storageValues, err := txn.snapshot.BatchGet(shrinkKeys)
if err != nil {
return nil, errors.Trace(err)
}
for i, key := range keys {
if bufferValues[i] == nil {
continue
}
storageValues[string(key)] = bufferValues[i]
}
return storageValues, nil
}

func (txn *tikvTxn) Set(k kv.Key, v []byte) error {
txn.setCnt++

Expand Down Expand Up @@ -278,7 +310,3 @@ func (txn *tikvTxn) Size() int {
func (txn *tikvTxn) GetMemBuffer() kv.MemBuffer {
return txn.us.GetMemBuffer()
}

func (txn *tikvTxn) GetSnapshot() kv.Snapshot {
return txn.snapshot
}

0 comments on commit 18449d7

Please sign in to comment.