Skip to content

Commit

Permalink
store/tikv: prewrite primary and secondary in parallel (pingcap#3148)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreMouche authored and coocood committed May 5, 2017
1 parent cc3676a commit 6dd04ab
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 11 deletions.
3 changes: 2 additions & 1 deletion store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
}

firstIsPrimary := bytes.Equal(keys[0], c.primary())
if firstIsPrimary {
if firstIsPrimary && action == actionCommit {
// primary should be committed first.
err = c.doActionOnBatches(bo, action, batches[:1])
if err != nil {
return errors.Trace(err)
Expand Down
74 changes: 74 additions & 0 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,77 @@ func (s *testCommitterSuite) TestIllegalTso(c *C) {
err := txn.Commit()
c.Assert(err, NotNil)
}

func errMsgMustContain(c *C, err error, msg string) {
c.Assert(strings.Contains(err.Error(), msg), IsTrue)
}

func (s *testCommitterSuite) TestCommitBeforePrewrite(c *C) {
txn := s.begin(c)
err := txn.Set([]byte("a"), []byte("a1"))
c.Assert(err, IsNil)
commiter, err := newTwoPhaseCommitter(txn)
ctx := goctx.Background()
err = commiter.cleanupKeys(NewBackoffer(cleanupMaxBackoff, ctx), commiter.keys)
c.Assert(err, IsNil)
err = commiter.prewriteKeys(NewBackoffer(prewriteMaxBackoff, ctx), commiter.keys)
c.Assert(err, NotNil)
errMsgMustContain(c, err, "write conflict")
}

func (s *testCommitterSuite) TestPrewritePrimaryKeyFailed(c *C) {
// commit (a,a1)
txn1 := s.begin(c)
err := txn1.Set([]byte("a"), []byte("a1"))
c.Assert(err, IsNil)
err = txn1.Commit()
c.Assert(err, IsNil)

// check a
txn := s.begin(c)
v, err := txn.Get([]byte("a"))
c.Assert(err, IsNil)
c.Assert(v, BytesEquals, []byte("a1"))

// set txn2's startTs before txn1's
txn2 := s.begin(c)
txn2.startTS = txn1.startTS - 1
err = txn2.Set([]byte("a"), []byte("a2"))
c.Assert(err, IsNil)
err = txn2.Set([]byte("b"), []byte("b2"))
c.Assert(err, IsNil)
// prewrite:primary a failed, b success
err = txn2.Commit()
c.Assert(err, NotNil)

// txn2 failed with a rollback for record a.
txn = s.begin(c)
v, err = txn.Get([]byte("a"))
c.Assert(err, IsNil)
c.Assert(v, BytesEquals, []byte("a1"))
v, err = txn.Get([]byte("b"))
errMsgMustContain(c, err, "key not exist")

// clean again, shouldn't be failed when a rollback already exist.
ctx := goctx.Background()
commiter, err := newTwoPhaseCommitter(txn2)
err = commiter.cleanupKeys(NewBackoffer(cleanupMaxBackoff, ctx), commiter.keys)
c.Assert(err, IsNil)

// check the data after rollback twice.
txn = s.begin(c)
v, err = txn.Get([]byte("a"))
c.Assert(err, IsNil)
c.Assert(v, BytesEquals, []byte("a1"))

// update data in a new txn, should be success.
err = txn.Set([]byte("a"), []byte("a3"))
c.Assert(err, IsNil)
err = txn.Commit()
c.Assert(err, IsNil)
// check value
txn = s.begin(c)
v, err = txn.Get([]byte("a"))
c.Assert(err, IsNil)
c.Assert(v, BytesEquals, []byte("a3"))
}
2 changes: 1 addition & 1 deletion store/tikv/mock-tikv/cop_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (h *rpcHandler) setTopNDataForCtx(ctx *selectContext) []tipb.Chunk {
sort.Sort(&ctx.topnHeap.topnSorter)
chunks := make([]tipb.Chunk, 0, len(ctx.topnHeap.rows)/rowsPerChunk)
for _, row := range ctx.topnHeap.rows {
data := make([]byte, 0)
var data []byte
for _, d := range row.data {
data = append(data, d...)
}
Expand Down
33 changes: 24 additions & 9 deletions store/tikv/mock-tikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,18 @@ func (e *mvccEntry) Prewrite(mutation *kvrpcpb.Mutation, startTS uint64, primary
return nil
}

func (e *mvccEntry) checkTxnCommitted(startTS uint64) (uint64, bool) {
func (e *mvccEntry) getTxnCommitInfo(startTS uint64) *mvccValue {
for _, v := range e.values {
if v.startTS == startTS && v.valueType != typeRollback {
return v.commitTS, true
if v.startTS == startTS {
return &v
}
}
return 0, false
return nil
}

func (e *mvccEntry) Commit(startTS, commitTS uint64) error {
if e.lock == nil || e.lock.startTS != startTS {
if _, ok := e.checkTxnCommitted(startTS); ok {
if c := e.getTxnCommitInfo(startTS); c != nil && c.valueType != typeRollback {
return nil
}
return ErrRetryable("txn not found")
Expand All @@ -165,18 +165,33 @@ func (e *mvccEntry) Commit(startTS, commitTS uint64) error {
}

func (e *mvccEntry) Rollback(startTS uint64) error {
if e.lock == nil || e.lock.startTS != startTS {
if commitTS, ok := e.checkTxnCommitted(startTS); ok {
return ErrAlreadyCommitted(commitTS)
// If current transaction's lock exist.
if e.lock != nil && e.lock.startTS == startTS {
e.lock = nil
e.values = append([]mvccValue{{
valueType: typeRollback,
startTS: startTS,
commitTS: startTS,
}}, e.values...)
return nil
}

// If current transaction's lock not exist.
// If commit info of current transaction exist.
if c := e.getTxnCommitInfo(startTS); c != nil {
// If current transaction is already committed.
if c.valueType != typeRollback {
return ErrAlreadyCommitted(c.commitTS)
}
// If current transaction is already rollback.
return nil
}
// If current transaction is not prewritted before.
e.values = append([]mvccValue{{
valueType: typeRollback,
startTS: startTS,
commitTS: startTS,
}}, e.values...)
e.lock = nil
return nil
}

Expand Down

0 comments on commit 6dd04ab

Please sign in to comment.