From ab53786d33efab0d402f4cd81fa50e8a9abb9936 Mon Sep 17 00:00:00 2001 From: Lynn Date: Fri, 4 Aug 2017 17:53:38 +0800 Subject: [PATCH] *: Define the schema change at the table level (#3999) --- domain/domain.go | 63 ++++++++------- domain/schema_validator.go | 134 ++++++++++++++++++++++++++------ domain/schema_validator_test.go | 49 +++++++++--- executor/executor.go | 5 +- executor/executor_test.go | 87 +++++++++++++++++++++ infoschema/builder.go | 30 ++++--- infoschema/infoschema_test.go | 4 +- session.go | 25 +++++- session_test.go | 43 ++++++++++ tidb_test.go | 3 +- 10 files changed, 366 insertions(+), 77 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 92eed556961b2..07d12449afeaf 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -55,19 +55,19 @@ type Domain struct { // loadInfoSchema loads infoschema at startTS into handle, usedSchemaVersion is the currently used // infoschema version, if it is the same as the schema version at startTS, we don't need to reload again. -// It returns the latest schema version and an error. -func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion int64, startTS uint64) (int64, error) { +// It returns the latest schema version, the changed table IDs and an error. +func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion int64, startTS uint64) (int64, []int64, error) { snapshot, err := do.store.GetSnapshot(kv.NewVersion(startTS)) if err != nil { - return 0, errors.Trace(err) + return 0, nil, errors.Trace(err) } m := meta.NewSnapshotMeta(snapshot) latestSchemaVersion, err := m.GetSchemaVersion() if err != nil { - return 0, errors.Trace(err) + return 0, nil, errors.Trace(err) } if usedSchemaVersion != 0 && usedSchemaVersion == latestSchemaVersion { - return latestSchemaVersion, nil + return latestSchemaVersion, nil, nil } // Update self schema version to etcd. @@ -83,7 +83,7 @@ func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion in }() startTime := time.Now() - ok, err := do.tryLoadSchemaDiffs(m, usedSchemaVersion, latestSchemaVersion) + ok, tblIDs, err := do.tryLoadSchemaDiffs(m, usedSchemaVersion, latestSchemaVersion) if err != nil { // We can fall back to full load, don't need to return the error. log.Errorf("[ddl] failed to load schema diff err %v", err) @@ -91,22 +91,22 @@ func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion in if ok { log.Infof("[ddl] diff load InfoSchema from version %d to %d, in %v", usedSchemaVersion, latestSchemaVersion, time.Since(startTime)) - return latestSchemaVersion, nil + return latestSchemaVersion, tblIDs, nil } schemas, err := do.fetchAllSchemasWithTables(m) if err != nil { - return 0, errors.Trace(err) + return 0, nil, errors.Trace(err) } newISBuilder, err := infoschema.NewBuilder(handle).InitWithDBInfos(schemas, latestSchemaVersion) if err != nil { - return 0, errors.Trace(err) + return 0, nil, errors.Trace(err) } log.Infof("[ddl] full load InfoSchema from version %d to %d, in %v", usedSchemaVersion, latestSchemaVersion, time.Since(startTime)) newISBuilder.Build() - return latestSchemaVersion, nil + return latestSchemaVersion, nil, nil } func (do *Domain) fetchAllSchemasWithTables(m *meta.Meta) ([]*model.DBInfo, error) { @@ -172,41 +172,51 @@ const ( maxNumberOfDiffsToLoad = 100 ) -// tryLoadSchemaDiffs tries to only load latest schema changes. -// Returns true if the schema is loaded successfully. -// Returns false if the schema can not be loaded by schema diff, then we need to do full load. -func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64) (bool, error) { +func shouldUpdateAllSchema(newVersion, usedVersion int64) bool { if usedVersion == initialVersion || newVersion-usedVersion > maxNumberOfDiffsToLoad { - // If there isn't any used version, or used version is too old, we do full load. - return false, nil + return true + } + return false +} + +// tryLoadSchemaDiffs tries to only load latest schema changes. +// Return true if the schema is loaded successfully. +// Return false if the schema can not be loaded by schema diff, then we need to do full load. +// The second returned value is the delta updated table IDs. +func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64) (bool, []int64, error) { + // If there isn't any used version, or used version is too old, we do full load. + if shouldUpdateAllSchema(newVersion, usedVersion) { + return false, nil, nil } if usedVersion > newVersion { // When user use History Read feature, history schema will be loaded. // usedVersion may be larger than newVersion, full load is needed. - return false, nil + return false, nil, nil } var diffs []*model.SchemaDiff for usedVersion < newVersion { usedVersion++ diff, err := m.GetSchemaDiff(usedVersion) if err != nil { - return false, errors.Trace(err) + return false, nil, errors.Trace(err) } if diff == nil { // If diff is missing for any version between used and new version, we fall back to full reload. - return false, nil + return false, nil, nil } diffs = append(diffs, diff) } builder := infoschema.NewBuilder(do.infoHandle).InitWithOldInfoSchema() + tblIDs := make([]int64, 0, len(diffs)) for _, diff := range diffs { - err := builder.ApplyDiff(m, diff) + ids, err := builder.ApplyDiff(m, diff) if err != nil { - return false, errors.Trace(err) + return false, nil, errors.Trace(err) } + tblIDs = append(tblIDs, ids...) } builder.Build() - return true, nil + return true, tblIDs, nil } // InfoSchema gets information schema from domain. @@ -217,7 +227,7 @@ func (do *Domain) InfoSchema() infoschema.InfoSchema { // GetSnapshotInfoSchema gets a snapshot information schema. func (do *Domain) GetSnapshotInfoSchema(snapshotTS uint64) (infoschema.InfoSchema, error) { snapHandle := do.infoHandle.EmptyClone() - _, err := do.loadInfoSchema(snapHandle, do.infoHandle.Get().SchemaMetaVersion(), snapshotTS) + _, _, err := do.loadInfoSchema(snapHandle, do.infoHandle.Get().SchemaMetaVersion(), snapshotTS) if err != nil { return nil, errors.Trace(err) } @@ -277,7 +287,8 @@ func (do *Domain) Reload() error { schemaVersion = oldInfoSchema.SchemaMetaVersion() } - latestSchemaVersion, err = do.loadInfoSchema(do.infoHandle, schemaVersion, ver.Ver) + var changedTableIDs []int64 + latestSchemaVersion, changedTableIDs, err = do.loadInfoSchema(do.infoHandle, schemaVersion, ver.Ver) loadSchemaDuration.Observe(time.Since(startTime).Seconds()) if err != nil { loadSchemaCounter.WithLabelValues("failed").Inc() @@ -285,7 +296,7 @@ func (do *Domain) Reload() error { } loadSchemaCounter.WithLabelValues("succ").Inc() - do.SchemaValidator.Update(ver.Ver, latestSchemaVersion) + do.SchemaValidator.Update(ver.Ver, schemaVersion, latestSchemaVersion, changedTableIDs) lease := do.DDL().GetLease() sub := time.Since(startTime) @@ -391,7 +402,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio idleTimeout := 3 * time.Minute // sessions in the sysSessionPool will be recycled after idleTimeout d = &Domain{ store: store, - SchemaValidator: newSchemaValidator(ddlLease), + SchemaValidator: NewSchemaValidator(ddlLease), exit: make(chan struct{}), sysSessionPool: pools.NewResourcePool(factory, capacity, capacity, idleTimeout), statsLease: statsLease, diff --git a/domain/schema_validator.go b/domain/schema_validator.go index a3b9af1dd3ab9..240226a557c89 100644 --- a/domain/schema_validator.go +++ b/domain/schema_validator.go @@ -22,30 +22,47 @@ import ( // SchemaValidator is the interface for checking the validity of schema version. type SchemaValidator interface { - // Update the schema validator, add a new item, delete the expired items. + // Update the schema validator, add a new item, delete the expired cacheItemInfos. // The schemaVer is valid within leaseGrantTime plus lease duration. - Update(leaseGrantTime uint64, schemaVer int64) + // Add the changed table IDs to the new schema information, + // which is produced when the oldSchemaVer is updated to the newSchemaVer. + Update(leaseGrantTime uint64, oldSchemaVer, newSchemaVer int64, changedTableIDs []int64) // Check is it valid for a transaction to use schemaVer, at timestamp txnTS. Check(txnTS uint64, schemaVer int64) bool // Latest returns the latest schema version it knows, but not necessary a valid one. Latest() int64 + // IsRelatedTablesChanged returns the result whether relatedTableIDs is changed from usedVer to the latest schema version, + // and an error. + IsRelatedTablesChanged(txnTS uint64, usedVer int64, relatedTableIDs []int64) (bool, error) // Stop stops checking the valid of transaction. Stop() // Restart restarts the schema validator after it is stopped. Restart() } +type deltaSchemaInfo struct { + expire time.Time + nextSchemaVersion int64 + relatedTableIDs []int64 +} + type schemaValidator struct { - mux sync.RWMutex - lease time.Duration - items map[int64]time.Time - latestSchemaVer int64 + mux sync.RWMutex + lease time.Duration + validItems map[int64]time.Time + // cacheItemInfos caches the items' information, and some items may be expired. + // It's used to cache the updated table IDs, which is produced when the previous item's version is updated to current item's version. + cacheItemInfos map[int64]*deltaSchemaInfo + latestSchemaVer int64 + latestSchemaInfo *deltaSchemaInfo } -func newSchemaValidator(lease time.Duration) SchemaValidator { +// NewSchemaValidator returns a SchemaValidator structure. +func NewSchemaValidator(lease time.Duration) SchemaValidator { return &schemaValidator{ - lease: lease, - items: make(map[int64]time.Time), + lease: lease, + validItems: make(map[int64]time.Time), + cacheItemInfos: make(map[int64]*deltaSchemaInfo), } } @@ -53,49 +70,118 @@ func (s *schemaValidator) Stop() { log.Info("the schema validator stops") s.mux.Lock() defer s.mux.Unlock() - s.items = nil + s.cacheItemInfos = nil + s.validItems = nil s.latestSchemaVer = 0 + s.latestSchemaInfo = nil } func (s *schemaValidator) Restart() { log.Info("the schema validator restarts") s.mux.Lock() defer s.mux.Unlock() - s.items = make(map[int64]time.Time) + s.validItems = make(map[int64]time.Time) + s.cacheItemInfos = make(map[int64]*deltaSchemaInfo) } -func (s *schemaValidator) Update(leaseGrantTS uint64, schemaVer int64) { +func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, changedTableIDs []int64) { s.mux.Lock() - if s.items == nil { + if s.cacheItemInfos == nil { s.mux.Unlock() log.Infof("the schema validator stopped before updating") return } - s.latestSchemaVer = schemaVer + // Renew the lease. leaseGrantTime := extractPhysicalTime(leaseGrantTS) leaseExpire := leaseGrantTime.Add(s.lease - time.Millisecond) + s.validItems[currVer] = leaseExpire + // Update the schema information. + if currVer != oldVer || currVer == 0 { + s.cacheItemInfos[currVer] = &deltaSchemaInfo{ + expire: leaseExpire, + relatedTableIDs: changedTableIDs, + } + oldInfo, ok := s.cacheItemInfos[oldVer] + if ok { + oldInfo.nextSchemaVersion = currVer + } + } else { + s.cacheItemInfos[currVer].expire = leaseExpire + } + s.latestSchemaVer = currVer + s.latestSchemaInfo = s.cacheItemInfos[currVer] + + // Delete expired cacheItemInfos, leaseGrantTime is server current time, actually. + for k, info := range s.cacheItemInfos { + if leaseGrantTime.After(info.expire) { + delete(s.validItems, k) + // We cache some expired schema versions to store recently updated table IDs. + if shouldUpdateAllSchema(currVer, k) { + delete(s.cacheItemInfos, k) + } + } + } - // Renewal lease. - s.items[schemaVer] = leaseExpire + s.mux.Unlock() +} - // Delete expired items, leaseGrantTime is server current time, actually. - for k, expire := range s.items { - if leaseGrantTime.After(expire) { - delete(s.items, k) +func hasRelatedTableID(relatedTableIDs, updateTableIDs []int64) bool { + for _, tblID := range updateTableIDs { + for _, relatedTblID := range relatedTableIDs { + if tblID == relatedTblID { + return true + } } } + return false +} - s.mux.Unlock() +func (s *schemaValidator) isAllExpired(txnTS uint64) bool { + if s.latestSchemaInfo == nil { + return true + } + t := extractPhysicalTime(txnTS) + return t.After(s.latestSchemaInfo.expire) +} + +func (s *schemaValidator) IsRelatedTablesChanged(txnTS uint64, currVer int64, tableIDs []int64) (bool, error) { + s.mux.RLock() + defer s.mux.RUnlock() + + if s.cacheItemInfos == nil { + log.Infof("the schema validator stopped before judging") + return false, ErrInfoSchemaExpired + } + if s.isAllExpired(txnTS) { + log.Infof("the schema validator's latest schema version %d is expired", s.latestSchemaVer) + return false, ErrInfoSchemaExpired + } + + _, isExisting := s.cacheItemInfos[currVer] + if !isExisting { + log.Infof("the schema version %d is much older than the latest version %d", currVer, s.latestSchemaVer) + return false, ErrInfoSchemaChanged + } + + for { + info, ok := s.cacheItemInfos[currVer] + if !ok { + return false, nil + } + if hasRelatedTableID(tableIDs, info.relatedTableIDs) { + return true, nil + } + currVer = info.nextSchemaVersion + } } // Check checks schema validity, returns true if use schemaVer at txnTS is legal. func (s *schemaValidator) Check(txnTS uint64, schemaVer int64) bool { s.mux.RLock() defer s.mux.RUnlock() - - if s.items == nil { + if s.cacheItemInfos == nil { log.Infof("the schema validator stopped before checking") return false } @@ -104,7 +190,7 @@ func (s *schemaValidator) Check(txnTS uint64, schemaVer int64) bool { return true } - expire, ok := s.items[schemaVer] + expire, ok := s.validItems[schemaVer] if !ok { // Can't find schema version means it's already expired. return false diff --git a/domain/schema_validator_test.go b/domain/schema_validator_test.go index 49dcb78f59423..737640b89c673 100644 --- a/domain/schema_validator_test.go +++ b/domain/schema_validator_test.go @@ -18,11 +18,13 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/tidb/terror" "github.com/pingcap/tidb/util/testleak" ) type leaseGrantItem struct { leaseGrantTS uint64 + oldVer int64 schemaVer int64 } @@ -34,43 +36,69 @@ func (*testSuite) TestSchemaValidator(c *C) { exit := make(chan struct{}) go serverFunc(lease, leaseGrantCh, oracleCh, exit) - validator := newSchemaValidator(lease) + validator := NewSchemaValidator(lease) for i := 0; i < 10; i++ { delay := time.Duration(100+rand.Intn(900)) * time.Microsecond time.Sleep(delay) // Reload can run arbitrarily, at any time. - reload(validator, leaseGrantCh) + reload(validator, leaseGrantCh, 0) } // Take a lease, check it's valid. item := <-leaseGrantCh - validator.Update(item.leaseGrantTS, item.schemaVer) + validator.Update(item.leaseGrantTS, 0, item.schemaVer, nil) valid := validator.Check(item.leaseGrantTS, item.schemaVer) c.Assert(valid, IsTrue) + // Stop the validator, validator's items value is nil. + validator.Stop() + isTablesChanged, err := validator.IsRelatedTablesChanged(item.leaseGrantTS, item.schemaVer, nil) + c.Assert(terror.ErrorEqual(err, ErrInfoSchemaExpired), IsTrue) + c.Assert(isTablesChanged, IsFalse) + valid = validator.Check(item.leaseGrantTS, item.schemaVer) + c.Assert(valid, IsFalse) + validator.Restart() + // Sleep for a long time, check schema is invalid. time.Sleep(lease) ts := <-oracleCh valid = validator.Check(ts, item.schemaVer) c.Assert(valid, IsFalse) - validator.Stop() - validator.Restart() - - reload(validator, leaseGrantCh) + currVer := reload(validator, leaseGrantCh, 0) valid = validator.Check(ts, item.schemaVer) c.Assert(valid, IsFalse) - // Check the latest schema version must changed. c.Assert(item.schemaVer, Less, validator.Latest()) + // Update current schema version to 10 and the delta table IDs is 1, 2, 3. + validator.Update(ts, currVer, 10, []int64{1, 2, 3}) + // Make sure the updated table IDs don't be covered with the same schema version. + validator.Update(ts, 10, 10, nil) + isTablesChanged, err = validator.IsRelatedTablesChanged(ts, currVer, nil) + c.Assert(err, IsNil) + c.Assert(isTablesChanged, IsFalse) + isTablesChanged, err = validator.IsRelatedTablesChanged(ts, currVer, []int64{2}) + c.Assert(err, IsNil) + c.Assert(isTablesChanged, IsTrue) + // The current schema version is older than the oldest schema version. + isTablesChanged, err = validator.IsRelatedTablesChanged(ts, -1, nil) + c.Assert(terror.ErrorEqual(err, ErrInfoSchemaChanged), IsTrue) + c.Assert(isTablesChanged, IsFalse) + // All schema versions is expired. + ts = uint64(time.Now().Add(lease).UnixNano()) + isTablesChanged, err = validator.IsRelatedTablesChanged(ts, currVer, nil) + c.Assert(terror.ErrorEqual(err, ErrInfoSchemaExpired), IsTrue) + c.Assert(isTablesChanged, IsFalse) + exit <- struct{}{} } -func reload(validator SchemaValidator, leaseGrantCh chan leaseGrantItem) { +func reload(validator SchemaValidator, leaseGrantCh chan leaseGrantItem, ids ...int64) int64 { item := <-leaseGrantCh - validator.Update(item.leaseGrantTS, item.schemaVer) + validator.Update(item.leaseGrantTS, item.oldVer, item.schemaVer, ids) + return item.schemaVer } // serverFunc plays the role as a remote server, runs in a separate goroutine. @@ -87,6 +115,7 @@ func serverFunc(lease time.Duration, requireLease chan leaseGrantItem, oracleCh leaseTS = uint64(time.Now().UnixNano()) case requireLease <- leaseGrantItem{ leaseGrantTS: leaseTS, + oldVer: version - 1, schemaVer: version, }: case oracleCh <- uint64(time.Now().UnixNano()): diff --git a/executor/executor.go b/executor/executor.go index 9137e7d38b636..9d2628896982f 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -263,7 +263,8 @@ func (e *SelectLockExec) Next() (*Row, error) { return nil, nil } if len(row.RowKeys) != 0 && e.Lock == ast.SelectLockForUpdate { - e.ctx.GetSessionVars().TxnCtx.ForUpdate = true + txnCtx := e.ctx.GetSessionVars().TxnCtx + txnCtx.ForUpdate = true txn := e.ctx.Txn() for _, k := range row.RowKeys { lockKey := tablecodec.EncodeRowKeyWithHandle(k.Tbl.Meta().ID, k.Handle) @@ -271,6 +272,8 @@ func (e *SelectLockExec) Next() (*Row, error) { if err != nil { return nil, errors.Trace(err) } + // This operation is only for schema validator check. + txnCtx.UpdateDeltaForTable(k.Tbl.Meta().ID, 0, 0) } } return row, nil diff --git a/executor/executor_test.go b/executor/executor_test.go index 2e2f88043a2e9..384f7807e3928 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -17,6 +17,7 @@ import ( "flag" "fmt" "os" + "strings" "sync" "testing" "time" @@ -27,6 +28,7 @@ import ( pb "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb" "github.com/pingcap/tidb/context" + "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/inspectkv" @@ -1601,6 +1603,91 @@ func (s *testSuite) TestEmptyEnum(c *C) { tk.MustQuery("select * from t").Check(testkit.Rows("", "", "")) } +func (s *testSuite) TestMiscellaneousBuiltin(c *C) { + defer func() { + s.cleanEnv(c) + testleak.AfterTest(c)() + }() + + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + // for uuid + r := tk.MustQuery("select uuid(), uuid(), uuid(), uuid(), uuid(), uuid();") + for _, it := range r.Rows() { + for _, item := range it { + uuid, ok := item.(string) + c.Assert(ok, Equals, true) + list := strings.Split(uuid, "-") + c.Assert(len(list), Equals, 5) + c.Assert(len(list[0]), Equals, 8) + c.Assert(len(list[1]), Equals, 4) + c.Assert(len(list[2]), Equals, 4) + c.Assert(len(list[3]), Equals, 4) + c.Assert(len(list[4]), Equals, 12) + } + } +} + +func (s *testSuite) TestSchemaCheckerSQL(c *C) { + defer testleak.AfterTest(c)() + store, err := tikv.NewMockTikvStore() + c.Assert(err, IsNil) + tidb.SetStatsLease(0) + lease := 5 * time.Millisecond + tidb.SetSchemaLease(lease) + dom, err := tidb.BootstrapSession(store) + c.Assert(err, IsNil) + defer dom.Close() + defer store.Close() + + tk := testkit.NewTestKit(c, store) + tk.MustExec(`use test`) + se1, err := tidb.CreateSession(store) + c.Assert(err, IsNil) + defer se1.Close() + _, err = se1.Execute(`use test`) + c.Assert(err, IsNil) + + // create table + tk.MustExec(`create table t (id int, c int);`) + tk.MustExec(`create table t1 (id int, c int);`) + // insert data + tk.MustExec(`insert into t values(1, 1);`) + + // The schema version is out of date in the first transaction, but the SQL can be retried. + tk.MustExec(`begin;`) + _, err = se1.Execute(`alter table t add index idx(c);`) + c.Assert(err, IsNil) + time.Sleep(lease * 2) + tk.MustExec(`insert into t1 values(2, 2);`) + tk.MustExec(`commit;`) + // The schema version is out of date in the first transaction, and the SQL can't be retried. + tidb.SchemaChangedWithoutRetry = true + tk.MustExec(`begin;`) + _, err = se1.Execute(`alter table t add index idx1(c);`) + c.Assert(err, IsNil) + time.Sleep(lease * 2) + tk.MustExec(`insert into t values(3, 3);`) + _, err = tk.Exec(`commit;`) + c.Assert(terror.ErrorEqual(err, domain.ErrInfoSchemaChanged), IsTrue, Commentf("err %v", err)) + // The schema version is out of date in the first transaction, and the SQL can't be retried. + // But the transaction related table IDs aren't in the updated table IDs. + tk.MustExec(`begin;`) + _, err = se1.Execute(`alter table t add index idx2(c);`) + c.Assert(err, IsNil) + time.Sleep(lease * 2) + tk.MustExec(`insert into t1 values(4, 4);`) + tk.MustExec(`commit;`) + // Test for "select for update". + tk.MustExec(`begin;`) + _, err = se1.Execute(`alter table t add index idx3(c);`) + c.Assert(err, IsNil) + time.Sleep(lease * 2) + tk.MustQuery(`select * from t for update`) + _, err = tk.Exec(`commit;`) + c.Assert(terror.ErrorEqual(err, domain.ErrInfoSchemaChanged), IsTrue, Commentf("err %v", err)) +} + type checkRequestClient struct { tikv.Client priority pb.CommandPri diff --git a/infoschema/builder.go b/infoschema/builder.go index 7bfee388d3446..695c7eb85229c 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -31,31 +31,37 @@ type Builder struct { } // ApplyDiff applies SchemaDiff to the new InfoSchema. -func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) error { +// Return the detal updated table IDs that are produced from SchemaDiff and an error. +func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { b.is.schemaMetaVersion = diff.Version if diff.Type == model.ActionCreateSchema { - return b.applyCreateSchema(m, diff) + return nil, b.applyCreateSchema(m, diff) } else if diff.Type == model.ActionDropSchema { - b.applyDropSchema(diff.SchemaID) - return nil + tblIDs := b.applyDropSchema(diff.SchemaID) + return tblIDs, nil } roDBInfo, ok := b.is.SchemaByID(diff.SchemaID) if !ok { - return ErrDatabaseNotExists + return nil, ErrDatabaseNotExists } var oldTableID, newTableID int64 + tblIDs := make([]int64, 0, 2) switch diff.Type { case model.ActionCreateTable: newTableID = diff.TableID + tblIDs = append(tblIDs, newTableID) case model.ActionDropTable: oldTableID = diff.TableID + tblIDs = append(tblIDs, oldTableID) case model.ActionTruncateTable: oldTableID = diff.OldTableID newTableID = diff.TableID + tblIDs = append(tblIDs, oldTableID, newTableID) default: oldTableID = diff.TableID newTableID = diff.TableID + tblIDs = append(tblIDs, oldTableID) } b.copySchemaTables(roDBInfo.Name.L) b.copySortedTables(oldTableID, newTableID) @@ -69,7 +75,7 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) error { if diff.Type == model.ActionRenameTable { oldRoDBInfo, ok := b.is.SchemaByID(diff.OldSchemaID) if !ok { - return ErrDatabaseNotExists + return nil, ErrDatabaseNotExists } b.applyDropTable(oldRoDBInfo, oldTableID) } else { @@ -80,10 +86,10 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) error { // All types except DropTable. err := b.applyCreateTable(m, roDBInfo, newTableID, alloc) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } } - return nil + return tblIDs, nil } // copySortedTables copies sortedTables for old table and new table for later modification. @@ -118,15 +124,19 @@ func (b *Builder) applyCreateSchema(m *meta.Meta, diff *model.SchemaDiff) error return nil } -func (b *Builder) applyDropSchema(schemaID int64) { +func (b *Builder) applyDropSchema(schemaID int64) []int64 { di, ok := b.is.SchemaByID(schemaID) if !ok { - return + return nil } delete(b.is.schemaMap, di.Name.L) + ids := make([]int64, 0, len(di.Tables)) for _, tbl := range di.Tables { b.applyDropTable(di, tbl.ID) + // TODO: If the table ID doesn't exist. + ids = append(ids, tbl.ID) } + return ids } func (b *Builder) applyCreateTable(m *meta.Meta, roDBInfo *model.DBInfo, tableID int64, alloc autoid.Allocator) error { diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index 416595adfe677..eb4cfd5288cd0 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -186,13 +186,13 @@ func (*testSuite) TestT(c *C) { func checkApplyCreateNonExistsSchemaDoesNotPanic(c *C, txn kv.Transaction, builder *infoschema.Builder) { m := meta.NewMeta(txn) - err := builder.ApplyDiff(m, &model.SchemaDiff{Type: model.ActionCreateSchema, SchemaID: 999}) + _, err := builder.ApplyDiff(m, &model.SchemaDiff{Type: model.ActionCreateSchema, SchemaID: 999}) c.Assert(infoschema.ErrDatabaseNotExists.Equal(err), IsTrue) } func checkApplyCreateNonExistsTableDoesNotPanic(c *C, txn kv.Transaction, builder *infoschema.Builder, dbID int64) { m := meta.NewMeta(txn) - err := builder.ApplyDiff(m, &model.SchemaDiff{Type: model.ActionCreateTable, SchemaID: dbID, TableID: 999}) + _, err := builder.ApplyDiff(m, &model.SchemaDiff{Type: model.ActionCreateTable, SchemaID: dbID, TableID: 999}) c.Assert(infoschema.ErrTableNotExists.Equal(err), IsTrue) } diff --git a/session.go b/session.go index 4c6ce7a12167f..61752d6470612 100644 --- a/session.go +++ b/session.go @@ -191,7 +191,8 @@ func (s *session) GetSessionManager() util.SessionManager { type schemaLeaseChecker struct { domain.SchemaValidator - schemaVer int64 + schemaVer int64 + relatedTableIDs []int64 } const ( @@ -219,10 +220,15 @@ func (s *schemaLeaseChecker) Check(txnTS uint64) error { func (s *schemaLeaseChecker) checkOnce(txnTS uint64) error { succ := s.SchemaValidator.Check(txnTS, s.schemaVer) if !succ { - if s.SchemaValidator.Latest() > s.schemaVer { + isChanged, err := s.SchemaValidator.IsRelatedTablesChanged(txnTS, s.schemaVer, s.relatedTableIDs) + if err != nil { + return errors.Trace(err) + } + if isChanged { return domain.ErrInfoSchemaChanged } - return domain.ErrInfoSchemaExpired + log.Infof("schema checker txnTS %d ver %d doesn't change related tables %v", + txnTS, s.schemaVer, s.relatedTableIDs) } return nil } @@ -253,10 +259,17 @@ func (s *session) doCommit() error { } } + // Get the related table IDs. + relatedTables := s.GetSessionVars().TxnCtx.TableDeltaMap + tableIDs := make([]int64, 0, len(relatedTables)) + for id := range relatedTables { + tableIDs = append(tableIDs, id) + } // Set this option for 2 phase commit to validate schema lease. s.txn.SetOption(kv.SchemaLeaseChecker, &schemaLeaseChecker{ SchemaValidator: sessionctx.GetDomain(s).SchemaValidator, schemaVer: s.sessionVars.TxnCtx.SchemaVersion, + relatedTableIDs: tableIDs, }) if err := s.txn.Commit(); err != nil { return errors.Trace(err) @@ -343,7 +356,13 @@ func (s *session) String() string { const sqlLogMaxLen = 1024 +// SchemaChangedWithoutRetry is used for testing. +var SchemaChangedWithoutRetry bool + func (s *session) isRetryableError(err error) bool { + if SchemaChangedWithoutRetry { + return kv.IsRetryableError(err) + } return kv.IsRetryableError(err) || terror.ErrorEqual(err, domain.ErrInfoSchemaChanged) } diff --git a/session_test.go b/session_test.go index 2794ca485e809..d3dd8c8280ac6 100644 --- a/session_test.go +++ b/session_test.go @@ -74,6 +74,49 @@ func (s *testSessionSuite) TearDownSuite(c *C) { c.Assert(err, IsNil) } +func (s *testSessionSuite) TestSchemaCheckerSimple(c *C) { + defer testleak.AfterTest(c)() + lease := 5 * time.Millisecond + validator := domain.NewSchemaValidator(lease) + checker := &schemaLeaseChecker{SchemaValidator: validator} + + // Add some schema versions and delta table IDs. + ts := uint64(time.Now().UnixNano()) + validator.Update(ts, 0, 2, []int64{1}) + validator.Update(ts, 2, 4, []int64{2}) + + // checker's schema version is the same as the current schema version. + checker.schemaVer = 4 + err := checker.checkOnce(ts) + c.Assert(err, IsNil) + + // checker's schema version is less than the current schema version, and it doesn't exist in validator's items. + // checker's related table ID isn't in validator's changed table IDs. + checker.schemaVer = 2 + checker.relatedTableIDs = []int64{3} + err = checker.checkOnce(ts) + c.Assert(err, IsNil) + // The checker's schema version isn't in validator's items. + checker.schemaVer = 1 + checker.relatedTableIDs = []int64{3} + err = checker.checkOnce(ts) + c.Assert(terror.ErrorEqual(err, domain.ErrInfoSchemaChanged), IsTrue) + // checker's related table ID is in validator's changed table IDs. + checker.relatedTableIDs = []int64{2} + err = checker.checkOnce(ts) + c.Assert(terror.ErrorEqual(err, domain.ErrInfoSchemaChanged), IsTrue) + + // validator's latest schema version is expired. + time.Sleep(lease + time.Microsecond) + checker.schemaVer = 2 + checker.relatedTableIDs = []int64{3} + err = checker.checkOnce(ts) + c.Assert(err, IsNil) + nowTS := uint64(time.Now().UnixNano()) + err = checker.checkOnce(nowTS) + c.Assert(terror.ErrorEqual(err, domain.ErrInfoSchemaExpired), IsTrue) +} + func (s *testSessionSuite) TestPrepare(c *C) { defer testleak.AfterTest(c)() dbName := "test_prepare" diff --git a/tidb_test.go b/tidb_test.go index 27357fdf81619..44c1ba7989b55 100644 --- a/tidb_test.go +++ b/tidb_test.go @@ -395,7 +395,8 @@ func newStore(c *C, dbPath string) kv.Storage { func newStoreWithBootstrap(c *C, dbPath string) kv.Storage { store := newStore(c, dbPath) - BootstrapSession(store) + _, err := BootstrapSession(store) + c.Assert(err, IsNil) return store }