Skip to content

Commit

Permalink
*: slow log and metrics includes execute prepared statement. (pingcap…
Browse files Browse the repository at this point in the history
…#2512)

Execute prepared statement was ignored in slow query log and query metrics.
  • Loading branch information
coocood authored and shenli committed Jan 21, 2017
1 parent 869dfda commit 0d97442
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 60 deletions.
3 changes: 0 additions & 3 deletions ast/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,6 @@ type Statement interface {
// OriginText gets the origin SQL text.
OriginText() string

// SetText sets the executive SQL text.
SetText(text string)

// Exec executes SQL and gets a Recordset.
Exec(ctx context.Context) (RecordSet, error)
}
Expand Down
60 changes: 43 additions & 17 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
package executor

import (
"fmt"
"time"

"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/plan"
Expand All @@ -27,13 +30,13 @@ import (
type recordSet struct {
fields []*ast.ResultField
executor Executor
schema expression.Schema
ctx context.Context
stmt *statement
err error
}

func (a *recordSet) Fields() ([]*ast.ResultField, error) {
if len(a.fields) == 0 {
for _, col := range a.schema.Columns {
for _, col := range a.executor.Schema().Columns {
rf := &ast.ResultField{
ColumnAsName: col.ColName,
TableAsName: col.TblName,
Expand All @@ -58,31 +61,32 @@ func (a *recordSet) Next() (*ast.Row, error) {
}

func (a *recordSet) Close() error {
return a.executor.Close()
err := a.executor.Close()
a.stmt.logSlowQuery()
return errors.Trace(err)
}

// statement implements the ast.Statement interface, it builds a plan.Plan to an ast.Statement.
type statement struct {
// The InfoSchema cannot change during execution, so we hold a reference to it.
is infoschema.InfoSchema
plan plan.Plan
text string
is infoschema.InfoSchema
ctx context.Context
text string
plan plan.Plan
startTime time.Time
}

func (a *statement) OriginText() string {
return a.text
}

func (a *statement) SetText(text string) {
a.text = text
return
}

// Exec implements the ast.Statement Exec interface.
// This function builds an Executor from a plan. If the Executor doesn't return result,
// like the INSERT, UPDATE statements, it executes in this function, if the Executor returns
// result, execution is done after this function returns, in the returned ast.RecordSet Next method.
func (a *statement) Exec(ctx context.Context) (ast.RecordSet, error) {
a.startTime = time.Now()
a.ctx = ctx
if _, ok := a.plan.(*plan.Execute); !ok {
// Do not sync transaction for Execute statement, because the real optimization work is done in
// "ExecuteExec.Build".
Expand All @@ -104,7 +108,8 @@ func (a *statement) Exec(ctx context.Context) (ast.RecordSet, error) {
if err != nil {
return nil, errors.Trace(err)
}
stmtCount(executorExec.Stmt, executorExec.Plan)
a.text = executorExec.Stmt.Text()
a.plan = executorExec.Plan
e = executorExec.StmtExec
}

Expand All @@ -120,7 +125,10 @@ func (a *statement) Exec(ctx context.Context) (ast.RecordSet, error) {
}
}

defer e.Close()
defer func() {
e.Close()
a.logSlowQuery()
}()
for {
row, err := e.Next()
if err != nil {
Expand All @@ -137,7 +145,25 @@ func (a *statement) Exec(ctx context.Context) (ast.RecordSet, error) {
}
return &recordSet{
executor: e,
schema: e.Schema(),
ctx: ctx,
stmt: a,
}, nil
}

const (
queryLogMaxLen = 2048
slowThreshold = 300 * time.Millisecond
)

func (a *statement) logSlowQuery() {
costTime := time.Since(a.startTime)
sql := a.text
if len(sql) > queryLogMaxLen {
sql = sql[:queryLogMaxLen] + fmt.Sprintf("(len:%d)", len(sql))
}
connID := a.ctx.GetSessionVars().ConnectionID
if costTime < slowThreshold {
log.Debugf("[%d][TIME_QUERY] %v %s", connID, costTime, sql)
} else {
log.Warnf("[%d][TIME_QUERY] %v %s", connID, costTime, sql)
}
}
97 changes: 62 additions & 35 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ import (
"io"
"net"
"runtime"
"strconv"
"strings"
"time"

"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/arena"
Expand Down Expand Up @@ -342,37 +344,85 @@ func (cc *clientConn) Run() {
return
}

if err := cc.dispatch(data); err != nil {
startTime := time.Now()
if err = cc.dispatch(data); err != nil {
if terror.ErrorEqual(err, io.EOF) {
cc.addMetrics(data[0], startTime, nil)
return
}
cmd := string(data[1:])
if len(cmd) > size {
log.Warnf("[%d] dispatch error:\n%s\n%s\n%s (len %d)", cc.connectionID, cc, cmd[:size], errors.ErrorStack(err), len(cmd))
} else {
log.Warnf("[%d] dispatch error:\n%s\n%s\n%s", cc.connectionID, cc, cmd, errors.ErrorStack(err))
}
log.Warnf("[%d] dispatch error:\n%s\n%s\n%s",
cc.connectionID, cc, queryStrForLog(string(data[1:])), errStrForLog(err))
cc.writeError(err)
}

cc.addMetrics(data[0], startTime, err)
cc.pkt.sequence = 0
}
}

func queryStrForLog(query string) string {
const size = 4096
if len(query) > size {
return query[:size] + fmt.Sprintf("(len: %d)", len(query))
}
return query
}

func errStrForLog(err error) string {
if kv.ErrKeyExists.Equal(err) {
// Do not log stack for duplicated entry error.
return err.Error()
}
return errors.ErrorStack(err)
}

func (cc *clientConn) addMetrics(cmd byte, startTime time.Time, err error) {
var label string
switch cmd {
case mysql.ComSleep:
label = "Sleep"
case mysql.ComQuit:
label = "Quit"
case mysql.ComQuery:
label = "Query"
case mysql.ComPing:
label = "Ping"
case mysql.ComInitDB:
label = "InitDB"
case mysql.ComFieldList:
label = "FieldList"
case mysql.ComStmtPrepare:
label = "StmtPrepare"
case mysql.ComStmtExecute:
label = "StmtExecute"
case mysql.ComStmtClose:
label = "StmtClose"
case mysql.ComStmtSendLongData:
label = "StmtSendLongData"
case mysql.ComStmtReset:
label = "StmtReset"
case mysql.ComSetOption:
label = "SetOption"
default:
label = strconv.Itoa(int(cmd))
}
if err != nil {
queryCounter.WithLabelValues(label, "Error").Inc()
} else {
queryCounter.WithLabelValues(label, "OK").Inc()
}
queryHistogram.Observe(time.Since(startTime).Seconds())
}

// dispatch handles client request based on command which is the first byte of the data.
// It also gets a token from server which is used to limit the concurrently handling clients.
// The most frequently used command is ComQuery.
func (cc *clientConn) dispatch(data []byte) error {
cmd := data[0]
data = data[1:]
cc.lastCmd = hack.String(data)

token := cc.server.getToken()

startTS := time.Now()
defer func() {
cc.server.releaseToken(token)
log.Debugf("[TIME_CMD] %v %d", time.Since(startTS), cmd)
}()

switch cmd {
Expand All @@ -395,7 +445,6 @@ func (cc *clientConn) dispatch(data []byte) error {
case mysql.ComPing:
return cc.writeOK()
case mysql.ComInitDB:
log.Debug("init db", hack.String(data))
if err := cc.useDB(hack.String(data)); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -600,32 +649,10 @@ func (cc *clientConn) handleLoadData(loadDataInfo *executor.LoadDataInfo) error
return errors.Trace(txn.Commit())
}

const queryLogMaxLen = 2048

// handleQuery executes the sql query string and writes result set or result ok to the client.
// As the execution time of this function represents the performance of TiDB, we do time log and metrics here.
// There is a special query `load data` that does not return result, which is handled differently.
func (cc *clientConn) handleQuery(sql string) (err error) {
startTime := time.Now()
defer func() {
costTime := time.Since(startTime)
if len(sql) > queryLogMaxLen {
sql = sql[:queryLogMaxLen] + fmt.Sprintf("(len:%d)", len(sql))
}
if costTime < 300*time.Millisecond {
log.Debugf("[%d][TIME_QUERY] %v %s", cc.connectionID, costTime, sql)
} else {
log.Warnf("[%d][TIME_QUERY] %v %s", cc.connectionID, costTime, sql)
}
// Add metrics
queryHistogram.Observe(costTime.Seconds())
label := querySucc
if err != nil {
label = queryFailed
}
queryCounter.WithLabelValues(label).Inc()
}()

rs, err := cc.ctx.Execute(sql)
if err != nil {
executeErrorCounter.WithLabelValues(executeErrorToLabel(err)).Inc()
Expand Down
6 changes: 1 addition & 5 deletions server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,7 @@ var (
Subsystem: "server",
Name: "query_total",
Help: "Counter of queries.",
}, []string{"type"})

// Query handle result status.
querySucc = "query_succ"
queryFailed = "query_failed"
}, []string{"type", "status"})

connGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Expand Down

0 comments on commit 0d97442

Please sign in to comment.