Skip to content

Commit

Permalink
*: add add column ddl skeleton.
Browse files Browse the repository at this point in the history
  • Loading branch information
qiuyesuifeng committed Oct 31, 2015
1 parent ad280e2 commit 6515263
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 76 deletions.
95 changes: 65 additions & 30 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,20 +177,6 @@ func (d *ddl) CreateSchema(ctx context.Context, schema model.CIStr) (err error)
return errors.Trace(err)
}

func (d *ddl) verifySchemaMetaVersion(t *meta.Meta, schemaMetaVersion int64) error {
curVer, err := t.GetSchemaVersion()
if err != nil {
return errors.Trace(err)
}
if curVer != schemaMetaVersion {
return errors.Errorf("Schema changed, our version %d, but got %d", schemaMetaVersion, curVer)
}

// Increment version.
_, err = t.GenSchemaVersion()
return errors.Trace(err)
}

func (d *ddl) DropSchema(ctx context.Context, schema model.CIStr) (err error) {
is := d.GetInformationSchema()
old, ok := is.SchemaByName(schema)
Expand Down Expand Up @@ -283,7 +269,7 @@ func (d *ddl) buildColumnsAndConstraints(colDefs []*coldef.ColumnDef, constraint
}

func (d *ddl) buildColumnAndConstraint(offset int, colDef *coldef.ColumnDef) (*column.Col, []*coldef.TableConstraint, error) {
// set charset
// Set charset.
if len(colDef.Tp.Charset) == 0 {
switch colDef.Tp.Tp {
case mysql.TypeString, mysql.TypeVarchar, mysql.TypeVarString, mysql.TypeBlob, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob:
Expand All @@ -293,15 +279,17 @@ func (d *ddl) buildColumnAndConstraint(offset int, colDef *coldef.ColumnDef) (*c
colDef.Tp.Collate = charset.CharsetBin
}
}
// convert colDef into col

col, cts, err := coldef.ColumnDefToCol(offset, colDef)
if err != nil {
return nil, nil, errors.Trace(err)
}

col.ID, err = d.genGlobalID()
if err != nil {
return nil, nil, errors.Trace(err)
}

return col, cts, nil
}

Expand Down Expand Up @@ -434,9 +422,7 @@ func (d *ddl) CreateTable(ctx context.Context, ident table.Ident, colDefs []*col
}

func (d *ddl) AlterTable(ctx context.Context, ident table.Ident, specs []*AlterSpecification) (err error) {
// Get database and table.
is := d.GetInformationSchema()

schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return errors.Trace(qerror.ErrDatabaseNotExist)
Expand All @@ -446,52 +432,88 @@ func (d *ddl) AlterTable(ctx context.Context, ident table.Ident, specs []*AlterS
if err != nil {
return errors.Trace(ErrNotExists)
}

for _, spec := range specs {
switch spec.Action {
case AlterAddColumn:
if err := d.addColumn(ctx, schema, tbl, spec, is.SchemaMetaVersion()); err != nil {
return errors.Trace(err)
}
default:
// TODO: process more actions
continue
err := d.alterTable(ctx, schema, tbl, spec)
if err != nil {
return errors.Trace(err)
}
}

return nil
}

func (d *ddl) alterTable(ctx context.Context, schema *model.DBInfo, t table.Table, spec *AlterSpecification) error {
var job *model.Job
switch spec.Action {
case AlterAddColumn:
job = &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
Type: model.ActionAddColumn,
Args: []interface{}{spec},
}
default:
// TODO: support more actions.
return errors.Errorf("Not support alter table spec - %v", spec)
}

err := d.startJob(ctx, job)
err = d.onDDLChange(err)
return nil
}

/*
func (d *ddl) verifySchemaMetaVersion(t *meta.Meta, schemaMetaVersion int64) error {
curVer, err := t.GetSchemaVersion()
if err != nil {
return errors.Trace(err)
}
if curVer != schemaMetaVersion {
return errors.Errorf("Schema changed, our version %d, but got %d", schemaMetaVersion, curVer)
}
// Increment version.
_, err = t.GenSchemaVersion()
return errors.Trace(err)
}
// Add a column into table
func (d *ddl) addColumn(ctx context.Context, schema *model.DBInfo, tbl table.Table, spec *AlterSpecification, schemaMetaVersion int64) error {
// Find position
// Check column name duplicate.
cols := tbl.Cols()
position := len(cols)
name := spec.Column.Name
// Check column name duplicate.
dc := column.FindCol(cols, name)
if dc != nil {
return errors.Errorf("Try to add a column with the same name of an already exists column.")
}
// Get column position.
if spec.Position.Type == ColumnPositionFirst {
position = 0
} else if spec.Position.Type == ColumnPositionAfter {
// Find the mentioned column.
c := column.FindCol(cols, spec.Position.RelativeColumn)
if c == nil {
return errors.Errorf("No such column: %v", name)
}
// Insert position is after the mentioned column.
position = c.Offset + 1
}
// TODO: set constraint
col, _, err := d.buildColumnAndConstraint(position, spec.Column)
if err != nil {
return errors.Trace(err)
}
// insert col into the right place of the column list
// Insert col into the right place of the column list.
newCols := make([]*column.Col, 0, len(cols)+1)
newCols = append(newCols, cols[:position]...)
newCols = append(newCols, col)
newCols = append(newCols, cols[position:]...)
// adjust position
if position != len(cols) {
offsetChange := make(map[int]int)
Expand Down Expand Up @@ -532,6 +554,7 @@ func (d *ddl) addColumn(ctx context.Context, schema *model.DBInfo, tbl table.Tab
err = d.onDDLChange(err)
return errors.Trace(err)
}
*/

func updateOldRows(ctx context.Context, t *tables.Table, col *column.Col) error {
txn, err := ctx.GetTxn(false)
Expand Down Expand Up @@ -660,3 +683,15 @@ func (d *ddl) DropIndex(ctx context.Context, schemaName, tableName, indexName mo
err = d.onDDLChange(err)
return errors.Trace(err)
}

// findCol finds column in cols by name.
func findCol(cols []*model.ColumnInfo, name string) *model.ColumnInfo {
name = strings.ToLower(name)
for _, col := range cols {
if col.Name.L == name {
return col
}
}

return nil
}
34 changes: 0 additions & 34 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package ddl

import (
"bytes"
"strings"

"github.com/juju/errors"
"github.com/ngaut/log"
Expand All @@ -29,39 +28,6 @@ import (
"github.com/pingcap/tidb/util/errors2"
)

func (d *ddl) getTableInfo(t *meta.Meta, job *model.Job) (*model.TableInfo, error) {
schemaID := job.SchemaID
tableID := job.TableID
tblInfo, err := t.GetTable(schemaID, tableID)
if errors2.ErrorEqual(err, meta.ErrDBNotExists) {
job.State = model.JobCancelled
return nil, errors.Trace(ErrNotExists)
} else if err != nil {
return nil, errors.Trace(err)
} else if tblInfo == nil {
job.State = model.JobCancelled
return nil, errors.Trace(ErrNotExists)
}

if tblInfo.State != model.StatePublic {
job.State = model.JobCancelled
return nil, errors.Errorf("table %s is not in public, but %s", tblInfo.Name.L, tblInfo.State)
}

return tblInfo, nil
}

// FindCol finds column in cols by name.
func findCol(cols []*model.ColumnInfo, name string) (c *model.ColumnInfo) {
name = strings.ToLower(name)
for _, c = range cols {
if c.Name.L == name {
return
}
}
return nil
}

func buildIndexInfo(tblInfo *model.TableInfo, unique bool, indexName model.CIStr, idxColNames []*coldef.IndexColName) (*model.IndexInfo, error) {
for _, col := range tblInfo.Columns {
if col.Name.L == indexName.L {
Expand Down
22 changes: 22 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,28 @@ func (d *ddl) getTable(t *meta.Meta, schemaID int64, tblInfo *model.TableInfo) (
return tbl, nil
}

func (d *ddl) getTableInfo(t *meta.Meta, job *model.Job) (*model.TableInfo, error) {
schemaID := job.SchemaID
tableID := job.TableID
tblInfo, err := t.GetTable(schemaID, tableID)
if errors2.ErrorEqual(err, meta.ErrDBNotExists) {
job.State = model.JobCancelled
return nil, errors.Trace(ErrNotExists)
} else if err != nil {
return nil, errors.Trace(err)
} else if tblInfo == nil {
job.State = model.JobCancelled
return nil, errors.Trace(ErrNotExists)
}

if tblInfo.State != model.StatePublic {
job.State = model.JobCancelled
return nil, errors.Errorf("table %s is not in public, but %s", tblInfo.Name.L, tblInfo.State)
}

return tblInfo, nil
}

func (d *ddl) dropTableData(t table.Table) error {
ctx := d.newReorgContext()
txn, err := ctx.GetTxn(true)
Expand Down
2 changes: 2 additions & 0 deletions ddl/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,9 @@ func (d *ddl) runJob(t *meta.Meta, job *model.Job) error {
case model.ActionDropTable:
err = d.onTableDrop(t, job)
case model.ActionAddColumn:
err = d.onColumnAdd(t, job)
case model.ActionDropColumn:
err = d.onColumnDrop(t, job)
case model.ActionAddIndex:
err = d.onIndexCreate(t, job)
case model.ActionDropIndex:
Expand Down
22 changes: 10 additions & 12 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,13 @@ func (m *Meta) parseTableID(key string) (int64, error) {

// GenAutoTableID adds step to the auto id of the table and returns the sum.
func (m *Meta) GenAutoTableID(dbID int64, tableID int64, step int64) (int64, error) {
// check db exists
// Check if db exists.
dbKey := m.dbKey(dbID)
if err := m.checkDBExists(dbKey); err != nil {
return 0, errors.Trace(err)
}

// check table exists
// Check if table exists.
tableKey := m.tableKey(tableID)
if err := m.checkTableExists(dbKey, tableKey); err != nil {
return 0, errors.Trace(err)
Expand Down Expand Up @@ -239,14 +239,14 @@ func (m *Meta) UpdateDatabase(dbInfo *model.DBInfo) error {

// CreateTable creates a table with tableInfo in database.
func (m *Meta) CreateTable(dbID int64, tableInfo *model.TableInfo) error {
// first check db exists or not.
// Check if db exists.
dbKey := m.dbKey(dbID)
if err := m.checkDBExists(dbKey); err != nil {
return errors.Trace(err)
}

// Check if table exists.
tableKey := m.tableKey(tableInfo.ID)
// then check table exists or not
if err := m.checkTableNotExists(dbKey, tableKey); err != nil {
return errors.Trace(err)
}
Expand All @@ -261,7 +261,7 @@ func (m *Meta) CreateTable(dbID int64, tableInfo *model.TableInfo) error {

// DropDatabase drops whole database.
func (m *Meta) DropDatabase(dbID int64) error {
// check if db exists.
// Check if db exists.
dbKey := m.dbKey(dbID)
if err := m.txn.HClear(dbKey); err != nil {
return errors.Trace(err)
Expand All @@ -276,14 +276,14 @@ func (m *Meta) DropDatabase(dbID int64) error {

// DropTable drops table in database.
func (m *Meta) DropTable(dbID int64, tableID int64) error {
// first check db exists or not.
// Check if db exists.
dbKey := m.dbKey(dbID)
if err := m.checkDBExists(dbKey); err != nil {
return errors.Trace(err)
}

// Check if table exists.
tableKey := m.tableKey(tableID)
// then check table exists or not
if err := m.checkTableExists(dbKey, tableKey); err != nil {
return errors.Trace(err)
}
Expand All @@ -301,15 +301,14 @@ func (m *Meta) DropTable(dbID int64, tableID int64) error {

// UpdateTable updates the table with table info.
func (m *Meta) UpdateTable(dbID int64, tableInfo *model.TableInfo) error {
// first check db exists or not.
// Check if db exists.
dbKey := m.dbKey(dbID)
if err := m.checkDBExists(dbKey); err != nil {
return errors.Trace(err)
}

// Check if table exists.
tableKey := m.tableKey(tableInfo.ID)

// then check table exists or not
if err := m.checkTableExists(dbKey, tableKey); err != nil {
return errors.Trace(err)
}
Expand All @@ -320,7 +319,6 @@ func (m *Meta) UpdateTable(dbID int64, tableInfo *model.TableInfo) error {
}

err = m.txn.HSet(dbKey, tableKey, data)

return errors.Trace(err)
}

Expand Down Expand Up @@ -390,7 +388,7 @@ func (m *Meta) GetDatabase(dbID int64) (*model.DBInfo, error) {

// GetTable gets the table value in database with tableID.
func (m *Meta) GetTable(dbID int64, tableID int64) (*model.TableInfo, error) {
// first check db exists or not.
// Check if db exists.
dbKey := m.dbKey(dbID)
if err := m.checkDBExists(dbKey); err != nil {
return nil, errors.Trace(err)
Expand Down

0 comments on commit 6515263

Please sign in to comment.