Skip to content

Commit

Permalink
ddl: fix the 'max-index-length' check result in non-restricted sql mod (
Browse files Browse the repository at this point in the history
  • Loading branch information
e1ijah1 authored Jun 6, 2022
1 parent 173dd00 commit 45a6758
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 17 deletions.
121 changes: 121 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain"
Expand All @@ -32,8 +34,10 @@ import (
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/auth"
"github.com/pingcap/tidb/parser/charset"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
parsertypes "github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/session"
Expand Down Expand Up @@ -1573,3 +1577,120 @@ func TestReportingMinStartTimestamp(t *testing.T) {
infoSyncer.ReportMinStartTS(dom.Store())
require.Equal(t, validTS, infoSyncer.GetMinStartTS())
}

// for issue #34931
func TestBuildMaxLengthIndexWithNonRestrictedSqlMode(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

maxIndexLength := config.GetGlobalConfig().MaxIndexLength

tt := []struct {
ColType string
SpecifiedColLen bool
SpecifiedIndexLen bool
}{
{
"text",
false,
true,
},
{
"blob",
false,
true,
},
{
"varchar",
true,
false,
},
{
"varbinary",
true,
false,
},
}

sqlTemplate := "create table %s (id int, name %s, age int, %s index(name%s%s)) charset=%s;"
// test character strings for varchar and text
for _, tc := range tt {

for _, cs := range charset.CharacterSetInfos {
tableName := fmt.Sprintf("t_%s", cs.Name)
tk.MustExec(fmt.Sprintf("drop table if exists %s", tableName))
tk.MustExec("set @@sql_mode=default")

// test in strict sql mode
maxLen := cs.Maxlen
if tc.ColType == "varbinary" || tc.ColType == "blob" {
maxLen = 1
}
expectKeyLength := maxIndexLength / maxLen
length := 2 * expectKeyLength

indexLen := ""
// specify index length for text type
if tc.SpecifiedIndexLen {
indexLen = fmt.Sprintf("(%d)", length)
}

col := tc.ColType
// specify column length for varchar type
if tc.SpecifiedColLen {
col += fmt.Sprintf("(%d)", length)
}
sql := fmt.Sprintf(sqlTemplate,
tableName, col, "", indexLen, "", cs.Name)
tk.MustGetErrCode(sql, errno.ErrTooLongKey)

tk.MustExec("set @@sql_mode=''")

err := tk.ExecToErr(sql)
require.NoErrorf(t, err, "exec sql '%s' failed", sql)

require.Equal(t, uint16(1), tk.Session().GetSessionVars().StmtCtx.WarningCount())

warnErr := tk.Session().GetSessionVars().StmtCtx.GetWarnings()[0].Err
tErr := errors.Cause(warnErr).(*terror.Error)
sqlErr := terror.ToSQLError(tErr)
require.Equal(t, errno.ErrTooLongKey, int(sqlErr.Code))

if cs.Name == charset.CharsetBin {
if tc.ColType == "varchar" || tc.ColType == "varbinary" {
col = fmt.Sprintf("varbinary(%d)", length)
} else {
col = "blob"
}
}
rows := fmt.Sprintf("%s CREATE TABLE `%s` (\n `id` int(11) DEFAULT NULL,\n `name` %s DEFAULT NULL,\n `age` int(11) DEFAULT NULL,\n KEY `name` (`name`(%d))\n) ENGINE=InnoDB DEFAULT CHARSET=%s",
tableName, tableName, col, expectKeyLength, cs.Name)
// add collation for binary charset
if cs.Name != charset.CharsetBin {
rows += fmt.Sprintf(" COLLATE=%s", cs.DefaultCollation)
}

tk.MustQuery(fmt.Sprintf("show create table %s", tableName)).Check(testkit.Rows(rows))

ukTable := fmt.Sprintf("t_%s_uk", cs.Name)
mkTable := fmt.Sprintf("t_%s_mk", cs.Name)
tk.MustExec(fmt.Sprintf("drop table if exists %s", ukTable))
tk.MustExec(fmt.Sprintf("drop table if exists %s", mkTable))

// For a unique index, an error occurs regardless of SQL mode because reducing
//the index length might enable insertion of non-unique entries that do not meet
//the specified uniqueness requirement.
sql = fmt.Sprintf(sqlTemplate, ukTable, col, "unique", indexLen, "", cs.Name)
tk.MustGetErrCode(sql, errno.ErrTooLongKey)

// The multiple column index in which the length sum exceeds the maximum size
// will return an error instead produce a warning in strict sql mode.
indexLen = fmt.Sprintf("(%d)", expectKeyLength)
sql = fmt.Sprintf(sqlTemplate, mkTable, col, "", indexLen, ", age", cs.Name)
tk.MustGetErrCode(sql, errno.ErrTooLongKey)
}
}
}
8 changes: 4 additions & 4 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1821,7 +1821,7 @@ func buildTableInfo(
continue
}
// build index info.
idxInfo, err := buildIndexInfo(tbInfo, model.NewCIStr(constr.Name), constr.Keys, model.StatePublic)
idxInfo, err := buildIndexInfo(ctx, tbInfo, model.NewCIStr(constr.Name), constr.Keys, model.StatePublic)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -4629,7 +4629,7 @@ func checkIndexInModifiableColumns(columns []*model.ColumnInfo, idxColumns []*mo
// if the type is still prefixable and larger than old prefix length.
prefixLength = ic.Length
}
if err := checkIndexColumn(col, prefixLength); err != nil {
if err := checkIndexColumn(nil, col, prefixLength); err != nil {
return err
}
}
Expand Down Expand Up @@ -5611,7 +5611,7 @@ func (d *ddl) CreatePrimaryKey(ctx sessionctx.Context, ti ast.Ident, indexName m
// After DDL job is put to the queue, and if the check fail, TiDB will run the DDL cancel logic.
// The recover step causes DDL wait a few seconds, makes the unit test painfully slow.
// For same reason, decide whether index is global here.
indexColumns, err := buildIndexColumns(tblInfo.Columns, indexPartSpecifications)
indexColumns, err := buildIndexColumns(ctx, tblInfo.Columns, indexPartSpecifications)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -5809,7 +5809,7 @@ func (d *ddl) CreateIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast.Inde
// After DDL job is put to the queue, and if the check fail, TiDB will run the DDL cancel logic.
// The recover step causes DDL wait a few seconds, makes the unit test painfully slow.
// For same reason, decide whether index is global here.
indexColumns, err := buildIndexColumns(finalColumns, indexPartSpecifications)
indexColumns, err := buildIndexColumns(ctx, finalColumns, indexPartSpecifications)
if err != nil {
return errors.Trace(err)
}
Expand Down
40 changes: 27 additions & 13 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ const (
MaxCommentLength = 1024
)

func buildIndexColumns(columns []*model.ColumnInfo, indexPartSpecifications []*ast.IndexPartSpecification) ([]*model.IndexColumn, error) {
func buildIndexColumns(ctx sessionctx.Context, columns []*model.ColumnInfo, indexPartSpecifications []*ast.IndexPartSpecification) ([]*model.IndexColumn, error) {
// Build offsets.
idxParts := make([]*model.IndexColumn, 0, len(indexPartSpecifications))
var col *model.ColumnInfo

maxIndexLength := config.GetGlobalConfig().MaxIndexLength
// The sum of length of all index columns.
sumLength := 0
for _, ip := range indexPartSpecifications {
Expand All @@ -66,25 +66,37 @@ func buildIndexColumns(columns []*model.ColumnInfo, indexPartSpecifications []*a
return nil, dbterror.ErrKeyColumnDoesNotExits.GenWithStack("column does not exist: %s", ip.Column.Name)
}

if err := checkIndexColumn(col, ip.Length); err != nil {
if err := checkIndexColumn(ctx, col, ip.Length); err != nil {
return nil, err
}

indexColLen := ip.Length
indexColumnLength, err := getIndexColumnLength(col, ip.Length)
if err != nil {
return nil, err
}
sumLength += indexColumnLength

// The sum of all lengths must be shorter than the max length for prefix.
if sumLength > config.GetGlobalConfig().MaxIndexLength {
return nil, dbterror.ErrTooLongKey.GenWithStackByArgs(config.GetGlobalConfig().MaxIndexLength)
if sumLength > maxIndexLength {
// The multiple column index and the unique index in which the length sum exceeds the maximum size
// will return an error instead produce a warning.
if ctx == nil || ctx.GetSessionVars().StrictSQLMode || mysql.HasUniKeyFlag(col.GetFlag()) || len(indexPartSpecifications) > 1 {
return nil, dbterror.ErrTooLongKey.GenWithStackByArgs(maxIndexLength)
}
// truncate index length and produce warning message in non-restrict sql mode.
colLenPerUint, err := getIndexColumnLength(col, 1)
if err != nil {
return nil, err
}
indexColLen = maxIndexLength / colLenPerUint
// produce warning message
ctx.GetSessionVars().StmtCtx.AppendWarning(dbterror.ErrTooLongKey.FastGenByArgs(maxIndexLength))
}

idxParts = append(idxParts, &model.IndexColumn{
Name: col.Name,
Offset: col.Offset,
Length: ip.Length,
Length: indexColLen,
})
}

Expand Down Expand Up @@ -121,7 +133,7 @@ func checkIndexPrefixLength(columns []*model.ColumnInfo, idxColumns []*model.Ind
return nil
}

func checkIndexColumn(col *model.ColumnInfo, indexColumnLen int) error {
func checkIndexColumn(ctx sessionctx.Context, col *model.ColumnInfo, indexColumnLen int) error {
if col.GetFlen() == 0 && (types.IsTypeChar(col.FieldType.GetType()) || types.IsTypeVarchar(col.FieldType.GetType())) {
if col.Hidden {
return errors.Trace(dbterror.ErrWrongKeyColumnFunctionalIndex.GenWithStackByArgs(col.GeneratedExprString))
Expand Down Expand Up @@ -175,8 +187,10 @@ func checkIndexColumn(col *model.ColumnInfo, indexColumnLen int) error {
indexColumnLen *= desc.Maxlen
}
// Specified length must be shorter than the max length for prefix.
if indexColumnLen > config.GetGlobalConfig().MaxIndexLength {
return dbterror.ErrTooLongKey.GenWithStackByArgs(config.GetGlobalConfig().MaxIndexLength)
maxIndexLength := config.GetGlobalConfig().MaxIndexLength
if indexColumnLen > maxIndexLength && (ctx == nil || ctx.GetSessionVars().StrictSQLMode) {
// return error in strict sql mode
return dbterror.ErrTooLongKey.GenWithStackByArgs(maxIndexLength)
}
return nil
}
Expand Down Expand Up @@ -221,12 +235,12 @@ func calcBytesLengthForDecimal(m int) int {
return (m / 9 * 4) + ((m%9)+1)/2
}

func buildIndexInfo(tblInfo *model.TableInfo, indexName model.CIStr, indexPartSpecifications []*ast.IndexPartSpecification, state model.SchemaState) (*model.IndexInfo, error) {
func buildIndexInfo(ctx sessionctx.Context, tblInfo *model.TableInfo, indexName model.CIStr, indexPartSpecifications []*ast.IndexPartSpecification, state model.SchemaState) (*model.IndexInfo, error) {
if err := checkTooLongIndex(indexName); err != nil {
return nil, errors.Trace(err)
}

idxColumns, err := buildIndexColumns(tblInfo.Columns, indexPartSpecifications)
idxColumns, err := buildIndexColumns(ctx, tblInfo.Columns, indexPartSpecifications)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -467,7 +481,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
indexInfo, err = buildIndexInfo(tblInfo, indexName, indexPartSpecifications, model.StateNone)
indexInfo, err = buildIndexInfo(nil, tblInfo, indexName, indexPartSpecifications, model.StateNone)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down

0 comments on commit 45a6758

Please sign in to comment.