forked from pingcap/tidb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
data_feature_usage.go
187 lines (162 loc) · 6.07 KB
/
data_feature_usage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package telemetry
import (
"context"
"errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/infoschema"
m "github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/tikv/client-go/v2/metrics"
)
type featureUsage struct {
// transaction usage information
Txn *TxnUsage `json:"txn"`
// cluster index usage information
// key is the first 6 characters of sha2(TABLE_NAME, 256)
ClusterIndex *ClusterIndexUsage `json:"clusterIndex"`
TemporaryTable bool `json:"temporaryTable"`
CTE *m.CTEUsageCounter `json:"cte"`
}
func getFeatureUsage(ctx sessionctx.Context) (*featureUsage, error) {
clusterIdxUsage, err := getClusterIndexUsageInfo(ctx)
if err != nil {
logutil.BgLogger().Info(err.Error())
return nil, err
}
// transaction related feature
txnUsage := getTxnUsageInfo(ctx)
// Avoid the circle dependency.
temporaryTable := ctx.(TemporaryTableFeatureChecker).TemporaryTableExists()
cteUsage := getCTEUsageInfo()
return &featureUsage{txnUsage, clusterIdxUsage, temporaryTable, cteUsage}, nil
}
// ClusterIndexUsage records the usage info of all the tables, no more than 10k tables
type ClusterIndexUsage map[string]TableClusteredInfo
// TableClusteredInfo records the usage info of clusterindex of each table
// CLUSTERED, NON_CLUSTERED, NA
type TableClusteredInfo struct {
IsClustered bool `json:"isClustered"` // True means CLUSTERED, False means NON_CLUSTERED
ClusterPKType string `json:"clusterPKType"` // INT means clustered PK type is int
// NON_INT means clustered PK type is not int
// NA means this field is no meaningful information
}
// getClusterIndexUsageInfo gets the ClusterIndex usage information. It's exported for future test.
func getClusterIndexUsageInfo(ctx sessionctx.Context) (cu *ClusterIndexUsage, err error) {
usage := make(ClusterIndexUsage)
exec := ctx.(sqlexec.RestrictedSQLExecutor)
// query INFORMATION_SCHEMA.tables to get the latest table information about ClusterIndex
stmt, err := exec.ParseWithParams(context.TODO(), `
SELECT left(sha2(TABLE_NAME, 256), 6) table_name_hash, TIDB_PK_TYPE, TABLE_SCHEMA, TABLE_NAME
FROM information_schema.tables
WHERE table_schema not in ('INFORMATION_SCHEMA', 'METRICS_SCHEMA', 'PERFORMANCE_SCHEMA', 'mysql')
ORDER BY table_name_hash
limit 10000`)
if err != nil {
return nil, err
}
rows, _, err := exec.ExecRestrictedStmt(context.TODO(), stmt)
if err != nil {
return nil, err
}
defer func() {
if r := recover(); r != nil {
switch x := r.(type) {
case string:
err = errors.New(x)
case error:
err = x
default:
err = errors.New("unknown failure")
}
}
}()
err = ctx.RefreshTxnCtx(context.TODO())
if err != nil {
return nil, err
}
infoSchema := ctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema)
// check ClusterIndex information for each table
// row: 0 = table_name_hash, 1 = TIDB_PK_TYPE, 2 = TABLE_SCHEMA (db), 3 = TABLE_NAME
for _, row := range rows {
if row.Len() < 4 {
continue
}
tblClusteredInfo := TableClusteredInfo{false, "NA"}
if row.GetString(1) == "CLUSTERED" {
tblClusteredInfo.IsClustered = true
table, err := infoSchema.TableByName(model.NewCIStr(row.GetString(2)), model.NewCIStr(row.GetString(3)))
if err != nil {
continue
}
tableInfo := table.Meta()
if tableInfo.PKIsHandle {
tblClusteredInfo.ClusterPKType = "INT"
} else if tableInfo.IsCommonHandle {
tblClusteredInfo.ClusterPKType = "NON_INT"
} else {
// if both CLUSTERED IS TURE and CLUSTERPKTYPE IS NA met, this else is hit
// it means the status of INFORMATION_SCHEMA.tables if not consistent with session.Context
// WE SHOULD treat this issue SERIOUSLY
}
}
usage[row.GetString(0)] = tblClusteredInfo
}
return &usage, nil
}
// TemporaryTableFeatureChecker is defined to avoid package circle dependency.
// The session struct implements this interface.
type TemporaryTableFeatureChecker interface {
TemporaryTableExists() bool
}
// TxnUsage records the usage info of transaction related features, including
// async-commit, 1PC and counters of transactions committed with different protocols.
type TxnUsage struct {
AsyncCommitUsed bool `json:"asyncCommitUsed"`
OnePCUsed bool `json:"onePCUsed"`
TxnCommitCounter metrics.TxnCommitCounter `json:"txnCommitCounter"`
}
var initialTxnCommitCounter metrics.TxnCommitCounter
var initialCTECounter m.CTEUsageCounter
// getTxnUsageInfo gets the usage info of transaction related features. It's exported for tests.
func getTxnUsageInfo(ctx sessionctx.Context) *TxnUsage {
asyncCommitUsed := false
if val, err := variable.GetGlobalSystemVar(ctx.GetSessionVars(), variable.TiDBEnableAsyncCommit); err == nil {
asyncCommitUsed = val == variable.On
}
onePCUsed := false
if val, err := variable.GetGlobalSystemVar(ctx.GetSessionVars(), variable.TiDBEnable1PC); err == nil {
onePCUsed = val == variable.On
}
curr := metrics.GetTxnCommitCounter()
diff := curr.Sub(initialTxnCommitCounter)
return &TxnUsage{asyncCommitUsed, onePCUsed, diff}
}
func postReportTxnUsage() {
initialTxnCommitCounter = metrics.GetTxnCommitCounter()
}
// ResetCTEUsage resets CTE usages.
func postReportCTEUsage() {
initialCTECounter = m.GetCTECounter()
}
// getCTEUsageInfo gets the CTE usages.
func getCTEUsageInfo() *m.CTEUsageCounter {
curr := m.GetCTECounter()
diff := curr.Sub(initialCTECounter)
return &diff
}