Skip to content

Commit

Permalink
store/tikv: export RunGCJob (pingcap#3439)
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch authored and coocood committed Jun 13, 2017
1 parent 1ccc71a commit cf96dca
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 42 deletions.
18 changes: 7 additions & 11 deletions cmd/benchdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
"golang.org/x/net/context"
)

var (
Expand Down Expand Up @@ -81,9 +82,8 @@ func main() {
}

type benchDB struct {
store kv.Storage
session tidb.Session
gcWorker *tikv.GCWorker
store kv.Storage
session tidb.Session
}

func newBenchDB() *benchDB {
Expand All @@ -101,14 +101,10 @@ func newBenchDB() *benchDB {
if err != nil {
log.Fatal(err)
}
gcWoker, err := tikv.NewGCWorker(store)
if err != nil {
log.Fatal(err)
}

return &benchDB{
store: store,
session: session,
gcWorker: gcWoker,
store: store,
session: session,
}
}

Expand Down Expand Up @@ -283,7 +279,7 @@ func (ut *benchDB) manualGC(done chan bool) {
if err != nil {
log.Fatal(err)
}
err = ut.gcWorker.DoGC(ver.Ver)
err = tikv.RunGCJob(context.Background(), ut.store, ver.Ver, "benchDB")
if err != nil {
log.Fatal(err)
}
Expand Down
73 changes: 42 additions & 31 deletions store/tikv/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type GCWorker struct {
session tidb.Session
gcIsRunning bool
lastFinish time.Time
quit chan struct{}
cancel goctx.CancelFunc
done chan error
}

Expand All @@ -60,16 +60,17 @@ func NewGCWorker(store kv.Storage) (*GCWorker, error) {
store: store.(*tikvStore),
gcIsRunning: false,
lastFinish: time.Now(),
quit: make(chan struct{}),
done: make(chan error),
}
go worker.start()
var ctx goctx.Context
ctx, worker.cancel = goctx.WithCancel(goctx.Background())
go worker.start(ctx)
return worker, nil
}

// Close stops background goroutines.
func (w *GCWorker) Close() {
close(w.quit)
w.cancel()
}

const (
Expand Down Expand Up @@ -101,7 +102,7 @@ var gcVariableComments = map[string]string{
gcSafePointKey: "All versions after safe point can be accessed. (DO NOT EDIT)",
}

func (w *GCWorker) start() {
func (w *GCWorker) start(ctx goctx.Context) {
log.Infof("[gc worker] %s start.", w.uuid)
ticker := time.NewTicker(gcWorkerTickInterval)
for {
Expand All @@ -127,7 +128,7 @@ func (w *GCWorker) start() {
break
}
if isLeader {
err = w.leaderTick()
err = w.leaderTick(ctx)
if err != nil {
log.Warnf("[gc worker] leader tick err: %v", err)
}
Expand All @@ -143,7 +144,7 @@ func (w *GCWorker) start() {
log.Errorf("[gc worker] runGCJob error: %v", err)
break
}
case <-w.quit:
case <-ctx.Done():
log.Infof("[gc worker] (%s) quit.", w.uuid)
return
}
Expand All @@ -168,7 +169,7 @@ func (w *GCWorker) storeIsBootstrapped() bool {
}

// Leader of GC worker checks if it should start a GC job every tick.
func (w *GCWorker) leaderTick() error {
func (w *GCWorker) leaderTick(ctx goctx.Context) error {
if w.gcIsRunning {
return nil
}
Expand All @@ -185,7 +186,7 @@ func (w *GCWorker) leaderTick() error {

w.gcIsRunning = true
log.Infof("[gc worker] %s starts GC job, safePoint: %v", w.uuid, safePoint)
go w.runGCJob(safePoint)
go w.runGCJob(ctx, safePoint)
return nil
}

Expand Down Expand Up @@ -259,24 +260,35 @@ func (w *GCWorker) calculateNewSafePoint(now time.Time) (*time.Time, error) {
return &safePoint, nil
}

func (w *GCWorker) runGCJob(safePoint uint64) {
gcWorkerCounter.WithLabelValues("run_job").Inc()

err := w.resolveLocks(safePoint)
// RunGCJob sends GC command to KV. it is exported for testing purpose, do not use it with GCWorker at the same time.
func RunGCJob(ctx goctx.Context, store kv.Storage, safePoint uint64, identifier string) error {
s, ok := store.(*tikvStore)
if !ok {
return errors.New("should use tikv driver")
}
err := resolveLocks(ctx, s, safePoint, identifier)
if err != nil {
w.done <- errors.Trace(err)
return
return errors.Trace(err)
}
err = w.DoGC(safePoint)
err = doGC(ctx, s, safePoint, identifier)
if err != nil {
return errors.Trace(err)
}
return nil
}

func (w *GCWorker) runGCJob(ctx goctx.Context, safePoint uint64) {
gcWorkerCounter.WithLabelValues("run_job").Inc()
err := RunGCJob(ctx, w.store, safePoint, w.uuid)
if err != nil {
w.done <- errors.Trace(err)
return
}
w.done <- nil
}

func (w *GCWorker) resolveLocks(safePoint uint64) error {
func resolveLocks(ctx goctx.Context, store *tikvStore, safePoint uint64, identifier string) error {
gcWorkerCounter.WithLabelValues("resolve_locks").Inc()

req := &tikvrpc.Request{
Type: tikvrpc.CmdScanLock,
ScanLock: &kvrpcpb.ScanLockRequest{
Expand All @@ -285,23 +297,23 @@ func (w *GCWorker) resolveLocks(safePoint uint64) error {
}
bo := NewBackoffer(gcResolveLockMaxBackoff, goctx.Background())

log.Infof("[gc worker] %s start resolve locks, safePoint: %v.", w.uuid, safePoint)
log.Infof("[gc worker] %s start resolve locks, safePoint: %v.", identifier, safePoint)
startTime := time.Now()
regions, totalResolvedLocks := 0, 0

var key []byte
for {
select {
case <-w.quit:
case <-ctx.Done():
return errors.New("[gc worker] gc job canceled")
default:
}

loc, err := w.store.regionCache.LocateKey(bo, key)
loc, err := store.regionCache.LocateKey(bo, key)
if err != nil {
return errors.Trace(err)
}
resp, err := w.store.SendReq(bo, req, loc.Region, readTimeoutMedium)
resp, err := store.SendReq(bo, req, loc.Region, readTimeoutMedium)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -328,7 +340,7 @@ func (w *GCWorker) resolveLocks(safePoint uint64) error {
for i := range locksInfo {
locks[i] = newLock(locksInfo[i])
}
ok, err1 := w.store.lockResolver.ResolveLocks(bo, locks)
ok, err1 := store.lockResolver.ResolveLocks(bo, locks)
if err1 != nil {
return errors.Trace(err1)
}
Expand All @@ -346,13 +358,12 @@ func (w *GCWorker) resolveLocks(safePoint uint64) error {
break
}
}
log.Infof("[gc worker] %s finish resolve locks, safePoint: %v, regions: %v, total resolved: %v, cost time: %s", w.uuid, safePoint, regions, totalResolvedLocks, time.Since(startTime))
log.Infof("[gc worker] %s finish resolve locks, safePoint: %v, regions: %v, total resolved: %v, cost time: %s", identifier, safePoint, regions, totalResolvedLocks, time.Since(startTime))
gcHistogram.WithLabelValues("resolve_locks").Observe(time.Since(startTime).Seconds())
return nil
}

// DoGC sends GC command to KV, it is exported for testing purpose.
func (w *GCWorker) DoGC(safePoint uint64) error {
func doGC(ctx goctx.Context, store *tikvStore, safePoint uint64, identifier string) error {
gcWorkerCounter.WithLabelValues("do_gc").Inc()

req := &tikvrpc.Request{
Expand All @@ -363,23 +374,23 @@ func (w *GCWorker) DoGC(safePoint uint64) error {
}
bo := NewBackoffer(gcMaxBackoff, goctx.Background())

log.Infof("[gc worker] %s start gc, safePoint: %v.", w.uuid, safePoint)
log.Infof("[gc worker] %s start gc, safePoint: %v.", identifier, safePoint)
startTime := time.Now()
regions := 0

var key []byte
for {
select {
case <-w.quit:
case <-ctx.Done():
return errors.New("[gc worker] gc job canceled")
default:
}

loc, err := w.store.regionCache.LocateKey(bo, key)
loc, err := store.regionCache.LocateKey(bo, key)
if err != nil {
return errors.Trace(err)
}
resp, err := w.store.SendReq(bo, req, loc.Region, readTimeoutLong)
resp, err := store.SendReq(bo, req, loc.Region, readTimeoutLong)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -407,7 +418,7 @@ func (w *GCWorker) DoGC(safePoint uint64) error {
break
}
}
log.Infof("[gc worker] %s finish gc, safePoint: %v, regions: %v, cost time: %s", w.uuid, safePoint, regions, time.Since(startTime))
log.Infof("[gc worker] %s finish gc, safePoint: %v, regions: %v, cost time: %s", identifier, safePoint, regions, time.Since(startTime))
gcHistogram.WithLabelValues("do_gc").Observe(time.Since(startTime).Seconds())
return nil
}
Expand Down

0 comments on commit cf96dca

Please sign in to comment.