Skip to content

Commit

Permalink
executor, planner, util: optimize query statements summary table by c…
Browse files Browse the repository at this point in the history
…onditional pushdown (pingcap#27563)
  • Loading branch information
TszKitLo40 authored Sep 28, 2021
1 parent 58e1c05 commit fd73942
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 12 deletions.
5 changes: 3 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1643,8 +1643,9 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
table: v.Table,
retriever: &stmtSummaryTableRetriever{
table: v.Table,
columns: v.Columns,
table: v.Table,
columns: v.Columns,
extractor: v.Extractor.(*plannercore.StatementsSummaryExtractor),
},
}
case strings.ToLower(infoschema.TableColumns):
Expand Down
13 changes: 13 additions & 0 deletions executor/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testutil"
)

func (s *testSuite1) TestExplainPrivileges(c *C) {
Expand Down Expand Up @@ -352,3 +353,15 @@ func (s *testSuite2) TestExplainAnalyzeCTEMemoryAndDiskInfo(c *C) {
c.Assert(rows[4][7].(string), Not(Equals), "N/A")
c.Assert(rows[4][8].(string), Not(Equals), "N/A")
}

func (s *testSuite) TestExplainStatementsSummary(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustQuery("desc select * from information_schema.statements_summary").Check(testkit.Rows(
`MemTableScan_4 10000.00 root table:STATEMENTS_SUMMARY `))
tk.MustQuery("desc select * from information_schema.statements_summary where digest is null").Check(testutil.RowsWithSep("|",
`Selection_5|8000.00|root| isnull(Column#5)`, `└─MemTableScan_6|10000.00|root|table:STATEMENTS_SUMMARY|`))
tk.MustQuery("desc select * from information_schema.statements_summary where digest = 'abcdefg'").Check(testutil.RowsWithSep(" ",
`MemTableScan_5 10000.00 root table:STATEMENTS_SUMMARY digests: ["abcdefg"]`))
tk.MustQuery("desc select * from information_schema.statements_summary where digest in ('a','b','c')").Check(testutil.RowsWithSep(" ",
`MemTableScan_5 10000.00 root table:STATEMENTS_SUMMARY digests: ["a","b","c"]`))
}
7 changes: 6 additions & 1 deletion executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2094,11 +2094,12 @@ type stmtSummaryTableRetriever struct {
table *model.TableInfo
columns []*model.ColumnInfo
retrieved bool
extractor *plannercore.StatementsSummaryExtractor
}

// retrieve implements the infoschemaRetriever interface
func (e *stmtSummaryTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
if e.retrieved {
if e.extractor.SkipRequest || e.retrieved {
return nil, nil
}
e.retrieved = true
Expand All @@ -2115,6 +2116,10 @@ func (e *stmtSummaryTableRetriever) retrieve(ctx context.Context, sctx sessionct
}
user := sctx.GetSessionVars().User
reader := stmtsummary.NewStmtSummaryReader(user, hasPriv(sctx, mysql.ProcessPriv), e.columns, instanceAddr)
if e.extractor.Enable {
checker := stmtsummary.NewStmtSummaryChecker(e.extractor.Digests)
reader.SetChecker(checker)
}
var rows [][]types.Datum
switch e.table.Name.O {
case infoschema.TableStatementsSummary,
Expand Down
53 changes: 53 additions & 0 deletions infoschema/cluster_tables_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func TestClusterTables(t *testing.T) {
t.Run("SelectClusterTablePrivilege", SubTestSelectClusterTablePrivilege(s))
t.Run("StmtSummaryEvictedCountTable", SubTestStmtSummaryEvictedCountTable(s))
t.Run("StmtSummaryHistoryTable", SubTestStmtSummaryHistoryTable(s))
t.Run("Issue26379", SubTestIssue26379(s))
}

func SubTestForClusterServerInfo(s *clusterTablesSuite) func(*testing.T) {
Expand Down Expand Up @@ -403,6 +404,58 @@ func SubTestStmtSummaryHistoryTable(s *clusterTablesSuite) func(*testing.T) {
}
}

func SubTestIssue26379(s *clusterTablesSuite) func(*testing.T) {
return func(t *testing.T) {
tk := s.newTestKitWithRoot(t)

// Clear all statements.
tk.MustExec("set session tidb_enable_stmt_summary = 0")
tk.MustExec("set session tidb_enable_stmt_summary = ''")
tk.MustExec("set @@global.tidb_stmt_summary_max_stmt_count=10")

tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b varchar(10), c int, d int, key k(a))")

_, digest1 := parser.NormalizeDigest("select * from t where a = 3")
_, digest2 := parser.NormalizeDigest("select * from t where b = 'b'")
_, digest3 := parser.NormalizeDigest("select * from t where c = 6")
_, digest4 := parser.NormalizeDigest("select * from t where d = 5")
fillStatementCache := func() {
tk.MustQuery("select * from t where a = 3")
tk.MustQuery("select * from t where b = 'b'")
tk.MustQuery("select * from t where c = 6")
tk.MustQuery("select * from t where d = 5")
}
fillStatementCache()
tk.MustQuery(fmt.Sprintf("select digest from information_schema.statements_summary where digest = '%s'", digest1.String())).Check(testkit.Rows(digest1.String()))
tk.MustQuery(fmt.Sprintf("select digest from information_schema.cluster_statements_summary where digest = '%s'", digest1.String())).Check(testkit.Rows(digest1.String()))
fillStatementCache()
tk.MustQuery(fmt.Sprintf("select digest from information_schema.statements_summary where digest = '%s'", digest2.String())).Check(testkit.Rows(digest2.String()))
tk.MustQuery(fmt.Sprintf("select digest from information_schema.cluster_statements_summary where digest = '%s'", digest2.String())).Check(testkit.Rows(digest2.String()))
fillStatementCache()
tk.MustQuery(fmt.Sprintf("select digest from information_schema.statements_summary where digest = '%s'", digest3.String())).Check(testkit.Rows(digest3.String()))
tk.MustQuery(fmt.Sprintf("select digest from information_schema.cluster_statements_summary where digest = '%s'", digest3.String())).Check(testkit.Rows(digest3.String()))
fillStatementCache()
tk.MustQuery(fmt.Sprintf("select digest from information_schema.statements_summary where digest = '%s'", digest4.String())).Check(testkit.Rows(digest4.String()))
tk.MustQuery(fmt.Sprintf("select digest from information_schema.cluster_statements_summary where digest = '%s'", digest4.String())).Check(testkit.Rows(digest4.String()))
fillStatementCache()
tk.MustQuery(fmt.Sprintf("select digest from information_schema.statements_summary where digest = '%s' or digest = '%s'", digest1.String(), digest2.String())).Sort().Check(testkit.Rows(digest1.String(), digest2.String()))
tk.MustQuery(fmt.Sprintf("select digest from information_schema.cluster_statements_summary where digest = '%s' or digest = '%s'", digest1.String(), digest2.String())).Sort().Check(testkit.Rows(digest1.String(), digest2.String()))
re := tk.MustQuery(fmt.Sprintf("select digest from information_schema.cluster_statements_summary where digest = '%s' and digest = '%s'", digest1.String(), digest2.String()))
require.Equal(t, 0, len(re.Rows()))
re = tk.MustQuery(fmt.Sprintf("select digest from information_schema.cluster_statements_summary where digest = '%s' and digest = '%s'", digest1.String(), digest2.String()))
require.Equal(t, 0, len(re.Rows()))
fillStatementCache()
tk.MustQuery(fmt.Sprintf("select digest from information_schema.statements_summary where digest in ('%s', '%s', '%s', '%s')", digest1.String(), digest2.String(), digest3.String(), digest4.String())).Sort().Check(testkit.Rows(digest1.String(), digest4.String(), digest2.String(), digest3.String()))
tk.MustQuery(fmt.Sprintf("select digest from information_schema.cluster_statements_summary where digest in ('%s', '%s', '%s', '%s')", digest1.String(), digest2.String(), digest3.String(), digest4.String())).Sort().Check(testkit.Rows(digest1.String(), digest4.String(), digest2.String(), digest3.String()))
fillStatementCache()
tk.MustQuery("select count(*) from information_schema.statements_summary where digest=''").Check(testkit.Rows("0"))
tk.MustQuery("select count(*) from information_schema.statements_summary where digest is null").Check(testkit.Rows("1"))
tk.MustQuery("select count(*) from information_schema.cluster_statements_summary where digest=''").Check(testkit.Rows("0"))
tk.MustQuery("select count(*) from information_schema.cluster_statements_summary where digest is null").Check(testkit.Rows("1"))
}
}

func (s *clusterTablesSuite) setUpRPCService(t *testing.T, addr string) (*grpc.Server, string) {
lis, err := net.Listen("tcp", addr)
require.NoError(t, err)
Expand Down
2 changes: 2 additions & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4258,6 +4258,8 @@ func (b *PlanBuilder) buildMemTable(_ context.Context, dbName model.CIStr, table
p.Extractor = &TableStorageStatsExtractor{}
case infoschema.TableTiFlashTables, infoschema.TableTiFlashSegments:
p.Extractor = &TiFlashSystemTableExtractor{}
case infoschema.TableStatementsSummary, infoschema.TableStatementsSummaryHistory:
p.Extractor = &StatementsSummaryExtractor{}
}
}
return p, nil
Expand Down
69 changes: 67 additions & 2 deletions planner/core/memtable_predicate_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (helper extractHelper) extractColOrExpr(extractCols map[int64]*types.FieldN
}
// Define an inner function to avoid populate the outer scope
var extract = func(extractCols map[int64]*types.FieldName, fn *expression.ScalarFunction) (string, []types.Datum) {
switch fn.FuncName.L {
switch helper.getStringFunctionName(fn) {
case ast.EQ:
return helper.extractColBinaryOpConsExpr(extractCols, fn)
case ast.LogicOr:
Expand Down Expand Up @@ -204,7 +204,7 @@ func (helper extractHelper) extractCol(
}
var colName string
var datums []types.Datum // the memory of datums should not be reused, they will be put into result.
switch fn.FuncName.L {
switch helper.getStringFunctionName(fn) {
case ast.EQ:
colName, datums = helper.extractColBinaryOpConsExpr(extractCols, fn)
case ast.In:
Expand Down Expand Up @@ -366,6 +366,27 @@ func (helper extractHelper) getTimeFunctionName(fn *expression.ScalarFunction) s
}
}

// getStringFunctionName is used to get the (string) function name.
// For the expression that push down to the coprocessor, the function name is different with normal compare function,
// Then getStringFunctionName will do a sample function name convert.
// Currently, this is used to support query `CLUSTER_STMT_SUMMARY` at any string.
func (helper extractHelper) getStringFunctionName(fn *expression.ScalarFunction) string {
switch fn.Function.PbCode() {
case tipb.ScalarFuncSig_GTString:
return ast.GT
case tipb.ScalarFuncSig_GEString:
return ast.GE
case tipb.ScalarFuncSig_LTString:
return ast.LT
case tipb.ScalarFuncSig_LEString:
return ast.LE
case tipb.ScalarFuncSig_EQString:
return ast.EQ
default:
return fn.FuncName.L
}
}

// extracts the time range column, e.g:
// SELECT * FROM t WHERE time='2019-10-10 10:10:10'
// SELECT * FROM t WHERE time>'2019-10-10 10:10:10' AND time<'2019-10-11 10:10:10'
Expand Down Expand Up @@ -1316,3 +1337,47 @@ func (e *TiFlashSystemTableExtractor) explainInfo(p *PhysicalMemTable) string {
}
return s
}

// StatementsSummaryExtractor is used to extract some predicates of statements summary table.
type StatementsSummaryExtractor struct {
extractHelper

// SkipRequest means the where clause always false, we don't need to request any component
SkipRequest bool
// Digests represents digest applied to, and we should apply all digest if there is no digest specified.
// e.g: SELECT * FROM STATEMENTS_SUMMARY WHERE digest='8019af26debae8aa7642c501dbc43212417b3fb14e6aec779f709976b7e521be'
Digests set.StringSet
// Enable is true means the executor should use digest to locate statement summary.
// Enable is false, means the executor should keep the behavior compatible with before.
Enable bool
}

// Extract implements the MemTablePredicateExtractor Extract interface
func (e *StatementsSummaryExtractor) Extract(
_ sessionctx.Context,
schema *expression.Schema,
names []*types.FieldName,
predicates []expression.Expression,
) (remained []expression.Expression) {
// Extract the `digest` column
remained, skip, digests := e.extractCol(schema, names, predicates, "digest", false)
e.SkipRequest = skip
if e.SkipRequest {
return nil
}
if digests.Count() > 0 {
e.Enable = true
e.Digests = digests
}
return remained
}

func (e *StatementsSummaryExtractor) explainInfo(p *PhysicalMemTable) string {
if e.SkipRequest {
return "skip_request: true"
}
if !e.Enable {
return ""
}
return fmt.Sprintf("digests: [%s]", extractStringFromStringSet(e.Digests))
}
5 changes: 4 additions & 1 deletion planner/core/pb_to_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ func (b *PBPlanBuilder) pbToTableScan(e *tipb.Executor) (PhysicalPlan, error) {
Columns: columns,
}.Init(b.sctx, &property.StatsInfo{}, 0)
p.SetSchema(schema)
if strings.ToUpper(p.Table.Name.O) == infoschema.ClusterTableSlowLog {
switch strings.ToUpper(p.Table.Name.O) {
case infoschema.ClusterTableSlowLog:
extractor := &SlowQueryExtractor{}
extractor.Desc = tblScan.Desc
if b.ranges != nil {
Expand All @@ -119,6 +120,8 @@ func (b *PBPlanBuilder) pbToTableScan(e *tipb.Executor) (PhysicalPlan, error) {
}
}
p.Extractor = extractor
case infoschema.ClusterTableStatementsSummary, infoschema.ClusterTableStatementsSummaryHistory:
p.Extractor = &StatementsSummaryExtractor{}
}
return p, nil
}
Expand Down
1 change: 1 addition & 0 deletions planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ func (p *LogicalMemTable) PruneColumns(parentUsedCols []*expression.Column) erro
for i := len(used) - 1; i >= 0; i-- {
if !used[i] && p.schema.Len() > 1 {
p.schema.Columns = append(p.schema.Columns[:i], p.schema.Columns[i+1:]...)
p.names = append(p.names[:i], p.names[i+1:]...)
p.Columns = append(p.Columns[:i], p.Columns[i+1:]...)
}
}
Expand Down
45 changes: 39 additions & 6 deletions util/stmtsummary/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/set"
"go.uber.org/zap"
)

Expand All @@ -38,6 +39,7 @@ type stmtSummaryReader struct {
instanceAddr string
ssMap *stmtSummaryByDigestMap
columnValueFactories []columnValueFactory
checker *stmtSummaryChecker
}

// NewStmtSummaryReader return a new statement summaries reader.
Expand Down Expand Up @@ -78,13 +80,19 @@ func (ssr *stmtSummaryReader) GetStmtSummaryCurrentRows() [][]types.Datum {

rows := make([][]types.Datum, 0, len(values))
for _, value := range values {
record := ssr.getStmtByDigestRow(value.(*stmtSummaryByDigest), beginTime)
ssbd := value.(*stmtSummaryByDigest)
if ssr.checker != nil && !ssr.checker.isDigestValid(ssbd.digest) {
continue
}
record := ssr.getStmtByDigestRow(ssbd, beginTime)
if record != nil {
rows = append(rows, record)
}
}
if otherDatum := ssr.getStmtEvictedOtherRow(other); otherDatum != nil {
rows = append(rows, otherDatum)
if ssr.checker == nil {
if otherDatum := ssr.getStmtEvictedOtherRow(other); otherDatum != nil {
rows = append(rows, otherDatum)
}
}
return rows
}
Expand All @@ -100,15 +108,25 @@ func (ssr *stmtSummaryReader) GetStmtSummaryHistoryRows() [][]types.Datum {
historySize := ssMap.historySize()
rows := make([][]types.Datum, 0, len(values)*historySize)
for _, value := range values {
records := ssr.getStmtByDigestHistoryRow(value.(*stmtSummaryByDigest), historySize)
ssbd := value.(*stmtSummaryByDigest)
if ssr.checker != nil && !ssr.checker.isDigestValid(ssbd.digest) {
continue
}
records := ssr.getStmtByDigestHistoryRow(ssbd, historySize)
rows = append(rows, records...)
}

otherDatum := ssr.getStmtEvictedOtherHistoryRow(other, historySize)
rows = append(rows, otherDatum...)
if ssr.checker == nil {
otherDatum := ssr.getStmtEvictedOtherHistoryRow(other, historySize)
rows = append(rows, otherDatum...)
}
return rows
}

func (ssr *stmtSummaryReader) SetChecker(checker *stmtSummaryChecker) {
ssr.checker = checker
}

func (ssr *stmtSummaryReader) getStmtByDigestRow(ssbd *stmtSummaryByDigest, beginTimeForCurInterval int64) []types.Datum {
var ssElement *stmtSummaryByDigestElement

Expand Down Expand Up @@ -187,6 +205,21 @@ func (ssr *stmtSummaryReader) getStmtEvictedOtherHistoryRow(ssbde *stmtSummaryBy
return rows
}

type stmtSummaryChecker struct {
digests set.StringSet
}

// NewStmtSummaryChecker return a new statement summaries checker.
func NewStmtSummaryChecker(digests set.StringSet) *stmtSummaryChecker {
return &stmtSummaryChecker{
digests: digests,
}
}

func (ssc *stmtSummaryChecker) isDigestValid(digest string) bool {
return ssc.digests.Exist(digest)
}

// Statements summary table column name.
const (
SummaryBeginTimeStr = "SUMMARY_BEGIN_TIME"
Expand Down

0 comments on commit fd73942

Please sign in to comment.