Skip to content

Commit

Permalink
kv: move TxnScope into kv (pingcap#24715)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored May 19, 2021
1 parent 424a5a8 commit 8fb29eb
Show file tree
Hide file tree
Showing 28 changed files with 134 additions and 123 deletions.
3 changes: 1 addition & 2 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv/mockstore/cluster"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -1343,7 +1342,7 @@ func getMaxTableHandle(ctx *testMaxTableRowIDContext, store kv.Storage) (kv.Hand
c := ctx.c
d := ctx.d
tbl := ctx.tbl
curVer, err := store.CurrentVersion(oracle.GlobalTxnScope)
curVer, err := store.CurrentVersion(kv.GlobalTxnScope)
c.Assert(err, IsNil)
maxHandle, emptyTable, err := d.GetTableMaxHandle(curVer.Ver, tbl.(table.PhysicalTable))
c.Assert(err, IsNil)
Expand Down
3 changes: 1 addition & 2 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -451,7 +450,7 @@ func doBatchInsert(s sqlexec.SQLExecutor, jobID int64, tableIDs []int64, ts uint

// getNowTS gets the current timestamp, in TSO.
func getNowTSO(ctx sessionctx.Context) (uint64, error) {
currVer, err := ctx.GetStore().CurrentVersion(oracle.GlobalTxnScope)
currVer, err := ctx.GetStore().CurrentVersion(kv.GlobalTxnScope)
if err != nil {
return 0, errors.Trace(err)
}
Expand Down
3 changes: 1 addition & 2 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -534,7 +533,7 @@ func getTableRange(d *ddlCtx, tbl table.PhysicalTable, snapshotVer uint64, prior
}

func getValidCurrentVersion(store kv.Storage) (ver kv.Version, err error) {
ver, err = store.CurrentVersion(oracle.GlobalTxnScope)
ver, err = store.CurrentVersion(kv.GlobalTxnScope)
if err != nil {
return ver, errors.Trace(err)
} else if ver.Ver <= 0 {
Expand Down
9 changes: 4 additions & 5 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -236,7 +235,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
}
builder.txnScope = sv.TxnCtx.TxnScope
builder.IsStaleness = sv.TxnCtx.IsStaleness
if builder.IsStaleness && builder.txnScope != oracle.GlobalTxnScope {
if builder.IsStaleness && builder.txnScope != kv.GlobalTxnScope {
builder.MatchStoreLabels = []*metapb.StoreLabel{
{
Key: placement.DCLabelKey,
Expand Down Expand Up @@ -279,9 +278,9 @@ func (builder *RequestBuilder) SetFromInfoSchema(is infoschema.InfoSchema) *Requ

func (builder *RequestBuilder) verifyTxnScope() error {
if builder.txnScope == "" {
builder.txnScope = oracle.GlobalTxnScope
builder.txnScope = kv.GlobalTxnScope
}
if builder.txnScope == oracle.GlobalTxnScope || builder.is == nil {
if builder.txnScope == kv.GlobalTxnScope || builder.is == nil {
return nil
}
visitPhysicalTableID := make(map[int64]struct{})
Expand Down Expand Up @@ -600,7 +599,7 @@ func CommonHandleRangesToKVRanges(sc *stmtctx.StatementContext, tids []int64, ra

// VerifyTxnScope verify whether the txnScope and visited physical table break the leader rule's dcLocation.
func VerifyTxnScope(txnScope string, physicalTableID int64, is infoschema.InfoSchema) bool {
if txnScope == "" || txnScope == oracle.GlobalTxnScope {
if txnScope == "" || txnScope == kv.GlobalTxnScope {
return true
}
bundle, ok := is.BundleByName(placement.GroupID(physicalTableID))
Expand Down
3 changes: 1 addition & 2 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/telemetry"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
Expand Down Expand Up @@ -336,7 +335,7 @@ func (do *Domain) Reload() error {
defer do.m.Unlock()

startTime := time.Now()
ver, err := do.store.CurrentVersion(oracle.GlobalTxnScope)
ver, err := do.store.CurrentVersion(kv.GlobalTxnScope)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (*testSuite) TestT(c *C) {

// for schemaValidator
schemaVer := dom.SchemaValidator.(*schemaValidator).LatestSchemaVersion()
ver, err := store.CurrentVersion(oracle.GlobalTxnScope)
ver, err := store.CurrentVersion(kv.GlobalTxnScope)
c.Assert(err, IsNil)
ts := ver.Ver

Expand All @@ -360,7 +360,7 @@ func (*testSuite) TestT(c *C) {
c.Assert(succ, Equals, ResultSucc)
time.Sleep(ddlLease)

ver, err = store.CurrentVersion(oracle.GlobalTxnScope)
ver, err = store.CurrentVersion(kv.GlobalTxnScope)
c.Assert(err, IsNil)
ts = ver.Ver
_, succ = dom.SchemaValidator.Check(ts, schemaVer, nil)
Expand Down
2 changes: 1 addition & 1 deletion domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) {
pl := is.manager.ShowProcessList()

// Calculate the lower limit of the start timestamp to avoid extremely old transaction delaying GC.
currentVer, err := store.CurrentVersion(oracle.GlobalTxnScope)
currentVer, err := store.CurrentVersion(kv.GlobalTxnScope)
if err != nil {
logutil.BgLogger().Error("update minStartTS failed", zap.Error(err))
return
Expand Down
3 changes: 1 addition & 2 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
driver "github.com/pingcap/tidb/store/driver/txn"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -122,7 +121,7 @@ func (e *BatchPointGetExec) Open(context.Context) error {
snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness
snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness)
if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != oracle.GlobalTxnScope {
if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != kv.GlobalTxnScope {
snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{
{
Key: placement.DCLabelKey,
Expand Down
4 changes: 2 additions & 2 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2693,11 +2693,11 @@ func (s *testSuiteP2) TestHistoryRead(c *C) {
// SnapshotTS Is not updated if check failed.
c.Assert(tk.Se.GetSessionVars().SnapshotTS, Equals, uint64(0))

curVer1, _ := s.store.CurrentVersion(oracle.GlobalTxnScope)
curVer1, _ := s.store.CurrentVersion(kv.GlobalTxnScope)
time.Sleep(time.Millisecond)
snapshotTime := time.Now()
time.Sleep(time.Millisecond)
curVer2, _ := s.store.CurrentVersion(oracle.GlobalTxnScope)
curVer2, _ := s.store.CurrentVersion(kv.GlobalTxnScope)
tk.MustExec("insert history_read values (2)")
tk.MustQuery("select * from history_read").Check(testkit.Rows("1", "2"))
tk.MustExec("set @@tidb_snapshot = '" + snapshotTime.Format("2006-01-02 15:04:05.999999") + "'")
Expand Down
5 changes: 2 additions & 3 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -153,7 +152,7 @@ func (e *PointGetExecutor) Open(context.Context) error {
e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness
e.snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness)
if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != oracle.GlobalTxnScope {
if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != kv.GlobalTxnScope {
e.snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{
{
Key: placement.DCLabelKey,
Expand Down Expand Up @@ -392,7 +391,7 @@ func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error)

func (e *PointGetExecutor) verifyTxnScope() error {
txnScope := e.txn.GetOption(kv.TxnScope).(string)
if txnScope == "" || txnScope == oracle.GlobalTxnScope {
if txnScope == "" || txnScope == kv.GlobalTxnScope {
return nil
}
var tblID int64
Expand Down
3 changes: 2 additions & 1 deletion executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/testkit"
)
Expand Down Expand Up @@ -76,7 +77,7 @@ func (s *testStaleTxnSerialSuite) TestExactStalenessTransaction(c *C) {
preSQL: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`,
sql: "begin",
IsStaleness: false,
txnScope: oracle.GlobalTxnScope,
txnScope: kv.GlobalTxnScope,
zone: "",
},
}
Expand Down
3 changes: 1 addition & 2 deletions kv/fault_injection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
)

type testFaultInjectionSuite struct{}
Expand All @@ -36,7 +35,7 @@ func (s testFaultInjectionSuite) TestFaultInjectionBasic(c *C) {
storage := NewInjectedStore(newMockStorage(), &cfg)
txn, err := storage.Begin()
c.Assert(err, IsNil)
_, err = storage.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(oracle.GlobalTxnScope).SetStartTs(0))
_, err = storage.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(GlobalTxnScope).SetStartTs(0))
c.Assert(err, IsNil)
ver := Version{Ver: 1}
snap := storage.GetSnapshot(ver)
Expand Down
3 changes: 1 addition & 2 deletions kv/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/store/tikv/oracle"
)

var _ = Suite(testMockSuite{})
Expand All @@ -29,7 +28,7 @@ func (s testMockSuite) TestInterface(c *C) {
storage := newMockStorage()
storage.GetClient()
storage.UUID()
version, err := storage.CurrentVersion(oracle.GlobalTxnScope)
version, err := storage.CurrentVersion(GlobalTxnScope)
c.Check(err, IsNil)
snapshot := storage.GetSnapshot(version)
_, err = snapshot.BatchGet(context.Background(), []Key{Key("abc"), Key("def")})
Expand Down
73 changes: 73 additions & 0 deletions kv/txn_scope_var.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package kv

import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/tikv/oracle"
)

// TxnScopeVar indicates the used txnScope for oracle
type TxnScopeVar struct {
// varValue indicates the value of @@txn_scope, which can only be `global` or `local`
varValue string
// txnScope indicates the value which the tidb-server holds to request tso to pd
txnScope string
}

// GetTxnScopeVar gets TxnScopeVar from config
func GetTxnScopeVar() TxnScopeVar {
isGlobal, location := config.GetTxnScopeFromConfig()
if isGlobal {
return NewGlobalTxnScopeVar()
}
return NewLocalTxnScopeVar(location)
}

// NewGlobalTxnScopeVar creates a Global TxnScopeVar
func NewGlobalTxnScopeVar() TxnScopeVar {
return newTxnScopeVar(GlobalTxnScope, GlobalTxnScope)
}

// NewLocalTxnScopeVar creates a Local TxnScopeVar with given real txnScope value.
func NewLocalTxnScopeVar(txnScope string) TxnScopeVar {
return newTxnScopeVar(LocalTxnScope, txnScope)
}

// GetVarValue returns the value of @@txn_scope which can only be `global` or `local`
func (t TxnScopeVar) GetVarValue() string {
return t.varValue
}

// GetTxnScope returns the value of the tidb-server holds to request tso to pd.
func (t TxnScopeVar) GetTxnScope() string {
return t.txnScope
}

func newTxnScopeVar(varValue string, txnScope string) TxnScopeVar {
return TxnScopeVar{
varValue: varValue,
txnScope: txnScope,
}
}

// Transaction scopes constants.
const (
// GlobalTxnScope is synced with PD's define of global scope.
// If we want to remove the dependency on store/tikv here, we need to map
// the two GlobalTxnScopes in the driver layer.
GlobalTxnScope = oracle.GlobalTxnScope
// LocalTxnScope indicates the transaction should use local ts.
LocalTxnScope = "local"
)
3 changes: 1 addition & 2 deletions meta/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/testleak"
. "github.com/pingcap/tidb/util/testutil"
)
Expand Down Expand Up @@ -291,7 +290,7 @@ func (s *testSuite) TestSnapshot(c *C) {
err = txn.Commit(context.Background())
c.Assert(err, IsNil)

ver1, _ := store.CurrentVersion(oracle.GlobalTxnScope)
ver1, _ := store.CurrentVersion(kv.GlobalTxnScope)
time.Sleep(time.Millisecond)
txn, _ = store.Begin()
m = meta.NewMeta(txn)
Expand Down
6 changes: 3 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/session/txninfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/fastrand"
Expand Down Expand Up @@ -311,9 +311,9 @@ func setSSLVariable(ca, key, cert string) {
func setTxnScope() {
variable.SetSysVar("txn_scope", func() string {
if isGlobal, _ := config.GetTxnScopeFromConfig(); isGlobal {
return oracle.GlobalTxnScope
return kv.GlobalTxnScope
}
return oracle.LocalTxnScope
return kv.LocalTxnScope
}())
}

Expand Down
4 changes: 2 additions & 2 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2029,14 +2029,14 @@ func (s *testPessimisticSuite) TestSelectForUpdateConflictRetry(c *C) {
tsCh := make(chan uint64)
go func() {
tk3.MustExec("update tk set c2 = c2 + 1 where c1 = 1")
lastTS, err := s.store.GetOracle().GetLowResolutionTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
lastTS, err := s.store.GetOracle().GetLowResolutionTimestamp(context.Background(), &oracle.Option{TxnScope: kv.GlobalTxnScope})
c.Assert(err, IsNil)
tsCh <- lastTS
tk3.MustExec("commit")
tsCh <- lastTS
}()
// tk2LastTS should be its forUpdateTS
tk2LastTS, err := s.store.GetOracle().GetLowResolutionTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
tk2LastTS, err := s.store.GetOracle().GetLowResolutionTimestamp(context.Background(), &oracle.Option{TxnScope: kv.GlobalTxnScope})
c.Assert(err, IsNil)
tk2.MustExec("commit")

Expand Down
3 changes: 1 addition & 2 deletions session/schema_amender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -426,7 +425,7 @@ func (s *testSchemaAmenderSuite) TestAmendCollectAndGenMutations(c *C) {
}
c.Assert(err, IsNil)
}
curVer, err := se.store.CurrentVersion(oracle.GlobalTxnScope)
curVer, err := se.store.CurrentVersion(kv.GlobalTxnScope)
c.Assert(err, IsNil)
se.sessionVars.TxnCtx.SetForUpdateTS(curVer.Ver + 1)
mutationVals, err := txn.BatchGet(ctx, checkKeys)
Expand Down
Loading

0 comments on commit 8fb29eb

Please sign in to comment.