From 6dd04abf1e2f7458518c8fa963ce2e1f783d51df Mon Sep 17 00:00:00 2001 From: Shirly Date: Fri, 5 May 2017 17:25:08 +0800 Subject: [PATCH] store/tikv: prewrite primary and secondary in parallel (#3148) --- store/tikv/2pc.go | 3 +- store/tikv/2pc_test.go | 74 +++++++++++++++++++++++++++++ store/tikv/mock-tikv/cop_handler.go | 2 +- store/tikv/mock-tikv/mvcc.go | 33 +++++++++---- 4 files changed, 101 insertions(+), 11 deletions(-) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 2c14a8421631f..242ba6a3c2e2d 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -206,7 +206,8 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA } firstIsPrimary := bytes.Equal(keys[0], c.primary()) - if firstIsPrimary { + if firstIsPrimary && action == actionCommit { + // primary should be committed first. err = c.doActionOnBatches(bo, action, batches[:1]) if err != nil { return errors.Trace(err) diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index ea78fabc331ce..44c551aba4c13 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -303,3 +303,77 @@ func (s *testCommitterSuite) TestIllegalTso(c *C) { err := txn.Commit() c.Assert(err, NotNil) } + +func errMsgMustContain(c *C, err error, msg string) { + c.Assert(strings.Contains(err.Error(), msg), IsTrue) +} + +func (s *testCommitterSuite) TestCommitBeforePrewrite(c *C) { + txn := s.begin(c) + err := txn.Set([]byte("a"), []byte("a1")) + c.Assert(err, IsNil) + commiter, err := newTwoPhaseCommitter(txn) + ctx := goctx.Background() + err = commiter.cleanupKeys(NewBackoffer(cleanupMaxBackoff, ctx), commiter.keys) + c.Assert(err, IsNil) + err = commiter.prewriteKeys(NewBackoffer(prewriteMaxBackoff, ctx), commiter.keys) + c.Assert(err, NotNil) + errMsgMustContain(c, err, "write conflict") +} + +func (s *testCommitterSuite) TestPrewritePrimaryKeyFailed(c *C) { + // commit (a,a1) + txn1 := s.begin(c) + err := txn1.Set([]byte("a"), []byte("a1")) + c.Assert(err, IsNil) + err = txn1.Commit() + c.Assert(err, IsNil) + + // check a + txn := s.begin(c) + v, err := txn.Get([]byte("a")) + c.Assert(err, IsNil) + c.Assert(v, BytesEquals, []byte("a1")) + + // set txn2's startTs before txn1's + txn2 := s.begin(c) + txn2.startTS = txn1.startTS - 1 + err = txn2.Set([]byte("a"), []byte("a2")) + c.Assert(err, IsNil) + err = txn2.Set([]byte("b"), []byte("b2")) + c.Assert(err, IsNil) + // prewrite:primary a failed, b success + err = txn2.Commit() + c.Assert(err, NotNil) + + // txn2 failed with a rollback for record a. + txn = s.begin(c) + v, err = txn.Get([]byte("a")) + c.Assert(err, IsNil) + c.Assert(v, BytesEquals, []byte("a1")) + v, err = txn.Get([]byte("b")) + errMsgMustContain(c, err, "key not exist") + + // clean again, shouldn't be failed when a rollback already exist. + ctx := goctx.Background() + commiter, err := newTwoPhaseCommitter(txn2) + err = commiter.cleanupKeys(NewBackoffer(cleanupMaxBackoff, ctx), commiter.keys) + c.Assert(err, IsNil) + + // check the data after rollback twice. + txn = s.begin(c) + v, err = txn.Get([]byte("a")) + c.Assert(err, IsNil) + c.Assert(v, BytesEquals, []byte("a1")) + + // update data in a new txn, should be success. + err = txn.Set([]byte("a"), []byte("a3")) + c.Assert(err, IsNil) + err = txn.Commit() + c.Assert(err, IsNil) + // check value + txn = s.begin(c) + v, err = txn.Get([]byte("a")) + c.Assert(err, IsNil) + c.Assert(v, BytesEquals, []byte("a3")) +} diff --git a/store/tikv/mock-tikv/cop_handler.go b/store/tikv/mock-tikv/cop_handler.go index be659c59eea4e..514166de13f9d 100644 --- a/store/tikv/mock-tikv/cop_handler.go +++ b/store/tikv/mock-tikv/cop_handler.go @@ -178,7 +178,7 @@ func (h *rpcHandler) setTopNDataForCtx(ctx *selectContext) []tipb.Chunk { sort.Sort(&ctx.topnHeap.topnSorter) chunks := make([]tipb.Chunk, 0, len(ctx.topnHeap.rows)/rowsPerChunk) for _, row := range ctx.topnHeap.rows { - data := make([]byte, 0) + var data []byte for _, d := range row.data { data = append(data, d...) } diff --git a/store/tikv/mock-tikv/mvcc.go b/store/tikv/mock-tikv/mvcc.go index 99fa0cf2bcae4..dc8f5075313ba 100644 --- a/store/tikv/mock-tikv/mvcc.go +++ b/store/tikv/mock-tikv/mvcc.go @@ -130,18 +130,18 @@ func (e *mvccEntry) Prewrite(mutation *kvrpcpb.Mutation, startTS uint64, primary return nil } -func (e *mvccEntry) checkTxnCommitted(startTS uint64) (uint64, bool) { +func (e *mvccEntry) getTxnCommitInfo(startTS uint64) *mvccValue { for _, v := range e.values { - if v.startTS == startTS && v.valueType != typeRollback { - return v.commitTS, true + if v.startTS == startTS { + return &v } } - return 0, false + return nil } func (e *mvccEntry) Commit(startTS, commitTS uint64) error { if e.lock == nil || e.lock.startTS != startTS { - if _, ok := e.checkTxnCommitted(startTS); ok { + if c := e.getTxnCommitInfo(startTS); c != nil && c.valueType != typeRollback { return nil } return ErrRetryable("txn not found") @@ -165,18 +165,33 @@ func (e *mvccEntry) Commit(startTS, commitTS uint64) error { } func (e *mvccEntry) Rollback(startTS uint64) error { - if e.lock == nil || e.lock.startTS != startTS { - if commitTS, ok := e.checkTxnCommitted(startTS); ok { - return ErrAlreadyCommitted(commitTS) + // If current transaction's lock exist. + if e.lock != nil && e.lock.startTS == startTS { + e.lock = nil + e.values = append([]mvccValue{{ + valueType: typeRollback, + startTS: startTS, + commitTS: startTS, + }}, e.values...) + return nil + } + + // If current transaction's lock not exist. + // If commit info of current transaction exist. + if c := e.getTxnCommitInfo(startTS); c != nil { + // If current transaction is already committed. + if c.valueType != typeRollback { + return ErrAlreadyCommitted(c.commitTS) } + // If current transaction is already rollback. return nil } + // If current transaction is not prewritted before. e.values = append([]mvccValue{{ valueType: typeRollback, startTS: startTS, commitTS: startTS, }}, e.values...) - e.lock = nil return nil }