Skip to content

Commit

Permalink
store/tikv/region_cache: Backoff when region epoch is ahead of TiKV (p…
Browse files Browse the repository at this point in the history
…ingcap#9181)

Signed-off-by: MyonKeminta <[email protected]>
  • Loading branch information
MyonKeminta authored Feb 14, 2019
1 parent 632a8c4 commit de56ea6
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 18 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ require (
github.com/pingcap/errors v0.11.0
github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190110035000-d4fe6b336379
github.com/pingcap/kvproto v0.0.0-20190131052532-7e329e0c9e32
github.com/pingcap/parser v0.0.0-20190212061044-a71b434969f3
github.com/pingcap/pd v2.1.0-rc.4+incompatible
github.com/pingcap/tidb-tools v2.1.3-0.20190116051332-34c808eef588+incompatible
Expand Down Expand Up @@ -81,6 +81,7 @@ require (
golang.org/x/sys v0.0.0-20190109145017-48ac38b7c8cb // indirect
golang.org/x/text v0.3.0
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c // indirect
golang.org/x/tools v0.0.0-20190130214255-bb1329dc71a0 // indirect
google.golang.org/genproto v0.0.0-20190108161440-ae2f86662275 // indirect
google.golang.org/grpc v1.17.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3 h1:04yuCf5NMvLU8rB2
github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3/go.mod h1:DazNTg0PTldtpsQiT9I5tVJwV1onHMKBBgXzmJUlMns=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190110035000-d4fe6b336379 h1:l4KInBOtxjbgQLjCFHzX66vZgNzsH4a+RiuVZGrO0xk=
github.com/pingcap/kvproto v0.0.0-20190110035000-d4fe6b336379/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190131052532-7e329e0c9e32 h1:9uwqk2DvsAKImRKYAjERMuIf5ZiCcNFhaFhgFRXw7X0=
github.com/pingcap/kvproto v0.0.0-20190131052532-7e329e0c9e32/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/parser v0.0.0-20190212061044-a71b434969f3 h1:Wn8ERRenAuN00KT7TAISS86HzVHDyMRR+onWCeb6BjI=
github.com/pingcap/parser v0.0.0-20190212061044-a71b434969f3/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/pd v2.1.0-rc.4+incompatible h1:/buwGk04aHO5odk/+O8ZOXGs4qkUjYTJ2UpCJXna8NE=
Expand Down Expand Up @@ -200,6 +200,8 @@ golang.org/x/time v0.0.0-20181108054448-85acf8d2951c h1:fqgJT0MGcGpPgpWU7VRdRjuA
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52 h1:JG/0uqcGdTNgq7FdU+61l5Pdmb8putNZlXb65bJBROs=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190130214255-bb1329dc71a0 h1:iRpjPej1fPzmfoBhMFkp3HdqzF+ytPmAwiQhJGV0zGw=
golang.org/x/tools v0.0.0-20190130214255-bb1329dc71a0/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
Expand Down
10 changes: 5 additions & 5 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,14 @@ func (h *rpcHandler) checkRequestContext(ctx *kvrpcpb.Context) *errorpb.Error {
// Region epoch does not match.
if !proto.Equal(region.GetRegionEpoch(), ctx.GetRegionEpoch()) {
nextRegion, _ := h.cluster.GetRegionByKey(region.GetEndKey())
newRegions := []*metapb.Region{region}
currentRegions := []*metapb.Region{region}
if nextRegion != nil {
newRegions = append(newRegions, nextRegion)
currentRegions = append(currentRegions, nextRegion)
}
return &errorpb.Error{
Message: *proto.String("stale epoch"),
StaleEpoch: &errorpb.StaleEpoch{
NewRegions: newRegions,
Message: *proto.String("epoch not match"),
EpochNotMatch: &errorpb.EpochNotMatch{
CurrentRegions: currentRegions,
},
}
}
Expand Down
18 changes: 15 additions & 3 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,14 +533,26 @@ func (c *RegionCache) DropStoreOnSendRequestFail(ctx *RPCContext, err error) {
failedStoreID, failedStoreAddr, err)
}

// OnRegionStale removes the old region and inserts new regions into the cache.
func (c *RegionCache) OnRegionStale(ctx *RPCContext, newRegions []*metapb.Region) error {
// OnRegionEpochNotMatch removes the old region and inserts new regions into the cache.
func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, currentRegions []*metapb.Region) error {
c.mu.Lock()
defer c.mu.Unlock()

c.dropRegionFromCache(ctx.Region)

for _, meta := range newRegions {
// Find whether the region epoch in `ctx` is ahead of TiKV's. If so, backoff.
for _, meta := range currentRegions {
if meta.GetId() == ctx.Region.id &&
(meta.GetRegionEpoch().GetConfVer() < ctx.Region.confVer ||
meta.GetRegionEpoch().GetVersion() < ctx.Region.ver) {
err := errors.Errorf("region epoch is ahead of tikv. rpc ctx: %+v, currentRegions: %+v", ctx, currentRegions)
log.Info(err.Error())
return bo.Backoff(BoRegionMiss, err)
}
}

// If the region epoch is not ahead of TiKV's, replace region meta in region cache.
for _, meta := range currentRegions {
if _, ok := c.pdClient.(*codecPDClient); ok {
if err := decodeRegionMetaKey(meta); err != nil {
return errors.Errorf("newRegion's range key is not encoded: %v, %v", meta, err)
Expand Down
22 changes: 22 additions & 0 deletions store/tikv/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,28 @@ func (s *testRegionCacheSuite) TestRequestFail2(c *C) {
s.checkCache(c, 0)
}

func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV(c *C) {
// Create a separated region cache to do this test.
pdCli := &codecPDClient{mocktikv.NewPDClient(s.cluster)}
cache := NewRegionCache(pdCli)

region := createSampleRegion([]byte("k1"), []byte("k2"))
region.meta.Id = 1
region.meta.RegionEpoch = &metapb.RegionEpoch{Version: 10, ConfVer: 10}
cache.insertRegionToCache(region)

r1 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 9, ConfVer: 10}}
r2 := metapb.Region{Id: 1, RegionEpoch: &metapb.RegionEpoch{Version: 10, ConfVer: 9}}

bo := NewBackoffer(context.Background(), 2000000)

err := cache.OnRegionEpochNotMatch(bo, &RPCContext{Region: region.VerID()}, []*metapb.Region{&r1})
c.Assert(err, IsNil)
err = cache.OnRegionEpochNotMatch(bo, &RPCContext{Region: region.VerID()}, []*metapb.Region{&r2})
c.Assert(err, IsNil)
c.Assert(len(bo.errors), Equals, 2)
}

func (s *testRegionCacheSuite) TestDropStoreOnSendRequestFail(c *C) {
regionCnt := 999
cluster := createClusterWithStoresAndRegions(regionCnt)
Expand Down
14 changes: 7 additions & 7 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func (s *RegionRequestSender) SendReq(bo *Backoffer, req *tikvrpc.Request, regio
// RPC by returning RegionError directly.

// TODO: Change the returned error to something like "region missing in cache",
// and handle this error like StaleEpoch, which means to re-split the request and retry.
return tikvrpc.GenRegionErrorResp(req, &errorpb.Error{StaleEpoch: &errorpb.StaleEpoch{}})
// and handle this error like EpochNotMatch, which means to re-split the request and retry.
return tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}})
}

s.storeAddr = ctx.Addr
Expand Down Expand Up @@ -171,8 +171,8 @@ func regionErrorToLabel(e *errorpb.Error) string {
return "region_not_found"
} else if e.GetKeyNotInRegion() != nil {
return "key_not_in_region"
} else if e.GetStaleEpoch() != nil {
return "stale_epoch"
} else if e.GetEpochNotMatch() != nil {
return "epoch_not_match"
} else if e.GetServerIsBusy() != nil {
return "server_is_busy"
} else if e.GetStaleCommand() != nil {
Expand Down Expand Up @@ -211,9 +211,9 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, regi
return true, nil
}

if staleEpoch := regionErr.GetStaleEpoch(); staleEpoch != nil {
log.Debugf("tikv reports `StaleEpoch`, ctx: %v, retry later", ctx)
err = s.regionCache.OnRegionStale(ctx, staleEpoch.NewRegions)
if epochNotMatch := regionErr.GetEpochNotMatch(); epochNotMatch != nil {
log.Debugf("tikv reports `EpochNotMatch`, ctx: %v, retry later", ctx)
err = s.regionCache.OnRegionEpochNotMatch(bo, ctx, epochNotMatch.CurrentRegions)
return false, errors.Trace(err)
}
if regionErr.GetServerIsBusy() != nil {
Expand Down

0 comments on commit de56ea6

Please sign in to comment.