Skip to content

Commit

Permalink
executor: rename txnScope as readReplicaScope instead (pingcap#27987)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer authored Sep 14, 2021
1 parent b52f3ef commit 193b74e
Show file tree
Hide file tree
Showing 19 changed files with 316 additions and 311 deletions.
29 changes: 14 additions & 15 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/oracle"
)

// RequestBuilder is used to build a "kv.Request".
Expand All @@ -48,21 +47,21 @@ type RequestBuilder struct {

// Build builds a "kv.Request".
func (builder *RequestBuilder) Build() (*kv.Request, error) {
if builder.TxnScope == "" {
builder.TxnScope = oracle.GlobalTxnScope
if builder.ReadReplicaScope == "" {
builder.ReadReplicaScope = kv.GlobalReplicaScope
}
if builder.IsStaleness && builder.TxnScope != kv.GlobalTxnScope {
if builder.IsStaleness && builder.ReadReplicaScope != kv.GlobalReplicaScope {
builder.MatchStoreLabels = []*metapb.StoreLabel{
{
Key: placement.DCLabelKey,
Value: builder.TxnScope,
Value: builder.ReadReplicaScope,
},
}
}
failpoint.Inject("assertRequestBuilderStalenessOption", func(val failpoint.Value) {
assertScope := val.(string)
if len(assertScope) > 0 {
if builder.IsStaleness && assertScope != builder.TxnScope {
if builder.IsStaleness && assertScope != builder.ReadReplicaScope {
panic("request builder get staleness option fail")
}
}
Expand Down Expand Up @@ -294,10 +293,10 @@ func (builder *RequestBuilder) SetResourceGroupTag(sc *stmtctx.StatementContext)
}

func (builder *RequestBuilder) verifyTxnScope() error {
if builder.TxnScope == "" {
builder.TxnScope = kv.GlobalTxnScope
if builder.ReadReplicaScope == "" {
builder.ReadReplicaScope = kv.GlobalReplicaScope
}
if builder.TxnScope == kv.GlobalTxnScope || builder.is == nil {
if builder.ReadReplicaScope == kv.GlobalReplicaScope || builder.is == nil {
return nil
}
visitPhysicalTableID := make(map[int64]struct{})
Expand All @@ -311,7 +310,7 @@ func (builder *RequestBuilder) verifyTxnScope() error {
}

for phyTableID := range visitPhysicalTableID {
valid := VerifyTxnScope(builder.TxnScope, phyTableID, builder.is)
valid := VerifyTxnScope(builder.ReadReplicaScope, phyTableID, builder.is)
if !valid {
var tblName string
var partName string
Expand All @@ -323,20 +322,20 @@ func (builder *RequestBuilder) verifyTxnScope() error {
tblInfo, _ = builder.is.TableByID(phyTableID)
tblName = tblInfo.Meta().Name.String()
}
err := fmt.Errorf("table %v can not be read by %v txn_scope", tblName, builder.TxnScope)
err := fmt.Errorf("table %v can not be read by %v txn_scope", tblName, builder.ReadReplicaScope)
if len(partName) > 0 {
err = fmt.Errorf("table %v's partition %v can not be read by %v txn_scope",
tblName, partName, builder.TxnScope)
tblName, partName, builder.ReadReplicaScope)
}
return err
}
}
return nil
}

// SetTxnScope sets request TxnScope
func (builder *RequestBuilder) SetTxnScope(scope string) *RequestBuilder {
builder.TxnScope = scope
// SetReadReplicaScope sets request readReplicaScope
func (builder *RequestBuilder) SetReadReplicaScope(scope string) *RequestBuilder {
builder.ReadReplicaScope = scope
return builder
}

Expand Down
191 changes: 95 additions & 96 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tipb/go-tipb"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

type handleRange struct {
Expand Down Expand Up @@ -252,17 +251,17 @@ func TestRequestBuilder1(t *testing.T) {
EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xc, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x23},
},
},
Cacheable: true,
KeepOrder: false,
Desc: false,
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
TxnScope: oracle.GlobalTxnScope,
Cacheable: true,
KeepOrder: false,
Desc: false,
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
}
require.Equal(t, expect, actual)
}
Expand Down Expand Up @@ -330,17 +329,17 @@ func TestRequestBuilder2(t *testing.T) {
EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xc, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x3, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x23},
},
},
Cacheable: true,
KeepOrder: false,
Desc: false,
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
TxnScope: oracle.GlobalTxnScope,
Cacheable: true,
KeepOrder: false,
Desc: false,
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
}
require.Equal(t, expect, actual)
}
Expand Down Expand Up @@ -379,17 +378,17 @@ func TestRequestBuilder3(t *testing.T) {
EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x65},
},
},
Cacheable: true,
KeepOrder: false,
Desc: false,
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
TxnScope: oracle.GlobalTxnScope,
Cacheable: true,
KeepOrder: false,
Desc: false,
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
}
require.Equal(t, expect, actual)
}
Expand Down Expand Up @@ -424,21 +423,21 @@ func TestRequestBuilder4(t *testing.T) {
Build()
require.NoError(t, err)
expect := &kv.Request{
Tp: 103,
StartTs: 0x0,
Data: []uint8{0x18, 0x0, 0x20, 0x0, 0x40, 0x0, 0x5a, 0x0},
KeyRanges: keyRanges,
Cacheable: true,
KeepOrder: false,
Desc: false,
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
Streaming: true,
NotFillCache: false,
SyncLog: false,
ReplicaRead: kv.ReplicaReadLeader,
TxnScope: oracle.GlobalTxnScope,
Tp: 103,
StartTs: 0x0,
Data: []uint8{0x18, 0x0, 0x20, 0x0, 0x40, 0x0, 0x5a, 0x0},
KeyRanges: keyRanges,
Cacheable: true,
KeepOrder: false,
Desc: false,
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
Streaming: true,
NotFillCache: false,
SyncLog: false,
ReplicaRead: kv.ReplicaReadLeader,
ReadReplicaScope: kv.GlobalReplicaScope,
}
require.Equal(t, expect, actual)
}
Expand Down Expand Up @@ -471,19 +470,19 @@ func TestRequestBuilder5(t *testing.T) {
Build()
require.NoError(t, err)
expect := &kv.Request{
Tp: 104,
StartTs: 0x0,
Data: []uint8{0x8, 0x0, 0x18, 0x0, 0x20, 0x0},
KeyRanges: keyRanges,
KeepOrder: true,
Desc: false,
Concurrency: 15,
IsolationLevel: kv.RC,
Priority: 1,
NotFillCache: true,
SyncLog: false,
Streaming: false,
TxnScope: oracle.GlobalTxnScope,
Tp: 104,
StartTs: 0x0,
Data: []uint8{0x8, 0x0, 0x18, 0x0, 0x20, 0x0},
KeyRanges: keyRanges,
KeepOrder: true,
Desc: false,
Concurrency: 15,
IsolationLevel: kv.RC,
Priority: 1,
NotFillCache: true,
SyncLog: false,
Streaming: false,
ReadReplicaScope: kv.GlobalReplicaScope,
}
require.Equal(t, expect, actual)
}
Expand All @@ -503,19 +502,19 @@ func TestRequestBuilder6(t *testing.T) {
Build()
require.NoError(t, err)
expect := &kv.Request{
Tp: 105,
StartTs: 0x0,
Data: []uint8{0x10, 0x0, 0x18, 0x0},
KeyRanges: keyRanges,
KeepOrder: false,
Desc: false,
Concurrency: concurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: true,
SyncLog: false,
Streaming: false,
TxnScope: oracle.GlobalTxnScope,
Tp: 105,
StartTs: 0x0,
Data: []uint8{0x10, 0x0, 0x18, 0x0},
KeyRanges: keyRanges,
KeepOrder: false,
Desc: false,
Concurrency: concurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: true,
SyncLog: false,
Streaming: false,
ReadReplicaScope: kv.GlobalReplicaScope,
}
require.Equal(t, expect, actual)
}
Expand All @@ -541,18 +540,18 @@ func TestRequestBuilder7(t *testing.T) {
Build()
require.NoError(t, err)
expect := &kv.Request{
Tp: 0,
StartTs: 0x0,
KeepOrder: false,
Desc: false,
Concurrency: concurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: replicaRead.replicaReadType,
TxnScope: oracle.GlobalTxnScope,
Tp: 0,
StartTs: 0x0,
KeepOrder: false,
Desc: false,
Concurrency: concurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: replicaRead.replicaReadType,
ReadReplicaScope: kv.GlobalReplicaScope,
}
require.Equal(t, expect, actual)
})
Expand All @@ -567,15 +566,15 @@ func TestRequestBuilder8(t *testing.T) {
Build()
require.NoError(t, err)
expect := &kv.Request{
Tp: 0,
StartTs: 0x0,
Data: []uint8(nil),
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
MemTracker: (*memory.Tracker)(nil),
SchemaVar: 0,
TxnScope: oracle.GlobalTxnScope,
Tp: 0,
StartTs: 0x0,
Data: []uint8(nil),
Concurrency: variable.DefDistSQLScanConcurrency,
IsolationLevel: 0,
Priority: 0,
MemTracker: (*memory.Tracker)(nil),
SchemaVar: 0,
ReadReplicaScope: kv.GlobalReplicaScope,
}
require.Equal(t, expect, actual)
}
Expand Down
10 changes: 5 additions & 5 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ type ExecStmt struct {
SnapshotTS uint64
// IsStaleness means whether this statement use stale read.
IsStaleness bool
// TxnScope indicates the scope the store selector scope the request visited
TxnScope string
// ReplicaReadScope indicates the scope the store selector scope the request visited
ReplicaReadScope string
// InfoSchema stores a reference to the schema information.
InfoSchema infoschema.InfoSchema
// Plan stores a reference to the final physical plan.
Expand Down Expand Up @@ -248,7 +248,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
}
}
if a.PsStmt.Executor == nil {
b := newExecutorBuilder(a.Ctx, is, a.Ti, a.SnapshotTS, a.IsStaleness, a.TxnScope)
b := newExecutorBuilder(a.Ctx, is, a.Ti, a.SnapshotTS, a.IsStaleness, a.ReplicaReadScope)
newExecutor := b.build(a.Plan)
if b.err != nil {
return nil, b.err
Expand Down Expand Up @@ -294,7 +294,7 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) {
a.InfoSchema = ret.InfoSchema
a.SnapshotTS = ret.LastSnapshotTS
a.IsStaleness = ret.IsStaleness
a.TxnScope = ret.TxnScope
a.ReplicaReadScope = ret.ReadReplicaScope
p, names, err := planner.Optimize(ctx, a.Ctx, a.StmtNode, a.InfoSchema)
if err != nil {
return 0, err
Expand Down Expand Up @@ -787,7 +787,7 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
}

b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti, a.SnapshotTS, a.IsStaleness, a.TxnScope)
b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti, a.SnapshotTS, a.IsStaleness, a.ReplicaReadScope)
e := b.build(a.Plan)
if b.err != nil {
return nil, errors.Trace(b.err)
Expand Down
Loading

0 comments on commit 193b74e

Please sign in to comment.