Skip to content

Commit

Permalink
dumpling: fix tidb lock consistency and check tikv problem (pingcap#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
lichunzhu authored Nov 5, 2021
1 parent b74c13a commit 651e910
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 7 deletions.
11 changes: 11 additions & 0 deletions dumpling/export/consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const (
consistencyTypeNone = "none"
)

var tiDBDisableTableLockErr = errors.New("try to apply lock consistency on TiDB but it doesn't enable table lock. please set enable-table-lock=true in tidb server config")

// NewConsistencyController returns a new consistency controller
func NewConsistencyController(ctx context.Context, conf *Config, session *sql.DB) (ConsistencyController, error) {
conn, err := session.Conn(ctx)
Expand Down Expand Up @@ -116,6 +118,15 @@ type ConsistencyLockDumpingTables struct {

// Setup implements ConsistencyController.Setup
func (c *ConsistencyLockDumpingTables) Setup(tctx *tcontext.Context) error {
if c.conf.ServerInfo.ServerType == ServerTypeTiDB {
if enableTableLock, err := CheckTiDBEnableTableLock(c.conn); err != nil || !enableTableLock {
if err != nil {
return err
} else {
return tiDBDisableTableLockErr
}
}
}
blockList := make(map[string]map[string]interface{})
return utils.WithRetry(tctx, func() error {
lockTablesSQL := buildLockTablesSQL(c.conf.Tables, blockList)
Expand Down
62 changes: 60 additions & 2 deletions dumpling/export/consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ package export

import (
"context"
"encoding/json"
"errors"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/require"

dbconfig "github.com/pingcap/tidb/config"
tcontext "github.com/pingcap/tidb/dumpling/context"
)

Expand All @@ -26,7 +28,7 @@ func TestConsistencyController(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

tctx := tcontext.Background().WithContext(ctx)
tctx := tcontext.Background().WithContext(ctx).WithLogger(appLogger)
conf := defaultConfigForTest(t)
resultOk := sqlmock.NewResult(0, 1)

Expand Down Expand Up @@ -55,6 +57,7 @@ func TestConsistencyController(t *testing.T) {
require.NoError(t, ctrl.Setup(tctx))
require.NoError(t, ctrl.TearDown(tctx))

conf.ServerInfo.ServerType = ServerTypeMySQL
conf.Consistency = consistencyTypeLock
conf.Tables = NewDatabaseTables().
AppendTables("db1", []string{"t1", "t2", "t3"}, []uint64{1, 2, 3}).
Expand Down Expand Up @@ -84,6 +87,7 @@ func TestConsistencyLockControllerRetry(t *testing.T) {
conf := defaultConfigForTest(t)
resultOk := sqlmock.NewResult(0, 1)

conf.ServerInfo.ServerType = ServerTypeMySQL
conf.Consistency = consistencyTypeLock
conf.Tables = NewDatabaseTables().
AppendTables("db1", []string{"t1", "t2", "t3"}, []uint64{1, 2, 3}).
Expand Down Expand Up @@ -140,7 +144,7 @@ func TestConsistencyControllerError(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tctx := tcontext.Background().WithContext(ctx)
tctx := tcontext.Background().WithContext(ctx).WithLogger(appLogger)
conf := defaultConfigForTest(t)

conf.Consistency = "invalid_str"
Expand Down Expand Up @@ -169,3 +173,57 @@ func TestConsistencyControllerError(t *testing.T) {
err = ctrl.Setup(tctx)
require.Error(t, err)
}

func TestConsistencyLockTiDBCheck(t *testing.T) {
t.Parallel()

db, mock, err := sqlmock.New()
require.NoError(t, err)
defer func() {
_ = db.Close()
}()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tctx := tcontext.Background().WithContext(ctx).WithLogger(appLogger)
conf := defaultConfigForTest(t)
resultOk := sqlmock.NewResult(0, 1)

conf.ServerInfo.ServerType = ServerTypeTiDB
conf.Consistency = consistencyTypeLock
conf.Tables = NewDatabaseTables().
AppendTables("db1", []string{"t1"}, []uint64{1})
ctrl, err := NewConsistencyController(ctx, conf, db)
require.NoError(t, err)

// no tidb_config found, don't allow to lock tables
unknownSysVarErr := errors.New("ERROR 1193 (HY000): Unknown system variable 'tidb_config'")
mock.ExpectQuery("SELECT @@tidb_config").WillReturnError(unknownSysVarErr)
err = ctrl.Setup(tctx)
require.ErrorIs(t, err, unknownSysVarErr)
require.NoError(t, mock.ExpectationsWereMet())

// enable-table-lock is false, don't allow to lock tables
tidbConf := dbconfig.NewConfig()
tidbConf.EnableTableLock = false
tidbConfBytes, err := json.Marshal(tidbConf)
require.NoError(t, err)
mock.ExpectQuery("SELECT @@tidb_config").WillReturnRows(
sqlmock.NewRows([]string{"@@tidb_config"}).AddRow(string(tidbConfBytes)))
err = ctrl.Setup(tctx)
require.ErrorIs(t, err, tiDBDisableTableLockErr)
require.NoError(t, mock.ExpectationsWereMet())

// enable-table-lock is true, allow to lock tables
tidbConf.EnableTableLock = true
tidbConfBytes, err = json.Marshal(tidbConf)
require.NoError(t, err)
mock.ExpectQuery("SELECT @@tidb_config").WillReturnRows(
sqlmock.NewRows([]string{"@@tidb_config"}).AddRow(string(tidbConfBytes)))
mock.ExpectExec("LOCK TABLES `db1`.`t1` READ").WillReturnResult(resultOk)
mock.ExpectExec("UNLOCK TABLES").WillReturnResult(resultOk)
err = ctrl.Setup(tctx)
require.NoError(t, err)
require.NoError(t, ctrl.TearDown(tctx))
require.NoError(t, mock.ExpectationsWereMet())
}
47 changes: 42 additions & 5 deletions dumpling/export/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bytes"
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
"math"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/pingcap/errors"
"go.uber.org/zap"

dbconfig "github.com/pingcap/tidb/config"
tcontext "github.com/pingcap/tidb/dumpling/context"
"github.com/pingcap/tidb/dumpling/log"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -234,7 +236,7 @@ func ListAllDatabasesTables(tctx *tcontext.Context, db *sql.Conn, databaseNames
}
}
default:
queryTemplate := "SHOW TABLE STATUS FROM `%s`"
const queryTemplate = "SHOW TABLE STATUS FROM `%s`"
selectedTableType := make(map[TableType]struct{})
for _, tableType = range tableTypes {
selectedTableType[tableType] = struct{}{}
Expand Down Expand Up @@ -629,7 +631,7 @@ func GetSpecifiedColumnValuesAndClose(rows *sql.Rows, columnName ...string) ([][

// GetPdAddrs gets PD address from TiDB
func GetPdAddrs(tctx *tcontext.Context, db *sql.DB) ([]string, error) {
query := "SELECT * FROM information_schema.cluster_info where type = 'pd';"
const query = "SELECT * FROM information_schema.cluster_info where type = 'pd';"
rows, err := db.QueryContext(tctx, query)
if err != nil {
return []string{}, errors.Annotatef(err, "sql: %s", query)
Expand All @@ -640,7 +642,7 @@ func GetPdAddrs(tctx *tcontext.Context, db *sql.DB) ([]string, error) {

// GetTiDBDDLIDs gets DDL IDs from TiDB
func GetTiDBDDLIDs(tctx *tcontext.Context, db *sql.DB) ([]string, error) {
query := "SELECT * FROM information_schema.tidb_servers_info;"
const query = "SELECT * FROM information_schema.tidb_servers_info;"
rows, err := db.QueryContext(tctx, query)
if err != nil {
return []string{}, errors.Annotatef(err, "sql: %s", query)
Expand All @@ -649,12 +651,38 @@ func GetTiDBDDLIDs(tctx *tcontext.Context, db *sql.DB) ([]string, error) {
return ddlIDs, errors.Annotatef(err, "sql: %s", query)
}

// getTiDBConfig gets tidb config from TiDB server
// @@tidb_config details doc https://docs.pingcap.com/tidb/stable/system-variables#tidb_config
// this variable exists at least from v2.0.0, so this works in most existing tidb instances
func getTiDBConfig(db *sql.Conn) (dbconfig.Config, error) {
const query = "SELECT @@tidb_config;"
var (
tidbConfig dbconfig.Config
tidbConfigBytes []byte
)
row := db.QueryRowContext(context.Background(), query)
err := row.Scan(&tidbConfigBytes)
if err != nil {
return tidbConfig, errors.Annotatef(err, "sql: %s", query)
}
err = json.Unmarshal(tidbConfigBytes, &tidbConfig)
return tidbConfig, errors.Annotatef(err, "sql: %s", query)
}

// CheckTiDBWithTiKV use sql to check whether current TiDB has TiKV
func CheckTiDBWithTiKV(db *sql.DB) (bool, error) {
conn, err := db.Conn(context.Background())
if err == nil {
defer conn.Close()
tidbConfig, err := getTiDBConfig(conn)
if err == nil {
return tidbConfig.Store == "tikv", nil
}
}
var count int
query := "SELECT COUNT(1) as c FROM MYSQL.TiDB WHERE VARIABLE_NAME='tikv_gc_safe_point'"
const query = "SELECT COUNT(1) as c FROM MYSQL.TiDB WHERE VARIABLE_NAME='tikv_gc_safe_point'"
row := db.QueryRow(query)
err := row.Scan(&count)
err = row.Scan(&count)
if err != nil {
// still return true here. Because sometimes users may not have privileges for MySQL.TiDB database
// In most production cases TiDB has TiKV
Expand All @@ -663,6 +691,15 @@ func CheckTiDBWithTiKV(db *sql.DB) (bool, error) {
return count > 0, nil
}

// CheckTiDBEnableTableLock use sql variable to check whether current TiDB has TiKV
func CheckTiDBEnableTableLock(db *sql.Conn) (bool, error) {
tidbConfig, err := getTiDBConfig(db)
if err != nil {
return false, err
}
return tidbConfig.EnableTableLock, nil
}

func getSnapshot(db *sql.Conn) (string, error) {
str, err := ShowMasterStatus(db)
if err != nil {
Expand Down
51 changes: 51 additions & 0 deletions dumpling/export/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"database/sql"
"database/sql/driver"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"os"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/pingcap/errors"
"github.com/stretchr/testify/require"

dbconfig "github.com/pingcap/tidb/config"
tcontext "github.com/pingcap/tidb/dumpling/context"
)

Expand Down Expand Up @@ -1540,6 +1542,55 @@ func TestBuildVersion3RegionQueries(t *testing.T) {
}
}

func TestCheckTiDBWithTiKV(t *testing.T) {
t.Parallel()

db, mock, err := sqlmock.New()
require.NoError(t, err)
defer func() {
_ = db.Close()
}()

tidbConf := dbconfig.NewConfig()
stores := []string{"unistore", "mocktikv", "tikv"}
for _, store := range stores {
tidbConf.Store = store
tidbConfBytes, err := json.Marshal(tidbConf)
require.NoError(t, err)
mock.ExpectQuery("SELECT @@tidb_config").WillReturnRows(
sqlmock.NewRows([]string{"@@tidb_config"}).AddRow(string(tidbConfBytes)))
hasTiKV, err := CheckTiDBWithTiKV(db)
require.NoError(t, err)
if store == "tikv" {
require.True(t, hasTiKV)
} else {
require.False(t, hasTiKV)
}
require.NoError(t, mock.ExpectationsWereMet())
}

errLackPrivilege := errors.New("ERROR 1142 (42000): SELECT command denied to user 'test'@'%' for table 'tidb'")
expectedResults := []interface{}{errLackPrivilege, 1, 0}
for i, res := range expectedResults {
t.Logf("case #%d", i)
mock.ExpectQuery("SELECT @@tidb_config").WillReturnError(errLackPrivilege)
expectedErr, ok := res.(error)
if ok {
mock.ExpectQuery("SELECT COUNT").WillReturnError(expectedErr)
hasTiKV, err := CheckTiDBWithTiKV(db)
require.ErrorIs(t, err, expectedErr)
require.True(t, hasTiKV)
} else if cnt, ok := res.(int); ok {
mock.ExpectQuery("SELECT COUNT").WillReturnRows(
sqlmock.NewRows([]string{"c"}).AddRow(cnt))
hasTiKV, err := CheckTiDBWithTiKV(db)
require.NoError(t, err)
require.Equal(t, cnt > 0, hasTiKV)
}
require.NoError(t, mock.ExpectationsWereMet())
}
}

func TestPickupPossibleField(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 651e910

Please sign in to comment.