Skip to content

Commit

Permalink
txn: support read consistency read with ts checking (pingcap#32922)
Browse files Browse the repository at this point in the history
  • Loading branch information
cfzjywxk authored Mar 16, 2022
1 parent 44aae22 commit d4d43ba
Show file tree
Hide file tree
Showing 25 changed files with 446 additions and 71 deletions.
6 changes: 5 additions & 1 deletion distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,15 +250,19 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
// Concurrency may be set to 1 by SetDAGRequest
builder.Request.Concurrency = sv.DistSQLScanConcurrency()
}
replicaReadType := sv.GetReplicaRead()
if sv.StmtCtx.WeakConsistency {
builder.Request.IsolationLevel = kv.RC
} else if sv.StmtCtx.RCCheckTS {
builder.Request.IsolationLevel = kv.RCCheckTS
replicaReadType = kv.ReplicaReadLeader
} else {
builder.Request.IsolationLevel = builder.getIsolationLevel()
}
builder.Request.NotFillCache = sv.StmtCtx.NotFillCache
builder.Request.TaskID = sv.StmtCtx.TaskID
builder.Request.Priority = builder.getKVPriority(sv)
builder.Request.ReplicaRead = sv.GetReplicaRead()
builder.Request.ReplicaRead = replicaReadType
builder.SetResourceGroupTagger(sv.StmtCtx)
return builder
}
Expand Down
1 change: 1 addition & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,7 @@ func UpdateForUpdateTS(seCtx sessionctx.Context, newForUpdateTS uint64) error {
}
seCtx.GetSessionVars().TxnCtx.SetForUpdateTS(newForUpdateTS)
txn.SetOption(kv.SnapshotTS, seCtx.GetSessionVars().TxnCtx.GetForUpdateTS())
seCtx.GetSessionVars().TxnCtx.LastRcReadTs = newForUpdateTS
return nil
}

Expand Down
5 changes: 4 additions & 1 deletion executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ func (e *BatchPointGetExec) Open(context.Context) error {
} else {
snapshot = e.ctx.GetSnapshotWithTS(e.snapshotTS)
}
if e.ctx.GetSessionVars().StmtCtx.RCCheckTS {
snapshot.SetOption(kv.IsolationLevel, kv.RCCheckTS)
}
if e.cacheTable != nil {
snapshot = cacheTableSnapshot{snapshot, e.cacheTable}
}
Expand All @@ -129,7 +132,7 @@ func (e *BatchPointGetExec) Open(context.Context) error {
stmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
replicaReadType := e.ctx.GetSessionVars().GetReplicaRead()
if replicaReadType.IsFollowerRead() {
if replicaReadType.IsFollowerRead() && !e.ctx.GetSessionVars().StmtCtx.RCCheckTS {
snapshot.SetOption(kv.ReplicaRead, replicaReadType)
}
snapshot.SetOption(kv.TaskID, stmtCtx.TaskID)
Expand Down
9 changes: 9 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2168,6 +2168,15 @@ func (b *executorBuilder) refreshForUpdateTSForRC() error {
defer func() {
b.snapshotTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
}()
// The first time read-consistency read is executed and `RcReadCheckTS` is enabled, try to use
// the last valid ts as the for update read ts.
if b.ctx.GetSessionVars().StmtCtx.RCCheckTS {
rcReadTS := b.ctx.GetSessionVars().TxnCtx.LastRcReadTs
if rcReadTS == 0 {
rcReadTS = b.ctx.GetSessionVars().TxnCtx.StartTS
}
return UpdateForUpdateTS(b.ctx, rcReadTS)
}
future := b.ctx.GetSessionVars().TxnCtx.GetStmtFutureForRC()
if future == nil {
return nil
Expand Down
15 changes: 15 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1881,6 +1881,11 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
sc.NotFillCache = !opts.SQLCache
}
sc.WeakConsistency = isWeakConsistencyRead(ctx, stmt)
// Try to mark the `RCCheckTS` flag for the first time execution of in-transaction read requests
// using read-consistency isolation level.
if NeedSetRCCheckTSFlag(ctx, stmt) {
sc.RCCheckTS = true
}
case *ast.SetOprStmt:
sc.InSelectStmt = true
sc.OverflowAsWarning = true
Expand Down Expand Up @@ -2006,3 +2011,13 @@ func isWeakConsistencyRead(ctx sessionctx.Context, node ast.Node) bool {
return sessionVars.ConnectionID > 0 && sessionVars.ReadConsistency.IsWeak() &&
plannercore.IsAutoCommitTxn(ctx) && plannercore.IsReadOnly(node, sessionVars)
}

// NeedSetRCCheckTSFlag checks whether it's needed to set `RCCheckTS` flag in current stmtctx.
func NeedSetRCCheckTSFlag(ctx sessionctx.Context, node ast.Node) bool {
sessionVars := ctx.GetSessionVars()
if sessionVars.ConnectionID > 0 && sessionVars.RcReadCheckTS && sessionVars.InTxn() &&
sessionVars.IsPessimisticReadConsistency() && !sessionVars.RetryInfo.Retrying && plannercore.IsReadOnly(node, sessionVars) {
return true
}
return false
}
5 changes: 4 additions & 1 deletion executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ func (e *PointGetExecutor) Open(context.Context) error {
} else {
e.snapshot = e.ctx.GetSnapshotWithTS(snapshotTS)
}
if e.ctx.GetSessionVars().StmtCtx.RCCheckTS {
e.snapshot.SetOption(kv.IsolationLevel, kv.RCCheckTS)
}
if e.cacheTable != nil {
e.snapshot = cacheTableSnapshot{e.snapshot, e.cacheTable}
}
Expand All @@ -171,7 +174,7 @@ func (e *PointGetExecutor) Open(context.Context) error {
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
readReplicaType := e.ctx.GetSessionVars().GetReplicaRead()
if readReplicaType.IsFollowerRead() {
if readReplicaType.IsFollowerRead() && !e.ctx.GetSessionVars().StmtCtx.RCCheckTS {
e.snapshot.SetOption(kv.ReplicaRead, readReplicaType)
}
e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,4 +513,6 @@ const (
SI IsoLevel = iota
// RC stands for 'read committed'.
RC
// RCCheckTS stands for 'read consistency read with ts check'.
RCCheckTS
)
11 changes: 11 additions & 0 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1849,6 +1849,17 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {
}
retryable, err = cc.handleStmt(ctx, stmt, parserWarns, i == len(stmts)-1)
if err != nil {
if retryable && cc.ctx.GetSessionVars().IsRcCheckTsRetryable(err) {
cc.ctx.GetSessionVars().RetryInfo.Retrying = true
logutil.Logger(ctx).Info("RC read with ts checking has failed, retry RC read",
zap.String("sql", cc.ctx.GetSessionVars().StmtCtx.OriginalSQL))
_, err = cc.handleStmt(ctx, stmt, parserWarns, i == len(stmts)-1)
cc.ctx.GetSessionVars().RetryInfo.Retrying = false
if err != nil {
break
}
continue
}
if !retryable || !errors.ErrorEqual(err, storeerr.ErrTiFlashServerTimeout) {
break
}
Expand Down
10 changes: 10 additions & 0 deletions server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/topsql"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)

func (cc *clientConn) handleStmtPrepare(ctx context.Context, sql string) error {
Expand Down Expand Up @@ -205,6 +207,14 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e
ctx = context.WithValue(ctx, execdetails.StmtExecDetailKey, &execdetails.StmtExecDetails{})
ctx = context.WithValue(ctx, util.ExecDetailsKey, &util.ExecDetails{})
retryable, err := cc.executePreparedStmtAndWriteResult(ctx, stmt, args, useCursor)
if retryable && err != nil && cc.ctx.GetSessionVars().IsRcCheckTsRetryable(err) {
logutil.Logger(ctx).Info("RC read using start_ts has failed, retry RC read",
zap.String("sql", cc.ctx.GetSessionVars().StmtCtx.OriginalSQL))
cc.ctx.GetSessionVars().RetryInfo.Retrying = true
_, err = cc.executePreparedStmtAndWriteResult(ctx, stmt, args, useCursor)
cc.ctx.GetSessionVars().RetryInfo.Retrying = false
return err
}
_, allowTiFlashFallback := cc.ctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
if allowTiFlashFallback && err != nil && errors.ErrorEqual(err, storeerr.ErrTiFlashServerTimeout) && retryable {
// When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash
Expand Down
61 changes: 61 additions & 0 deletions server/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2316,3 +2316,64 @@ func TestLocalhostClientMapping(t *testing.T) {
err = dbSocket.Ping()
require.Errorf(t, err, "Connection successful without matching host for unix domain socket!")
}

func TestRcReadCheckTS(t *testing.T) {
ts, cleanup := createTidbTestSuite(t)
defer cleanup()

db, err := sql.Open("mysql", ts.getDSN())
require.NoError(t, err)
defer func() {
err := db.Close()
require.NoError(t, err)
}()

db2, err := sql.Open("mysql", ts.getDSN())
require.NoError(t, err)
defer func() {
err := db2.Close()
require.NoError(t, err)
}()
tk2 := testkit.NewDBTestKit(t, db2)
tk2.MustExec("set @@tidb_enable_async_commit = 0")
tk2.MustExec("set @@tidb_enable_1pc = 0")

cli := newTestServerClient()

tk := testkit.NewDBTestKit(t, db)
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(c1 int key, c2 int)")
tk.MustExec("insert into t1 values(1, 10), (2, 20), (3, 30)")

tk.MustExec(`set tidb_rc_read_check_ts = 'on';`)
tk.MustExec(`set tx_isolation = 'READ-COMMITTED';`)
tk.MustExec("begin pessimistic")
// Test point get retry.
rows := tk.MustQuery("select * from t1 where c1 = 1")
cli.checkRows(t, rows, "1 10")
tk2.MustExec("update t1 set c2 = c2 + 1")
rows = tk.MustQuery("select * from t1 where c1 = 1")
cli.checkRows(t, rows, "1 11")
// Test batch point get retry.
rows = tk.MustQuery("select * from t1 where c1 in (1, 3)")
cli.checkRows(t, rows, "1 11", "3 31")
tk2.MustExec("update t1 set c2 = c2 + 1")
rows = tk.MustQuery("select * from t1 where c1 in (1, 3)")
cli.checkRows(t, rows, "1 12", "3 32")
// Test scan retry.
rows = tk.MustQuery("select * from t1")
cli.checkRows(t, rows, "1 12", "2 22", "3 32")
tk2.MustExec("update t1 set c2 = c2 + 1")
rows = tk.MustQuery("select * from t1")
cli.checkRows(t, rows, "1 13", "2 23", "3 33")
// Test reverse scan retry.
rows = tk.MustQuery("select * from t1 order by c1 desc")
cli.checkRows(t, rows, "3 33", "2 23", "1 13")
tk2.MustExec("update t1 set c2 = c2 + 1")
rows = tk.MustQuery("select * from t1 order by c1 desc")
cli.checkRows(t, rows, "3 34", "2 24", "1 14")

// Test retry caused by ongoing prewrite lock.
// As the `defaultLockTTL` is 3s and it's difficult to change it here, the lock
// test is implemented in the uft test cases.
}
7 changes: 6 additions & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3154,7 +3154,12 @@ func (s *session) PrepareTSFuture(ctx context.Context) {
s.txn.changeInvalidToPending(txnFuture)
} else if s.txn.Valid() && s.GetSessionVars().IsPessimisticReadConsistency() {
// Prepare the statement future if the transaction is valid in RC transactions.
s.GetSessionVars().TxnCtx.SetStmtFutureForRC(s.getTxnFuture(ctx).future)
// If the `RCCheckTS` is used, try to use the last valid ts to read.
if s.GetSessionVars().StmtCtx.RCCheckTS {
s.GetSessionVars().TxnCtx.SetStmtFutureForRC(nil)
} else {
s.GetSessionVars().TxnCtx.SetStmtFutureForRC(s.getTxnFuture(ctx).future)
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ type StatementContext struct {

// SysdateIsNow indicates whether sysdate() is an alias of now() in this statement
SysdateIsNow bool

// RCCheckTS indicates the current read-consistency read select statement will use `RCCheckTS` path.
RCCheckTS bool
}

// StmtHints are SessionVars related sql hints.
Expand Down
15 changes: 15 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type RetryInfo struct {
DroppedPreparedStmtIDs []uint32
autoIncrementIDs retryInfoAutoIDs
autoRandomIDs retryInfoAutoIDs
LastRcReadTS uint64
}

// Clean does some clean work.
Expand Down Expand Up @@ -182,6 +183,9 @@ type TransactionContext struct {

// CachedTables is not nil if the transaction write on cached table.
CachedTables map[int64]interface{}

// Last ts used by read-consistency read.
LastRcReadTs uint64
}

// GetShard returns the shard prefix for the next `count` rowids.
Expand Down Expand Up @@ -1012,6 +1016,8 @@ type SessionVars struct {
IgnorePreparedCacheCloseStmt bool
// BatchPendingTiFlashCount shows the threshold of pending TiFlash tables when batch adding.
BatchPendingTiFlashCount int
// RcReadCheckTS indicates if ts check optimization is enabled for current session.
RcReadCheckTS bool
}

// InitStatementContext initializes a StatementContext, the object is reused to reduce allocation.
Expand Down Expand Up @@ -2352,3 +2358,12 @@ func (s *SessionVars) GetSeekFactor(tbl *model.TableInfo) float64 {
}
return s.seekFactor
}

// IsRcCheckTsRetryable checks if the current error is retryable for `RcReadCheckTS` path.
func (s *SessionVars) IsRcCheckTsRetryable(err error) bool {
if err == nil {
return false
}
// The `RCCheckTS` flag of `stmtCtx` is set.
return s.RcReadCheckTS && s.StmtCtx.RCCheckTS && errors.ErrorEqual(err, kv.ErrWriteConflict)
}
4 changes: 4 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,10 @@ var defaultSysVars = []*SysVar{
return nil
},
},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBRCReadCheckTS, Type: TypeBool, Value: BoolToOnOff(DefRCReadCheckTS), SetSession: func(s *SessionVars, val string) error {
s.RcReadCheckTS = TiDBOptOn(val)
return nil
}},
}

// FeedbackProbability points to the FeedbackProbability in statistics package.
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,8 @@ const (
TiDBStatsLoadPseudoTimeout = "tidb_stats_load_pseudo_timeout"
// TiDBMemQuotaBindingCache indicates the memory quota for the bind cache.
TiDBMemQuotaBindingCache = "tidb_mem_quota_binding_cache"
// TiDBRCReadCheckTS indicates the tso optimization for read-consistency read is enabled.
TiDBRCReadCheckTS = "tidb_rc_read_check_ts"
)

// TiDB intentional limits
Expand Down Expand Up @@ -809,6 +811,7 @@ const (
DefTiDBTxnAssertionLevel = AssertionOffStr
DefTiDBIgnorePreparedCacheCloseStmt = false
DefTiDBBatchPendingTiFlashCount = 4000
DefRCReadCheckTS = false
)

// Process global variables.
Expand Down
5 changes: 5 additions & 0 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,9 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
zap.Uint64("regionID", task.region.GetID()),
zap.String("storeAddr", task.storeAddr),
zap.Error(err))
if strings.Contains(err.Error(), "write conflict") {
return nil, kv.ErrWriteConflict
}
return nil, errors.Trace(err)
}
// When the request is using streaming API, the `Range` is not nil.
Expand Down Expand Up @@ -1342,6 +1345,8 @@ func isolationLevelToPB(level kv.IsoLevel) kvrpcpb.IsolationLevel {
return kvrpcpb.IsolationLevel_RC
case kv.SI:
return kvrpcpb.IsolationLevel_SI
case kv.RCCheckTS:
return kvrpcpb.IsolationLevel_RCCheckTS
default:
return kvrpcpb.IsolationLevel_SI
}
Expand Down
2 changes: 2 additions & 0 deletions store/driver/txn/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ func getTiKVIsolationLevel(level kv.IsoLevel) txnsnapshot.IsoLevel {
return txnsnapshot.SI
case kv.RC:
return txnsnapshot.RC
case kv.RCCheckTS:
return txnsnapshot.RCCheckTS
default:
return txnsnapshot.SI
}
Expand Down
12 changes: 9 additions & 3 deletions store/mockstore/unistore/cophandler/cop_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/store/mockstore/unistore/client"
"github.com/pingcap/tidb/store/mockstore/unistore/lockstore"
"github.com/pingcap/tidb/store/mockstore/unistore/tikv/dbreader"
"github.com/pingcap/tidb/store/mockstore/unistore/tikv/kverrors"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -436,12 +437,17 @@ func buildRespWithMPPExec(chunks []tipb.Chunk, counts, ndvs []int64, exec mppExe
resp.ExecDetailsV2 = &kvrpcpb.ExecDetailsV2{
TimeDetail: resp.ExecDetails.TimeDetail,
}
data, err := proto.Marshal(selResp)
if err != nil {
resp.OtherError = err.Error()
data, mErr := proto.Marshal(selResp)
if mErr != nil {
resp.OtherError = mErr.Error()
return resp
}
resp.Data = data
if err != nil {
if conflictErr, ok := errors.Cause(err).(*kverrors.ErrConflict); ok {
resp.OtherError = conflictErr.Error()
}
}
return resp
}

Expand Down
Loading

0 comments on commit d4d43ba

Please sign in to comment.