Skip to content

Commit

Permalink
infoschema, executor: Add SQL text column to DATA_LOCK_WAITS table (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
MyonKeminta authored Aug 3, 2021
1 parent b780396 commit de3bc62
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 95 deletions.
12 changes: 10 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1593,8 +1593,7 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
strings.ToLower(infoschema.TableClientErrorsSummaryByUser),
strings.ToLower(infoschema.TableClientErrorsSummaryByHost),
strings.ToLower(infoschema.TableDeadlocks),
strings.ToLower(infoschema.ClusterTableDeadlocks),
strings.ToLower(infoschema.TableDataLockWaits):
strings.ToLower(infoschema.ClusterTableDeadlocks):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
table: v.Table,
Expand All @@ -1613,6 +1612,15 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
columns: v.Columns,
},
}
case strings.ToLower(infoschema.TableDataLockWaits):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
table: v.Table,
retriever: &dataLockWaitsTableRetriever{
table: v.Table,
columns: v.Columns,
},
}
case strings.ToLower(infoschema.TableStatementsSummary),
strings.ToLower(infoschema.TableStatementsSummaryHistory),
strings.ToLower(infoschema.ClusterTableStatementsSummaryHistory),
Expand Down
180 changes: 137 additions & 43 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -161,8 +162,6 @@ func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Contex
err = e.setDataForDeadlock(sctx)
case infoschema.ClusterTableDeadlocks:
err = e.setDataForClusterDeadlock(sctx)
case infoschema.TableDataLockWaits:
err = e.setDataForTableDataLockWaits(sctx)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -2066,47 +2065,6 @@ func (e *memtableRetriever) setDataForClusterDeadlock(ctx sessionctx.Context) er
return nil
}

func (e *memtableRetriever) setDataForTableDataLockWaits(ctx sessionctx.Context) error {
if !hasPriv(ctx, mysql.ProcessPriv) {
return plannercore.ErrSpecificAccessDenied.GenWithStackByArgs("PROCESS")
}
waits, err := ctx.GetStore().GetLockWaits()
if err != nil {
return err
}
for _, wait := range waits {
var digestStr interface{}
digest, err := resourcegrouptag.DecodeResourceGroupTag(wait.ResourceGroupTag)
if err != nil {
logutil.BgLogger().Warn("failed to decode resource group tag", zap.Error(err))
digestStr = nil
} else {
digestStr = hex.EncodeToString(digest)
}
infoSchema := ctx.GetInfoSchema().(infoschema.InfoSchema)
var decodedKeyStr interface{} = nil
decodedKey, err := keydecoder.DecodeKey(wait.Key, infoSchema)
if err == nil {
decodedKeyBytes, err := json.Marshal(decodedKey)
if err != nil {
logutil.BgLogger().Warn("marshal decoded key info to JSON failed", zap.Error(err))
} else {
decodedKeyStr = string(decodedKeyBytes)
}
} else {
logutil.BgLogger().Warn("decode key failed", zap.Error(err))
}
e.rows = append(e.rows, types.MakeDatums(
strings.ToUpper(hex.EncodeToString(wait.Key)),
decodedKeyStr,
wait.Txn,
wait.WaitForTxn,
digestStr,
))
}
return nil
}

type stmtSummaryTableRetriever struct {
dummyCloser
table *model.TableInfo
Expand Down Expand Up @@ -2256,6 +2214,142 @@ func (e *tidbTrxTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Co
return res, nil
}

// dataLockWaitsTableRetriever is the memtable retriever for the DATA_LOCK_WAITS table.
type dataLockWaitsTableRetriever struct {
dummyCloser
batchRetrieverHelper
table *model.TableInfo
columns []*model.ColumnInfo
lockWaits []*deadlock.WaitForEntry
initialized bool
}

func (r *dataLockWaitsTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
if r.retrieved {
return nil, nil
}

if !r.initialized {
if !hasPriv(sctx, mysql.ProcessPriv) {
return nil, plannercore.ErrSpecificAccessDenied.GenWithStackByArgs("PROCESS")
}

r.initialized = true
var err error
r.lockWaits, err = sctx.GetStore().GetLockWaits()
if err != nil {
r.retrieved = true
return nil, err
}

r.batchRetrieverHelper.totalRows = len(r.lockWaits)
r.batchRetrieverHelper.batchSize = 1024
}

var res [][]types.Datum

err := r.nextBatch(func(start, end int) error {
// Before getting rows, collect the SQL digests that needs to be retrieved first.
var needDigest bool
var needSQLText bool
for _, c := range r.columns {
if c.Name.O == infoschema.DataLockWaitsColumnSQLDigestText {
needSQLText = true
} else if c.Name.O == infoschema.DataLockWaitsColumnSQLDigest {
needDigest = true
}
}

var digests []string
if needDigest || needSQLText {
digests = make([]string, end-start)
for i, lockWait := range r.lockWaits {
digest, err := resourcegrouptag.DecodeResourceGroupTag(lockWait.ResourceGroupTag)
if err != nil {
// Ignore the error if failed to decode the digest from resource_group_tag. We still want to show
// as much information as possible even we can't retrieve some of them.
logutil.Logger(ctx).Warn("failed to decode resource group tag", zap.Error(err))
} else {
digests[i] = hex.EncodeToString(digest)
}
}
}

// Fetch the SQL Texts of the digests above if necessary.
var sqlRetriever *SQLDigestTextRetriever
if needSQLText {
sqlRetriever = NewSQLDigestTextRetriever()
for _, digest := range digests {
if len(digest) > 0 {
sqlRetriever.SQLDigestsMap[digest] = ""
}
}
err := sqlRetriever.RetrieveGlobal(ctx, sctx)
if err != nil {
return errors.Trace(err)
}
}

// Calculate rows.
res = make([][]types.Datum, 0, end-start)
for rowIdx, lockWait := range r.lockWaits[start:end] {
row := make([]types.Datum, 0, len(r.columns))

for _, col := range r.columns {
switch col.Name.O {
case infoschema.DataLockWaitsColumnKey:
row = append(row, types.NewDatum(strings.ToUpper(hex.EncodeToString(lockWait.Key))))
case infoschema.DataLockWaitsColumnKeyInfo:
infoSchema := sctx.GetInfoSchema().(infoschema.InfoSchema)
var decodedKeyStr interface{} = nil
decodedKey, err := keydecoder.DecodeKey(lockWait.Key, infoSchema)
if err == nil {
decodedKeyBytes, err := json.Marshal(decodedKey)
if err != nil {
logutil.BgLogger().Warn("marshal decoded key info to JSON failed", zap.Error(err))
} else {
decodedKeyStr = string(decodedKeyBytes)
}
} else {
logutil.BgLogger().Warn("decode key failed", zap.Error(err))
}
row = append(row, types.NewDatum(decodedKeyStr))
case infoschema.DataLockWaitsColumnTrxID:
row = append(row, types.NewDatum(lockWait.Txn))
case infoschema.DataLockWaitsColumnCurrentHoldingTrxID:
row = append(row, types.NewDatum(lockWait.WaitForTxn))
case infoschema.DataLockWaitsColumnSQLDigest:
digest := digests[rowIdx]
if len(digest) == 0 {
row = append(row, types.NewDatum(nil))
} else {
row = append(row, types.NewDatum(digest))
}
case infoschema.DataLockWaitsColumnSQLDigestText:
text := sqlRetriever.SQLDigestsMap[digests[rowIdx]]
if len(text) > 0 {
row = append(row, types.NewDatum(text))
} else {
row = append(row, types.NewDatum(nil))
}
default:
row = append(row, types.NewDatum(nil))
}
}

res = append(res, row)
}

return nil
})

if err != nil {
return nil, err
}

return res, nil
}

type hugeMemTableRetriever struct {
dummyCloser
table *model.TableInfo
Expand Down
28 changes: 22 additions & 6 deletions infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,21 @@ const (
TableDataLockWaits = "DATA_LOCK_WAITS"
)

const (
// DataLockWaitsColumnKey is the name of the KEY column of the DATA_LOCK_WAITS table.
DataLockWaitsColumnKey = "KEY"
// DataLockWaitsColumnKeyInfo is the name of the KEY_INFO column of the DATA_LOCK_WAITS table.
DataLockWaitsColumnKeyInfo = "KEY_INFO"
// DataLockWaitsColumnTrxID is the name of the TRX_ID column of the DATA_LOCK_WAITS table.
DataLockWaitsColumnTrxID = "TRX_ID"
// DataLockWaitsColumnCurrentHoldingTrxID is the name of the CURRENT_HOLDING_TRX_ID column of the DATA_LOCK_WAITS table.
DataLockWaitsColumnCurrentHoldingTrxID = "CURRENT_HOLDING_TRX_ID"
// DataLockWaitsColumnSQLDigest is the name of the SQL_DIGEST column of the DATA_LOCK_WAITS table.
DataLockWaitsColumnSQLDigest = "SQL_DIGEST"
// DataLockWaitsColumnSQLDigestText is the name of the SQL_DIGEST_TEXT column of the DATA_LOCK_WAITS table.
DataLockWaitsColumnSQLDigestText = "SQL_DIGEST_TEXT"
)

var tableIDMap = map[string]int64{
TableSchemata: autoid.InformationSchemaDBID + 1,
TableTables: autoid.InformationSchemaDBID + 2,
Expand Down Expand Up @@ -1380,16 +1395,17 @@ var tableDeadlocksCols = []columnInfo{
{name: "TRY_LOCK_TRX_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag | mysql.UnsignedFlag, comment: "The transaction ID (start ts) of the transaction that's trying to acquire the lock"},
{name: "CURRENT_SQL_DIGEST", tp: mysql.TypeVarchar, size: 64, comment: "The digest of the SQL that's being blocked"},
{name: "KEY", tp: mysql.TypeBlob, size: types.UnspecifiedLength, comment: "The key on which a transaction is waiting for another"},
{name: "KEY_INFO", tp: mysql.TypeVarchar, size: 64, flag: mysql.NotNullFlag, comment: "Information of the key"},
{name: "KEY_INFO", tp: mysql.TypeBlob, size: types.UnspecifiedLength, flag: mysql.NotNullFlag, comment: "Information of the key"},
{name: "TRX_HOLDING_LOCK", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag | mysql.UnsignedFlag, comment: "The transaction ID (start ts) of the transaction that's currently holding the lock"},
}

var tableDataLockWaitsCols = []columnInfo{
{name: "KEY", tp: mysql.TypeVarchar, size: 64, flag: mysql.NotNullFlag, comment: "The key that's being waiting on"},
{name: "KEY_INFO", tp: mysql.TypeVarchar, size: 64, flag: mysql.NotNullFlag, comment: "Information of the key"},
{name: "TRX_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag | mysql.UnsignedFlag, comment: "Current transaction that's waiting for the lock"},
{name: "CURRENT_HOLDING_TRX_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag | mysql.UnsignedFlag, comment: "The transaction that's holding the lock and blocks the current transaction"},
{name: "SQL_DIGEST", tp: mysql.TypeVarchar, size: 64, comment: "Digest of the SQL that's trying to acquire the lock"},
{name: DataLockWaitsColumnKey, tp: mysql.TypeBlob, size: types.UnspecifiedLength, flag: mysql.NotNullFlag, comment: "The key that's being waiting on"},
{name: DataLockWaitsColumnKeyInfo, tp: mysql.TypeBlob, size: types.UnspecifiedLength, flag: mysql.NotNullFlag, comment: "Information of the key"},
{name: DataLockWaitsColumnTrxID, tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag | mysql.UnsignedFlag, comment: "Current transaction that's waiting for the lock"},
{name: DataLockWaitsColumnCurrentHoldingTrxID, tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag | mysql.UnsignedFlag, comment: "The transaction that's holding the lock and blocks the current transaction"},
{name: DataLockWaitsColumnSQLDigest, tp: mysql.TypeVarchar, size: 64, comment: "Digest of the SQL that's trying to acquire the lock"},
{name: DataLockWaitsColumnSQLDigestText, tp: mysql.TypeBlob, size: types.UnspecifiedLength, comment: "Digest of the SQL that's trying to acquire the lock"},
}

var tableStatementsSummaryEvictedCols = []columnInfo{
Expand Down
58 changes: 24 additions & 34 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package infoschema_test

import (
"crypto/tls"
"encoding/hex"
"fmt"
"math"
"net"
Expand Down Expand Up @@ -49,7 +48,6 @@ import (
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/mockstorage"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/kvcache"
"github.com/pingcap/tidb/util/pdapi"
Expand All @@ -58,22 +56,16 @@ import (
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/testutil"
"github.com/tikv/client-go/v2/tikv"
"google.golang.org/grpc"
)

var _ = Suite(&testTableSuite{&testTableSuiteBase{}})
var _ = Suite(&testDataLockWaitSuite{&testTableSuiteBase{}})
var _ = SerialSuites(&testClusterTableSuite{testTableSuiteBase: &testTableSuiteBase{}})

type testTableSuite struct {
*testTableSuiteBase
}

type testDataLockWaitSuite struct {
*testTableSuiteBase
}

type testTableSuiteBase struct {
store kv.Storage
dom *domain.Domain
Expand Down Expand Up @@ -1810,39 +1802,36 @@ func (s *testTableSuite) TestInfoschemaDeadlockPrivilege(c *C) {
_ = tk.MustQuery("select * from information_schema.deadlocks")
}

func (s *testDataLockWaitSuite) SetUpSuite(c *C) {
testleak.BeforeTest()

client, pdClient, cluster, err := unistore.New("")
c.Assert(err, IsNil)
unistore.BootstrapWithSingleStore(cluster)
kvstore, err := tikv.NewTestTiKVStore(client, pdClient, nil, nil, 0)
c.Assert(err, IsNil)
_, digest1 := parser.NormalizeDigest("select * from t1 for update;")
_, digest2 := parser.NormalizeDigest("update t1 set f1=1 where id=2;")
s.store, err = mockstorage.NewMockStorageWithLockWaits(kvstore, []*deadlock.WaitForEntry{
{Txn: 1, WaitForTxn: 2, KeyHash: 3, Key: []byte("a"), ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(digest1, nil)},
{Txn: 4, WaitForTxn: 5, KeyHash: 6, Key: []byte("b"), ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(digest2, nil)},
func (s *testClusterTableSuite) TestDataLockWaits(c *C) {
_, digest1 := parser.NormalizeDigest("select * from test_data_lock_waits for update")
_, digest2 := parser.NormalizeDigest("update test_data_lock_waits set f1=1 where id=2")
s.store.(mockstorage.MockLockWaitSetter).SetMockLockWaits([]*deadlock.WaitForEntry{
{Txn: 1, WaitForTxn: 2, Key: []byte("key1"), ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(digest1, nil)},
{Txn: 3, WaitForTxn: 4, Key: []byte("key2"), ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(digest2, nil)},
// Invalid digests
{Txn: 5, WaitForTxn: 6, Key: []byte("key3"), ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(nil, nil)},
{Txn: 7, WaitForTxn: 8, Key: []byte("key4"), ResourceGroupTag: []byte("asdfghjkl")},
})
c.Assert(err, IsNil)
session.DisableStats4Test()
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
}

func (s *testDataLockWaitSuite) TestDataLockWait(c *C) {
_, digest1 := parser.NormalizeDigest("select * from t1 for update;")
_, digest2 := parser.NormalizeDigest("update t1 set f1=1 where id=2;")
keyHex1 := hex.EncodeToString([]byte("a"))
keyHex2 := hex.EncodeToString([]byte("b"))
tk := s.newTestKitWithRoot(c)
tk.MustQuery("select * from information_schema.DATA_LOCK_WAITS;").
Check(testkit.Rows(keyHex1+" <nil> 1 2 "+digest1.String(), keyHex2+" <nil> 4 5 "+digest2.String()))

// Execute one of the query once so it's stored into statements_summary.
tk.MustExec("create table test_data_lock_waits (id int primary key, f1 int)")
tk.MustExec("select * from test_data_lock_waits for update")

tk.MustQuery("select * from information_schema.DATA_LOCK_WAITS").Check(testkit.Rows(
"6B657931 <nil> 1 2 "+digest1.String()+" select * from `test_data_lock_waits` for update",
"6B657932 <nil> 3 4 "+digest2.String()+" <nil>",
"6B657933 <nil> 5 6 <nil> <nil>",
"6B657934 <nil> 7 8 <nil> <nil>"))
}

func (s *testDataLockWaitSuite) TestDataLockPrivilege(c *C) {
func (s *testClusterTableSuite) TestDataLockWaitsPrivilege(c *C) {
dropUserTk := s.newTestKitWithRoot(c)

tk := s.newTestKitWithRoot(c)
tk.MustExec("create user 'testuser'@'localhost'")
defer dropUserTk.MustExec("drop user 'testuser'@'localhost'")
c.Assert(tk.Se.Auth(&auth.UserIdentity{
Username: "testuser",
Hostname: "localhost",
Expand All @@ -1853,6 +1842,7 @@ func (s *testDataLockWaitSuite) TestDataLockPrivilege(c *C) {

tk = s.newTestKitWithRoot(c)
tk.MustExec("create user 'testuser2'@'localhost'")
defer dropUserTk.MustExec("drop user 'testuser2'@'localhost'")
tk.MustExec("grant process on *.* to 'testuser2'@'localhost'")
c.Assert(tk.Se.Auth(&auth.UserIdentity{
Username: "testuser2",
Expand Down
3 changes: 2 additions & 1 deletion planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ func (p *LogicalMemTable) PruneColumns(parentUsedCols []*expression.Column) erro
infoschema.ClusterTableStatementsSummaryHistory,
infoschema.ClusterTableSlowLog,
infoschema.TableTiDBTrx,
infoschema.ClusterTableTiDBTrx:
infoschema.ClusterTableTiDBTrx,
infoschema.TableDataLockWaits:
default:
return nil
}
Expand Down
Loading

0 comments on commit de3bc62

Please sign in to comment.