Skip to content

Commit

Permalink
dumpling/sql: fix pick up possible field (pingcap#29399)
Browse files Browse the repository at this point in the history
  • Loading branch information
lichunzhu authored Nov 4, 2021
1 parent 3f7a933 commit 4820f27
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 14 deletions.
2 changes: 1 addition & 1 deletion dumpling/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ func checkTiDBTableRegionPkFields(pkFields, pkColTypes []string) (err error) {
err = errors.Errorf("unsupported primary key for selectTableRegion. pkFields: [%s], pkColTypes: [%s]", strings.Join(pkFields, ", "), strings.Join(pkColTypes, ", "))
return
}
if _, ok := dataTypeNum[pkColTypes[0]]; !ok {
if _, ok := dataTypeInt[pkColTypes[0]]; !ok {
err = errors.Errorf("unsupported primary key type for selectTableRegion. pkFields: [%s], pkColTypes: [%s]", strings.Join(pkFields, ", "), strings.Join(pkColTypes, ", "))
}
return
Expand Down
64 changes: 55 additions & 9 deletions dumpling/export/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"database/sql"
"fmt"
"io"
"math"
"net/url"
"strconv"
"strings"
Expand Down Expand Up @@ -432,6 +433,9 @@ func GetPrimaryKeyColumns(db *sql.Conn, database, table string) ([]string, error
return cols, nil
}

// getNumericIndex picks up indices according to the following priority:
// primary key > unique key with the smallest count > key with the max cardinality
// primary key with multi cols is before unique key with single col because we will sort result by primary keys
func getNumericIndex(db *sql.Conn, meta TableMeta) (string, error) {
database, table := meta.DatabaseName(), meta.TableName()
colName2Type := string2Map(meta.ColumnNames(), meta.ColumnTypes())
Expand All @@ -440,22 +444,64 @@ func getNumericIndex(db *sql.Conn, meta TableMeta) (string, error) {
if err != nil {
return "", errors.Annotatef(err, "sql: %s", keyQuery)
}
results, err := GetSpecifiedColumnValuesAndClose(rows, "NON_UNIQUE", "KEY_NAME", "COLUMN_NAME")
results, err := GetSpecifiedColumnValuesAndClose(rows, "NON_UNIQUE", "SEQ_IN_INDEX", "KEY_NAME", "COLUMN_NAME", "CARDINALITY")
if err != nil {
return "", errors.Annotatef(err, "sql: %s", keyQuery)
}
uniqueColumnName := ""
type keyColumnPair struct {
colName string
count uint64
}
var (
uniqueKeyMap = map[string]keyColumnPair{} // unique key name -> key column name, unique key columns count
keyColumn string
maxCardinality int64 = -1
)

// check primary key first, then unique key
for _, oneRow := range results {
var ok bool
if _, ok = dataTypeNum[colName2Type[oneRow[2]]]; ok && oneRow[1] == "PRIMARY" {
return oneRow[2], nil
nonUnique, seqInIndex, keyName, colName, cardinality := oneRow[0], oneRow[1], oneRow[2], oneRow[3], oneRow[4]
// only try pick the first column, because the second column of pk/uk in where condition will trigger a full table scan
if seqInIndex != "1" {
if pair, ok := uniqueKeyMap[keyName]; ok {
seqInIndexInt, err := strconv.ParseUint(seqInIndex, 10, 64)
if err == nil && seqInIndexInt > pair.count {
uniqueKeyMap[keyName] = keyColumnPair{pair.colName, seqInIndexInt}
}
}
continue
}
if uniqueColumnName != "" && oneRow[0] == "0" && ok {
uniqueColumnName = oneRow[2]
_, numberColumn := dataTypeInt[colName2Type[colName]]
if numberColumn {
switch {
case keyName == "PRIMARY":
return colName, nil
case nonUnique == "0":
uniqueKeyMap[keyName] = keyColumnPair{colName, 1}
// pick index column with max cardinality when there is no unique index
case len(uniqueKeyMap) == 0:
cardinalityInt, err := strconv.ParseInt(cardinality, 10, 64)
if err == nil && cardinalityInt > maxCardinality {
keyColumn = colName
maxCardinality = cardinalityInt
}
}
}
}
if len(uniqueKeyMap) > 0 {
var (
minCols uint64 = math.MaxUint64
uniqueKeyColumn string
)
for _, pair := range uniqueKeyMap {
if pair.count < minCols {
uniqueKeyColumn = pair.colName
minCols = pair.count
}
}
return uniqueKeyColumn, nil
}
return uniqueColumnName, nil
return keyColumn, nil
}

// FlushTableWithReadLock flush tables with read lock
Expand Down Expand Up @@ -948,7 +994,7 @@ func pickupPossibleField(meta TableMeta, db *sql.Conn) (string, error) {
if meta.HasImplicitRowID() {
return "_tidb_rowid", nil
}
// try to use pk
// try to use pk or uk
fieldName, err := getNumericIndex(db, meta)
if err != nil {
return "", err
Expand Down
145 changes: 145 additions & 0 deletions dumpling/export/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1540,6 +1540,151 @@ func TestBuildVersion3RegionQueries(t *testing.T) {
}
}

func TestPickupPossibleField(t *testing.T) {
t.Parallel()

db, mock, err := sqlmock.New()
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()

conn, err := db.Conn(context.Background())
require.NoError(t, err)

meta := &mockTableIR{
dbName: database,
tblName: table,
colNames: []string{"string1", "int1", "int2", "float1", "bin1", "int3", "bool1", "int4"},
colTypes: []string{"VARCHAR", "INT", "BIGINT", "FLOAT", "BINARY", "MEDIUMINT", "BOOL", "TINYINT"},
specCmt: []string{
"/*!40101 SET NAMES binary*/;",
},
}

testCases := []struct {
expectedErr error
expectedField string
hasImplicitRowID bool
showIndexResults [][]driver.Value
}{
{
errors.New("show index error"),
"",
false,
nil,
}, {
nil,
"_tidb_rowid",
true,
nil,
}, // both primary and unique key columns are integers, use primary key first
{
nil,
"int1",
false,
[][]driver.Value{
{table, 0, "PRIMARY", 1, "int1", "A", 2, nil, nil, "", "BTREE", "", ""},
{table, 0, "PRIMARY", 2, "float1", "A", 2, nil, nil, "", "BTREE", "", ""},
{table, 0, "int2", 1, "int2", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 1, "string1", 1, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 1, "int3", 1, "int3", "A", 20, nil, nil, "YES", "BTREE", "", ""},
},
}, // primary key doesn't have integer at seq 1, use unique key with integer
{
nil,
"int2",
false,
[][]driver.Value{
{table, 0, "PRIMARY", 1, "float1", "A", 2, nil, nil, "", "BTREE", "", ""},
{table, 0, "PRIMARY", 2, "int1", "A", 2, nil, nil, "", "BTREE", "", ""},
{table, 0, "int2", 1, "int2", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 1, "string1", 1, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 1, "int3", 1, "int3", "A", 20, nil, nil, "YES", "BTREE", "", ""},
},
}, // several unique keys, use unique key who has a integer in seq 1
{
nil,
"int1",
false,
[][]driver.Value{
{table, 0, "u1", 1, "int1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u1", 2, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u1", 3, "bin1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u2", 1, "float1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u2", 2, "int2", "A", 2, nil, nil, "YES", "BTREE", "", ""},
},
}, // several unique keys and ordinary keys, use unique key who has a integer in seq 1
{
nil,
"int1",
false,
[][]driver.Value{
{table, 0, "u1", 1, "float1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u1", 2, "int2", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u2", 1, "int1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u2", 2, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u2", 3, "bin1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 1, "int3", 1, "int3", "A", 2, nil, nil, "YES", "BTREE", "", ""},
},
}, // several unique keys and ordinary keys, use unique key who has less columns
{
nil,
"int2",
false,
[][]driver.Value{
{table, 0, "u1", 1, "int1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u1", 2, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u1", 3, "bin1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u2", 1, "int2", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u2", 2, "string1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 1, "int3", 1, "int3", "A", 20, nil, nil, "YES", "BTREE", "", ""},
},
}, // several unique keys and ordinary keys, use key who has max cardinality
{
nil,
"int2",
false,
[][]driver.Value{
{table, 0, "PRIMARY", 1, "string1", "A", 2, nil, nil, "", "BTREE", "", ""},
{table, 0, "u1", 1, "float1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 0, "u1", 2, "int3", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 1, "i1", 1, "int1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 1, "i2", 1, "int2", "A", 5, nil, nil, "YES", "BTREE", "", ""},
{table, 1, "i2", 2, "bool1", "A", 2, nil, nil, "YES", "BTREE", "", ""},
{table, 1, "i3", 1, "bin1", "A", 10, nil, nil, "YES", "BTREE", "", ""},
{table, 1, "i3", 2, "int4", "A", 10, nil, nil, "YES", "BTREE", "", ""},
},
},
}

query := fmt.Sprintf("SHOW INDEX FROM `%s`.`%s`", database, table)
for i, testCase := range testCases {
t.Logf("case #%d", i)

meta.hasImplicitRowID = testCase.hasImplicitRowID
expectedErr := testCase.expectedErr
if expectedErr != nil {
mock.ExpectQuery(query).WillReturnError(expectedErr)
} else if !testCase.hasImplicitRowID {
rows := sqlmock.NewRows(showIndexHeaders)
for _, showIndexResult := range testCase.showIndexResults {
rows.AddRow(showIndexResult...)
}
mock.ExpectQuery(query).WillReturnRows(rows)
}

field, err := pickupPossibleField(meta, conn)
if expectedErr != nil {
require.ErrorIs(t, err, expectedErr)
} else {
require.NoError(t, err)
require.Equal(t, testCase.expectedField, field)
}
require.NoError(t, mock.ExpectationsWereMet())
}
}

func makeVersion(major, minor, patch int64, preRelease string) *semver.Version {
return &semver.Version{
Major: major,
Expand Down
13 changes: 9 additions & 4 deletions dumpling/export/sql_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@ func initColTypeRowReceiverMap() {
"ENUM", "SET", "JSON", "NULL", "VAR_STRING",
}

dataTypeNumArr := []string{
dataTypeIntArr := []string{
"INTEGER", "BIGINT", "TINYINT", "SMALLINT", "MEDIUMINT",
"INT", "INT1", "INT2", "INT3", "INT8",
}

dataTypeNumArr := append(dataTypeIntArr, []string{
"FLOAT", "REAL", "DOUBLE", "DOUBLE PRECISION",
"DECIMAL", "NUMERIC", "FIXED",
"BOOL", "BOOLEAN",
}
}...)

dataTypeBinArr := []string{
"BLOB", "TINYBLOB", "MEDIUMBLOB", "LONGBLOB", "LONG",
Expand All @@ -49,8 +52,10 @@ func initColTypeRowReceiverMap() {
dataTypeString[s] = struct{}{}
colTypeRowReceiverMap[s] = SQLTypeStringMaker
}
for _, s := range dataTypeIntArr {
dataTypeInt[s] = struct{}{}
}
for _, s := range dataTypeNumArr {
dataTypeNum[s] = struct{}{}
colTypeRowReceiverMap[s] = SQLTypeNumberMaker
}
for _, s := range dataTypeBinArr {
Expand All @@ -59,7 +64,7 @@ func initColTypeRowReceiverMap() {
}
}

var dataTypeString, dataTypeNum, dataTypeBin = make(map[string]struct{}), make(map[string]struct{}), make(map[string]struct{})
var dataTypeString, dataTypeInt, dataTypeBin = make(map[string]struct{}), make(map[string]struct{}), make(map[string]struct{})

func escapeBackslashSQL(s []byte, bf *bytes.Buffer) {
var (
Expand Down

0 comments on commit 4820f27

Please sign in to comment.