From 3dff7cd348666fd31dca0ba50befe748b2a2eb63 Mon Sep 17 00:00:00 2001 From: disksing Date: Fri, 29 Jan 2021 01:52:14 -0600 Subject: [PATCH] store/tikv: resolve variable dependency (#22609) Signed-off-by: disksing --- domain/domain_test.go | 8 ++++---- domain/infosync/info.go | 5 ++--- executor/brie.go | 4 ++-- server/statistics_handler.go | 4 ++-- store/tikv/2pc_test.go | 7 +++---- store/tikv/gcworker/gc_worker.go | 3 +-- store/tikv/gcworker/gc_worker_test.go | 5 ++--- store/tikv/oracle/oracle.go | 6 ++++++ util/gcutil/gcutil.go | 3 ++- 9 files changed, 24 insertions(+), 21 deletions(-) diff --git a/domain/domain_test.go b/domain/domain_test.go index 3440e0116ce94..bdd1cc2962b15 100644 --- a/domain/domain_test.go +++ b/domain/domain_test.go @@ -437,16 +437,16 @@ func (*testSuite) TestT(c *C) { PS: make([]*util.ProcessInfo, 0), } infoSyncer.SetSessionManager(sm) - beforeTS := variable.GoTimeToTS(time.Now()) + beforeTS := oracle.GoTimeToTS(time.Now()) infoSyncer.ReportMinStartTS(dom.Store()) - afterTS := variable.GoTimeToTS(time.Now()) + afterTS := oracle.GoTimeToTS(time.Now()) c.Assert(infoSyncer.GetMinStartTS() > beforeTS && infoSyncer.GetMinStartTS() < afterTS, IsFalse) lowerLimit := time.Now().Add(-time.Duration(kv.MaxTxnTimeUse) * time.Millisecond) - validTS := variable.GoTimeToTS(lowerLimit.Add(time.Minute)) + validTS := oracle.GoTimeToTS(lowerLimit.Add(time.Minute)) sm.PS = []*util.ProcessInfo{ {CurTxnStartTS: 0}, {CurTxnStartTS: math.MaxUint64}, - {CurTxnStartTS: variable.GoTimeToTS(lowerLimit)}, + {CurTxnStartTS: oracle.GoTimeToTS(lowerLimit)}, {CurTxnStartTS: validTS}, } infoSyncer.SetSessionManager(sm) diff --git a/domain/infosync/info.go b/domain/infosync/info.go index 419d9b376fff9..3250009ee7c29 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -40,7 +40,6 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/sessionctx/binloginfo" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv/oracle" util2 "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/dbterror" @@ -564,9 +563,9 @@ func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) { return } now := time.Unix(0, oracle.ExtractPhysical(currentVer.Ver)*1e6) - startTSLowerLimit := variable.GoTimeToTS(now.Add(-time.Duration(kv.MaxTxnTimeUse) * time.Millisecond)) + startTSLowerLimit := oracle.GoTimeToTS(now.Add(-time.Duration(kv.MaxTxnTimeUse) * time.Millisecond)) - minStartTS := variable.GoTimeToTS(now) + minStartTS := oracle.GoTimeToTS(now) for _, info := range pl { if info.CurTxnStartTS > startTSLowerLimit && info.CurTxnStartTS < minStartTS { minStartTS = info.CurTxnStartTS diff --git a/executor/brie.go b/executor/brie.go index 4dd86aa9fdb1b..f5ec0827697e4 100644 --- a/executor/brie.go +++ b/executor/brie.go @@ -39,7 +39,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" - "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/sqlexec" @@ -164,7 +164,7 @@ func (b *executorBuilder) parseTSString(ts string) (uint64, error) { if err != nil { return 0, err } - return variable.GoTimeToTS(t1), nil + return oracle.GoTimeToTS(t1), nil } func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) Executor { diff --git a/server/statistics_handler.go b/server/statistics_handler.go index a40e1b19b321f..fc923396ddefa 100644 --- a/server/statistics_handler.go +++ b/server/statistics_handler.go @@ -22,7 +22,7 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/session" - "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/gcutil" "github.com/pingcap/tidb/util/sqlexec" @@ -104,7 +104,7 @@ func (sh StatsHistoryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request writeError(w, err) return } - snapshot := variable.GoTimeToTS(t1) + snapshot := oracle.GoTimeToTS(t1) err = gcutil.ValidateSnapshot(se, snapshot) if err != nil { writeError(w, err) diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index bbb2ed7a58fcc..b33d7a54a629f 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -31,7 +31,6 @@ import ( pb "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/mockstore/cluster" "github.com/pingcap/tidb/store/mockstore/mocktikv" "github.com/pingcap/tidb/store/tikv/oracle" @@ -957,14 +956,14 @@ func (s *testCommitterSuite) TestPkNotFound(c *C) { LockType: kvrpcpb.Op_PessimisticLock, LockForUpdateTS: txn1.startTS, } - status, err = s.store.lockResolver.getTxnStatusFromLock(bo, lockKey2, variable.GoTimeToTS(time.Now().Add(200*time.Millisecond)), false) + status, err = s.store.lockResolver.getTxnStatusFromLock(bo, lockKey2, oracle.GoTimeToTS(time.Now().Add(200*time.Millisecond)), false) c.Assert(err, IsNil) c.Assert(status.Action(), Equals, kvrpcpb.Action_TTLExpirePessimisticRollback) // Txn2 tries to lock the secondary key k2, there should be no dead loop. // Since the resolving key k2 is a pessimistic lock, no rollback record should be written, and later lock // and the other secondary key k3 should succeed if there is no fail point enabled. - status, err = s.store.lockResolver.getTxnStatusFromLock(bo, lockKey2, variable.GoTimeToTS(time.Now().Add(200*time.Millisecond)), false) + status, err = s.store.lockResolver.getTxnStatusFromLock(bo, lockKey2, oracle.GoTimeToTS(time.Now().Add(200*time.Millisecond)), false) c.Assert(err, IsNil) c.Assert(status.Action(), Equals, kvrpcpb.Action_LockNotExistDoNothing) txn2 := s.begin(c) @@ -997,7 +996,7 @@ func (s *testCommitterSuite) TestPkNotFound(c *C) { lockCtx = &kv.LockCtx{ForUpdateTS: txn3.startTS, WaitStartTime: time.Now(), LockWaitTime: kv.LockNoWait} err = txn3.LockKeys(ctx, lockCtx, k3) c.Assert(err, IsNil) - status, err = s.store.lockResolver.getTxnStatusFromLock(bo, lockKey3, variable.GoTimeToTS(time.Now().Add(200*time.Millisecond)), false) + status, err = s.store.lockResolver.getTxnStatusFromLock(bo, lockKey3, oracle.GoTimeToTS(time.Now().Add(200*time.Millisecond)), false) c.Assert(err, IsNil) c.Assert(status.Action(), Equals, kvrpcpb.Action_LockNotExistDoNothing) } diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index e0a7591e98184..4cde4b08a0cb6 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -42,7 +42,6 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/session" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/oracle" @@ -502,7 +501,7 @@ func (w *GCWorker) calcNewSafePoint(ctx context.Context, now time.Time) (*time.T return nil, 0, errors.Trace(err) } - safePointValue := w.calcSafePointByMinStartTS(ctx, variable.GoTimeToTS(now.Add(-*lifeTime))) + safePointValue := w.calcSafePointByMinStartTS(ctx, oracle.GoTimeToTS(now.Add(-*lifeTime))) safePointValue, err = w.setGCWorkerServiceSafePoint(ctx, safePointValue) if err != nil { return nil, 0, errors.Trace(err) diff --git a/store/tikv/gcworker/gc_worker_test.go b/store/tikv/gcworker/gc_worker_test.go index 5c850ebea3feb..4294f5dc254c2 100644 --- a/store/tikv/gcworker/gc_worker_test.go +++ b/store/tikv/gcworker/gc_worker_test.go @@ -37,7 +37,6 @@ import ( "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/session" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/mockoracle" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/mockstore/cluster" @@ -241,7 +240,7 @@ func (s *testGCWorkerSuite) TestMinStartTS(c *C) { spkv := s.store.GetSafePointKV() err := spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), strconv.FormatUint(math.MaxUint64, 10)) c.Assert(err, IsNil) - now := variable.GoTimeToTS(time.Now()) + now := oracle.GoTimeToTS(time.Now()) sp := s.gcWorker.calcSafePointByMinStartTS(ctx, now) c.Assert(sp, Equals, now) err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), "0") @@ -380,7 +379,7 @@ func (s *testGCWorkerSuite) TestPrepareGC(c *C) { // Check skipping GC if safe point is not changed. safePointTime, err := s.gcWorker.loadTime(gcSafePointKey) - minStartTS := variable.GoTimeToTS(*safePointTime) + 1 + minStartTS := oracle.GoTimeToTS(*safePointTime) + 1 c.Assert(err, IsNil) spkv := s.store.GetSafePointKV() err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "a"), strconv.FormatUint(minStartTS, 10)) diff --git a/store/tikv/oracle/oracle.go b/store/tikv/oracle/oracle.go index fa471eeb4d1b5..1dd6526b20865 100644 --- a/store/tikv/oracle/oracle.go +++ b/store/tikv/oracle/oracle.go @@ -88,3 +88,9 @@ func GetTimeFromTS(ts uint64) time.Time { ms := ExtractPhysical(ts) return time.Unix(ms/1e3, (ms%1e3)*1e6) } + +// GoTimeToTS converts a Go time to uint64 timestamp. +func GoTimeToTS(t time.Time) uint64 { + ts := (t.UnixNano() / int64(time.Millisecond)) << physicalShiftBits + return uint64(ts) +} diff --git a/util/gcutil/gcutil.go b/util/gcutil/gcutil.go index c04873fda8060..e307d876604e4 100644 --- a/util/gcutil/gcutil.go +++ b/util/gcutil/gcutil.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/util" "github.com/pingcap/tidb/util/sqlexec" ) @@ -86,6 +87,6 @@ func GetGCSafePoint(ctx sessionctx.Context) (uint64, error) { if err != nil { return 0, errors.Trace(err) } - ts := variable.GoTimeToTS(safePointTime) + ts := oracle.GoTimeToTS(safePointTime) return ts, nil }