Skip to content

Commit

Permalink
*: Provide staleReadProcessor to process stale read (pingcap#32699)
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao authored Mar 8, 2022
1 parent d8fbad3 commit f79d2a5
Show file tree
Hide file tree
Showing 6 changed files with 587 additions and 145 deletions.
39 changes: 4 additions & 35 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strings"
"time"

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
Expand All @@ -45,8 +46,10 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/table/temptable"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
Expand All @@ -60,11 +63,7 @@ import (
"github.com/pingcap/tidb/util/sem"
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"

"github.com/cznic/mathutil"
"github.com/pingcap/tidb/table/tables"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -3121,7 +3120,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan,
case *ast.BeginStmt:
readTS := b.ctx.GetSessionVars().TxnReadTS.PeakTxnReadTS()
if raw.AsOf != nil {
startTS, err := calculateTsExpr(b.ctx, raw.AsOf)
startTS, err := staleread.CalculateAsOfTsExpr(b.ctx, raw.AsOf)
if err != nil {
return nil, err
}
Expand All @@ -3138,36 +3137,6 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan,
return p, nil
}

// calculateTsExpr calculates the TsExpr of AsOfClause to get a StartTS.
func calculateTsExpr(sctx sessionctx.Context, asOfClause *ast.AsOfClause) (uint64, error) {
tsVal, err := evalAstExpr(sctx, asOfClause.TsExpr)
if err != nil {
return 0, err
}
toTypeTimestamp := types.NewFieldType(mysql.TypeTimestamp)
// We need at least the millionsecond here, so set fsp to 3.
toTypeTimestamp.Decimal = 3
tsTimestamp, err := tsVal.ConvertTo(sctx.GetSessionVars().StmtCtx, toTypeTimestamp)
if err != nil {
return 0, err
}
tsTime, err := tsTimestamp.GetMysqlTime().GoTime(sctx.GetSessionVars().Location())
if err != nil {
return 0, err
}
return oracle.GoTimeToTS(tsTime), nil
}

func calculateTsWithReadStaleness(sctx sessionctx.Context, readStaleness time.Duration) (uint64, error) {
nowVal, err := expression.GetStmtTimestamp(sctx)
if err != nil {
return 0, err
}
tsVal := nowVal.Add(readStaleness)
minTsVal := expression.GetMinSafeTime(sctx)
return oracle.GoTimeToTS(expression.CalAppropriateTime(tsVal, nowVal, minTsVal)), nil
}

func collectVisitInfoFromRevokeStmt(sctx sessionctx.Context, vi []visitInfo, stmt *ast.RevokeStmt) ([]visitInfo, error) {
// To use REVOKE, you must have the GRANT OPTION privilege,
// and you must have the privileges that you are granting.
Expand Down
141 changes: 31 additions & 110 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package core

import (
"context"
"fmt"
"math"
"strings"
Expand All @@ -26,7 +25,6 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
Expand All @@ -39,6 +37,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/temptable"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -113,7 +112,12 @@ func TryAddExtraLimit(ctx sessionctx.Context, node ast.StmtNode) ast.StmtNode {
// Preprocess resolves table names of the node, and checks some statements' validation.
// preprocessReturn used to extract the infoschema for the tableName and the timestamp from the asof clause.
func Preprocess(ctx sessionctx.Context, node ast.Node, preprocessOpt ...PreprocessOpt) error {
v := preprocessor{ctx: ctx, tableAliasInJoin: make([]map[string]interface{}, 0), withName: make(map[string]interface{})}
v := preprocessor{
ctx: ctx,
tableAliasInJoin: make([]map[string]interface{}, 0),
withName: make(map[string]interface{}),
staleReadProcessor: staleread.NewStaleReadProcessor(ctx),
}
for _, optFn := range preprocessOpt {
optFn(&v)
}
Expand Down Expand Up @@ -184,6 +188,8 @@ type preprocessor struct {
tableAliasInJoin []map[string]interface{}
withName map[string]interface{}

staleReadProcessor staleread.Processor

// values that may be returned
*PreprocessorReturn
*PreprocessExecuteISUpdate
Expand Down Expand Up @@ -1446,7 +1452,7 @@ func (p *preprocessor) handleTableName(tn *ast.TableName) {
return
}

p.handleAsOfAndReadTS(tn.AsOf)
p.handleAsOfAndReadTS(tn)
if p.err != nil {
return
}
Expand Down Expand Up @@ -1608,7 +1614,7 @@ func (p *preprocessor) checkFuncCastExpr(node *ast.FuncCastExpr) {
}

// handleAsOfAndReadTS tries to handle as of closure, or possibly read_ts.
func (p *preprocessor) handleAsOfAndReadTS(node *ast.AsOfClause) {
func (p *preprocessor) handleAsOfAndReadTS(tn *ast.TableName) {
if p.stmtTp != TypeSelect {
return
}
Expand All @@ -1620,117 +1626,32 @@ func (p *preprocessor) handleAsOfAndReadTS(node *ast.AsOfClause) {
p.ctx.GetSessionVars().StmtCtx.IsStaleness = true
}
}()
// When statement is during the Txn, we check whether there exists AsOfClause. If exists, we will return error,
// otherwise we should directly set the return param from TxnCtx.
p.ReadReplicaScope = kv.GlobalReplicaScope
if p.ctx.GetSessionVars().InTxn() {
if node != nil {
p.err = ErrAsOf.FastGenWithCause("as of timestamp can't be set in transaction.")
return
}
txnCtx := p.ctx.GetSessionVars().TxnCtx
p.ReadReplicaScope = txnCtx.TxnScope
// It means we meet following case:
// 1. start transaction read only as of timestamp ts
// 2. select statement
if txnCtx.IsStaleness {
p.LastSnapshotTS = txnCtx.StartTS
p.IsStaleness = txnCtx.IsStaleness
p.initedLastSnapshotTS = true
return
}
}
scope := config.GetTxnScopeFromConfig()
if p.ctx.GetSessionVars().GetReplicaRead().IsClosestRead() && scope != kv.GlobalReplicaScope {
p.ReadReplicaScope = scope
}

// If the statement is in auto-commit mode, we will check whether there exists read_ts, if exists,
// we will directly use it. The txnScope will be defined by the zone label, if it is not set, we will use
// global txnScope directly.
readTS := p.ctx.GetSessionVars().TxnReadTS.UseTxnReadTS()
readStaleness := p.ctx.GetSessionVars().ReadStaleness
var ts uint64
switch {
case readTS > 0:
ts = readTS
if node != nil {
p.err = ErrAsOf.FastGenWithCause("can't use select as of while already set transaction as of")
return
}
if !p.initedLastSnapshotTS {
p.SnapshotTSEvaluator = func(sessionctx.Context) (uint64, error) {
return ts, nil
}
p.LastSnapshotTS = ts
p.IsStaleness = true
}
case readTS == 0 && node != nil:
// If we didn't use read_ts, and node isn't nil, it means we use 'select table as of timestamp ... '
// for stale read
// It means we meet following case:
// select statement with as of timestamp
ts, p.err = calculateTsExpr(p.ctx, node)
if p.err != nil {
return
}
if err := sessionctx.ValidateStaleReadTS(context.Background(), p.ctx, ts); err != nil {
p.err = errors.Trace(err)
return
}
if !p.initedLastSnapshotTS {
p.SnapshotTSEvaluator = func(ctx sessionctx.Context) (uint64, error) {
return calculateTsExpr(ctx, node)
}
p.LastSnapshotTS = ts
p.IsStaleness = true
}
case readTS == 0 && node == nil && readStaleness != 0:
// If both readTS and node is empty while the readStaleness isn't, it means we meet following situation:
// set @@tidb_read_staleness='-5';
// select * from t;
// Then the following select statement should be affected by the tidb_read_staleness in session.
ts, p.err = calculateTsWithReadStaleness(p.ctx, readStaleness)
if p.err != nil {
return
}
if err := sessionctx.ValidateStaleReadTS(context.Background(), p.ctx, ts); err != nil {
p.err = errors.Trace(err)
return
}
if !p.initedLastSnapshotTS {
p.SnapshotTSEvaluator = func(ctx sessionctx.Context) (uint64, error) {
return calculateTsWithReadStaleness(p.ctx, readStaleness)
}
p.LastSnapshotTS = ts
p.IsStaleness = true
}
case readTS == 0 && node == nil && readStaleness == 0:
// If both readTS and node is empty while the readStaleness is empty,
// setting p.ReadReplicaScope is necessary to verify the txn scope later
// because we may be in a local txn without using the Stale Read.
p.ReadReplicaScope = scope
if p.err = p.staleReadProcessor.OnSelectTable(tn); p.err != nil {
return
}

// If the select statement is related to multi tables, we should grantee that all tables use the same timestamp
if p.LastSnapshotTS != ts {
p.err = ErrAsOf.GenWithStack("can not set different time in the as of")
if p.initedLastSnapshotTS {
return
}
if p.LastSnapshotTS != 0 {
dom := domain.GetDomain(p.ctx)
is, err := dom.GetSnapshotInfoSchema(p.LastSnapshotTS)
// if infoschema is empty, LastSnapshotTS init failed
if err != nil {
p.err = err
return
}
if is == nil {
p.err = fmt.Errorf("can not get any information schema based on snapshotTS: %d", p.LastSnapshotTS)
return
}
p.InfoSchema = temptable.AttachLocalTemporaryTableInfoSchema(p.ctx, is)

if p.IsStaleness = p.staleReadProcessor.IsStaleness(); p.IsStaleness {
p.LastSnapshotTS = p.staleReadProcessor.GetStalenessReadTS()
p.SnapshotTSEvaluator = p.staleReadProcessor.GetStalenessTSEvaluatorForPrepare()
p.InfoSchema = p.staleReadProcessor.GetStalenessInfoSchema()
}

// It is a little hacking for the below codes. `ReadReplicaScope` is used both by stale read's closest read and local txn.
// They are different features and the value for `ReadReplicaScope` will be conflicted in some scenes.
// But because local txn is still an experimental feature, we should make stale read work first.
if p.IsStaleness || p.ctx.GetSessionVars().GetReplicaRead().IsClosestRead() {
// When stale read or closet read is set, we read the tidb's locality as the read replica scope
p.ReadReplicaScope = config.GetTxnScopeFromConfig()
} else {
// Otherwise, use the scope from TxnCtx for local txn validation
p.ReadReplicaScope = p.ctx.GetSessionVars().TxnCtx.TxnScope
}

p.initedLastSnapshotTS = true
}

Expand Down
24 changes: 24 additions & 0 deletions sessiontxn/staleread/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package staleread

import (
mysql "github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/util/dbterror"
)

var (
errAsOf = dbterror.ClassOptimizer.NewStd(mysql.ErrAsOf)
)
Loading

0 comments on commit f79d2a5

Please sign in to comment.