Skip to content

Commit

Permalink
*: check the schema-validity when the DDL fails (pingcap#6797)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored Jul 9, 2018
1 parent eeeb092 commit e28a818
Show file tree
Hide file tree
Showing 19 changed files with 360 additions and 132 deletions.
18 changes: 17 additions & 1 deletion ddl/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,27 @@
package ddl

import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/sessionctx"
"golang.org/x/net/context"
)

// Callback is the interface supporting callback function when DDL changed.
// Interceptor is used for DDL.
type Interceptor interface {
// OnGetInfoSchema is an intercept which is called in the function ddl.GetInfoSchema(). It is used in the tests.
OnGetInfoSchema(ctx sessionctx.Context, is infoschema.InfoSchema) infoschema.InfoSchema
}

// BaseInterceptor implements Interceptor.
type BaseInterceptor struct{}

// OnGetInfoSchema implements Interceptor.OnGetInfoSchema interface.
func (bi *BaseInterceptor) OnGetInfoSchema(ctx sessionctx.Context, is infoschema.InfoSchema) infoschema.InfoSchema {
return is
}

// Callback is used for DDL.
type Callback interface {
// OnChanged is called after schema is changed.
OnChanged(err error) error
Expand Down
16 changes: 16 additions & 0 deletions ddl/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,27 @@ package ddl

import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/sessionctx"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)

type TestInterceptor struct {
*BaseInterceptor

OnGetInfoSchemaExported func(ctx sessionctx.Context, is infoschema.InfoSchema) infoschema.InfoSchema
}

func (ti *TestInterceptor) OnGetInfoSchema(ctx sessionctx.Context, is infoschema.InfoSchema) infoschema.InfoSchema {
if ti.OnGetInfoSchemaExported != nil {
return ti.OnGetInfoSchemaExported(ctx, is)
}

return ti.BaseInterceptor.OnGetInfoSchema(ctx, is)
}

type TestDDLCallback struct {
*BaseCallback

Expand Down
27 changes: 18 additions & 9 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,11 @@ type ddlCtx struct {
binlogCli interface{} // binlogCli is used for Binlog.

// hook may be modified.
hook Callback
hookMu sync.RWMutex
mu struct {
sync.RWMutex
hook Callback
interceptor Interceptor
}
}

func (dc *ddlCtx) isOwner() bool {
Expand Down Expand Up @@ -304,8 +307,9 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
ownerManager: manager,
schemaSyncer: syncer,
binlogCli: binloginfo.GetPumpClient(),
hook: hook,
}
ddlCtx.mu.hook = hook
ddlCtx.mu.interceptor = &BaseInterceptor{}
d := &ddl{
infoHandle: infoHandle,
ddlCtx: ddlCtx,
Expand Down Expand Up @@ -383,9 +387,13 @@ func (d *ddl) GetLease() time.Duration {
return lease
}

// GetInformationSchema get the infoschema binding to d. It's expoted for testing.
func (d *ddl) GetInformationSchema() infoschema.InfoSchema {
return d.infoHandle.Get()
// GetInformationSchema gets the infoschema binding to d. It's expoted for testing.
func (d *ddl) GetInformationSchema(ctx sessionctx.Context) infoschema.InfoSchema {
is := d.infoHandle.Get()

d.mu.RLock()
defer d.mu.RUnlock()
return d.mu.interceptor.OnGetInfoSchema(ctx, is)
}

func (d *ddl) genGlobalID() (int64, error) {
Expand Down Expand Up @@ -438,6 +446,7 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
if err != nil {
return errors.Trace(err)
}
ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true

// Notice worker that we push a new job and wait the job done.
asyncNotify(d.ddlJobCh)
Expand Down Expand Up @@ -485,10 +494,10 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
}

func (d *ddl) callHookOnChanged(err error) error {
d.hookMu.Lock()
defer d.hookMu.Unlock()
d.mu.RLock()
defer d.mu.RUnlock()

err = d.hook.OnChanged(err)
err = d.mu.hook.OnChanged(err)
return errors.Trace(err)
}

Expand Down
26 changes: 13 additions & 13 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
)

func (d *ddl) CreateSchema(ctx sessionctx.Context, schema model.CIStr, charsetInfo *ast.CharsetOpt) (err error) {
is := d.GetInformationSchema()
is := d.GetInformationSchema(ctx)
_, ok := is.SchemaByName(schema)
if ok {
return infoschema.ErrDatabaseExists.GenByArgs(schema)
Expand Down Expand Up @@ -81,7 +81,7 @@ func (d *ddl) CreateSchema(ctx sessionctx.Context, schema model.CIStr, charsetIn
}

func (d *ddl) DropSchema(ctx sessionctx.Context, schema model.CIStr) (err error) {
is := d.GetInformationSchema()
is := d.GetInformationSchema(ctx)
old, ok := is.SchemaByName(schema)
if !ok {
return errors.Trace(infoschema.ErrDatabaseNotExists)
Expand Down Expand Up @@ -743,7 +743,7 @@ func buildTableInfo(ctx sessionctx.Context, d *ddl, tableName model.CIStr, cols
}

func (d *ddl) CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.Ident, ifNotExists bool) error {
is := d.GetInformationSchema()
is := d.GetInformationSchema(ctx)
_, ok := is.SchemaByName(referIdent.Schema)
if !ok {
return infoschema.ErrTableNotExists.GenByArgs(referIdent.Schema, referIdent.Name)
Expand Down Expand Up @@ -792,7 +792,7 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
return d.CreateTableWithLike(ctx, ident, referIdent, s.IfNotExists)
}
colDefs := s.Cols
is := d.GetInformationSchema()
is := d.GetInformationSchema(ctx)
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenByArgs(ident.Schema)
Expand Down Expand Up @@ -1072,7 +1072,7 @@ func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.A
}

func (d *ddl) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int64) error {
is := d.GetInformationSchema()
is := d.GetInformationSchema(ctx)
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenByArgs(ident.Schema)
Expand Down Expand Up @@ -1100,7 +1100,7 @@ func (d *ddl) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int6

// ShardRowID shards the implicit row ID by adding shard value to the row ID's first few bits.
func (d *ddl) ShardRowID(ctx sessionctx.Context, tableIdent ast.Ident, uVal uint64) error {
schema, t, err := d.getSchemaAndTableByIdent(tableIdent)
schema, t, err := d.getSchemaAndTableByIdent(ctx, tableIdent)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1119,8 +1119,8 @@ func (d *ddl) ShardRowID(ctx sessionctx.Context, tableIdent ast.Ident, uVal uint
return errors.Trace(err)
}

func (d *ddl) getSchemaAndTableByIdent(tableIdent ast.Ident) (dbInfo *model.DBInfo, t table.Table, err error) {
is := d.GetInformationSchema()
func (d *ddl) getSchemaAndTableByIdent(ctx sessionctx.Context, tableIdent ast.Ident) (dbInfo *model.DBInfo, t table.Table, err error) {
is := d.GetInformationSchema(ctx)
schema, ok := is.SchemaByName(tableIdent.Schema)
if !ok {
return nil, nil, infoschema.ErrDatabaseNotExists.GenByArgs(tableIdent.Schema)
Expand Down Expand Up @@ -1157,7 +1157,7 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab
return errors.Trace(err)
}

is := d.infoHandle.Get()
is := d.GetInformationSchema(ctx)
schema, ok := is.SchemaByName(ti.Schema)
if !ok {
return errors.Trace(infoschema.ErrDatabaseNotExists)
Expand Down Expand Up @@ -1278,7 +1278,7 @@ func (d *ddl) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec *

// DropColumn will drop a column from the table, now we don't support drop the column with index covered.
func (d *ddl) DropColumn(ctx sessionctx.Context, ti ast.Ident, colName model.CIStr) error {
is := d.infoHandle.Get()
is := d.GetInformationSchema(ctx)
schema, ok := is.SchemaByName(ti.Schema)
if !ok {
return errors.Trace(infoschema.ErrDatabaseNotExists)
Expand Down Expand Up @@ -1709,7 +1709,7 @@ func (d *ddl) RenameIndex(ctx sessionctx.Context, ident ast.Ident, spec *ast.Alt

// DropTable will proceed even if some table in the list does not exists.
func (d *ddl) DropTable(ctx sessionctx.Context, ti ast.Ident) (err error) {
is := d.GetInformationSchema()
is := d.GetInformationSchema(ctx)
schema, ok := is.SchemaByName(ti.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenByArgs(ti.Schema)
Expand All @@ -1733,7 +1733,7 @@ func (d *ddl) DropTable(ctx sessionctx.Context, ti ast.Ident) (err error) {
}

func (d *ddl) TruncateTable(ctx sessionctx.Context, ti ast.Ident) error {
is := d.GetInformationSchema()
is := d.GetInformationSchema(ctx)
schema, ok := is.SchemaByName(ti.Schema)
if !ok {
return infoschema.ErrDatabaseNotExists.GenByArgs(ti.Schema)
Expand All @@ -1759,7 +1759,7 @@ func (d *ddl) TruncateTable(ctx sessionctx.Context, ti ast.Ident) error {
}

func (d *ddl) RenameTable(ctx sessionctx.Context, oldIdent, newIdent ast.Ident) error {
is := d.GetInformationSchema()
is := d.GetInformationSchema(ctx)
oldSchema, ok := is.SchemaByName(oldIdent.Schema)
if !ok {
return errFileNotFound.GenByArgs(oldIdent.Schema, oldIdent.Name)
Expand Down
86 changes: 86 additions & 0 deletions ddl/ddl_db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/juju/errors"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/model"
Expand Down Expand Up @@ -664,3 +666,87 @@ func (s *testStateChangeSuite) TestCreateDBIfNotExists(c *C) {
defer s.se.Execute(context.Background(), "drop database test_not_exists")
s.testParallelExecSQL(c, "create database if not exists test_not_exists;")
}

// TestParallelDDLBeforeRunDDLJob tests a session to execute DDL with an outdated information schema.
// This test is used to simulate the following conditions:
// In a cluster, TiDB "a" executes the DDL.
// TiDB "b" fails to load schema, then TiDB "b" executes the DDL statement associated with the DDL statement executed by "a".
func (s *testStateChangeSuite) TestParallelDDLBeforeRunDDLJob(c *C) {
defer s.se.Execute(context.Background(), "drop table test_table")
_, err := s.se.Execute(context.Background(), "use test_db_state")
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "create table test_table (c1 int, c2 int default 1, index (c1))")
c.Assert(err, IsNil)

// Create two sessions.
se, err := session.CreateSession(s.store)
c.Assert(err, IsNil)
_, err = se.Execute(context.Background(), "use test_db_state")
c.Assert(err, IsNil)
se1, err := session.CreateSession(s.store)
c.Assert(err, IsNil)
_, err = se1.Execute(context.Background(), "use test_db_state")
c.Assert(err, IsNil)

intercept := &ddl.TestInterceptor{}
firstConnID := uint64(1)
finishedCnt := int32(0)
interval := 5 * time.Millisecond
var sessionCnt int32 // sessionCnt is the number of sessions that goes into the function of OnGetInfoSchema.
intercept.OnGetInfoSchemaExported = func(ctx sessionctx.Context, is infoschema.InfoSchema) infoschema.InfoSchema {
// The following code is for testing.
// Make sure the two sessions get the same information schema before executing DDL.
// After the first session executes its DDL, then the second session executes its DDL.
var info infoschema.InfoSchema
atomic.AddInt32(&sessionCnt, 1)
for {
// Make sure there are two sessions running here.
if atomic.LoadInt32(&sessionCnt) == 2 {
info = is
break
}
time.Sleep(interval)
}

currID := ctx.GetSessionVars().ConnectionID
for {
seCnt := atomic.LoadInt32(&sessionCnt)
// Make sure the two session have got the same information schema. And the first session can continue to go on,
// or the frist session finished this SQL(seCnt = finishedCnt), then other sessions can continue to go on.
if currID == firstConnID || seCnt == finishedCnt {
break
}
time.Sleep(interval)
}

return info
}
d := s.dom.DDL()
d.(ddl.DDLForTest).SetInterceptoror(intercept)

// Make sure the connection 1 executes a SQL before the connection 2.
// And the connection 2 executes a SQL with an outdated information schema.
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()

se.SetConnectionID(firstConnID)
_, err1 := se.Execute(context.Background(), "alter table test_table drop column c2")
c.Assert(err1, IsNil)
atomic.StoreInt32(&sessionCnt, finishedCnt)
}()
go func() {
defer wg.Done()

se1.SetConnectionID(2)
_, err2 := se1.Execute(context.Background(), "alter table test_table add column c2 int")
c.Assert(err2, NotNil)
c.Assert(strings.Contains(err2.Error(), "Information schema is changed"), IsTrue)
}()

wg.Wait()

intercept = &ddl.TestInterceptor{}
d.(ddl.DDLForTest).SetInterceptoror(intercept)
}
16 changes: 13 additions & 3 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,24 @@ import (
type DDLForTest interface {
// SetHook sets the hook.
SetHook(h Callback)
// SetInterceptoror sets the interceptor.
SetInterceptoror(h Interceptor)
}

// SetHook implements DDL.SetHook interface.
func (d *ddl) SetHook(h Callback) {
d.hookMu.Lock()
defer d.hookMu.Unlock()
d.mu.Lock()
defer d.mu.Unlock()

d.hook = h
d.mu.hook = h
}

// SetInterceptoror implements DDL.SetInterceptoror interface.
func (d *ddl) SetInterceptoror(i Interceptor) {
d.mu.Lock()
defer d.mu.Unlock()

d.mu.interceptor = i
}

func TestT(t *testing.T) {
Expand Down
12 changes: 6 additions & 6 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,9 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx, shouldCleanJobs bool) error {
return errors.Trace(err)
}

d.hookMu.Lock()
d.hook.OnJobRunBefore(job)
d.hookMu.Unlock()
d.mu.RLock()
d.mu.hook.OnJobRunBefore(job)
d.mu.RUnlock()

// If running job meets error, we will save this error in job Error
// and retry later if the job is not cancelled.
Expand Down Expand Up @@ -363,9 +363,9 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx, shouldCleanJobs bool) error {
return nil
}

d.hookMu.Lock()
d.hook.OnJobUpdated(job)
d.hookMu.Unlock()
d.mu.RLock()
d.mu.hook.OnJobUpdated(job)
d.mu.RUnlock()

// Here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
// If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update
Expand Down
Loading

0 comments on commit e28a818

Please sign in to comment.