forked from pingcap/tidb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstatement.go
324 lines (303 loc) · 10.2 KB
/
statement.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package perfschema
import (
"fmt"
"reflect"
"runtime"
"sync/atomic"
"time"
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/util/types"
)
// statementInfo defines statement instrument information.
type statementInfo struct {
// The registered statement key
key uint64
// The name of the statement instrument to register
name string
}
// StatementState provides temporary storage to a statement runtime statistics.
// TODO:
// 1. support statement digest.
// 2. support prepared statement.
type StatementState struct {
// Connection identifier
connID uint64
// Statement information
info *statementInfo
// Statement type
stmtType reflect.Type
// Source file and line number
source string
// Timer name
timerName enumTimerName
// Timer start
timerStart int64
// Timer end
timerEnd int64
// Locked time
lockTime int64
// SQL statement string
sqlText string
// Current schema name
schemaName string
// Number of errors
errNum uint32
// Number of warnings
warnNum uint32
// Rows affected
rowsAffected uint64
// Rows sent
rowsSent uint64
// Rows examined
rowsExamined uint64
// Metric, temporary tables created on disk
createdTmpDiskTables uint32
// Metric, temproray tables created
createdTmpTables uint32
// Metric, number of select full join
selectFullJoin uint32
// Metric, number of select full range join
selectFullRangeJoin uint32
// Metric, number of select range
selectRange uint32
// Metric, number of select range check
selectRangeCheck uint32
// Metric, number of select scan
selectScan uint32
// Metric, number of sort merge passes
sortMergePasses uint32
// Metric, number of sort merge
sortRange uint32
// Metric, number of sort rows
sortRows uint32
// Metric, number of sort scans
sortScan uint32
// Metric, no index used flag
noIndexUsed uint8
// Metric, no good index used flag
noGoodIndexUsed uint8
}
func (ps *perfSchema) RegisterStatement(category, name string, elem interface{}) {
instrumentName := fmt.Sprintf("%s%s/%s", statementInstrumentPrefix, category, name)
key, err := ps.addInstrument(instrumentName)
if err != nil {
// just ignore, do nothing else.
log.Errorf("Unable to register instrument %s", instrumentName)
return
}
ps.stmtInfos[reflect.TypeOf(elem)] = &statementInfo{
key: key,
name: instrumentName,
}
}
func (ps *perfSchema) StartStatement(sql string, connID uint64, callerName EnumCallerName, elem interface{}) *StatementState {
if !enablePerfSchema {
return nil
}
stmtType := reflect.TypeOf(elem)
info, ok := ps.stmtInfos[stmtType]
if !ok {
// just ignore, do nothing else.
log.Errorf("No instrument registered for statement %s", stmtType)
return nil
}
// check and apply the configuration parameter in table setup_timers.
timerName, err := ps.getTimerName(flagStatement)
if err != nil {
// just ignore, do nothing else.
log.Error("Unable to check setup_timers table")
return nil
}
var timerStart int64
switch timerName {
case timerNameNanosec:
timerStart = time.Now().UnixNano()
case timerNameMicrosec:
timerStart = time.Now().UnixNano() / int64(time.Microsecond)
case timerNameMillisec:
timerStart = time.Now().UnixNano() / int64(time.Millisecond)
default:
return nil
}
// TODO: check and apply the additional configuration parameters in:
// - table setup_actors
// - table setup_setup_consumers
// - table setup_instruments
// - table setup_objects
var source string
callerLock.RLock()
source, ok = callerNames[callerName]
callerLock.RUnlock()
if !ok {
_, fileName, fileLine, ok := runtime.Caller(1)
if !ok {
// just ignore, do nothing else.
log.Error("Unable to get runtime.Caller(1)")
return nil
}
source = fmt.Sprintf("%s:%d", fileName, fileLine)
callerLock.Lock()
callerNames[callerName] = source
callerLock.Unlock()
}
return &StatementState{
connID: connID,
info: info,
stmtType: stmtType,
source: source,
timerName: timerName,
timerStart: timerStart,
sqlText: sql,
}
}
func (ps *perfSchema) EndStatement(state *StatementState) {
if !enablePerfSchema {
return
}
if state == nil {
return
}
switch state.timerName {
case timerNameNanosec:
state.timerEnd = time.Now().UnixNano()
case timerNameMicrosec:
state.timerEnd = time.Now().UnixNano() / int64(time.Microsecond)
case timerNameMillisec:
state.timerEnd = time.Now().UnixNano() / int64(time.Millisecond)
default:
return
}
log.Debugf("EndStatement: sql %s, connection id %d, type %s", state.sqlText, state.connID, state.stmtType)
record := state2Record(state)
err := ps.updateEventsStmtsCurrent(state.connID, record)
if err != nil {
log.Error("Unable to update events_statements_current table")
}
err = ps.appendEventsStmtsHistory(record)
if err != nil {
log.Errorf("Unable to append to events_statements_history table %v", errors.ErrorStack(err))
}
}
func state2Record(state *StatementState) []types.Datum {
return types.MakeDatums(
state.connID, // THREAD_ID
state.info.key, // EVENT_ID
nil, // END_EVENT_ID
state.info.name, // EVENT_NAME
state.source, // SOURCE
uint64(state.timerStart), // TIMER_START
uint64(state.timerEnd), // TIMER_END
nil, // TIMER_WAIT
uint64(state.lockTime), // LOCK_TIME
state.sqlText, // SQL_TEXT
nil, // DIGEST
nil, // DIGEST_TEXT
state.schemaName, // CURRENT_SCHEMA
nil, // OBJECT_TYPE
nil, // OBJECT_SCHEMA
nil, // OBJECT_NAME
nil, // OBJECT_INSTANCE_BEGIN
nil, // MYSQL_ERRNO,
nil, // RETURNED_SQLSTATE
nil, // MESSAGE_TEXT
uint64(state.errNum), // ERRORS
uint64(state.warnNum), // WARNINGS
state.rowsAffected, // ROWS_AFFECTED
state.rowsSent, // ROWS_SENT
state.rowsExamined, // ROWS_EXAMINED
uint64(state.createdTmpDiskTables), // CREATED_TMP_DISK_TABLES
uint64(state.createdTmpTables), // CREATED_TMP_TABLES
uint64(state.selectFullJoin), // SELECT_FULL_JOIN
uint64(state.selectFullRangeJoin), // SELECT_FULL_RANGE_JOIN
uint64(state.selectRange), // SELECT_RANGE
uint64(state.selectRangeCheck), // SELECT_RANGE_CHECK
uint64(state.selectScan), // SELECT_SCAN
uint64(state.sortMergePasses), // SORT_MERGE_PASSES
uint64(state.sortRange), // SORT_RANGE
uint64(state.sortRows), // SORT_ROWS
uint64(state.sortScan), // SORT_SCAN
uint64(state.noIndexUsed), // NO_INDEX_USED
uint64(state.noGoodIndexUsed), // NO_GOOD_INDEX_USED
nil, // NESTING_EVENT_ID
nil, // NESTING_EVENT_TYPE
nil, // NESTING_EVENT_LEVEL
)
}
func (ps *perfSchema) updateEventsStmtsCurrent(connID uint64, record []types.Datum) error {
tbl := ps.mTables[TableStmtsCurrent]
if tbl == nil {
return nil
}
index := connID % uint64(currentElemMax)
handle := atomic.LoadInt64(&ps.stmtHandles[index])
if handle == 0 {
newHandle, err := tbl.AddRecord(nil, record)
if err != nil {
return errors.Trace(err)
}
atomic.StoreInt64(&ps.stmtHandles[index], newHandle)
return nil
}
err := tbl.UpdateRecord(nil, handle, nil, record, nil)
if err != nil {
return errors.Trace(err)
}
return nil
}
func (ps *perfSchema) appendEventsStmtsHistory(record []types.Datum) error {
tbl := ps.mTables[TableStmtsHistory]
if tbl == nil {
return nil
}
_, err := tbl.AddRecord(nil, record)
if err != nil {
return errors.Trace(err)
}
return nil
}
func (ps *perfSchema) registerStatements() {
ps.stmtInfos = make(map[reflect.Type]*statementInfo)
// Existing instrument names are the same as MySQL 5.7
ps.RegisterStatement("sql", "alter_table", (*ast.AlterTableStmt)(nil))
ps.RegisterStatement("sql", "begin", (*ast.BeginStmt)(nil))
ps.RegisterStatement("sql", "commit", (*ast.CommitStmt)(nil))
ps.RegisterStatement("sql", "create_db", (*ast.CreateDatabaseStmt)(nil))
ps.RegisterStatement("sql", "create_index", (*ast.CreateIndexStmt)(nil))
ps.RegisterStatement("sql", "create_table", (*ast.CreateTableStmt)(nil))
ps.RegisterStatement("sql", "create_user", (*ast.CreateUserStmt)(nil))
ps.RegisterStatement("sql", "deallocate", (*ast.DeallocateStmt)(nil))
ps.RegisterStatement("sql", "delete", (*ast.DeleteStmt)(nil))
ps.RegisterStatement("sql", "do", (*ast.DoStmt)(nil))
ps.RegisterStatement("sql", "drop_db", (*ast.DropDatabaseStmt)(nil))
ps.RegisterStatement("sql", "drop_table", (*ast.DropTableStmt)(nil))
ps.RegisterStatement("sql", "drop_index", (*ast.DropIndexStmt)(nil))
ps.RegisterStatement("sql", "execute", (*ast.ExecuteStmt)(nil))
ps.RegisterStatement("sql", "explain", (*ast.ExplainStmt)(nil))
ps.RegisterStatement("sql", "grant", (*ast.GrantStmt)(nil))
ps.RegisterStatement("sql", "insert", (*ast.InsertStmt)(nil))
ps.RegisterStatement("sql", "prepare", (*ast.PrepareStmt)(nil))
ps.RegisterStatement("sql", "rollback", (*ast.RollbackStmt)(nil))
ps.RegisterStatement("sql", "select", (*ast.SelectStmt)(nil))
ps.RegisterStatement("sql", "set", (*ast.SetStmt)(nil))
ps.RegisterStatement("sql", "set_password", (*ast.SetPwdStmt)(nil))
ps.RegisterStatement("sql", "show", (*ast.ShowStmt)(nil))
ps.RegisterStatement("sql", "truncate", (*ast.TruncateTableStmt)(nil))
ps.RegisterStatement("sql", "union", (*ast.UnionStmt)(nil))
ps.RegisterStatement("sql", "update", (*ast.UpdateStmt)(nil))
ps.RegisterStatement("sql", "use", (*ast.UseStmt)(nil))
ps.RegisterStatement("sql", "analyze", (*ast.AnalyzeTableStmt)(nil))
}