Skip to content

Commit

Permalink
store/tikv: fix cancel prewrite bug. (pingcap#2454)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored and zimulala committed Jan 14, 2017
1 parent afeccd0 commit 1e9131d
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 3 deletions.
7 changes: 4 additions & 3 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,13 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
for i := 0; i < len(batches); i++ {
if e := <-ch; e != nil {
log.Debugf("2PC doActionOnBatches %s failed: %v, tid: %d", action, e, c.startTS)
// Cancel other requests and return the first error.
if cancel != nil {
// Cancel other requests and return the first error.
cancel()
return errors.Trace(e)
}
err = e
if err == nil {
err = e
}
}
}
return errors.Trace(err)
Expand Down
86 changes: 86 additions & 0 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ package tikv
import (
"math/rand"
"strings"
"time"

"github.com/juju/errors"
. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/store/tikv/mock-tikv"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -195,3 +198,86 @@ func (s *testCommitterSuite) TestContextCancelRetryable(c *C) {
c.Assert(err, NotNil)
c.Assert(strings.Contains(err.Error(), txnRetryableMark), IsTrue)
}

func (s *testCommitterSuite) mustGetRegionID(c *C, key []byte) uint64 {
loc, err := s.store.regionCache.LocateKey(NewBackoffer(getMaxBackoff, context.Background()), key)
c.Assert(err, IsNil)
return loc.Region.id
}

func (s *testCommitterSuite) mustNotLocked(c *C, key []byte) {
ver, err := s.store.CurrentVersion()
c.Assert(err, IsNil)
bo := NewBackoffer(getMaxBackoff, context.Background())
req := &kvrpcpb.Request{
Type: kvrpcpb.MessageType_CmdGet,
CmdGetReq: &kvrpcpb.CmdGetRequest{
Key: key,
Version: ver.Ver,
},
}
loc, err := s.store.regionCache.LocateKey(bo, key)
c.Assert(err, IsNil)
resp, err := s.store.SendKVReq(bo, req, loc.Region, readTimeoutShort)
c.Assert(err, IsNil)
cmdGetResp := resp.GetCmdGetResp()
c.Assert(cmdGetResp, NotNil)
keyErr := cmdGetResp.GetError()
c.Assert(keyErr, IsNil)
}

func (s *testCommitterSuite) TestPrewriteCancel(c *C) {
// Setup region delays for key "b" and "c".
delays := map[uint64]time.Duration{
s.mustGetRegionID(c, []byte("b")): time.Millisecond * 10,
s.mustGetRegionID(c, []byte("c")): time.Millisecond * 20,
}
s.store.client = &slowClient{
Client: s.store.client,
regionDelays: delays,
}

txn1, txn2 := s.begin(c), s.begin(c)
// txn2 writes "b"
err := txn2.Set([]byte("b"), []byte("b2"))
c.Assert(err, IsNil)
err = txn2.Commit()
c.Assert(err, IsNil)
// txn1 writes "a"(PK), "b", "c" on different regions.
// "b" will return an error and cancel commit.
err = txn1.Set([]byte("a"), []byte("a1"))
c.Assert(err, IsNil)
err = txn1.Set([]byte("b"), []byte("b1"))
c.Assert(err, IsNil)
err = txn1.Set([]byte("c"), []byte("c1"))
c.Assert(err, IsNil)
err = txn1.Commit()
c.Assert(err, NotNil)
// "c" should be cleaned up.
time.Sleep(time.Millisecond * 10)
s.mustNotLocked(c, []byte("c"))
}

// slowClient wraps rpcClient and makes some regions respond with delay.
type slowClient struct {
Client
regionDelays map[uint64]time.Duration
}

func (c *slowClient) SendKVReq(addr string, req *kvrpcpb.Request, timeout time.Duration) (*kvrpcpb.Response, error) {
for id, delay := range c.regionDelays {
if req.GetContext().GetRegionId() == id {
time.Sleep(delay)
}
}
return c.Client.SendKVReq(addr, req, timeout)
}

func (c *slowClient) SendCopReq(addr string, req *coprocessor.Request, timeout time.Duration) (*coprocessor.Response, error) {
for id, delay := range c.regionDelays {
if req.GetContext().GetRegionId() == id {
time.Sleep(delay)
}
}
return c.Client.SendCopReq(addr, req, timeout)
}
12 changes: 12 additions & 0 deletions store/tikv/mock-tikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ func (h *rpcHandler) handleRequest(req *kvrpcpb.Request) *kvrpcpb.Response {
resp.CmdResolveLockResp = h.onResolveLock(req.CmdResolveLockReq)
case kvrpcpb.MessageType_CmdResolveLock:
resp.CmdResolveLockResp = h.onResolveLock(req.CmdResolveLockReq)
case kvrpcpb.MessageType_CmdBatchRollback:
resp.CmdBatchRollbackResp = h.onBatchRollback(req.CmdBatchRollbackReq)

case kvrpcpb.MessageType_CmdRawGet:
resp.CmdRawGetResp = h.onRawGet(req.CmdRawGetReq)
Expand Down Expand Up @@ -273,6 +275,16 @@ func (h *rpcHandler) onResolveLock(req *kvrpcpb.CmdResolveLockRequest) *kvrpcpb.
return &kvrpcpb.CmdResolveLockResponse{}
}

func (h *rpcHandler) onBatchRollback(req *kvrpcpb.CmdBatchRollbackRequest) *kvrpcpb.CmdBatchRollbackResponse {
err := h.mvccStore.Rollback(req.Keys, req.StartVersion)
if err != nil {
return &kvrpcpb.CmdBatchRollbackResponse{
Error: convertToKeyError(err),
}
}
return &kvrpcpb.CmdBatchRollbackResponse{}
}

func (h *rpcHandler) onRawGet(req *kvrpcpb.CmdRawGetRequest) *kvrpcpb.CmdRawGetResponse {
return &kvrpcpb.CmdRawGetResponse{
Value: h.mvccStore.RawGet(req.GetKey()),
Expand Down

0 comments on commit 1e9131d

Please sign in to comment.