Skip to content

Commit

Permalink
Handling insertion issue when the dropping column's flag is not null …
Browse files Browse the repository at this point in the history
…and its state is 'WriteOnly' (pingcap#8791)
  • Loading branch information
zimulala authored and ngaut committed Jan 5, 2019
1 parent 0c75c43 commit cfff965
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 24 deletions.
31 changes: 31 additions & 0 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ package ddl

import (
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
Expand All @@ -25,6 +27,8 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/sqlexec"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -234,6 +238,15 @@ func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
colInfo.State = model.StateWriteOnly
// Set this column's offset to the last and reset all following columns' offsets.
adjustColumnInfoInDropColumn(tblInfo, colInfo.Offset)
// When the dropping column has not-null flag and it hasn't the default value, we can backfill the column value like "add column".
// NOTE: If the state of StateWriteOnly can be rollbacked, we'd better reconsider the original default value.
// And we need consider the column without not-null flag.
if colInfo.OriginDefaultValue == nil && mysql.HasNotNullFlag(colInfo.Flag) {
colInfo.OriginDefaultValue, err = generateOriginDefaultValue(colInfo)
if err != nil {
return ver, errors.Trace(err)
}
}
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfo.State)
case model.StateWriteOnly:
// write only -> delete only
Expand Down Expand Up @@ -545,3 +558,21 @@ func modifyColumnFromNull2NotNull(w *worker, t *meta.Meta, dbInfo *model.DBInfo,
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
return ver, errors.Trace(err)
}

func generateOriginDefaultValue(col *model.ColumnInfo) (interface{}, error) {
var err error
odValue := col.GetDefaultValue()
if odValue == nil && mysql.HasNotNullFlag(col.Flag) {
zeroVal := table.GetZeroValue(col)
odValue, err = zeroVal.ToString()
if err != nil {
return nil, errors.Trace(err)
}
}

if odValue == strings.ToUpper(ast.CurrentTimestamp) &&
(col.Tp == mysql.TypeTimestamp || col.Tp == mysql.TypeDatetime) {
odValue = time.Now().Format(types.TimeFormat)
}
return odValue, nil
}
78 changes: 67 additions & 11 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,70 @@ func (s *testStateChangeSuite) TestShowCreateTable(c *C) {
}
}
d := s.dom.DDL()
originalCallback := d.GetHook()
defer d.(ddl.DDLForTest).SetHook(originalCallback)
d.(ddl.DDLForTest).SetHook(callback)
tk.MustExec("alter table t add index idx1(id)")
c.Assert(checkErr, IsNil)
tk.MustExec("alter table t add column c int")
c.Assert(checkErr, IsNil)
}

// TestDropNotNullColumn is used to test issue #8654.
func (s *testStateChangeSuite) TestDropNotNullColumn(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table t (id int, a int not null default 11)")
tk.MustExec("insert into t values(1, 1)")
tk.MustExec("create table t1 (id int, b varchar(255) not null)")
tk.MustExec("insert into t1 values(2, '')")
tk.MustExec("create table t2 (id int, c time not null)")
tk.MustExec("insert into t2 values(3, '11:22:33')")
tk.MustExec("create table t3 (id int, d json not null)")
tk.MustExec("insert into t3 values(4, d)")
tk1 := testkit.NewTestKit(c, s.store)
tk1.MustExec("use test")

var checkErr error
d := s.dom.DDL()
originalCallback := d.GetHook()
callback := &ddl.TestDDLCallback{}
sqlNum := 0
callback.OnJobUpdatedExported = func(job *model.Job) {
if checkErr != nil {
return
}
originalCallback.OnChanged(nil)
if job.SchemaState == model.StateWriteOnly {
switch sqlNum {
case 0:
_, checkErr = tk1.Exec("insert into t set id = 1")
case 1:
_, checkErr = tk1.Exec("insert into t1 set id = 2")
case 2:
_, checkErr = tk1.Exec("insert into t2 set id = 3")
case 3:
_, checkErr = tk1.Exec("insert into t3 set id = 4")
}
}
}

d.(ddl.DDLForTest).SetHook(callback)
tk.MustExec("alter table t drop column a")
c.Assert(checkErr, IsNil)
sqlNum++
tk.MustExec("alter table t1 drop column b")
c.Assert(checkErr, IsNil)
sqlNum++
tk.MustExec("alter table t2 drop column c")
c.Assert(checkErr, IsNil)
sqlNum++
tk.MustExec("alter table t3 drop column d")
c.Assert(checkErr, IsNil)
d.(ddl.DDLForTest).SetHook(originalCallback)
tk.MustExec("drop table t, t1, t2, t3")
}

func (s *testStateChangeSuite) TestTwoStates(c *C) {
cnt := 5
// New the testExecInfo.
Expand Down Expand Up @@ -212,6 +269,8 @@ func (s *testStateChangeSuite) test(c *C, tableName, alterTableSQL string, testI
}
}
d := s.dom.DDL()
originalCallback := d.GetHook()
defer d.(ddl.DDLForTest).SetHook(originalCallback)
d.(ddl.DDLForTest).SetHook(callback)
_, err = s.se.Execute(context.Background(), alterTableSQL)
c.Assert(err, IsNil)
Expand All @@ -223,8 +282,6 @@ func (s *testStateChangeSuite) test(c *C, tableName, alterTableSQL string, testI
err = testInfo.execSQL(3)
c.Assert(err, IsNil)
c.Assert(errors.ErrorStack(checkErr), Equals, "")
callback = &ddl.TestDDLCallback{}
d.(ddl.DDLForTest).SetHook(callback)
}

type stateCase struct {
Expand Down Expand Up @@ -481,12 +538,12 @@ func (s *testStateChangeSuite) runTestInSchemaState(c *C, state model.SchemaStat
}
}
d := s.dom.DDL()
originalCallback := d.GetHook()
d.(ddl.DDLForTest).SetHook(callback)
_, err = s.se.Execute(context.Background(), alterTableSQL)
c.Assert(err, IsNil)
c.Assert(errors.ErrorStack(checkErr), Equals, "")
callback = &ddl.TestDDLCallback{}
d.(ddl.DDLForTest).SetHook(callback)
d.(ddl.DDLForTest).SetHook(originalCallback)

if expectQuery != nil {
tk := testkit.NewTestKit(c, s.store)
Expand Down Expand Up @@ -554,6 +611,7 @@ func (s *testStateChangeSuite) TestShowIndex(c *C) {
}

d := s.dom.DDL()
originalCallback := d.GetHook()
d.(ddl.DDLForTest).SetHook(callback)
alterTableSQL := `alter table t add index c2(c2)`
_, err = s.se.Execute(context.Background(), alterTableSQL)
Expand All @@ -564,8 +622,7 @@ func (s *testStateChangeSuite) TestShowIndex(c *C) {
c.Assert(err, IsNil)
err = checkResult(result, testkit.Rows("t 0 PRIMARY 1 c1 A 0 <nil> <nil> BTREE ", "t 1 c2 1 c2 A 0 <nil> <nil> YES BTREE "))
c.Assert(err, IsNil)
callback = &ddl.TestDDLCallback{}
d.(ddl.DDLForTest).SetHook(callback)
d.(ddl.DDLForTest).SetHook(originalCallback)

c.Assert(err, IsNil)

Expand Down Expand Up @@ -716,6 +773,8 @@ func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 strin
times++
}
d := s.dom.DDL()
originalCallback := d.GetHook()
defer d.(ddl.DDLForTest).SetHook(originalCallback)
d.(ddl.DDLForTest).SetHook(callback)

wg := sync.WaitGroup{}
Expand Down Expand Up @@ -763,9 +822,6 @@ func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 strin

wg.Wait()
f(c, err1, err2)

callback = &ddl.TestDDLCallback{}
d.(ddl.DDLForTest).SetHook(callback)
}

func (s *testStateChangeSuite) testParallelExecSQL(c *C, sql string) {
Expand All @@ -792,6 +848,8 @@ func (s *testStateChangeSuite) testParallelExecSQL(c *C, sql string) {
}

d := s.dom.DDL()
originalCallback := d.GetHook()
defer d.(ddl.DDLForTest).SetHook(originalCallback)
d.(ddl.DDLForTest).SetHook(callback)

wg.Add(2)
Expand All @@ -807,8 +865,6 @@ func (s *testStateChangeSuite) testParallelExecSQL(c *C, sql string) {
wg.Wait()
c.Assert(err2, IsNil)
c.Assert(err3, IsNil)
callback = &ddl.TestDDLCallback{}
d.(ddl.DDLForTest).SetHook(callback)
}

// TestCreateTableIfNotExists parallel exec create table if not exists xxx. No error returns is expected.
Expand Down
16 changes: 3 additions & 13 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -1495,18 +1494,9 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab
if err != nil {
return errors.Trace(err)
}
col.OriginDefaultValue = col.GetDefaultValue()
if col.OriginDefaultValue == nil && mysql.HasNotNullFlag(col.Flag) {
zeroVal := table.GetZeroValue(col.ToInfo())
col.OriginDefaultValue, err = zeroVal.ToString()
if err != nil {
return errors.Trace(err)
}
}

if col.OriginDefaultValue == strings.ToUpper(ast.CurrentTimestamp) &&
(col.Tp == mysql.TypeTimestamp || col.Tp == mysql.TypeDatetime) {
col.OriginDefaultValue = time.Now().Format(types.TimeFormat)
col.OriginDefaultValue, err = generateOriginDefaultValue(col.ToInfo())
if err != nil {
return errors.Trace(err)
}

job := &model.Job{
Expand Down

0 comments on commit cfff965

Please sign in to comment.