Skip to content

Commit

Permalink
*: fix Command and Time in show processlist (pingcap#7844)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lingyu Song authored Oct 12, 2018
1 parent dbdd806 commit 48704b8
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 65 deletions.
38 changes: 17 additions & 21 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"math"
"strings"
"sync/atomic"
"time"

"github.com/pingcap/tidb/ast"
Expand All @@ -41,17 +42,16 @@ import (
)

type processinfoSetter interface {
SetProcessInfo(string)
SetProcessInfo(string, time.Time, byte)
}

// recordSet wraps an executor, implements ast.RecordSet interface
type recordSet struct {
fields []*ast.ResultField
executor Executor
stmt *ExecStmt
processinfo processinfoSetter
lastErr error
txnStartTS uint64
fields []*ast.ResultField
executor Executor
stmt *ExecStmt
lastErr error
txnStartTS uint64
}

func (a *recordSet) Fields() []*ast.ResultField {
Expand Down Expand Up @@ -119,9 +119,6 @@ func (a *recordSet) NewChunk() *chunk.Chunk {
func (a *recordSet) Close() error {
err := a.executor.Close()
a.stmt.logSlowQuery(a.txnStartTS, a.lastErr == nil)
if a.processinfo != nil {
a.processinfo.SetProcessInfo("")
}
return errors.Trace(err)
}

Expand Down Expand Up @@ -216,6 +213,8 @@ func (a *ExecStmt) Exec(ctx context.Context) (ast.RecordSet, error) {
return nil, errors.Trace(err)
}

cmd32 := atomic.LoadUint32(&sctx.GetSessionVars().CommandValue)
cmd := byte(cmd32)
var pi processinfoSetter
if raw, ok := sctx.(processinfoSetter); ok {
pi = raw
Expand All @@ -227,27 +226,27 @@ func (a *ExecStmt) Exec(ctx context.Context) (ast.RecordSet, error) {
}
}
// Update processinfo, ShowProcess() will use it.
pi.SetProcessInfo(sql)
pi.SetProcessInfo(sql, time.Now(), cmd)
}

// If the executor doesn't return any result to the client, we execute it without delay.
if e.Schema().Len() == 0 {
return a.handleNoDelayExecutor(ctx, sctx, e, pi)
return a.handleNoDelayExecutor(ctx, sctx, e)
} else if proj, ok := e.(*ProjectionExec); ok && proj.calculateNoDelay {
// Currently this is only for the "DO" statement. Take "DO 1, @a=2;" as an example:
// the Projection has two expressions and two columns in the schema, but we should
// not return the result of the two expressions.
return a.handleNoDelayExecutor(ctx, sctx, e, pi)
return a.handleNoDelayExecutor(ctx, sctx, e)
}

return &recordSet{
executor: e,
stmt: a,
processinfo: pi,
txnStartTS: sctx.Txn().StartTS(),
executor: e,
stmt: a,
txnStartTS: sctx.Txn().StartTS(),
}, nil
}

func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e Executor, pi processinfoSetter) (ast.RecordSet, error) {
func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e Executor) (ast.RecordSet, error) {
// Check if "tidb_snapshot" is set for the write executors.
// In history read mode, we can not do write operations.
switch e.(type) {
Expand All @@ -260,9 +259,6 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co

var err error
defer func() {
if pi != nil {
pi.SetProcessInfo("")
}
terror.Log(errors.Trace(e.Close()))
txnTS := uint64(0)
if sctx.Txn() != nil {
Expand Down
24 changes: 0 additions & 24 deletions executor/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/auth"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testutil"
Expand Down Expand Up @@ -469,33 +468,10 @@ type mockSessionManager struct {
session.Session
}

// ShowProcessList implements the SessionManager.ShowProcessList interface.
func (msm *mockSessionManager) ShowProcessList() map[uint64]util.ProcessInfo {
ps := msm.ShowProcess()
return map[uint64]util.ProcessInfo{ps.ID: ps}
}

// Kill implements the SessionManager.Kill interface.
func (msm *mockSessionManager) Kill(cid uint64, query bool) {
}

func (s *testSuite) TestShowFullProcessList(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("select 1") // for tk.Se init

se := tk.Se
se.SetSessionManager(&mockSessionManager{se})

fullSQL := "show full processlist"
simpSQL := "show processlist"

cols := []int{4, 5, 6, 7} // columns to check: Command, Time, State, Info
tk.MustQuery(fullSQL).CheckAt(cols, testutil.RowsWithSep("|", "Query|0|2|"+fullSQL))
tk.MustQuery(simpSQL).CheckAt(cols, testutil.RowsWithSep("|", "Query|0|2|"+simpSQL[:100]))

se.SetSessionManager(nil) // reset sm so other tests won't use this
}

type stats struct {
}

Expand Down
37 changes: 37 additions & 0 deletions mysql/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ const (
ComDaemon
ComBinlogDumpGtid
ComResetConnection
ComEnd
)

// Client information.
Expand Down Expand Up @@ -278,6 +279,42 @@ var Priv2UserCol = map[PrivilegeType]string{
IndexPriv: "Index_priv",
}

// Command2Str is the command information to command name.
var Command2Str = map[byte]string{
ComSleep: "Sleep",
ComQuit: "Quit",
ComInitDB: "Init DB",
ComQuery: "Query",
ComFieldList: "Field List",
ComCreateDB: "Create DB",
ComDropDB: "Drop DB",
ComRefresh: "Refresh",
ComShutdown: "Shutdown",
ComStatistics: "Statistics",
ComProcessInfo: "Processlist",
ComConnect: "Connect",
ComProcessKill: "Kill",
ComDebug: "Debug",
ComPing: "Ping",
ComTime: "Time",
ComDelayedInsert: "Delayed Insert",
ComChangeUser: "Change User",
ComBinlogDump: "Binlog Dump",
ComTableDump: "Table Dump",
ComConnectOut: "Connect out",
ComRegisterSlave: "Register Slave",
ComStmtPrepare: "Prepare",
ComStmtExecute: "Execute",
ComStmtSendLongData: "Long Data",
ComStmtClose: "Close stmt",
ComStmtReset: "Reset stmt",
ComSetOption: "Set option",
ComStmtFetch: "Fetch",
ComDaemon: "Daemon",
ComBinlogDumpGtid: "Binlog Dump",
ComResetConnection: "Reset connect",
}

// Col2PrivType is the privilege tables column name to privilege type.
var Col2PrivType = map[string]PrivilegeType{
"Create_priv": CreatePriv,
Expand Down
14 changes: 14 additions & 0 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,15 +584,29 @@ func (cc *clientConn) dispatch(data []byte) error {
cc.mu.cancelFunc = cancelFunc
cc.mu.Unlock()

t := time.Now()
cmd := data[0]
data = data[1:]
cc.lastCmd = hack.String(data)
token := cc.server.getToken()
defer func() {
cc.ctx.SetProcessInfo("", t, mysql.ComSleep)
cc.server.releaseToken(token)
span.Finish()
}()

if cmd < mysql.ComEnd {
cc.ctx.SetCommandValue(cmd)
}

switch cmd {
case mysql.ComPing, mysql.ComStmtClose, mysql.ComStmtSendLongData, mysql.ComStmtReset,
mysql.ComSetOption, mysql.ComChangeUser:
cc.ctx.SetProcessInfo("", t, cmd)
case mysql.ComInitDB:
cc.ctx.SetProcessInfo("use "+hack.String(data), t, cmd)
}

switch cmd {
case mysql.ComSleep:
// TODO: According to mysql document, this command is supposed to be used only internally.
Expand Down
6 changes: 6 additions & 0 deletions server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"fmt"
"math"
"strconv"
"time"

"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -217,6 +218,11 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err
return mysql.NewErr(mysql.ErrUnknownStmtHandler,
strconv.FormatUint(uint64(stmtID), 10), "stmt_fetch")
}
sql := ""
if prepared, ok := cc.ctx.GetStatement(int(stmtID)).(*TiDBStatement); ok {
sql = prepared.sql
}
cc.ctx.SetProcessInfo(sql, time.Now(), mysql.ComStmtExecute)
rs := stmt.GetResultSet()
if rs == nil {
return mysql.NewErr(mysql.ErrUnknownStmtHandler,
Expand Down
5 changes: 5 additions & 0 deletions server/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package server
import (
"crypto/tls"
"fmt"
"time"

"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -47,6 +48,8 @@ type QueryCtx interface {
// SetValue saves a value associated with this context for key.
SetValue(key fmt.Stringer, value interface{})

SetProcessInfo(sql string, t time.Time, command byte)

// CommitTxn commits the transaction operations.
CommitTxn(ctx context.Context) error

Expand Down Expand Up @@ -86,6 +89,8 @@ type QueryCtx interface {
// GetSessionVars return SessionVars.
GetSessionVars() *variable.SessionVars

SetCommandValue(command byte)

SetSessionManager(util.SessionManager)
}

Expand Down
13 changes: 13 additions & 0 deletions server/driver_tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package server
import (
"crypto/tls"
"fmt"
"time"

"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -59,6 +60,7 @@ type TiDBStatement struct {
paramsType []byte
ctx *TiDBContext
rs ResultSet
sql string
}

// ID implements PreparedStatement ID method.
Expand Down Expand Up @@ -207,6 +209,11 @@ func (tc *TiDBContext) CommitTxn(ctx context.Context) error {
return tc.session.CommitTxn(ctx)
}

// SetProcessInfo implements QueryCtx SetProcessInfo method.
func (tc *TiDBContext) SetProcessInfo(sql string, t time.Time, command byte) {
tc.session.SetProcessInfo(sql, t, command)
}

// RollbackTxn implements QueryCtx RollbackTxn method.
func (tc *TiDBContext) RollbackTxn() error {
return tc.session.RollbackTxn(context.TODO())
Expand Down Expand Up @@ -300,6 +307,7 @@ func (tc *TiDBContext) Prepare(sql string) (statement PreparedStatement, columns
return
}
stmt := &TiDBStatement{
sql: sql,
id: stmtID,
numParams: paramCount,
boundParams: make([][]byte, paramCount),
Expand All @@ -325,6 +333,11 @@ func (tc *TiDBContext) ShowProcess() util.ProcessInfo {
return tc.session.ShowProcess()
}

// SetCommandValue implements QueryCtx SetCommandValue method.
func (tc *TiDBContext) SetCommandValue(command byte) {
tc.session.SetCommandValue(command)
}

// GetSessionVars return SessionVars.
func (tc *TiDBContext) GetSessionVars() *variable.SessionVars {
return tc.session.GetSessionVars()
Expand Down
23 changes: 23 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,29 @@ func checkErrorCode(c *C, e error, codes ...uint16) {
c.Assert(isMatchCode, IsTrue, Commentf("got err %v, expected err codes %v", me, codes))
}

func runTestShowProcessList(c *C) {
runTests(c, nil, func(dbt *DBTest) {
fullSQL := "show full processlist"
simpSQL := "show processlist"
rows := dbt.mustQuery(fullSQL)
c.Assert(rows.Next(), IsTrue)
var outA, outB, outC, outD, outE, outF, outG, outH, outI string
err := rows.Scan(&outA, &outB, &outC, &outD, &outE, &outF, &outG, &outH, &outI)
c.Assert(err, IsNil)
c.Assert(outE, Equals, "Query")
c.Assert(outF, Equals, "0")
c.Assert(outG, Equals, "2")
c.Assert(outH, Equals, fullSQL)
rows = dbt.mustQuery(simpSQL)
err = rows.Scan(&outA, &outB, &outC, &outD, &outE, &outF, &outG, &outH, &outI)
c.Assert(err, IsNil)
c.Assert(outE, Equals, "Query")
c.Assert(outF, Equals, "0")
c.Assert(outG, Equals, "2")
c.Assert(outH, Equals, simpSQL[:100])
})
}

func runTestAuth(c *C) {
runTests(c, nil, func(dbt *DBTest) {
dbt.mustExec(`CREATE USER 'authtest'@'%' IDENTIFIED BY '123';`)
Expand Down
14 changes: 0 additions & 14 deletions server/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,17 +526,3 @@ func (ts *TidbTestSuite) TestSumAvg(c *C) {
c.Parallel()
runTestSumAvg(c)
}

func (ts *TidbTestSuite) TestShowProcess(c *C) {
qctx, err := ts.tidbdrv.OpenCtx(uint64(0), 0, uint8(tmysql.DefaultCollationID), "test", nil)
c.Assert(err, IsNil)
ctx := context.Background()
results, err := qctx.Execute(ctx, "select 1")
c.Assert(err, IsNil)
pi := qctx.ShowProcess()
c.Assert(pi.Command, Equals, "Query")
results[0].Close()
pi = qctx.ShowProcess()
c.Assert(pi.Command, Equals, "Sleep")
qctx.Close()
}
Loading

0 comments on commit 48704b8

Please sign in to comment.