Skip to content

Commit

Permalink
*: Define the schema change at the table level (pingcap#3999)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored and hanfei1991 committed Aug 4, 2017
1 parent 02b56b3 commit ab53786
Show file tree
Hide file tree
Showing 10 changed files with 366 additions and 77 deletions.
63 changes: 37 additions & 26 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,19 @@ type Domain struct {

// loadInfoSchema loads infoschema at startTS into handle, usedSchemaVersion is the currently used
// infoschema version, if it is the same as the schema version at startTS, we don't need to reload again.
// It returns the latest schema version and an error.
func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion int64, startTS uint64) (int64, error) {
// It returns the latest schema version, the changed table IDs and an error.
func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion int64, startTS uint64) (int64, []int64, error) {
snapshot, err := do.store.GetSnapshot(kv.NewVersion(startTS))
if err != nil {
return 0, errors.Trace(err)
return 0, nil, errors.Trace(err)
}
m := meta.NewSnapshotMeta(snapshot)
latestSchemaVersion, err := m.GetSchemaVersion()
if err != nil {
return 0, errors.Trace(err)
return 0, nil, errors.Trace(err)
}
if usedSchemaVersion != 0 && usedSchemaVersion == latestSchemaVersion {
return latestSchemaVersion, nil
return latestSchemaVersion, nil, nil
}

// Update self schema version to etcd.
Expand All @@ -83,30 +83,30 @@ func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion in
}()

startTime := time.Now()
ok, err := do.tryLoadSchemaDiffs(m, usedSchemaVersion, latestSchemaVersion)
ok, tblIDs, err := do.tryLoadSchemaDiffs(m, usedSchemaVersion, latestSchemaVersion)
if err != nil {
// We can fall back to full load, don't need to return the error.
log.Errorf("[ddl] failed to load schema diff err %v", err)
}
if ok {
log.Infof("[ddl] diff load InfoSchema from version %d to %d, in %v",
usedSchemaVersion, latestSchemaVersion, time.Since(startTime))
return latestSchemaVersion, nil
return latestSchemaVersion, tblIDs, nil
}

schemas, err := do.fetchAllSchemasWithTables(m)
if err != nil {
return 0, errors.Trace(err)
return 0, nil, errors.Trace(err)
}

newISBuilder, err := infoschema.NewBuilder(handle).InitWithDBInfos(schemas, latestSchemaVersion)
if err != nil {
return 0, errors.Trace(err)
return 0, nil, errors.Trace(err)
}
log.Infof("[ddl] full load InfoSchema from version %d to %d, in %v",
usedSchemaVersion, latestSchemaVersion, time.Since(startTime))
newISBuilder.Build()
return latestSchemaVersion, nil
return latestSchemaVersion, nil, nil
}

func (do *Domain) fetchAllSchemasWithTables(m *meta.Meta) ([]*model.DBInfo, error) {
Expand Down Expand Up @@ -172,41 +172,51 @@ const (
maxNumberOfDiffsToLoad = 100
)

// tryLoadSchemaDiffs tries to only load latest schema changes.
// Returns true if the schema is loaded successfully.
// Returns false if the schema can not be loaded by schema diff, then we need to do full load.
func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64) (bool, error) {
func shouldUpdateAllSchema(newVersion, usedVersion int64) bool {
if usedVersion == initialVersion || newVersion-usedVersion > maxNumberOfDiffsToLoad {
// If there isn't any used version, or used version is too old, we do full load.
return false, nil
return true
}
return false
}

// tryLoadSchemaDiffs tries to only load latest schema changes.
// Return true if the schema is loaded successfully.
// Return false if the schema can not be loaded by schema diff, then we need to do full load.
// The second returned value is the delta updated table IDs.
func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64) (bool, []int64, error) {
// If there isn't any used version, or used version is too old, we do full load.
if shouldUpdateAllSchema(newVersion, usedVersion) {
return false, nil, nil
}
if usedVersion > newVersion {
// When user use History Read feature, history schema will be loaded.
// usedVersion may be larger than newVersion, full load is needed.
return false, nil
return false, nil, nil
}
var diffs []*model.SchemaDiff
for usedVersion < newVersion {
usedVersion++
diff, err := m.GetSchemaDiff(usedVersion)
if err != nil {
return false, errors.Trace(err)
return false, nil, errors.Trace(err)
}
if diff == nil {
// If diff is missing for any version between used and new version, we fall back to full reload.
return false, nil
return false, nil, nil
}
diffs = append(diffs, diff)
}
builder := infoschema.NewBuilder(do.infoHandle).InitWithOldInfoSchema()
tblIDs := make([]int64, 0, len(diffs))
for _, diff := range diffs {
err := builder.ApplyDiff(m, diff)
ids, err := builder.ApplyDiff(m, diff)
if err != nil {
return false, errors.Trace(err)
return false, nil, errors.Trace(err)
}
tblIDs = append(tblIDs, ids...)
}
builder.Build()
return true, nil
return true, tblIDs, nil
}

// InfoSchema gets information schema from domain.
Expand All @@ -217,7 +227,7 @@ func (do *Domain) InfoSchema() infoschema.InfoSchema {
// GetSnapshotInfoSchema gets a snapshot information schema.
func (do *Domain) GetSnapshotInfoSchema(snapshotTS uint64) (infoschema.InfoSchema, error) {
snapHandle := do.infoHandle.EmptyClone()
_, err := do.loadInfoSchema(snapHandle, do.infoHandle.Get().SchemaMetaVersion(), snapshotTS)
_, _, err := do.loadInfoSchema(snapHandle, do.infoHandle.Get().SchemaMetaVersion(), snapshotTS)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -277,15 +287,16 @@ func (do *Domain) Reload() error {
schemaVersion = oldInfoSchema.SchemaMetaVersion()
}

latestSchemaVersion, err = do.loadInfoSchema(do.infoHandle, schemaVersion, ver.Ver)
var changedTableIDs []int64
latestSchemaVersion, changedTableIDs, err = do.loadInfoSchema(do.infoHandle, schemaVersion, ver.Ver)
loadSchemaDuration.Observe(time.Since(startTime).Seconds())
if err != nil {
loadSchemaCounter.WithLabelValues("failed").Inc()
return errors.Trace(err)
}
loadSchemaCounter.WithLabelValues("succ").Inc()

do.SchemaValidator.Update(ver.Ver, latestSchemaVersion)
do.SchemaValidator.Update(ver.Ver, schemaVersion, latestSchemaVersion, changedTableIDs)

lease := do.DDL().GetLease()
sub := time.Since(startTime)
Expand Down Expand Up @@ -391,7 +402,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
idleTimeout := 3 * time.Minute // sessions in the sysSessionPool will be recycled after idleTimeout
d = &Domain{
store: store,
SchemaValidator: newSchemaValidator(ddlLease),
SchemaValidator: NewSchemaValidator(ddlLease),
exit: make(chan struct{}),
sysSessionPool: pools.NewResourcePool(factory, capacity, capacity, idleTimeout),
statsLease: statsLease,
Expand Down
134 changes: 110 additions & 24 deletions domain/schema_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,80 +22,166 @@ import (

// SchemaValidator is the interface for checking the validity of schema version.
type SchemaValidator interface {
// Update the schema validator, add a new item, delete the expired items.
// Update the schema validator, add a new item, delete the expired cacheItemInfos.
// The schemaVer is valid within leaseGrantTime plus lease duration.
Update(leaseGrantTime uint64, schemaVer int64)
// Add the changed table IDs to the new schema information,
// which is produced when the oldSchemaVer is updated to the newSchemaVer.
Update(leaseGrantTime uint64, oldSchemaVer, newSchemaVer int64, changedTableIDs []int64)
// Check is it valid for a transaction to use schemaVer, at timestamp txnTS.
Check(txnTS uint64, schemaVer int64) bool
// Latest returns the latest schema version it knows, but not necessary a valid one.
Latest() int64
// IsRelatedTablesChanged returns the result whether relatedTableIDs is changed from usedVer to the latest schema version,
// and an error.
IsRelatedTablesChanged(txnTS uint64, usedVer int64, relatedTableIDs []int64) (bool, error)
// Stop stops checking the valid of transaction.
Stop()
// Restart restarts the schema validator after it is stopped.
Restart()
}

type deltaSchemaInfo struct {
expire time.Time
nextSchemaVersion int64
relatedTableIDs []int64
}

type schemaValidator struct {
mux sync.RWMutex
lease time.Duration
items map[int64]time.Time
latestSchemaVer int64
mux sync.RWMutex
lease time.Duration
validItems map[int64]time.Time
// cacheItemInfos caches the items' information, and some items may be expired.
// It's used to cache the updated table IDs, which is produced when the previous item's version is updated to current item's version.
cacheItemInfos map[int64]*deltaSchemaInfo
latestSchemaVer int64
latestSchemaInfo *deltaSchemaInfo
}

func newSchemaValidator(lease time.Duration) SchemaValidator {
// NewSchemaValidator returns a SchemaValidator structure.
func NewSchemaValidator(lease time.Duration) SchemaValidator {
return &schemaValidator{
lease: lease,
items: make(map[int64]time.Time),
lease: lease,
validItems: make(map[int64]time.Time),
cacheItemInfos: make(map[int64]*deltaSchemaInfo),
}
}

func (s *schemaValidator) Stop() {
log.Info("the schema validator stops")
s.mux.Lock()
defer s.mux.Unlock()
s.items = nil
s.cacheItemInfos = nil
s.validItems = nil
s.latestSchemaVer = 0
s.latestSchemaInfo = nil
}

func (s *schemaValidator) Restart() {
log.Info("the schema validator restarts")
s.mux.Lock()
defer s.mux.Unlock()
s.items = make(map[int64]time.Time)
s.validItems = make(map[int64]time.Time)
s.cacheItemInfos = make(map[int64]*deltaSchemaInfo)
}

func (s *schemaValidator) Update(leaseGrantTS uint64, schemaVer int64) {
func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, changedTableIDs []int64) {
s.mux.Lock()

if s.items == nil {
if s.cacheItemInfos == nil {
s.mux.Unlock()
log.Infof("the schema validator stopped before updating")
return
}

s.latestSchemaVer = schemaVer
// Renew the lease.
leaseGrantTime := extractPhysicalTime(leaseGrantTS)
leaseExpire := leaseGrantTime.Add(s.lease - time.Millisecond)
s.validItems[currVer] = leaseExpire
// Update the schema information.
if currVer != oldVer || currVer == 0 {
s.cacheItemInfos[currVer] = &deltaSchemaInfo{
expire: leaseExpire,
relatedTableIDs: changedTableIDs,
}
oldInfo, ok := s.cacheItemInfos[oldVer]
if ok {
oldInfo.nextSchemaVersion = currVer
}
} else {
s.cacheItemInfos[currVer].expire = leaseExpire
}
s.latestSchemaVer = currVer
s.latestSchemaInfo = s.cacheItemInfos[currVer]

// Delete expired cacheItemInfos, leaseGrantTime is server current time, actually.
for k, info := range s.cacheItemInfos {
if leaseGrantTime.After(info.expire) {
delete(s.validItems, k)
// We cache some expired schema versions to store recently updated table IDs.
if shouldUpdateAllSchema(currVer, k) {
delete(s.cacheItemInfos, k)
}
}
}

// Renewal lease.
s.items[schemaVer] = leaseExpire
s.mux.Unlock()
}

// Delete expired items, leaseGrantTime is server current time, actually.
for k, expire := range s.items {
if leaseGrantTime.After(expire) {
delete(s.items, k)
func hasRelatedTableID(relatedTableIDs, updateTableIDs []int64) bool {
for _, tblID := range updateTableIDs {
for _, relatedTblID := range relatedTableIDs {
if tblID == relatedTblID {
return true
}
}
}
return false
}

s.mux.Unlock()
func (s *schemaValidator) isAllExpired(txnTS uint64) bool {
if s.latestSchemaInfo == nil {
return true
}
t := extractPhysicalTime(txnTS)
return t.After(s.latestSchemaInfo.expire)
}

func (s *schemaValidator) IsRelatedTablesChanged(txnTS uint64, currVer int64, tableIDs []int64) (bool, error) {
s.mux.RLock()
defer s.mux.RUnlock()

if s.cacheItemInfos == nil {
log.Infof("the schema validator stopped before judging")
return false, ErrInfoSchemaExpired
}
if s.isAllExpired(txnTS) {
log.Infof("the schema validator's latest schema version %d is expired", s.latestSchemaVer)
return false, ErrInfoSchemaExpired
}

_, isExisting := s.cacheItemInfos[currVer]
if !isExisting {
log.Infof("the schema version %d is much older than the latest version %d", currVer, s.latestSchemaVer)
return false, ErrInfoSchemaChanged
}

for {
info, ok := s.cacheItemInfos[currVer]
if !ok {
return false, nil
}
if hasRelatedTableID(tableIDs, info.relatedTableIDs) {
return true, nil
}
currVer = info.nextSchemaVersion
}
}

// Check checks schema validity, returns true if use schemaVer at txnTS is legal.
func (s *schemaValidator) Check(txnTS uint64, schemaVer int64) bool {
s.mux.RLock()
defer s.mux.RUnlock()

if s.items == nil {
if s.cacheItemInfos == nil {
log.Infof("the schema validator stopped before checking")
return false
}
Expand All @@ -104,7 +190,7 @@ func (s *schemaValidator) Check(txnTS uint64, schemaVer int64) bool {
return true
}

expire, ok := s.items[schemaVer]
expire, ok := s.validItems[schemaVer]
if !ok {
// Can't find schema version means it's already expired.
return false
Expand Down
Loading

0 comments on commit ab53786

Please sign in to comment.