Skip to content

Commit

Permalink
infoschema: remove the inspection_schema (pingcap#15296)
Browse files Browse the repository at this point in the history
Signed-off-by: Lonng <[email protected]>
  • Loading branch information
lonng authored Mar 12, 2020
1 parent 107b071 commit 45d83d1
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 274 deletions.
13 changes: 13 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1320,6 +1320,7 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
case util.MetricSchemaName.L:
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &MetricRetriever{
table: v.Table,
extractor: v.Extractor.(*plannercore.MetricTableExtractor),
Expand All @@ -1330,13 +1331,15 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
case strings.ToLower(infoschema.TableClusterConfig):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &clusterConfigRetriever{
extractor: v.Extractor.(*plannercore.ClusterTableExtractor),
},
}
case strings.ToLower(infoschema.TableClusterLoad):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &clusterServerInfoRetriever{
extractor: v.Extractor.(*plannercore.ClusterTableExtractor),
serverInfoType: diagnosticspb.ServerInfoType_LoadInfo,
Expand All @@ -1345,6 +1348,7 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
case strings.ToLower(infoschema.TableClusterHardware):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &clusterServerInfoRetriever{
extractor: v.Extractor.(*plannercore.ClusterTableExtractor),
serverInfoType: diagnosticspb.ServerInfoType_HardwareInfo,
Expand All @@ -1353,6 +1357,7 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
case strings.ToLower(infoschema.TableClusterSystemInfo):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &clusterServerInfoRetriever{
extractor: v.Extractor.(*plannercore.ClusterTableExtractor),
serverInfoType: diagnosticspb.ServerInfoType_SystemInfo,
Expand All @@ -1361,13 +1366,15 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
case strings.ToLower(infoschema.TableClusterLog):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &clusterLogRetriever{
extractor: v.Extractor.(*plannercore.ClusterLogTableExtractor),
},
}
case strings.ToLower(infoschema.TableInspectionResult):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &inspectionResultRetriever{
extractor: v.Extractor.(*plannercore.InspectionResultTableExtractor),
timeRange: v.QueryTimeRange,
Expand All @@ -1376,6 +1383,7 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
case strings.ToLower(infoschema.TableInspectionSummary):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &inspectionSummaryRetriever{
table: v.Table,
extractor: v.Extractor.(*plannercore.InspectionSummaryTableExtractor),
Expand All @@ -1385,13 +1393,15 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
case strings.ToLower(infoschema.TableInspectionRules):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &inspectionRuleRetriever{
extractor: v.Extractor.(*plannercore.InspectionRuleTableExtractor),
},
}
case strings.ToLower(infoschema.TableMetricSummary):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &MetricsSummaryRetriever{
table: v.Table,
extractor: v.Extractor.(*plannercore.MetricSummaryTableExtractor),
Expand All @@ -1401,6 +1411,7 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
case strings.ToLower(infoschema.TableMetricSummaryByLabel):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &MetricsSummaryByLabelRetriever{
table: v.Table,
extractor: v.Extractor.(*plannercore.MetricSummaryTableExtractor),
Expand All @@ -1427,6 +1438,7 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
strings.ToLower(infoschema.TableConstraints):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &memtableRetriever{
table: v.Table,
columns: v.Columns,
Expand All @@ -1435,6 +1447,7 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
case strings.ToLower(infoschema.TableSlowQuery), strings.ToLower(infoschema.ClusterTableSlowLog):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
table: v.Table,
retriever: &slowQueryRetriever{
table: v.Table,
outputCols: v.Columns,
Expand Down
2 changes: 1 addition & 1 deletion executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func (e *memtableRetriever) setDataFromTables(ctx sessionctx.Context, schemas []
var tableType string
switch schema.Name.L {
case util.InformationSchemaName.L, util.PerformanceSchemaName.L,
util.MetricSchemaName.L, util.InspectionSchemaName.L:
util.MetricSchemaName.L:
tableType = "SYSTEM VIEW"
default:
tableType = "BASE TABLE"
Expand Down
67 changes: 67 additions & 0 deletions executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@ import (
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/server"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/pdapi"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/testutil"
"google.golang.org/grpc"
)
Expand All @@ -50,6 +54,8 @@ var _ = Suite(&testInfoschemaTableSuite{})
// if your test not change the TableStatsCacheExpiry variable, please use testInfoschemaTableSuite for test.
var _ = SerialSuites(&testInfoschemaTableSerialSuite{})

var _ = SerialSuites(&inspectionSuite{})

type testInfoschemaTableSuite struct {
store kv.Storage
dom *domain.Domain
Expand All @@ -60,6 +66,11 @@ type testInfoschemaTableSerialSuite struct {
dom *domain.Domain
}

type inspectionSuite struct {
store kv.Storage
dom *domain.Domain
}

func (s *testInfoschemaTableSerialSuite) SetUpSuite(c *C) {
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
Expand Down Expand Up @@ -92,6 +103,62 @@ func (s *testInfoschemaTableSuite) TearDownSuite(c *C) {
s.store.Close()
}

func (s *inspectionSuite) SetUpSuite(c *C) {
testleak.BeforeTest()

var err error
s.store, err = mockstore.NewMockTikvStore()
c.Assert(err, IsNil)
session.DisableStats4Test()
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
}

func (s *inspectionSuite) TearDownSuite(c *C) {
s.dom.Close()
s.store.Close()
testleak.AfterTest(c)()
}

func (s *inspectionSuite) TestInspectionTables(c *C) {
tk := testkit.NewTestKit(c, s.store)
instances := []string{
"pd,127.0.0.1:11080,127.0.0.1:10080,mock-version,mock-githash",
"tidb,127.0.0.1:11080,127.0.0.1:10080,mock-version,mock-githash",
"tikv,127.0.0.1:11080,127.0.0.1:10080,mock-version,mock-githash",
}
fpName := "github.com/pingcap/tidb/infoschema/mockClusterInfo"
fpExpr := `return("` + strings.Join(instances, ";") + `")`
c.Assert(failpoint.Enable(fpName, fpExpr), IsNil)
defer func() { c.Assert(failpoint.Disable(fpName), IsNil) }()

tk.MustQuery("select type, instance, status_address, version, git_hash from information_schema.cluster_info").Check(testkit.Rows(
"pd 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
"tidb 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
"tikv 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
))

// enable inspection mode
inspectionTableCache := map[string]variable.TableSnapshot{}
tk.Se.GetSessionVars().InspectionTableCache = inspectionTableCache
tk.MustQuery("select type, instance, status_address, version, git_hash from information_schema.cluster_info").Check(testkit.Rows(
"pd 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
"tidb 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
"tikv 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
))
c.Assert(inspectionTableCache["cluster_info"].Err, IsNil)
c.Assert(len(inspectionTableCache["cluster_info"].Rows), DeepEquals, 3)

// check whether is obtain data from cache at the next time
inspectionTableCache["cluster_info"].Rows[0][0].SetString("modified-pd", mysql.DefaultCollationName)
tk.MustQuery("select type, instance, status_address, version, git_hash from information_schema.cluster_info").Check(testkit.Rows(
"modified-pd 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
"tidb 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
"tikv 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash",
))
tk.Se.GetSessionVars().InspectionTableCache = nil
}

func (s *testInfoschemaTableSuite) TestSchemataTables(c *C) {
tk := testkit.NewTestKit(c, s.store)

Expand Down
25 changes: 10 additions & 15 deletions executor/inspection_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,7 @@ func (e *inspectionResultRetriever) retrieve(ctx context.Context, sctx sessionct
// Some data of cluster-level memory tables will be retrieved many times in different inspection rules,
// and the cost of retrieving some data is expensive. We use the `TableSnapshot` to cache those data
// and obtain them lazily, and provide a consistent view of inspection tables for each inspection rules.
// All cached snapshots should be released at the end of retrieving. So all diagnosis rules should query
// `cluster_config/cluster_hardware/cluster_load/cluster_info` in `inspection_schema`.
// e.g:
// SELECT * FROM inspection_schema.cluster_config
// instead of:
// SELECT * FROM information_schema.cluster_config
// All cached snapshots should be released at the end of retrieving.
sctx.GetSessionVars().InspectionTableCache = map[string]variable.TableSnapshot{}
defer func() { sctx.GetSessionVars().InspectionTableCache = nil }()

Expand Down Expand Up @@ -206,7 +201,7 @@ func (configInspection) inspectDiffConfig(_ context.Context, sctx sessionctx.Con
"raftstore.raftdb-path",
"storage.data-dir",
}
sql := fmt.Sprintf("select type, `key`, count(distinct value) as c from inspection_schema.cluster_config where `key` not in ('%s') group by type, `key` having c > 1",
sql := fmt.Sprintf("select type, `key`, count(distinct value) as c from information_schema.cluster_config where `key` not in ('%s') group by type, `key` having c > 1",
strings.Join(ignoreConfigKey, "','"))
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
if err != nil {
Expand Down Expand Up @@ -253,7 +248,7 @@ func (configInspection) inspectCheckConfig(_ context.Context, sctx sessionctx.Co
if !filter.enable(cas.key) {
continue
}
sql := fmt.Sprintf("select instance from inspection_schema.cluster_config where type = '%s' and `key` = '%s' and value = '%s'",
sql := fmt.Sprintf("select instance from information_schema.cluster_config where type = '%s' and `key` = '%s' and value = '%s'",
cas.tp, cas.key, cas.value)
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
if err != nil {
Expand All @@ -277,7 +272,7 @@ func (configInspection) inspectCheckConfig(_ context.Context, sctx sessionctx.Co

func (versionInspection) inspect(_ context.Context, sctx sessionctx.Context, filter inspectionFilter) []inspectionResult {
// check the configuration consistent
sql := "select type, count(distinct git_hash) as c from inspection_schema.cluster_info group by type having c > 1;"
sql := "select type, count(distinct git_hash) as c from information_schema.cluster_info group by type having c > 1;"
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
if err != nil {
sctx.GetSessionVars().StmtCtx.AppendWarning(fmt.Errorf("check version consistency failed: %v", err))
Expand Down Expand Up @@ -332,19 +327,19 @@ func (c currentLoadInspection) inspect(_ context.Context, sctx sessionctx.Contex
}{
{
"virtual-memory-usage",
"select type, instance, value from inspection_schema.cluster_load where device_type='memory' and device_name='virtual' and name='used-percent' and value > 0.7",
"select type, instance, value from information_schema.cluster_load where device_type='memory' and device_name='virtual' and name='used-percent' and value > 0.7",
"< 0.7",
commonResult,
},
{
"swap-memory-usage",
"select type, instance, value from inspection_schema.cluster_load where device_type='memory' and device_name='swap' and name='used-percent' and value > 0",
"select type, instance, value from information_schema.cluster_load where device_type='memory' and device_name='swap' and name='used-percent' and value > 0",
"0",
commonResult,
},
{
"disk-usage",
"select type, instance, device_name, value from inspection_schema.cluster_hardware where device_type='disk' and name='used-percent' and value > 70",
"select type, instance, device_name, value from information_schema.cluster_hardware where device_type='disk' and name='used-percent' and value > 70",
"< 70",
diskResult,
},
Expand Down Expand Up @@ -374,8 +369,8 @@ func (currentLoadInspection) inspectCPULoad(sctx sessionctx.Context, filter insp
continue
}
sql := fmt.Sprintf(`select t1.*, 0.7 * t2.cpu_core from
(select type, instance, value from inspection_schema.cluster_load where device_type='cpu' and device_name='cpu' and name='%s') as t1 join
(select type,instance, max(value) as cpu_core from inspection_schema.CLUSTER_HARDWARE where DEVICE_TYPE='cpu' and name='cpu-logical-cores' group by type,instance) as t2
(select type, instance, value from information_schema.cluster_load where device_type='cpu' and device_name='cpu' and name='%s') as t1 join
(select type,instance, max(value) as cpu_core from information_schema.CLUSTER_HARDWARE where DEVICE_TYPE='cpu' and name='cpu-logical-cores' group by type,instance) as t2
where t2.instance = t1.instance and t1.type=t2.type and t1.value > 0.7 * t2.cpu_core;`, item)
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
if err != nil {
Expand Down Expand Up @@ -562,7 +557,7 @@ func (thresholdCheckInspection) inspectThreshold1(ctx context.Context, sctx sess
if len(rule.configKey) > 0 {
sql = fmt.Sprintf("select t1.instance, t1.cpu, t2.threshold, t2.value from "+
"(select instance, max(value) as cpu from metrics_schema.tikv_thread_cpu %[4]s and name like '%[1]s' group by instance) as t1,"+
"(select value * %[2]f as threshold, value from inspection_schema.cluster_config where type='tikv' and `key` = '%[3]s' limit 1) as t2 "+
"(select value * %[2]f as threshold, value from information_schema.cluster_config where type='tikv' and `key` = '%[3]s' limit 1) as t2 "+
"where t1.cpu > t2.threshold;", rule.component, rule.threshold, rule.configKey, condition)
} else {
sql = fmt.Sprintf("select t1.instance, t1.cpu, %[2]f from "+
Expand Down
44 changes: 43 additions & 1 deletion executor/memtable_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/terror"
"github.com/pingcap/sysutil"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/infoschema"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
Expand All @@ -61,12 +63,52 @@ type memTableRetriever interface {
// MemTableReaderExec executes memTable information retrieving from the MemTable components
type MemTableReaderExec struct {
baseExecutor
table *model.TableInfo
retriever memTableRetriever
// cacheRetrieved is used to indicate whether has the parent executor retrieved
// from inspection cache in inspection mode.
cacheRetrieved bool
}

func (e *MemTableReaderExec) isInspectionCacheableTable(tblName string) bool {
switch tblName {
case strings.ToLower(infoschema.TableClusterConfig),
strings.ToLower(infoschema.TableClusterInfo),
strings.ToLower(infoschema.TableClusterSystemInfo),
strings.ToLower(infoschema.TableClusterLoad),
strings.ToLower(infoschema.TableClusterHardware):
return true
default:
return false
}
}

// Next implements the Executor Next interface.
func (e *MemTableReaderExec) Next(ctx context.Context, req *chunk.Chunk) error {
rows, err := e.retriever.retrieve(ctx, e.ctx)
var (
rows [][]types.Datum
err error
)

// The `InspectionTableCache` will be assigned in the begin of retrieving` and be
// cleaned at the end of retrieving, so nil represents currently in non-inspection mode.
if cache, tbl := e.ctx.GetSessionVars().InspectionTableCache, e.table.Name.L; cache != nil &&
e.isInspectionCacheableTable(tbl) {
// TODO: cached rows will be returned fully, we should refactor this part.
if !e.cacheRetrieved {
// Obtain data from cache first.
cached, found := cache[tbl]
if !found {
rows, err := e.retriever.retrieve(ctx, e.ctx)
cached = variable.TableSnapshot{Rows: rows, Err: err}
cache[tbl] = cached
}
e.cacheRetrieved = true
rows, err = cached.Rows, cached.Err
}
} else {
rows, err = e.retriever.retrieve(ctx, e.ctx)
}
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions infoschema/infoschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ func (*testSuite) TestT(c *C) {
is := handle.Get()

schemaNames := is.AllSchemaNames()
c.Assert(schemaNames, HasLen, 5)
c.Assert(testutil.CompareUnorderedStringSlice(schemaNames, []string{util.InformationSchemaName.O, util.MetricSchemaName.O, util.PerformanceSchemaName.O, "Test", util.InspectionSchemaName.O}), IsTrue)
c.Assert(schemaNames, HasLen, 4)
c.Assert(testutil.CompareUnorderedStringSlice(schemaNames, []string{util.InformationSchemaName.O, util.MetricSchemaName.O, util.PerformanceSchemaName.O, "Test"}), IsTrue)

schemas := is.AllSchemas()
c.Assert(schemas, HasLen, 5)
c.Assert(schemas, HasLen, 4)
schemas = is.Clone()
c.Assert(schemas, HasLen, 5)
c.Assert(schemas, HasLen, 4)

c.Assert(is.SchemaExists(dbName), IsTrue)
c.Assert(is.SchemaExists(noexist), IsFalse)
Expand Down
Loading

0 comments on commit 45d83d1

Please sign in to comment.