Skip to content

Commit

Permalink
ddl: avoid assert in hook function. (pingcap#2479)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored and hanfei1991 committed Jan 16, 2017
1 parent fce6256 commit d3f0612
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 93 deletions.
2 changes: 1 addition & 1 deletion ddl/bg_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *testDDLSuite) TestDropTableError(c *C) {
defer d.close()

dbInfo := testSchemaInfo(c, d, "test")
testCreateSchema(c, testNewContext(c, d), d, dbInfo)
testCreateSchema(c, testNewContext(d), d, dbInfo)

job := &model.Job{
SchemaID: dbInfo.ID,
Expand Down
2 changes: 1 addition & 1 deletion ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (s *testColumnChangeSuite) TestColumnChange(c *C) {
d := newDDL(s.store, nil, nil, testLease)
// create table t (c1 int, c2 int);
tblInfo := testTableInfo(c, d, "t", 2)
ctx := testNewContext(c, d)
ctx := testNewContext(d)
err := ctx.NewTxn()
c.Assert(err, IsNil)
testCreateTable(c, ctx, d, s.dbInfo, tblInfo)
Expand Down
115 changes: 74 additions & 41 deletions ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ func (s *testColumnSuite) SetUpSuite(c *C) {
s.d = newDDL(s.store, nil, nil, testLease)

s.dbInfo = testSchemaInfo(c, s.d, "test_column")
testCreateSchema(c, testNewContext(c, s.d), s.d, s.dbInfo)
testCreateSchema(c, testNewContext(s.d), s.d, s.dbInfo)
}

func (s *testColumnSuite) TearDownSuite(c *C) {
testDropSchema(c, testNewContext(c, s.d), s.d, s.dbInfo)
testDropSchema(c, testNewContext(s.d), s.d, s.dbInfo)
s.d.close()

err := s.store.Close()
Expand Down Expand Up @@ -105,7 +105,7 @@ func testDropColumn(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo, tbl
func (s *testColumnSuite) TestColumn(c *C) {
defer testleak.AfterTest(c)()
tblInfo := testTableInfo(c, s.d, "t1", 3)
ctx := testNewContext(c, s.d)
ctx := testNewContext(s.d)

testCreateTable(c, ctx, s.d, s.dbInfo, tblInfo)

Expand Down Expand Up @@ -289,9 +289,12 @@ func (s *testColumnSuite) checkColumnKVExist(ctx context.Context, t table.Table,
return nil
}

func (s *testColumnSuite) checkNoneColumn(c *C, ctx context.Context, d *ddl, tblInfo *model.TableInfo, handle int64, col *table.Column, columnValue interface{}) error {
t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
err := s.checkColumnKVExist(ctx, t, handle, col, columnValue, false)
func (s *testColumnSuite) checkNoneColumn(ctx context.Context, d *ddl, tblInfo *model.TableInfo, handle int64, col *table.Column, columnValue interface{}) error {
t, err := testGetTableWithError(d, s.dbInfo.ID, tblInfo.ID)
if err != nil {
return errors.Trace(err)
}
err = s.checkColumnKVExist(ctx, t, handle, col, columnValue, false)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -302,9 +305,12 @@ func (s *testColumnSuite) checkNoneColumn(c *C, ctx context.Context, d *ddl, tbl
return nil
}

func (s *testColumnSuite) checkDeleteOnlyColumn(c *C, ctx context.Context, d *ddl, tblInfo *model.TableInfo, handle int64, col *table.Column, row []types.Datum, columnValue interface{}) error {
t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
err := ctx.NewTxn()
func (s *testColumnSuite) checkDeleteOnlyColumn(ctx context.Context, d *ddl, tblInfo *model.TableInfo, handle int64, col *table.Column, row []types.Datum, columnValue interface{}) error {
t, err := testGetTableWithError(d, s.dbInfo.ID, tblInfo.ID)
if err != nil {
return errors.Trace(err)
}
err = ctx.NewTxn()
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -400,10 +406,12 @@ func (s *testColumnSuite) checkDeleteOnlyColumn(c *C, ctx context.Context, d *dd
return nil
}

func (s *testColumnSuite) checkWriteOnlyColumn(c *C, ctx context.Context, d *ddl, tblInfo *model.TableInfo, handle int64, col *table.Column, row []types.Datum, columnValue interface{}) error {
t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)

err := ctx.NewTxn()
func (s *testColumnSuite) checkWriteOnlyColumn(ctx context.Context, d *ddl, tblInfo *model.TableInfo, handle int64, col *table.Column, row []types.Datum, columnValue interface{}) error {
t, err := testGetTableWithError(d, s.dbInfo.ID, tblInfo.ID)
if err != nil {
return errors.Trace(err)
}
err = ctx.NewTxn()
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -503,10 +511,12 @@ func (s *testColumnSuite) checkWriteOnlyColumn(c *C, ctx context.Context, d *ddl
return nil
}

func (s *testColumnSuite) checkReorganizationColumn(c *C, ctx context.Context, d *ddl, tblInfo *model.TableInfo, handle int64, col *table.Column, row []types.Datum, columnValue interface{}) error {
t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)

err := ctx.NewTxn()
func (s *testColumnSuite) checkReorganizationColumn(ctx context.Context, d *ddl, tblInfo *model.TableInfo, handle int64, col *table.Column, row []types.Datum, columnValue interface{}) error {
t, err := testGetTableWithError(d, s.dbInfo.ID, tblInfo.ID)
if err != nil {
return errors.Trace(err)
}
err = ctx.NewTxn()
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -597,10 +607,12 @@ func (s *testColumnSuite) checkReorganizationColumn(c *C, ctx context.Context, d
return nil
}

func (s *testColumnSuite) checkPublicColumn(c *C, ctx context.Context, d *ddl, tblInfo *model.TableInfo, handle int64, newCol *table.Column, oldRow []types.Datum, columnValue interface{}) error {
t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)

err := ctx.NewTxn()
func (s *testColumnSuite) checkPublicColumn(ctx context.Context, d *ddl, tblInfo *model.TableInfo, handle int64, newCol *table.Column, oldRow []types.Datum, columnValue interface{}) error {
t, err := testGetTableWithError(d, s.dbInfo.ID, tblInfo.ID)
if err != nil {
return errors.Trace(err)
}
err = ctx.NewTxn()
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -669,7 +681,9 @@ func (s *testColumnSuite) checkPublicColumn(c *C, ctx context.Context, d *ddl, t

i = int64(0)
err = t.IterRecords(ctx, t.FirstKey(), t.Cols(), func(h int64, data []types.Datum, cols []*table.Column) (bool, error) {
c.Assert(data, DeepEquals, updatedRow)
if !reflect.DeepEqual(data, updatedRow) {
return false, errors.Errorf("%v not equal to %v", data, updatedRow)
}
i++
return true, nil
})
Expand All @@ -687,20 +701,20 @@ func (s *testColumnSuite) checkPublicColumn(c *C, ctx context.Context, d *ddl, t
return nil
}

func (s *testColumnSuite) checkAddColumn(c *C, state model.SchemaState, d *ddl, tblInfo *model.TableInfo, handle int64, newCol *table.Column, oldRow []types.Datum, columnValue interface{}) error {
ctx := testNewContext(c, d)
func (s *testColumnSuite) checkAddColumn(state model.SchemaState, d *ddl, tblInfo *model.TableInfo, handle int64, newCol *table.Column, oldRow []types.Datum, columnValue interface{}) error {
ctx := testNewContext(d)
var err error
switch state {
case model.StateNone:
err = errors.Trace(s.checkNoneColumn(c, ctx, d, tblInfo, handle, newCol, columnValue))
err = errors.Trace(s.checkNoneColumn(ctx, d, tblInfo, handle, newCol, columnValue))
case model.StateDeleteOnly:
err = errors.Trace(s.checkDeleteOnlyColumn(c, ctx, d, tblInfo, handle, newCol, oldRow, columnValue))
err = errors.Trace(s.checkDeleteOnlyColumn(ctx, d, tblInfo, handle, newCol, oldRow, columnValue))
case model.StateWriteOnly:
err = errors.Trace(s.checkWriteOnlyColumn(c, ctx, d, tblInfo, handle, newCol, oldRow, columnValue))
err = errors.Trace(s.checkWriteOnlyColumn(ctx, d, tblInfo, handle, newCol, oldRow, columnValue))
case model.StateWriteReorganization, model.StateDeleteReorganization:
err = errors.Trace(s.checkReorganizationColumn(c, ctx, d, tblInfo, handle, newCol, oldRow, columnValue))
err = errors.Trace(s.checkReorganizationColumn(ctx, d, tblInfo, handle, newCol, oldRow, columnValue))
case model.StatePublic:
err = errors.Trace(s.checkPublicColumn(c, ctx, d, tblInfo, handle, newCol, oldRow, columnValue))
err = errors.Trace(s.checkPublicColumn(ctx, d, tblInfo, handle, newCol, oldRow, columnValue))
}
return err
}
Expand All @@ -723,7 +737,7 @@ func (s *testColumnSuite) TestAddColumn(c *C) {
defer testleak.AfterTest(c)()
d := newDDL(s.store, nil, nil, testLease)
tblInfo := testTableInfo(c, d, "t", 3)
ctx := testNewContext(c, d)
ctx := testNewContext(d)

err := ctx.NewTxn()
c.Assert(err, IsNil)
Expand All @@ -740,24 +754,33 @@ func (s *testColumnSuite) TestAddColumn(c *C) {

newColName := "c4"
defaultColValue := int64(4)

var mu sync.Mutex
var hookErr error
checkOK := false

tc := &testDDLCallback{}
var checkErr error
tc.onJobUpdated = func(job *model.Job) {
mu.Lock()
defer mu.Unlock()
if checkOK {
return
}

t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID).(*tables.Table)
newCol := table.FindCol(t.Columns, newColName)
t, err1 := testGetTableWithError(d, s.dbInfo.ID, tblInfo.ID)
if err1 != nil {
hookErr = errors.Trace(err1)
return
}
newCol := table.FindCol(t.(*tables.Table).Columns, newColName)
if newCol == nil {
return
}

err1 := s.checkAddColumn(c, newCol.State, d, tblInfo, handle, newCol, oldRow, defaultColValue)
err1 = s.checkAddColumn(newCol.State, d, tblInfo, handle, newCol, oldRow, defaultColValue)
if err1 != nil {
checkErr = errors.Trace(err1)
hookErr = errors.Trace(err1)
return
}

if newCol.State == model.StatePublic {
Expand All @@ -774,8 +797,12 @@ func (s *testColumnSuite) TestAddColumn(c *C) {
d.start()

job := testCreateColumn(c, ctx, d, s.dbInfo, tblInfo, newColName, &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, defaultColValue)
c.Assert(errors.ErrorStack(checkErr), Equals, "")

testCheckJobDone(c, d, job, true)
mu.Lock()
c.Assert(errors.ErrorStack(hookErr), Equals, "")
c.Assert(checkOK, IsTrue)
mu.Unlock()

err = ctx.NewTxn()
c.Assert(err, IsNil)
Expand All @@ -794,7 +821,7 @@ func (s *testColumnSuite) TestDropColumn(c *C) {
defer testleak.AfterTest(c)()
d := newDDL(s.store, nil, nil, testLease)
tblInfo := testTableInfo(c, d, "t", 4)
ctx := testNewContext(c, d)
ctx := testNewContext(d)

err := ctx.NewTxn()
c.Assert(err, IsNil)
Expand All @@ -813,19 +840,24 @@ func (s *testColumnSuite) TestDropColumn(c *C) {
c.Assert(err, IsNil)

checkOK := false
var hookErr error
var mu sync.Mutex

tc := &testDDLCallback{}
tc.onJobUpdated = func(job *model.Job) {
mu.Lock()
defer mu.Unlock()
if checkOK {
return
}
t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID).(*tables.Table)
col := table.FindCol(t.Columns, colName)
t, err1 := testGetTableWithError(d, s.dbInfo.ID, tblInfo.ID)
if err1 != nil {
hookErr = errors.Trace(err1)
return
}
col := table.FindCol(t.(*tables.Table).Columns, colName)
if col == nil {
mu.Lock()
checkOK = true
mu.Unlock()
return
}
}
Expand All @@ -841,6 +873,7 @@ func (s *testColumnSuite) TestDropColumn(c *C) {
job := testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, colName, false)
testCheckJobDone(c, d, job, false)
mu.Lock()
c.Assert(hookErr, IsNil)
c.Assert(checkOK, IsTrue)
mu.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func testCreateStore(c *C, name string) kv.Storage {
return store
}

func testNewContext(c *C, d *ddl) context.Context {
func testNewContext(d *ddl) context.Context {
ctx := mock.NewContext()
ctx.Store = d.store
return ctx
Expand Down
12 changes: 6 additions & 6 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (s *testDDLSuite) TestSchemaError(c *C) {

d := newDDL(store, nil, nil, testLease)
defer d.close()
ctx := testNewContext(c, d)
ctx := testNewContext(d)

doDDLJobErr(c, 1, 0, model.ActionCreateSchema, []interface{}{1}, ctx, d)
}
Expand All @@ -108,13 +108,13 @@ func (s *testDDLSuite) TestTableError(c *C) {

d := newDDL(store, nil, nil, testLease)
defer d.close()
ctx := testNewContext(c, d)
ctx := testNewContext(d)

// Schema ID is wrong, so dropping table is failed.
doDDLJobErr(c, -1, 1, model.ActionDropTable, nil, ctx, d)
// Table ID is wrong, so dropping table is failed.
dbInfo := testSchemaInfo(c, d, "test")
testCreateSchema(c, testNewContext(c, d), d, dbInfo)
testCreateSchema(c, testNewContext(d), d, dbInfo)
job := doDDLJobErr(c, dbInfo.ID, -1, model.ActionDropTable, nil, ctx, d)

// Table ID or schema ID is wrong, so getting table is failed.
Expand Down Expand Up @@ -150,7 +150,7 @@ func (s *testDDLSuite) TestForeignKeyError(c *C) {

d := newDDL(store, nil, nil, testLease)
defer d.close()
ctx := testNewContext(c, d)
ctx := testNewContext(d)

doDDLJobErr(c, -1, 1, model.ActionAddForeignKey, nil, ctx, d)
doDDLJobErr(c, -1, 1, model.ActionDropForeignKey, nil, ctx, d)
Expand All @@ -169,7 +169,7 @@ func (s *testDDLSuite) TestIndexError(c *C) {

d := newDDL(store, nil, nil, testLease)
defer d.close()
ctx := testNewContext(c, d)
ctx := testNewContext(d)

// Schema ID is wrong.
doDDLJobErr(c, -1, 1, model.ActionAddIndex, nil, ctx, d)
Expand Down Expand Up @@ -205,7 +205,7 @@ func (s *testDDLSuite) TestColumnError(c *C) {
defer store.Close()
d := newDDL(store, nil, nil, testLease)
defer d.close()
ctx := testNewContext(c, d)
ctx := testNewContext(d)

dbInfo := testSchemaInfo(c, d, "test")
tblInfo := testTableInfo(c, d, "t", 3)
Expand Down
Loading

0 comments on commit d3f0612

Please sign in to comment.