Skip to content

Commit

Permalink
gc_worker: Add logs about adjusting gc safepoint by global minimum st…
Browse files Browse the repository at this point in the history
…art ts (pingcap#17892)

Co-authored-by: MyonKeminta <[email protected]>
Co-authored-by: pingcap-github-bot <[email protected]>
  • Loading branch information
3 people authored Jun 10, 2020
1 parent f12cdc4 commit 792d958
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 13 deletions.
25 changes: 16 additions & 9 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,26 +330,33 @@ func (w *GCWorker) checkPrepare(ctx context.Context) (bool, uint64, error) {
}

// calculateNewSafePoint uses the current global transaction min start timestamp to calculate the new safe point.
func (w *GCWorker) calSafePointByMinStartTS(safePoint time.Time) time.Time {
func (w *GCWorker) calSafePointByMinStartTS(ctx context.Context, safePoint time.Time) time.Time {
kvs, err := w.store.GetSafePointKV().GetWithPrefix(infosync.ServerMinStartTSPath)
if err != nil {
logutil.BgLogger().Warn("get all minStartTS failed", zap.Error(err))
logutil.Logger(ctx).Warn("get all minStartTS failed", zap.Error(err))
return safePoint
}

safePointTS := variable.GoTimeToTS(safePoint)
var globalMinStartTS uint64 = math.MaxUint64
for _, v := range kvs {
minStartTS, err := strconv.ParseUint(string(v.Value), 10, 64)
if err != nil {
logutil.BgLogger().Warn("parse minStartTS failed", zap.Error(err))
logutil.Logger(ctx).Warn("parse minStartTS failed", zap.Error(err))
continue
}
if minStartTS < safePointTS {
safePointTS = minStartTS
if minStartTS < globalMinStartTS {
globalMinStartTS = minStartTS
}
}
safePoint = time.Unix(0, oracle.ExtractPhysical(safePointTS)*1e6)
logutil.BgLogger().Debug("calSafePointByMinStartTS", zap.Time("safePoint", safePoint))

safePointTS := variable.GoTimeToTS(safePoint)
if globalMinStartTS < safePointTS {
safePoint = time.Unix(0, oracle.ExtractPhysical(globalMinStartTS)*1e6)
logutil.Logger(ctx).Info("[gc worker] gc safepoint blocked by a running session",
zap.String("uuid", w.uuid),
zap.Uint64("globalMinStartTS", globalMinStartTS),
zap.Time("safePoint", safePoint))
}
return safePoint
}

Expand Down Expand Up @@ -478,7 +485,7 @@ func (w *GCWorker) calculateNewSafePoint(ctx context.Context, now time.Time) (*t
if err != nil {
return nil, 0, errors.Trace(err)
}
safePoint := w.calSafePointByMinStartTS(now.Add(-*lifeTime))
safePoint := w.calSafePointByMinStartTS(ctx, now.Add(-*lifeTime))

safePointValue := oracle.ComposeTS(oracle.GetPhysical(safePoint), 0)
safePointValue, err = w.setGCWorkerServiceSafePoint(ctx, safePointValue)
Expand Down
9 changes: 5 additions & 4 deletions store/tikv/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,23 +231,24 @@ func (s *testGCWorkerSuite) TestGetOracleTime(c *C) {
}

func (s *testGCWorkerSuite) TestMinStartTS(c *C) {
ctx := context.Background()
spkv := s.store.GetSafePointKV()
err := spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), strconv.FormatUint(math.MaxUint64, 10))
c.Assert(err, IsNil)
now := time.Now()
sp := s.gcWorker.calSafePointByMinStartTS(now)
sp := s.gcWorker.calSafePointByMinStartTS(ctx, now)
c.Assert(sp.Second(), Equals, now.Second())
err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), "0")
c.Assert(err, IsNil)
sp = s.gcWorker.calSafePointByMinStartTS(now)
sp = s.gcWorker.calSafePointByMinStartTS(ctx, now)
zeroTime := time.Unix(0, oracle.ExtractPhysical(0)*1e6)
c.Assert(sp, Equals, zeroTime)

err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), "0")
c.Assert(err, IsNil)
err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "b"), "1")
c.Assert(err, IsNil)
sp = s.gcWorker.calSafePointByMinStartTS(now)
sp = s.gcWorker.calSafePointByMinStartTS(ctx, now)
c.Assert(sp, Equals, zeroTime)

err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"),
Expand All @@ -256,7 +257,7 @@ func (s *testGCWorkerSuite) TestMinStartTS(c *C) {
err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "b"),
strconv.FormatUint(variable.GoTimeToTS(now.Add(-20*time.Second)), 10))
c.Assert(err, IsNil)
sp = s.gcWorker.calSafePointByMinStartTS(now.Add(-10 * time.Second))
sp = s.gcWorker.calSafePointByMinStartTS(ctx, now.Add(-10*time.Second))
c.Assert(sp.Second(), Equals, now.Add(-20*time.Second).Second())
}

Expand Down

0 comments on commit 792d958

Please sign in to comment.