Skip to content

Commit

Permalink
store/tikv: refine code, backoffer is-a go context (pingcap#5276)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored and coocood committed Dec 1, 2017
1 parent c6e8f36 commit 23f5268
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 19 deletions.
31 changes: 18 additions & 13 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ const (
// NewBackoffFn creates a backoff func which implements exponential backoff with
// optional jitters.
// See http://www.awsarchitectureblog.com/2015/03/backoff.html
func NewBackoffFn(base, cap, jitter int) func() int {
func NewBackoffFn(base, cap, jitter int) func(goCtx goctx.Context) int {
attempts := 0
lastSleep := base
return func() int {
return func(goCtx goctx.Context) int {
var sleep int
switch jitter {
case NoJitter:
Expand All @@ -57,7 +57,11 @@ func NewBackoffFn(base, cap, jitter int) func() int {
case DecorrJitter:
sleep = int(math.Min(float64(cap), float64(base+rand.Intn(lastSleep*3-base))))
}
time.Sleep(time.Duration(sleep) * time.Millisecond)

select {
case <-time.After(time.Duration(sleep) * time.Millisecond):
case <-goCtx.Done():
}

attempts++
lastSleep = sleep
Expand All @@ -81,7 +85,7 @@ const (
boServerBusy
)

func (t backoffType) createFn() func() int {
func (t backoffType) createFn() func(goctx.Context) int {
switch t {
case boTiKVRPC:
return NewBackoffFn(100, 2000, EqualJitter)
Expand Down Expand Up @@ -154,43 +158,44 @@ var commitMaxBackoff = 20000

// Backoffer is a utility for retrying queries.
type Backoffer struct {
fn map[backoffType]func() int
goctx.Context

fn map[backoffType]func(goctx.Context) int
maxSleep int
totalSleep int
errors []error
ctx goctx.Context
types []backoffType
}

// NewBackoffer creates a Backoffer with maximum sleep time(in ms).
func NewBackoffer(maxSleep int, ctx goctx.Context) *Backoffer {
return &Backoffer{
Context: ctx,
maxSleep: maxSleep,
ctx: ctx,
}
}

// Backoff sleeps a while base on the backoffType and records the error message.
// It returns a retryable error if total sleep time exceeds maxSleep.
func (b *Backoffer) Backoff(typ backoffType, err error) error {
select {
case <-b.ctx.Done():
case <-b.Context.Done():
return errors.Trace(err)
default:
}

backoffCounter.WithLabelValues(typ.String()).Inc()
// Lazy initialize.
if b.fn == nil {
b.fn = make(map[backoffType]func() int)
b.fn = make(map[backoffType]func(goctx.Context) int)
}
f, ok := b.fn[typ]
if !ok {
f = typ.createFn()
b.fn[typ] = f
}

b.totalSleep += f()
b.totalSleep += f(b)
b.types = append(b.types, typ)

log.Debugf("%v, retry later(totalSleep %dms, maxSleep %dms)", err, b.totalSleep, b.maxSleep)
Expand Down Expand Up @@ -221,21 +226,21 @@ func (b *Backoffer) String() string {
// current Backoffer's context.
func (b *Backoffer) Clone() *Backoffer {
return &Backoffer{
Context: b.Context,
maxSleep: b.maxSleep,
totalSleep: b.totalSleep,
errors: b.errors,
ctx: b.ctx,
}
}

// Fork creates a new Backoffer which keeps current Backoffer's sleep time and errors, and holds
// a child context of current Backoffer's context.
func (b *Backoffer) Fork() (*Backoffer, goctx.CancelFunc) {
ctx, cancel := goctx.WithCancel(b.ctx)
ctx, cancel := goctx.WithCancel(b.Context)
return &Backoffer{
Context: ctx,
maxSleep: b.maxSleep,
totalSleep: b.totalSleep,
errors: b.errors,
ctx: ctx,
}, cancel
}
2 changes: 1 addition & 1 deletion store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func (s *tikvStore) CurrentVersion() (kv.Version, error) {

func (s *tikvStore) getTimestampWithRetry(bo *Backoffer) (uint64, error) {
for {
startTS, err := s.oracle.GetTimestamp(bo.ctx)
startTS, err := s.oracle.GetTimestamp(bo)
if err == nil {
return startTS, nil
}
Expand Down
6 changes: 3 additions & 3 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (c *RegionCache) loadRegion(bo *Backoffer, key []byte) (*Region, error) {
}
}

meta, leader, err := c.pdClient.GetRegion(bo.ctx, key)
meta, leader, err := c.pdClient.GetRegion(bo, key)
if err != nil {
backoffErr = errors.Errorf("loadRegion from PD failed, key: %q, err: %v", key, err)
continue
Expand Down Expand Up @@ -367,7 +367,7 @@ func (c *RegionCache) loadRegionByID(bo *Backoffer, regionID uint64) (*Region, e
}
}

meta, leader, err := c.pdClient.GetRegionByID(bo.ctx, regionID)
meta, leader, err := c.pdClient.GetRegionByID(bo, regionID)
if err != nil {
backoffErr = errors.Errorf("loadRegion from PD failed, regionID: %v, err: %v", regionID, err)
continue
Expand Down Expand Up @@ -427,7 +427,7 @@ func (c *RegionCache) ClearStoreByID(id uint64) {

func (c *RegionCache) loadStoreAddr(bo *Backoffer, id uint64) (string, error) {
for {
store, err := c.pdClient.GetStore(bo.ctx, id)
store, err := c.pdClient.GetStore(bo, id)
if err != nil {
if errors.Cause(err) == goctx.Canceled {
return "", errors.Trace(err)
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, ctx *RPCContext, re
if e := tikvrpc.SetContext(req, ctx.Meta, ctx.Peer); e != nil {
return nil, false, errors.Trace(e)
}
context, cancel := goctx.WithTimeout(bo.ctx, timeout)
context, cancel := goctx.WithTimeout(bo, timeout)
defer cancel()
resp, err = s.client.SendReq(context, ctx.Addr, req)
if err != nil {
Expand All @@ -124,7 +124,7 @@ func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err err
}
if grpc.Code(errors.Cause(err)) == codes.Canceled {
select {
case <-bo.ctx.Done():
case <-bo.Done():
return errors.Trace(err)
default:
// If we don't cancel, but the error code is Canceled, it must be from grpc remote.
Expand Down

0 comments on commit 23f5268

Please sign in to comment.