Skip to content

Commit

Permalink
Merge pull request pingcap#268 from pingcap/goroutine/schema-version
Browse files Browse the repository at this point in the history
*: Introduce schema version
  • Loading branch information
ngaut committed Sep 25, 2015
2 parents 5013e00 + 9218dbf commit 1373e0e
Show file tree
Hide file tree
Showing 11 changed files with 227 additions and 102 deletions.
2 changes: 2 additions & 0 deletions bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package tidb

import (
"fmt"
"runtime/debug"

"github.com/ngaut/log"
mysql "github.com/pingcap/tidb/mysqldef"
Expand Down Expand Up @@ -57,6 +58,7 @@ func initUserTable(s Session) {
func mustExecute(s Session, sql string) {
_, err := s.Execute(sql)
if err != nil {
debug.PrintStack()
log.Fatal(err)
}
}
171 changes: 126 additions & 45 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,20 @@ type DDL interface {
}

type ddl struct {
store kv.Storage
infoHandle *infoschema.Handle
store kv.Storage
infoHandle *infoschema.Handle
onDDLChange OnDDLChange
}

// OnDDLChange is used as hook function when schema changed
type OnDDLChange func(err error) error

// NewDDL create new DDL
func NewDDL(store kv.Storage, infoHandle *infoschema.Handle) DDL {
func NewDDL(store kv.Storage, infoHandle *infoschema.Handle, hook OnDDLChange) DDL {
d := &ddl{
store: store,
infoHandle: infoHandle,
store: store,
infoHandle: infoHandle,
onDDLChange: hook,
}
return d
}
Expand All @@ -82,27 +87,54 @@ func (d *ddl) CreateSchema(ctx context.Context, schema model.CIStr) (err error)
is := d.GetInformationSchema()
_, ok := is.SchemaByName(schema)
if ok {
return ErrExists
return errors.Trace(ErrExists)
}
info := &model.DBInfo{Name: schema}
info.ID, err = meta.GenGlobalID(d.store)
if err != nil {
return errors.Trace(err)
}
err = d.writeSchemaInfo(info)

err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
err := d.verifySchemaMetaVersion(txn, is.SchemaMetaVersion())
if err != nil {
return errors.Trace(err)
}
err = d.writeSchemaInfo(info, txn)
return errors.Trace(err)
})
if d.onDDLChange != nil {
err = d.onDDLChange(err)
}
return errors.Trace(err)
}

func (d *ddl) verifySchemaMetaVersion(txn kv.Transaction, schemaMetaVersion int64) error {
curVer, err := txn.GetInt64(meta.SchemaMetaVersionKey)
if err != nil {
return errors.Trace(err)
}
newInfo := append(is.Clone(), info)
d.infoHandle.Set(newInfo)
return nil
if curVer != schemaMetaVersion {
return errors.Errorf("Schema changed, our version %d, got %d", schemaMetaVersion, curVer)
}

// Increment version
_, err = txn.Inc(meta.SchemaMetaVersionKey, 1)
if err != nil {
return errors.Trace(err)
}

if err := txn.LockKeys(meta.SchemaMetaVersionKey); err != nil {
return errors.Trace(err)
}
return errors.Trace(err)
}

func (d *ddl) DropSchema(ctx context.Context, schema model.CIStr) (err error) {
is := d.GetInformationSchema()
old, ok := is.SchemaByName(schema)
if !ok {
return ErrNotExists
return errors.Trace(ErrNotExists)
}

// Update InfoSchema
Expand All @@ -113,7 +145,6 @@ func (d *ddl) DropSchema(ctx context.Context, schema model.CIStr) (err error) {
newInfo = append(newInfo, v)
}
}
d.infoHandle.Set(newInfo)

// Remove data
txn, err := ctx.GetTxn(true)
Expand All @@ -138,16 +169,20 @@ func (d *ddl) DropSchema(ctx context.Context, schema model.CIStr) (err error) {

// Delete meta key
err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
err := d.verifySchemaMetaVersion(txn, is.SchemaMetaVersion())
if err != nil {
return errors.Trace(err)
}
key := []byte(meta.DBMetaKey(old.ID))
if err := txn.LockKeys(key); err != nil {
return errors.Trace(err)
}
return txn.Delete(key)
})
if err != nil {
return errors.Trace(err)
if d.onDDLChange != nil {
err = d.onDDLChange(err)
}
return nil
return errors.Trace(err)
}

func getDefaultCharsetAndCollate() (string, string) {
Expand Down Expand Up @@ -362,7 +397,19 @@ func (d *ddl) CreateTable(ctx context.Context, ident table.Ident, colDefs []*col
return errors.Trace(err)
}
log.Infof("New table: %+v", tbInfo)
err = d.updateInfoSchema(ctx, ident.Schema, tbInfo)

err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
err := d.verifySchemaMetaVersion(txn, is.SchemaMetaVersion())
if err != nil {
return errors.Trace(err)
}
err = d.updateInfoSchema(ident.Schema, tbInfo, txn)
return errors.Trace(err)
})

if d.onDDLChange != nil {
err = d.onDDLChange(err)
}
return errors.Trace(err)
}

Expand All @@ -379,7 +426,7 @@ func (d *ddl) AlterTable(ctx context.Context, ident table.Ident, specs []*AlterS
for _, spec := range specs {
switch spec.Action {
case AlterAddColumn:
if err := d.addColumn(ctx, ident.Schema, tbl, spec); err != nil {
if err := d.addColumn(ident.Schema, tbl, spec, is.SchemaMetaVersion()); err != nil {
return errors.Trace(err)
}
default:
Expand All @@ -391,7 +438,7 @@ func (d *ddl) AlterTable(ctx context.Context, ident table.Ident, specs []*AlterS
}

// Add a column into table
func (d *ddl) addColumn(ctx context.Context, schema model.CIStr, tbl table.Table, spec *AlterSpecification) error {
func (d *ddl) addColumn(schema model.CIStr, tbl table.Table, spec *AlterSpecification, schemaMetaVersion int64) error {
// Find position
cols := tbl.Cols()
position := len(cols)
Expand Down Expand Up @@ -444,7 +491,18 @@ func (d *ddl) addColumn(ctx context.Context, schema model.CIStr, tbl table.Table
// TODO: update index
// TODO: update default value
// update infomation schema
err = d.updateInfoSchema(ctx, schema, tb.Meta())

err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
err := d.verifySchemaMetaVersion(txn, schemaMetaVersion)
if err != nil {
return errors.Trace(err)
}
err = d.updateInfoSchema(schema, tb.Meta(), txn)
return errors.Trace(err)
})
if d.onDDLChange != nil {
err = d.onDDLChange(err)
}
return errors.Trace(err)
}

Expand All @@ -457,23 +515,36 @@ func (d *ddl) DropTable(ctx context.Context, ti table.Ident) (err error) {
}
// update InfoSchema before delete all the table data.
clonedInfo := is.Clone()
for _, info := range clonedInfo {
if info.Name == ti.Schema {
var newTableInfos []*model.TableInfo
// append other tables.
for _, tbInfo := range info.Tables {
if tbInfo.Name.L != ti.Name.L {
newTableInfos = append(newTableInfos, tbInfo)

err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
err := d.verifySchemaMetaVersion(txn, is.SchemaMetaVersion())
if err != nil {
return errors.Trace(err)
}
for _, info := range clonedInfo {
if info.Name == ti.Schema {
var newTableInfos []*model.TableInfo
// append other tables.
for _, tbInfo := range info.Tables {
if tbInfo.Name.L != ti.Name.L {
newTableInfos = append(newTableInfos, tbInfo)
}
}
info.Tables = newTableInfos
err = d.writeSchemaInfo(info, txn)
if err != nil {
return errors.Trace(err)
}
}
info.Tables = newTableInfos
err = d.writeSchemaInfo(info)
if err != nil {
return errors.Trace(err)
}
}
return nil
})
if d.onDDLChange != nil {
err = d.onDDLChange(err)
if err != nil {
return errors.Trace(err)
}
}
d.infoHandle.Set(clonedInfo)
err = d.deleteTableData(ctx, tb)
return errors.Trace(err)
}
Expand Down Expand Up @@ -557,7 +628,18 @@ func (d *ddl) CreateIndex(ctx context.Context, ti table.Ident, unique bool, inde
}

// update InfoSchema
return d.updateInfoSchema(ctx, ti.Schema, tbInfo)
err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
err := d.verifySchemaMetaVersion(txn, is.SchemaMetaVersion())
if err != nil {
return errors.Trace(err)
}
err = d.updateInfoSchema(ti.Schema, tbInfo, txn)
return errors.Trace(err)
})
if d.onDDLChange != nil {
err = d.onDDLChange(err)
}
return errors.Trace(err)
}

func (d *ddl) buildIndex(ctx context.Context, t table.Table, idxInfo *model.IndexInfo, unique bool) error {
Expand Down Expand Up @@ -622,25 +704,25 @@ func (d *ddl) DropIndex(ctx context.Context, schema, tableName, indexNmae model.
return nil
}

func (d *ddl) writeSchemaInfo(info *model.DBInfo) error {
func (d *ddl) writeSchemaInfo(info *model.DBInfo, txn kv.Transaction) error {
var b []byte
b, err := json.Marshal(info)
if err != nil {
return errors.Trace(err)
}
err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
key := []byte(meta.DBMetaKey(info.ID))
if err := txn.LockKeys(key); err != nil {
return errors.Trace(err)
}
return txn.Set(key, b)
})
key := []byte(meta.DBMetaKey(info.ID))
if err := txn.LockKeys(key); err != nil {
return errors.Trace(err)
}
txn.Set(key, b)
log.Warn("save schema", string(b))

return errors.Trace(err)
}

func (d *ddl) updateInfoSchema(ctx context.Context, schema model.CIStr, tbInfo *model.TableInfo) error {
clonedInfo := d.GetInformationSchema().Clone()
func (d *ddl) updateInfoSchema(schema model.CIStr, tbInfo *model.TableInfo, txn kv.Transaction) error {
is := d.GetInformationSchema()
clonedInfo := is.Clone()
for _, info := range clonedInfo {
if info.Name == schema {
var match bool
Expand All @@ -653,12 +735,11 @@ func (d *ddl) updateInfoSchema(ctx context.Context, schema model.CIStr, tbInfo *
if !match {
info.Tables = append(info.Tables, tbInfo)
}
err := d.writeSchemaInfo(info)
err := d.writeSchemaInfo(info, txn)
if err != nil {
return errors.Trace(err)
}
}
}
d.infoHandle.Set(clonedInfo)
return nil
}
Loading

0 comments on commit 1373e0e

Please sign in to comment.