Skip to content

Commit

Permalink
store/tikv/gcworker: Make resolveLock concurrent (pingcap#10379)
Browse files Browse the repository at this point in the history
  • Loading branch information
MyonKeminta authored and zhangjinpeng87 committed May 16, 2019
1 parent f100cdd commit 148c5fa
Show file tree
Hide file tree
Showing 9 changed files with 590 additions and 64 deletions.
2 changes: 2 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,6 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVPendingBatchRequests)
prometheus.MustRegister(TiKVBatchWaitDuration)
prometheus.MustRegister(TiKVBatchClientUnavailable)
prometheus.MustRegister(TiKVRangeTaskStats)
prometheus.MustRegister(TiKVRangeTaskPushDuration)
}
18 changes: 18 additions & 0 deletions metrics/tikvclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,22 @@ var (
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms ~ 1000s
Help: "batch client unavailable",
})

TiKVRangeTaskStats = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "tikvclient",
Name: "range_task_stats",
Help: "stat of range tasks",
}, []string{LblType, LblResult})

TiKVRangeTaskPushDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "tikvclient",
Name: "range_task_push_duration",
// 1ms ~ 1000s
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20),
Help: "duration to push sub tasks to range task workers",
}, []string{LblType})
)
12 changes: 12 additions & 0 deletions store/mockstore/mocktikv/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,18 @@ func (c *Cluster) GetStore(storeID uint64) *metapb.Store {
return nil
}

// GetAllStores returns all Stores' meta.
func (c *Cluster) GetAllStores() []*metapb.Store {
c.RLock()
defer c.RUnlock()

stores := make([]*metapb.Store, 0, len(c.stores))
for _, store := range c.stores {
stores = append(stores, proto.Clone(store.meta).(*metapb.Store))
}
return stores
}

// StopStore stops a store with storeID.
func (c *Cluster) StopStore(storeID uint64) {
c.Lock()
Expand Down
3 changes: 1 addition & 2 deletions store/mockstore/mocktikv/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/client"
Expand Down Expand Up @@ -100,7 +99,7 @@ func (c *pdClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store,
}

func (c *pdClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) {
panic(errors.New("unimplemented"))
return c.cluster.GetAllStores(), nil
}

func (c *pdClient) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error) {
Expand Down
1 change: 1 addition & 0 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ const (
splitRegionBackoff = 20000
scatterRegionBackoff = 20000
waitScatterRegionFinishBackoff = 120000
locateRegionMaxBackoff = 20000
)

// CommitMaxBackoff is max sleep time of the 'commit' command
Expand Down
166 changes: 109 additions & 57 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ var gcVariableComments = map[string]string{
gcRunIntervalKey: "GC run interval, at least 10m, in Go format.",
gcLifeTimeKey: "All versions within life time will not be collected by GC, at least 10m, in Go format.",
gcSafePointKey: "All versions after safe point can be accessed. (DO NOT EDIT)",
gcConcurrencyKey: "How many go routines used to do GC parallel, [1, 128], default 2",
gcConcurrencyKey: "[DEPRECATED] How many goroutines used to do GC parallel, [1, 128], default 2",
gcEnableKey: "Current GC enable status",
gcModeKey: "Mode of GC, \"central\" or \"distributed\"",
}
Expand Down Expand Up @@ -258,11 +258,34 @@ func (w *GCWorker) leaderTick(ctx context.Context) error {
return nil
}

stores, err := w.getUpStores(ctx)
concurrency := len(stores)
if err != nil {
logutil.Logger(ctx).Error("[gc worker] failed to get up stores to calculate concurrency.",
zap.String("uuid", w.uuid),
zap.Error(err))

concurrency, err = w.loadGCConcurrencyWithDefault()
if err != nil {
logutil.Logger(ctx).Error("[gc worker] failed to load gc concurrency. use default value.",
zap.String("uuid", w.uuid),
zap.Error(err))
concurrency = gcDefaultConcurrency
}
}

if concurrency == 0 {
logutil.Logger(ctx).Error("[gc worker] no store is up",
zap.String("uuid", w.uuid))
return errors.New("[gc worker] no store is up")
}

w.gcIsRunning = true
logutil.Logger(ctx).Info("[gc worker] starts the whole job",
zap.String("uuid", w.uuid),
zap.Uint64("safePoint", safePoint))
go w.runGCJob(ctx, safePoint)
zap.Uint64("safePoint", safePoint),
zap.Int("concurrency", concurrency))
go w.runGCJob(ctx, safePoint, concurrency)
return nil
}

Expand Down Expand Up @@ -397,9 +420,9 @@ func (w *GCWorker) calculateNewSafePoint(now time.Time) (*time.Time, error) {
return &safePoint, nil
}

func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64) {
func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64, concurrency int) {
metrics.GCWorkerCounter.WithLabelValues("run_job").Inc()
err := w.resolveLocks(ctx, safePoint)
err := w.resolveLocks(ctx, safePoint, concurrency)
if err != nil {
logutil.Logger(ctx).Error("[gc worker] resolve locks returns an error",
zap.String("uuid", w.uuid),
Expand Down Expand Up @@ -448,7 +471,7 @@ func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64) {
return
}
} else {
err = w.doGC(ctx, safePoint)
err = w.doGC(ctx, safePoint, concurrency)
if err != nil {
logutil.Logger(ctx).Error("[gc worker] do GC returns an error",
zap.String("uuid", w.uuid),
Expand Down Expand Up @@ -544,7 +567,7 @@ func (w *GCWorker) redoDeleteRanges(ctx context.Context, safePoint uint64) error

func (w *GCWorker) sendUnsafeDestroyRangeRequest(ctx context.Context, startKey []byte, endKey []byte) error {
// Get all stores every time deleting a region. So the store list is less probably to be stale.
stores, err := w.pdClient.GetAllStores(ctx)
stores, err := w.getUpStores(ctx)
if err != nil {
logutil.Logger(ctx).Error("[gc worker] delete ranges: got an error while trying to get store list from PD",
zap.String("uuid", w.uuid),
Expand All @@ -563,10 +586,6 @@ func (w *GCWorker) sendUnsafeDestroyRangeRequest(ctx context.Context, startKey [
var wg sync.WaitGroup

for _, store := range stores {
if store.State != metapb.StoreState_Up {
continue
}

address := store.Address
storeID := store.Id
wg.Add(1)
Expand All @@ -588,6 +607,21 @@ func (w *GCWorker) sendUnsafeDestroyRangeRequest(ctx context.Context, startKey [
return errors.Trace(err)
}

func (w *GCWorker) getUpStores(ctx context.Context) ([]*metapb.Store, error) {
stores, err := w.pdClient.GetAllStores(ctx)
if err != nil {
return nil, errors.Trace(err)
}

upStores := make([]*metapb.Store, 0, len(stores))
for _, store := range stores {
if store.State == metapb.StoreState_Up {
upStores = append(upStores, store)
}
}
return upStores, nil
}

func (w *GCWorker) loadGCConcurrencyWithDefault() (int, error) {
str, err := w.loadValueFromSysTable(gcConcurrencyKey)
if err != nil {
Expand Down Expand Up @@ -640,9 +674,43 @@ func (w *GCWorker) checkUseDistributedGC() (bool, error) {
return true, nil
}

func (w *GCWorker) resolveLocks(ctx context.Context, safePoint uint64) error {
func (w *GCWorker) resolveLocks(ctx context.Context, safePoint uint64, concurrency int) error {
metrics.GCWorkerCounter.WithLabelValues("resolve_locks").Inc()
logutil.Logger(ctx).Info("[gc worker] start resolve locks",
zap.String("uuid", w.uuid),
zap.Uint64("safePoint", safePoint),
zap.Int("concurrency", concurrency))
startTime := time.Now()

handler := func(ctx context.Context, r kv.KeyRange) (int, error) {
return w.resolveLocksForRange(ctx, safePoint, r.StartKey, r.EndKey)
}

runner := tikv.NewRangeTaskRunner("resolve-locks-runner", w.store, concurrency, handler)
// Run resolve lock on the whole TiKV cluster. Empty keys means the range is unbounded.
err := runner.RunOnRange(ctx, []byte(""), []byte(""))
if err != nil {
logutil.Logger(ctx).Error("[gc worker] resolve locks failed",
zap.String("uuid", w.uuid),
zap.Uint64("safePoint", safePoint),
zap.Error(err))
return errors.Trace(err)
}

logutil.Logger(ctx).Info("[gc worker] finish resolve locks",
zap.String("uuid", w.uuid),
zap.Uint64("safePoint", safePoint),
zap.Int32("regions", runner.CompletedRegions()))
metrics.GCHistogram.WithLabelValues("resolve_locks").Observe(time.Since(startTime).Seconds())
return nil
}

func (w *GCWorker) resolveLocksForRange(
ctx context.Context,
safePoint uint64,
startKey []byte,
endKey []byte,
) (int, error) {
// for scan lock request, we must return all locks even if they are generated
// by the same transaction. because gc worker need to make sure all locks have been
// cleaned.
Expand All @@ -654,17 +722,12 @@ func (w *GCWorker) resolveLocks(ctx context.Context, safePoint uint64) error {
},
}

logutil.Logger(ctx).Info("[gc worker] start resolve locks",
zap.String("uuid", w.uuid),
zap.Uint64("safePoint", safePoint))
startTime := time.Now()
regions, totalResolvedLocks := 0, 0

var key []byte
regions := 0
key := startKey
for {
select {
case <-ctx.Done():
return errors.New("[gc worker] gc job canceled")
return regions, errors.New("[gc worker] gc job canceled")
default:
}

Expand All @@ -673,29 +736,29 @@ func (w *GCWorker) resolveLocks(ctx context.Context, safePoint uint64) error {
req.ScanLock.StartKey = key
loc, err := w.store.GetRegionCache().LocateKey(bo, key)
if err != nil {
return errors.Trace(err)
return regions, errors.Trace(err)
}
resp, err := w.store.SendReq(bo, req, loc.Region, tikv.ReadTimeoutMedium)
if err != nil {
return errors.Trace(err)
return regions, errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
return regions, errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(tikv.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
return regions, errors.Trace(err)
}
continue
}
locksResp := resp.ScanLock
if locksResp == nil {
return errors.Trace(tikv.ErrBodyMissing)
return regions, errors.Trace(tikv.ErrBodyMissing)
}
if locksResp.GetError() != nil {
return errors.Errorf("unexpected scanlock error: %s", locksResp)
return regions, errors.Errorf("unexpected scanlock error: %s", locksResp)
}
locksInfo := locksResp.GetLocks()
locks := make([]*tikv.Lock, len(locksInfo))
Expand All @@ -705,23 +768,19 @@ func (w *GCWorker) resolveLocks(ctx context.Context, safePoint uint64) error {

ok, err1 := w.store.GetLockResolver().BatchResolveLocks(bo, locks, loc.Region)
if err1 != nil {
return errors.Trace(err1)
return regions, errors.Trace(err1)
}
if !ok {
err = bo.Backoff(tikv.BoTxnLock, errors.Errorf("remain locks: %d", len(locks)))
if err != nil {
return errors.Trace(err)
return regions, errors.Trace(err)
}
continue
}

totalResolvedLocks += len(locks)
if len(locks) < gcScanLockLimit {
regions++
key = loc.EndKey
if len(key) == 0 {
break
}
} else {
logutil.Logger(ctx).Info("[gc worker] region has more than limit locks",
zap.String("uuid", w.uuid),
Expand All @@ -730,15 +789,12 @@ func (w *GCWorker) resolveLocks(ctx context.Context, safePoint uint64) error {
metrics.GCRegionTooManyLocksCounter.Inc()
key = locks[len(locks)-1].Key
}

if len(key) == 0 || (len(endKey) != 0 && bytes.Compare(key, endKey) >= 0) {
break
}
}
logutil.Logger(ctx).Info("[gc worker] finish resolve locks",
zap.String("uuid", w.uuid),
zap.Uint64("safePoint", safePoint),
zap.Int("regions", regions),
zap.Int("total resolved", totalResolvedLocks),
zap.Duration("cost time", time.Since(startTime)))
metrics.GCHistogram.WithLabelValues("resolve_locks").Observe(time.Since(startTime).Seconds())
return nil
return regions, nil
}

func (w *GCWorker) uploadSafePointToPD(ctx context.Context, safePoint uint64) error {
Expand Down Expand Up @@ -911,19 +967,7 @@ func (w *GCWorker) genNextGCTask(bo *tikv.Backoffer, safePoint uint64, key kv.Ke
return task, nil
}

func (w *GCWorker) doGC(ctx context.Context, safePoint uint64) error {
concurrency, err := w.loadGCConcurrencyWithDefault()
if err != nil {
logutil.Logger(ctx).Error("[gc worker] failed to load gcConcurrency",
zap.String("uuid", w.uuid),
zap.Error(err))
concurrency = gcDefaultConcurrency
}

return w.doGCInternal(ctx, safePoint, concurrency)
}

func (w *GCWorker) doGCInternal(ctx context.Context, safePoint uint64, concurrency int) error {
func (w *GCWorker) doGC(ctx context.Context, safePoint uint64, concurrency int) error {
metrics.GCWorkerCounter.WithLabelValues("do_gc").Inc()

err := w.saveSafePoint(w.store.GetSafePointKV(), tikv.GcSavedSafePoint, safePoint)
Expand Down Expand Up @@ -1189,15 +1233,15 @@ func RunGCJob(ctx context.Context, s tikv.Storage, safePoint uint64, identifier
uuid: identifier,
}

err := gcWorker.resolveLocks(ctx, safePoint)
err := gcWorker.resolveLocks(ctx, safePoint, concurrency)
if err != nil {
return errors.Trace(err)
}

if concurrency <= 0 {
return errors.Errorf("[gc worker] gc concurrency should greater than 0, current concurrency: %v", concurrency)
}
err = gcWorker.doGCInternal(ctx, safePoint, concurrency)
err = gcWorker.doGC(ctx, safePoint, concurrency)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1206,14 +1250,22 @@ func RunGCJob(ctx context.Context, s tikv.Storage, safePoint uint64, identifier

// RunDistributedGCJob notifies TiKVs to do GC. It is exported for kv api, do not use it with GCWorker at the same time.
// This function may not finish immediately because it may take some time to do resolveLocks.
func RunDistributedGCJob(ctx context.Context, s tikv.Storage, pd pd.Client, safePoint uint64, identifier string) error {
// Param concurrency specifies the concurrency of resolveLocks phase.
func RunDistributedGCJob(
ctx context.Context,
s tikv.Storage,
pd pd.Client,
safePoint uint64,
identifier string,
concurrency int,
) error {
gcWorker := &GCWorker{
store: s,
uuid: identifier,
pdClient: pd,
}

err := gcWorker.resolveLocks(ctx, safePoint)
err := gcWorker.resolveLocks(ctx, safePoint, concurrency)
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 148c5fa

Please sign in to comment.