Skip to content

Commit

Permalink
expression/builtin: Add tidb_decode_sql_digests function (pingcap#26787)
Browse files Browse the repository at this point in the history
  • Loading branch information
MyonKeminta authored Aug 5, 2021
1 parent cc1f990 commit cdaf996
Show file tree
Hide file tree
Showing 13 changed files with 458 additions and 263 deletions.
61 changes: 58 additions & 3 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6690,7 +6690,7 @@ func (s *testClusterTableSuite) TestSQLDigestTextRetriever(c *C) {

insertNormalized, insertDigest := parser.NormalizeDigest("insert into test_sql_digest_text_retriever values (1, 1)")
_, updateDigest := parser.NormalizeDigest("update test_sql_digest_text_retriever set v = v + 1 where id = 1")
r := &executor.SQLDigestTextRetriever{
r := &expression.SQLDigestTextRetriever{
SQLDigestsMap: map[string]string{
insertDigest.String(): "",
updateDigest.String(): "",
Expand All @@ -6702,6 +6702,61 @@ func (s *testClusterTableSuite) TestSQLDigestTextRetriever(c *C) {
c.Assert(r.SQLDigestsMap[updateDigest.String()], Equals, "")
}

func (s *testClusterTableSuite) TestFunctionDecodeSQLDigests(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
c.Assert(tk.Se.Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil), IsTrue)
tk.MustExec("set global tidb_enable_stmt_summary = 1")
tk.MustQuery("select @@global.tidb_enable_stmt_summary").Check(testkit.Rows("1"))
tk.MustExec("drop table if exists test_func_decode_sql_digests")
tk.MustExec("create table test_func_decode_sql_digests(id int primary key, v int)")

q1 := "begin"
norm1, digest1 := parser.NormalizeDigest(q1)
q2 := "select @@tidb_current_ts"
norm2, digest2 := parser.NormalizeDigest(q2)
q3 := "select id, v from test_func_decode_sql_digests where id = 1 for update"
norm3, digest3 := parser.NormalizeDigest(q3)

// TIDB_DECODE_SQL_DIGESTS function doesn't actually do "decoding", instead it queries `statements_summary` and it's
// variations for the corresponding statements.
// Execute the statements so that the queries will be saved into statements_summary table.
tk.MustExec(q1)
// Save the ts to query the transaction from tidb_trx.
ts, err := strconv.ParseUint(tk.MustQuery(q2).Rows()[0][0].(string), 10, 64)
c.Assert(err, IsNil)
c.Assert(ts, Greater, uint64(0))
tk.MustExec(q3)
tk.MustExec("rollback")

// Test statements truncating.
decoded := fmt.Sprintf(`["%s","%s","%s"]`, norm1, norm2, norm3)
digests := fmt.Sprintf(`["%s","%s","%s"]`, digest1, digest2, digest3)
tk.MustQuery("select tidb_decode_sql_digests(?, 0)", digests).Check(testkit.Rows(decoded))
// The three queries are shorter than truncate length, equal to truncate length and longer than truncate length respectively.
tk.MustQuery("select tidb_decode_sql_digests(?, ?)", digests, len(norm2)).Check(testkit.Rows(
"[\"begin\",\"select @@tidb_current_ts\",\"select `id` , `v` from `...\"]"))

// Empty array.
tk.MustQuery("select tidb_decode_sql_digests('[]')").Check(testkit.Rows("[]"))

// NULL
tk.MustQuery("select tidb_decode_sql_digests(null)").Check(testkit.Rows("<nil>"))

// Array containing wrong types and not-existing digests (maps to null).
tk.MustQuery("select tidb_decode_sql_digests(?)", fmt.Sprintf(`["%s",1,null,"%s",{"a":1},[2],"%s","","abcde"]`, digest1, digest2, digest3)).
Check(testkit.Rows(fmt.Sprintf(`["%s",null,null,"%s",null,null,"%s",null,null]`, norm1, norm2, norm3)))

// Not JSON array (throws warnings)
tk.MustQuery(`select tidb_decode_sql_digests('{"a":1}')`).Check(testkit.Rows("<nil>"))
tk.MustQuery(`show warnings`).Check(testkit.Rows(`Warning 1210 The argument can't be unmarshalled as JSON array: '{"a":1}'`))
tk.MustQuery(`select tidb_decode_sql_digests('aabbccdd')`).Check(testkit.Rows("<nil>"))
tk.MustQuery(`show warnings`).Check(testkit.Rows(`Warning 1210 The argument can't be unmarshalled as JSON array: 'aabbccdd'`))

// Invalid argument count.
tk.MustGetErrCode("select tidb_decode_sql_digests('a', 1, 2)", 1582)
tk.MustGetErrCode("select tidb_decode_sql_digests()", 1582)
}

func prepareLogs(c *C, logData []string, fileNames []string) {
writeFile := func(file string, data string) {
f, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
Expand Down Expand Up @@ -8338,9 +8393,9 @@ func (s *testSerialSuite) TestDeadlocksTable(c *C) {
id1 := strconv.FormatUint(rec.ID, 10)
id2 := strconv.FormatUint(rec2.ID, 10)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/sqlDigestRetrieverSkipRetrieveGlobal", "return"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/sqlDigestRetrieverSkipRetrieveGlobal", "return"), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/sqlDigestRetrieverSkipRetrieveGlobal"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/expression/sqlDigestRetrieverSkipRetrieveGlobal"), IsNil)
}()

tk := testkit.NewTestKit(c, s.store)
Expand Down
13 changes: 7 additions & 6 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
Expand Down Expand Up @@ -2137,11 +2138,11 @@ func (e *tidbTrxTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Co
var res [][]types.Datum
err = e.nextBatch(func(start, end int) error {
// Before getting rows, collect the SQL digests that needs to be retrieved first.
var sqlRetriever *SQLDigestTextRetriever
var sqlRetriever *expression.SQLDigestTextRetriever
for _, c := range e.columns {
if c.Name.O == txninfo.CurrentSQLDigestTextStr {
if sqlRetriever == nil {
sqlRetriever = NewSQLDigestTextRetriever()
sqlRetriever = expression.NewSQLDigestTextRetriever()
}

for i := start; i < end; i++ {
Expand Down Expand Up @@ -2250,9 +2251,9 @@ func (r *dataLockWaitsTableRetriever) retrieve(ctx context.Context, sctx session
}

// Fetch the SQL Texts of the digests above if necessary.
var sqlRetriever *SQLDigestTextRetriever
var sqlRetriever *expression.SQLDigestTextRetriever
if needSQLText {
sqlRetriever = NewSQLDigestTextRetriever()
sqlRetriever = expression.NewSQLDigestTextRetriever()
for _, digest := range digests {
if len(digest) > 0 {
sqlRetriever.SQLDigestsMap[digest] = ""
Expand Down Expand Up @@ -2390,11 +2391,11 @@ func (r *deadlocksTableRetriever) retrieve(ctx context.Context, sctx sessionctx.

err = r.nextBatch(func(start, end int) error {
// Before getting rows, collect the SQL digests that needs to be retrieved first.
var sqlRetriever *SQLDigestTextRetriever
var sqlRetriever *expression.SQLDigestTextRetriever
for _, c := range r.columns {
if c.Name.O == deadlockhistory.ColCurrentSQLDigestTextStr {
if sqlRetriever == nil {
sqlRetriever = NewSQLDigestTextRetriever()
sqlRetriever = expression.NewSQLDigestTextRetriever()
}

idx, waitChainIdx := r.currentIdx, r.currentWaitChainIdx
Expand Down
2 changes: 1 addition & 1 deletion executor/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,7 +1130,7 @@ func (s *testSuite5) TestShowBuiltin(c *C) {
res := tk.MustQuery("show builtins;")
c.Assert(res, NotNil)
rows := res.Rows()
const builtinFuncNum = 272
const builtinFuncNum = 273
c.Assert(builtinFuncNum, Equals, len(rows))
c.Assert("abs", Equals, rows[0][0].(string))
c.Assert("yearweek", Equals, rows[builtinFuncNum-1][0].(string))
Expand Down
178 changes: 0 additions & 178 deletions executor/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,7 @@
package executor

import (
"context"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/sqlexec"
)

// SetFromString constructs a slice of strings from a comma separated string.
Expand Down Expand Up @@ -60,178 +54,6 @@ func deleteFromSet(set []string, value string) []string {
return set
}

// SQLDigestTextRetriever is used to find the normalized SQL statement text by SQL digests in statements_summary table.
// It's exported for test purposes.
type SQLDigestTextRetriever struct {
// SQLDigestsMap is the place to put the digests that's requested for getting SQL text and also the place to put
// the query result.
SQLDigestsMap map[string]string

// Replace querying for test purposes.
mockLocalData map[string]string
mockGlobalData map[string]string
// There are two ways for querying information: 1) query specified digests by WHERE IN query, or 2) query all
// information to avoid the too long WHERE IN clause. If there are more than `fetchAllLimit` digests needs to be
// queried, the second way will be chosen; otherwise, the first way will be chosen.
fetchAllLimit int
}

// NewSQLDigestTextRetriever creates a new SQLDigestTextRetriever.
func NewSQLDigestTextRetriever() *SQLDigestTextRetriever {
return &SQLDigestTextRetriever{
SQLDigestsMap: make(map[string]string),
fetchAllLimit: 512,
}
}

func (r *SQLDigestTextRetriever) runMockQuery(data map[string]string, inValues []interface{}) (map[string]string, error) {
if len(inValues) == 0 {
return data, nil
}
res := make(map[string]string, len(inValues))
for _, digest := range inValues {
if text, ok := data[digest.(string)]; ok {
res[digest.(string)] = text
}
}
return res, nil
}

// runFetchDigestQuery runs query to the system tables to fetch the kv mapping of SQL digests and normalized SQL texts
// of the given SQL digests, if `inValues` is given, or all these mappings otherwise. If `queryGlobal` is false, it
// queries information_schema.statements_summary and information_schema.statements_summary_history; otherwise, it
// queries the cluster version of these two tables.
func (r *SQLDigestTextRetriever) runFetchDigestQuery(ctx context.Context, sctx sessionctx.Context, queryGlobal bool, inValues []interface{}) (map[string]string, error) {
// If mock data is set, query the mock data instead of the real statements_summary tables.
if !queryGlobal && r.mockLocalData != nil {
return r.runMockQuery(r.mockLocalData, inValues)
} else if queryGlobal && r.mockGlobalData != nil {
return r.runMockQuery(r.mockGlobalData, inValues)
}

exec, ok := sctx.(sqlexec.RestrictedSQLExecutor)
if !ok {
return nil, errors.New("restricted sql can't be executed in this context")
}

// Information in statements_summary will be periodically moved to statements_summary_history. Union them together
// to avoid missing information when statements_summary is just cleared.
stmt := "select digest, digest_text from information_schema.statements_summary union distinct " +
"select digest, digest_text from information_schema.statements_summary_history"
if queryGlobal {
stmt = "select digest, digest_text from information_schema.cluster_statements_summary union distinct " +
"select digest, digest_text from information_schema.cluster_statements_summary_history"
}
// Add the where clause if `inValues` is specified.
if len(inValues) > 0 {
stmt += " where digest in (" + strings.Repeat("%?,", len(inValues)-1) + "%?)"
}

stmtNode, err := exec.ParseWithParams(ctx, stmt, inValues...)
if err != nil {
return nil, err
}
rows, _, err := exec.ExecRestrictedStmt(ctx, stmtNode)
if err != nil {
return nil, err
}

res := make(map[string]string, len(rows))
for _, row := range rows {
res[row.GetString(0)] = row.GetString(1)
}
return res, nil
}

func (r *SQLDigestTextRetriever) updateDigestInfo(queryResult map[string]string) {
for digest, text := range r.SQLDigestsMap {
if len(text) > 0 {
// The text of this digest is already known
continue
}
sqlText, ok := queryResult[digest]
if ok {
r.SQLDigestsMap[digest] = sqlText
}
}
}

// RetrieveLocal tries to retrieve the SQL text of the SQL digests from local information.
func (r *SQLDigestTextRetriever) RetrieveLocal(ctx context.Context, sctx sessionctx.Context) error {
if len(r.SQLDigestsMap) == 0 {
return nil
}

var queryResult map[string]string
if len(r.SQLDigestsMap) <= r.fetchAllLimit {
inValues := make([]interface{}, 0, len(r.SQLDigestsMap))
for key := range r.SQLDigestsMap {
inValues = append(inValues, key)
}
var err error
queryResult, err = r.runFetchDigestQuery(ctx, sctx, false, inValues)
if err != nil {
return errors.Trace(err)
}

if len(queryResult) == len(r.SQLDigestsMap) {
r.SQLDigestsMap = queryResult
return nil
}
} else {
var err error
queryResult, err = r.runFetchDigestQuery(ctx, sctx, false, nil)
if err != nil {
return errors.Trace(err)
}
}

r.updateDigestInfo(queryResult)
return nil
}

// RetrieveGlobal tries to retrieve the SQL text of the SQL digests from the information of the whole cluster.
func (r *SQLDigestTextRetriever) RetrieveGlobal(ctx context.Context, sctx sessionctx.Context) error {
err := r.RetrieveLocal(ctx, sctx)
if err != nil {
return errors.Trace(err)
}

// In some unit test environments it's unable to retrieve global info, and this function blocks it for tens of
// seconds, which wastes much time during unit test. In this case, enable this failpoint to bypass retrieving
// globally.
failpoint.Inject("sqlDigestRetrieverSkipRetrieveGlobal", func() {
failpoint.Return(nil)
})

var unknownDigests []interface{}
for k, v := range r.SQLDigestsMap {
if len(v) == 0 {
unknownDigests = append(unknownDigests, k)
}
}

if len(unknownDigests) == 0 {
return nil
}

var queryResult map[string]string
if len(r.SQLDigestsMap) <= r.fetchAllLimit {
queryResult, err = r.runFetchDigestQuery(ctx, sctx, true, unknownDigests)
if err != nil {
return errors.Trace(err)
}
} else {
queryResult, err = r.runFetchDigestQuery(ctx, sctx, true, nil)
if err != nil {
return errors.Trace(err)
}
}

r.updateDigestInfo(queryResult)
return nil
}

// batchRetrieverHelper is a helper for batch returning data with known total rows. This helps implementing memtable
// retrievers of some information_schema tables. Initialize `batchSize` and `totalRows` fields to use it.
type batchRetrieverHelper struct {
Expand Down
Loading

0 comments on commit cdaf996

Please sign in to comment.