Skip to content

Commit

Permalink
store/tikv: remove dependency on tidb from store/tikv. (pingcap#4932
Browse files Browse the repository at this point in the history
)
  • Loading branch information
coocood authored and hanfei1991 committed Oct 31, 2017
1 parent 873e951 commit 473363a
Show file tree
Hide file tree
Showing 25 changed files with 691 additions and 402 deletions.
8 changes: 4 additions & 4 deletions cmd/benchdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (

log "github.com/Sirupsen/logrus"
"github.com/pingcap/tidb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/gcworker"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/logutil"
"golang.org/x/net/context"
Expand Down Expand Up @@ -88,7 +88,7 @@ func main() {
}

type benchDB struct {
store kv.Storage
store tikv.Storage
session tidb.Session
}

Expand All @@ -104,7 +104,7 @@ func newBenchDB() *benchDB {
terror.MustNil(err)

return &benchDB{
store: store,
store: store.(tikv.Storage),
session: session,
}
}
Expand Down Expand Up @@ -280,7 +280,7 @@ func (ut *benchDB) manualGC(done chan bool) {
if err != nil {
log.Fatal(err)
}
err = tikv.RunGCJob(context.Background(), ut.store, ver.Ver, "benchDB")
err = gcworker.RunGCJob(context.Background(), ut.store, ver.Ver, "benchDB")
if err != nil {
log.Fatal(err)
}
Expand Down
12 changes: 6 additions & 6 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
return errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(boRegionMiss, errors.New(regionErr.String()))
err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -354,7 +354,7 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
}
prewriteResp := resp.Prewrite
if prewriteResp == nil {
return errors.Trace(errBodyMissing)
return errors.Trace(ErrBodyMissing)
}
keyErrs := prewriteResp.GetErrors()
if len(keyErrs) == 0 {
Expand Down Expand Up @@ -386,7 +386,7 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
return errors.Trace(err)
}
if !ok {
err = bo.Backoff(boTxnLock, errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
err = bo.Backoff(BoTxnLock, errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -453,7 +453,7 @@ func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) er
return errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(boRegionMiss, errors.New(regionErr.String()))
err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -463,7 +463,7 @@ func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) er
}
commitResp := resp.Commit
if commitResp == nil {
return errors.Trace(errBodyMissing)
return errors.Trace(ErrBodyMissing)
}
if keyErr := commitResp.GetError(); keyErr != nil {
c.mu.RLock()
Expand Down Expand Up @@ -509,7 +509,7 @@ func (c *twoPhaseCommitter) cleanupSingleBatch(bo *Backoffer, batch batchKeys) e
return errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(boRegionMiss, errors.New(regionErr.String()))
err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
}
Expand Down
23 changes: 12 additions & 11 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,27 @@ func expo(base, cap, n int) int {

type backoffType int

// Back off types.
const (
boTiKVRPC backoffType = iota
boTxnLock
BoTxnLock
boTxnLockFast
boPDRPC
boRegionMiss
BoRegionMiss
boServerBusy
)

func (t backoffType) createFn() func() int {
switch t {
case boTiKVRPC:
return NewBackoffFn(100, 2000, EqualJitter)
case boTxnLock:
case BoTxnLock:
return NewBackoffFn(200, 3000, EqualJitter)
case boTxnLockFast:
return NewBackoffFn(100, 3000, EqualJitter)
case boPDRPC:
return NewBackoffFn(500, 3000, EqualJitter)
case boRegionMiss:
case BoRegionMiss:
return NewBackoffFn(100, 500, NoJitter)
case boServerBusy:
return NewBackoffFn(2000, 10000, EqualJitter)
Expand All @@ -102,13 +103,13 @@ func (t backoffType) String() string {
switch t {
case boTiKVRPC:
return "tikvRPC"
case boTxnLock:
case BoTxnLock:
return "txnLock"
case boTxnLockFast:
return "txnLockFast"
case boPDRPC:
return "pdRPC"
case boRegionMiss:
case BoRegionMiss:
return "regionMiss"
case boServerBusy:
return "serverBusy"
Expand All @@ -120,11 +121,11 @@ func (t backoffType) TError() *terror.Error {
switch t {
case boTiKVRPC:
return ErrTiKVServerTimeout
case boTxnLock, boTxnLockFast:
case BoTxnLock, boTxnLockFast:
return ErrResolveLockTimeout
case boPDRPC:
return ErrPDServerTimeout.GenByArgs(txnRetryableMark)
case boRegionMiss:
case BoRegionMiss:
return ErrRegionUnavailable
case boServerBusy:
return ErrTiKVServerBusy
Expand All @@ -142,9 +143,9 @@ const (
getMaxBackoff = 20000
prewriteMaxBackoff = 20000
cleanupMaxBackoff = 20000
gcMaxBackoff = 100000
gcResolveLockMaxBackoff = 100000
gcDeleteRangeMaxBackoff = 100000
GcMaxBackoff = 100000
GcResolveLockMaxBackoff = 100000
GcDeleteRangeMaxBackoff = 100000
rawkvMaxBackoff = 20000
splitRegionBackoff = 20000
)
Expand Down
5 changes: 3 additions & 2 deletions store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ import (
"google.golang.org/grpc"
)

// Timeout durations.
const (
maxConnectionNumber = 16
dialTimeout = 5 * time.Second
readTimeoutShort = 20 * time.Second // For requests that read/write several key-values.
readTimeoutMedium = 60 * time.Second // For requests that may need scan region.
readTimeoutLong = 150 * time.Second // For requests that may need scan region multiple times.
ReadTimeoutMedium = 60 * time.Second // For requests that may need scan region.
ReadTimeoutLong = 150 * time.Second // For requests that may need scan region multiple times.

grpcInitialWindowSize = 1 << 30
grpcInitialConnWindowSize = 1 << 30
Expand Down
6 changes: 3 additions & 3 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,20 +476,20 @@ func (it *copIterator) handleTask(bo *Backoffer, task *copTask) []copResponse {
NotFillCache: it.req.NotFillCache,
},
}
resp, err := sender.SendReq(bo, req, task.region, readTimeoutMedium)
resp, err := sender.SendReq(bo, req, task.region, ReadTimeoutMedium)
if err != nil {
return []copResponse{{err: errors.Trace(err)}}
}
if regionErr := resp.Cop.GetRegionError(); regionErr != nil {
err = bo.Backoff(boRegionMiss, errors.New(regionErr.String()))
err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return []copResponse{{err: errors.Trace(err)}}
}
return it.handleRegionErrorTask(bo, task)
}
if e := resp.Cop.GetLocked(); e != nil {
log.Debugf("coprocessor encounters lock: %v", e)
ok, err1 := it.store.lockResolver.ResolveLocks(bo, []*Lock{newLock(e)})
ok, err1 := it.store.lockResolver.ResolveLocks(bo, []*Lock{NewLock(e)})
if err1 != nil {
return []copResponse{{err: errors.Trace(err1)}}
}
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
)

var (
// errBodyMissing response body is missing error
errBodyMissing = errors.New("response body is missing")
// ErrBodyMissing response body is missing error
ErrBodyMissing = errors.New("response body is missing")
)

// TiDB decides whether to retry transaction by checking if error message contains
Expand Down
Loading

0 comments on commit 473363a

Please sign in to comment.