Skip to content

Commit

Permalink
*: support create table and drop table.
Browse files Browse the repository at this point in the history
  • Loading branch information
siddontang committed Oct 27, 2015
1 parent 3896dfb commit c8bbf78
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 32 deletions.
37 changes: 13 additions & 24 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,18 +372,15 @@ func (d *ddl) CreateTable(ctx context.Context, ident table.Ident, colDefs []*col
if err != nil {
return errors.Trace(err)
}
log.Infof("New table: %+v", tbInfo)

err = d.meta.RunInNewTxn(false, func(t *meta.TMeta) error {
err := d.verifySchemaMetaVersion(t, is.SchemaMetaVersion())
if err != nil {
return errors.Trace(err)
}

err = t.CreateTable(schema.ID, tbInfo)
return errors.Trace(err)
})
job := &model.Job{
SchemaID: schema.ID,
TableID: tbInfo.ID,
Type: model.ActionCreateTable,
Args: []interface{}{tbInfo},
}

err = d.startJob(ctx, job)
err = d.onDDLChange(err)
return errors.Trace(err)
}
Expand Down Expand Up @@ -534,22 +531,14 @@ func (d *ddl) DropTable(ctx context.Context, ti table.Ident) (err error) {
return errors.Trace(err)
}

err = d.meta.RunInNewTxn(false, func(t *meta.TMeta) error {
err := d.verifySchemaMetaVersion(t, is.SchemaMetaVersion())
if err != nil {
return errors.Trace(err)
}

err = t.DropTable(schema.ID, tb.Meta().ID)
return errors.Trace(err)
})

err = d.onDDLChange(err)
if err != nil {
return errors.Trace(err)
job := &model.Job{
SchemaID: schema.ID,
TableID: tb.Meta().ID,
Type: model.ActionDropTable,
}

err = d.deleteTableData(ctx, tb)
err = d.startJob(ctx, job)
err = d.onDDLChange(err)
return errors.Trace(err)
}

Expand Down
174 changes: 174 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import (
"github.com/juju/errors"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/table"
"github.com/reborndb/go/errors2"
)

func (d *ddl) onTableCreate(t *meta.TMeta, job *model.Job) error {
schemaID := job.SchemaID
tbInfo := &model.TableInfo{}
if err := job.DecodeArgs(tbInfo); err != nil {
// arg error, cancel this job.
job.State = model.JobCancelled
return errors.Trace(err)
}

tbInfo.State = model.StateNone

tables, err := t.ListTables(schemaID)
if err != nil {
return errors.Trace(err)
}

for _, tbl := range tables {
if tbl.Name.L == tbInfo.Name.L {
if tbl.ID != tbInfo.ID {
// table exists, can't create, we should cancel this job now.
job.State = model.JobCancelled
return errors.Trace(ErrExists)
}

tbInfo = tbl
}
}

_, err = t.GenSchemaVersion()
if err != nil {
return errors.Trace(err)
}

switch tbInfo.State {
case model.StateNone:
// none -> delete only
tbInfo.State = model.StateDeleteOnly
err = t.CreateTable(schemaID, tbInfo)
return errors.Trace(err)
case model.StateDeleteOnly:
// delete only -> write only
tbInfo.State = model.StateWriteOnly
err = t.UpdateTable(schemaID, tbInfo)
return errors.Trace(err)
case model.StateWriteOnly:
// write only -> public
tbInfo.State = model.StatePublic
err = t.UpdateTable(schemaID, tbInfo)
if err != nil {
return errors.Trace(err)
}

// finish this job
job.State = model.JobDone
return nil
default:
return errors.Errorf("invalid table state %v", tbInfo.State)
}
}

func (d *ddl) onTableDrop(t *meta.TMeta, job *model.Job) error {
schemaID := job.SchemaID
tableID := job.TableID

tblInfo, err := t.GetTable(schemaID, tableID)
if err != nil {
return errors.Trace(err)
} else if tblInfo == nil {
job.State = model.JobCancelled
return errors.Trace(ErrNotExists)
}

_, err = t.GenSchemaVersion()
if err != nil {
return errors.Trace(err)
}

switch tblInfo.State {
case model.StatePublic:
// public -> write only
tblInfo.State = model.StateWriteOnly
err = t.UpdateTable(schemaID, tblInfo)
return errors.Trace(err)
case model.StateWriteOnly:
// write only -> delete only
tblInfo.State = model.StateDeleteOnly
err = t.UpdateTable(schemaID, tblInfo)
return errors.Trace(err)
case model.StateDeleteOnly:
// delete only -> reorganization
tblInfo.State = model.StateReorgnization
err = t.UpdateTable(schemaID, tblInfo)
return errors.Trace(err)
case model.StateReorgnization:
// reorganization -> absent
var dbInfo *model.DBInfo
dbInfo, err = t.GetDatabase(schemaID)
if err != nil {
return errors.Trace(err)
}

err = d.runReorgJob(func() error {
return d.dropTableData(dbInfo, tblInfo)
})

if errors2.ErrorEqual(err, errWaitReorgTimeout) {
// if timeout, we should return, check for the owner and re-wait job done.
return nil
}
if err != nil {
return errors.Trace(err)
}

// all reorgnization jobs done, drop this database
if err = t.DropTable(schemaID, tableID); err != nil {
return errors.Trace(err)
}

// finish this job
job.State = model.JobDone
return nil
default:
return errors.Errorf("invalid table state %v", tblInfo.State)
}
}

func (d *ddl) dropTableData(dbInfo *model.DBInfo, tblInfo *model.TableInfo) error {
ctx := d.newReorgContext()
txn, err := ctx.GetTxn(true)

alloc := autoid.NewAllocator(d.meta, dbInfo.ID)
t := table.TableFromMeta(dbInfo.Name.L, alloc, tblInfo)
err = t.Truncate(ctx)
if err != nil {
ctx.FinishTxn(true)
return errors.Trace(err)
}

// Remove indices.
for _, v := range t.Indices() {
if v != nil && v.X != nil {
if err = v.X.Drop(txn); err != nil {
ctx.FinishTxn(true)
return errors.Trace(err)
}
}
}

return errors.Trace(ctx.FinishTxn(false))
}
4 changes: 3 additions & 1 deletion ddl/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (d *ddl) onWorker() {

err := d.handleJobQueue()
if err != nil {
log.Errorf("handle job err %v", err)
log.Errorf("handle job err %v", errors.ErrorStack(err))
}
}
}
Expand All @@ -242,7 +242,9 @@ func (d *ddl) runJob(t *meta.TMeta, job *model.Job) error {
case model.ActionDropSchema:
err = d.onSchemaDrop(t, job)
case model.ActionCreateTable:
err = d.onTableCreate(t, job)
case model.ActionDropTable:
err = d.onTableDrop(t, job)
case model.ActionAddColumn:
case model.ActionDropColumn:
case model.ActionAddIndex:
Expand Down
9 changes: 8 additions & 1 deletion domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,14 @@ func (do *Domain) loadInfoSchema(m *meta.TMeta) (err error) {
return errors.Trace(err)
}

di.Tables = tables
di.Tables = make([]*model.TableInfo, 0, len(tables))
for _, tbl := range tables {
if tbl.State != model.StatePublic {
// schema is not public, can't be used outsiee.
continue
}
di.Tables = append(di.Tables, tbl)
}
}

log.Infof("loadInfoSchema %d", schemaMetaVersion)
Expand Down
2 changes: 1 addition & 1 deletion meta/autoid/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (alloc *allocator) Alloc(tableID int64) (int64, error) {
}

alloc.base++
log.Infof("Alloc id %d, table ID:%d, from %p, database ID:%s", alloc.base, tableID, alloc, alloc.dbID)
log.Infof("Alloc id %d, table ID:%d, from %p, database ID:%d", alloc.base, tableID, alloc, alloc.dbID)
return alloc.base, nil
}

Expand Down
2 changes: 1 addition & 1 deletion model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (job *Job) DecodeArgs(args ...interface{}) error {

// String implements fmt.Stringer interface.
func (job *Job) String() string {
return fmt.Sprintf("ID:%d, Type:%s, State:%s, SchemaID:%d, TableID:%d, Args:%q",
return fmt.Sprintf("ID:%d, Type:%s, State:%s, SchemaID:%d, TableID:%d, Args:%s",
job.ID, job.Type, job.State, job.SchemaID, job.TableID, job.RawArgs)
}

Expand Down
2 changes: 1 addition & 1 deletion structure/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (t *TxStructure) HLen(key []byte) (int64, error) {
func (t *TxStructure) HDel(key []byte, fields ...[]byte) error {
metaKey := t.encodeHashMetaKey(key)
meta, err := t.loadHashMeta(metaKey)
if err != nil {
if err != nil || meta.IsEmpty() {
return errors.Trace(err)
}

Expand Down
5 changes: 4 additions & 1 deletion structure/structure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/pingcap/tidb/store/localstore/goleveldb"
)

func TesTxStructure(t *testing.T) {
func TestStructure(t *testing.T) {
TestingT(t)
}

Expand Down Expand Up @@ -239,6 +239,9 @@ func (s *tesTxStructureSuite) TestHash(c *C) {
c.Assert(err, IsNil)
c.Assert(l, Equals, int64(0))

err = tx.HDel(key, []byte("fake"))
c.Assert(err, IsNil)

err = tx.Commit()
c.Assert(err, IsNil)

Expand Down
7 changes: 5 additions & 2 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Table struct {
recordPrefix string
indexPrefix string
alloc autoid.Allocator
state model.SchemaState
}

// TableFromMeta creates a Table instance from model.TableInfo.
Expand All @@ -67,6 +68,7 @@ func TableFromMeta(dbname string, alloc autoid.Allocator, tblInfo *model.TableIn
t.AddIndex(idx)
}

t.state = tblInfo.State
return t
}

Expand Down Expand Up @@ -107,8 +109,9 @@ func (t *Table) TableName() model.CIStr {
// Meta implements table.Table Meta interface.
func (t *Table) Meta() *model.TableInfo {
ti := &model.TableInfo{
Name: t.Name,
ID: t.ID,
Name: t.Name,
ID: t.ID,
State: t.state,
}
// load table meta
for _, col := range t.Columns {
Expand Down

0 comments on commit c8bbf78

Please sign in to comment.