Skip to content

Commit

Permalink
variable: add a varaible to read current timestamp (pingcap#3400)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored and siddontang committed Jun 5, 2017
1 parent 0cbc6c8 commit 7b6e447
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 7 deletions.
20 changes: 20 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import (
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/plan"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/testutil"
Expand Down Expand Up @@ -1420,3 +1422,21 @@ func (s *testSuite) TestTimestampTimeZone(c *C) {
tk.MustQuery(fmt.Sprintf("select * from t where ts = '%s'", tt.expect)).Check(testkit.Rows(tt.expect))
}
}

func (s *testSuite) TestTiDBCurrentTS(c *C) {
defer func() {
s.cleanEnv(c)
testleak.AfterTest(c)()
}()
tk := testkit.NewTestKit(c, s.store)
tk.MustQuery("select @@tidb_current_ts").Check(testkit.Rows("0"))
tk.MustExec("begin")
rows := tk.MustQuery("select @@tidb_current_ts").Rows()
tsStr := rows[0][0].(string)
c.Assert(tsStr, Equals, fmt.Sprintf("%d", tk.Se.Txn().StartTS()))
tk.MustExec("commit")
tk.MustQuery("select @@tidb_current_ts").Check(testkit.Rows("0"))

_, err := tk.Exec("set @@tidb_current_ts = '1'")
c.Assert(terror.ErrorEqual(err, variable.ErrReadOnly), IsTrue)
}
16 changes: 9 additions & 7 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ type Session interface {
String() string // For debug
CommitTxn() error
RollbackTxn() error
// For execute prepare statement in binary protocol.
// PrepareStmt executes prepare statement in binary protocol.
PrepareStmt(sql string) (stmtID uint32, paramCount int, fields []*ast.ResultField, err error)
// Execute a prepared statement.
// ExecutePreparedStmt executes a prepared statement.
ExecutePreparedStmt(stmtID uint32, param ...interface{}) (ast.RecordSet, error)
DropPreparedStmt(stmtID uint32) error
SetClientCapability(uint32) // Set client capability flags.
Expand Down Expand Up @@ -105,12 +105,12 @@ func (h *stmtHistory) add(stmtID uint32, st ast.Statement, stmtCtx *variable.Sta
}

type session struct {
// It's used by ShowProcess(), and should be modified atomically.
// processInfo is used by ShowProcess(), and should be modified atomically.
processInfo atomic.Value
txn kv.Transaction // current transaction
txnFuture *txnFuture
txnFutureCh chan *txnFuture
// For cancel the execution of current transaction.
// goCtx is used for cancelling the execution of current transaction.
goCtx goctx.Context
cancelFunc goctx.CancelFunc

Expand All @@ -121,7 +121,7 @@ type session struct {

store kv.Storage

// Used for test only.
// unlimitedRetryCount is used for test only.
unlimitedRetryCount bool

// For performance_schema only.
Expand Down Expand Up @@ -632,7 +632,7 @@ func (s *session) Execute(sql string) ([]ast.RecordSet, error) {
return rs, nil
}

// For execute prepare statement in binary protocol
// PrepareStmt is used for executing prepare statement in binary protocol
func (s *session) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields []*ast.ResultField, err error) {
if s.sessionVars.TxnCtx.InfoSchema == nil {
// We don't need to create a transaction for prepare statement, just get information schema will do.
Expand Down Expand Up @@ -715,6 +715,7 @@ func (s *session) DropPreparedStmt(stmtID uint32) error {
return nil
}

// GetTxn gets the current transaction or creates a new transaction.
// If forceNew is true, GetTxn() must return a new transaction.
// In this situation, if current transaction is still in progress,
// there will be an implicit commit and create a new transaction.
Expand Down Expand Up @@ -1025,7 +1026,7 @@ const loadCommonGlobalVarsSQL = "select * from mysql.global_variables where vari
variable.TiDBMaxRowCountForINLJ + quoteCommaQuote +
variable.TiDBDistSQLScanConcurrency + "')"

// LoadCommonGlobalVariableIfNeeded loads and applies commonly used global variables for the session.
// loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session.
func (s *session) loadCommonGlobalVariablesIfNeeded() error {
vars := s.sessionVars
if vars.CommonGlobalLoaded {
Expand Down Expand Up @@ -1128,6 +1129,7 @@ func (s *session) ActivePendingTxn() error {
return errors.Trace(future.err)
}
s.txn = future.txn
s.sessionVars.TxnCtx.StartTS = s.txn.StartTS()
err := s.loadCommonGlobalVariablesIfNeeded()
if err != nil {
return errors.Trace(err)
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type TransactionContext struct {
InfoSchema interface{}
Histroy interface{}
SchemaVersion int64
StartTS uint64
TableDeltaMap map[int64]TableDelta
}

Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const (
CodeUnknownSystemVar terror.ErrCode = 1193
CodeIncorrectScope terror.ErrCode = 1238
CodeUnknownTimeZone terror.ErrCode = 1298
CodeReadOnly terror.ErrCode = 1621
)

// Variable errors
Expand All @@ -68,6 +69,7 @@ var (
UnknownSystemVar = terror.ClassVariable.New(CodeUnknownSystemVar, "unknown system variable '%s'")
ErrIncorrectScope = terror.ClassVariable.New(CodeIncorrectScope, "Incorrect variable scope")
ErrUnknownTimeZone = terror.ClassVariable.New(CodeUnknownTimeZone, "unknown or incorrect time zone: %s")
ErrReadOnly = terror.ClassVariable.New(CodeReadOnly, "variable is read only")
)

func init() {
Expand All @@ -81,6 +83,7 @@ func init() {
CodeUnknownSystemVar: mysql.ErrUnknownSystemVariable,
CodeIncorrectScope: mysql.ErrIncorrectGlobalLocalVar,
CodeUnknownTimeZone: mysql.ErrUnknownTimeZone,
CodeReadOnly: mysql.ErrVariableIsReadonly,
}
terror.ErrClassToMySQLCodes[terror.ClassVariable] = mySQLErrCodes
}
Expand Down Expand Up @@ -606,6 +609,7 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, TiDBMaxRowCountForINLJ, strconv.Itoa(DefMaxRowCountForINLJ)},
{ScopeGlobal | ScopeSession, TiDBSkipUTF8Check, boolToIntStr(DefSkipUTF8Check)},
{ScopeSession, TiDBBatchInsert, boolToIntStr(DefBatchInsert)},
{ScopeSession, TiDBCurrentTS, strconv.Itoa(DefCurretTS)},
}

// SetNamesVariables is the system variable names related to set names statements.
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ const (
// those indices can be scanned concurrently, with the cost of higher system performance impact.
TiDBBuildStatsConcurrency = "tidb_build_stats_concurrency"

// TiDBCurrentTS is used to get the current transaction timestamp.
// It is read-only.
TiDBCurrentTS = "tidb_current_ts"

/* Session and global */

// tidb_distsql_scan_concurrency is used to set the concurrency of a distsql scan task.
Expand Down Expand Up @@ -106,4 +110,5 @@ const (
DefOptAggPushDown = true
DefOptInSubqUnfolding = false
DefBatchInsert = false
DefCurretTS = 0
)
9 changes: 9 additions & 0 deletions sessionctx/varsutil/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"strings"
"time"

"fmt"
"github.com/juju/errors"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx/variable"
Expand All @@ -33,6 +34,12 @@ func GetSessionSystemVar(s *variable.SessionVars, key string) (string, error) {
if sysVar == nil {
return "", variable.UnknownSystemVar.GenByArgs(key)
}
// For virtual system varaibles:
switch sysVar.Name {
case variable.TiDBCurrentTS:
return fmt.Sprintf("%d", s.TxnCtx.StartTS), nil
}

sVal, ok := s.Systems[key]
if ok {
return sVal, nil
Expand Down Expand Up @@ -137,6 +144,8 @@ func SetSessionSystemVar(vars *variable.SessionVars, name string, value types.Da
vars.BatchInsert = tidbOptOn(sVal)
case variable.TiDBMaxRowCountForINLJ:
vars.MaxRowCountForINLJ = tidbOptPositiveInt(sVal, variable.DefMaxRowCountForINLJ)
case variable.TiDBCurrentTS:
return variable.ErrReadOnly
}
vars.Systems[name] = sVal
return nil
Expand Down

0 comments on commit 7b6e447

Please sign in to comment.