Skip to content

Commit

Permalink
Merge branch 'master' into zimuxia/inspectkv-sql
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala committed Dec 29, 2015
2 parents 8993f0e + e45c31b commit be2039c
Show file tree
Hide file tree
Showing 67 changed files with 722 additions and 578 deletions.
5 changes: 0 additions & 5 deletions ast/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,15 +310,10 @@ const (
// See: https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_subdate
// See: https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_date-sub
DateSub
// DateArithDaysForm is to run adddate or subdate function with days form Flag.
DateArithDaysForm
)

// DateArithInterval is the struct of DateArith interval part.
type DateArithInterval struct {
// Form is the flag of DateArith running form.
// The function runs with interval or days.
Form DateArithType
Unit string
Interval ExprNode
}
Expand Down
5 changes: 5 additions & 0 deletions column/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ func (c *Col) CheckNotNull(data interface{}) error {
return nil
}

// IsPKHandleColumn checks if the column is primary key handle column.
func (c *Col) IsPKHandleColumn(tbInfo *model.TableInfo) bool {
return mysql.HasPriKeyFlag(c.Flag) && tbInfo.PKIsHandle
}

// CheckNotNull checks if row has nil value set to a column with NotNull flag set.
func CheckNotNull(cols []*Col, row []interface{}) error {
for _, c := range cols {
Expand Down
16 changes: 8 additions & 8 deletions ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (s *testColumnSuite) TestColumn(c *C) {

num := 10
for i := 0; i < num; i++ {
_, err := t.AddRecord(ctx, []interface{}{i, 10 * i, 100 * i}, 0)
_, err := t.AddRecord(ctx, []interface{}{i, 10 * i, 100 * i})
c.Assert(err, IsNil)
}

Expand Down Expand Up @@ -155,7 +155,7 @@ func (s *testColumnSuite) TestColumn(c *C) {
})
c.Assert(i, Equals, int64(num))

h, err := t.AddRecord(ctx, []interface{}{11, 12, 13, 14}, 0)
h, err := t.AddRecord(ctx, []interface{}{11, 12, 13, 14})
c.Assert(err, IsNil)
err = ctx.FinishTxn(false)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -320,7 +320,7 @@ func (s *testColumnSuite) checkDeleteOnlyColumn(c *C, ctx context.Context, d *dd
c.Assert(err, IsNil)

newRow := []interface{}{int64(11), int64(22), int64(33)}
handle, err = t.AddRecord(ctx, newRow, 0)
handle, err = t.AddRecord(ctx, newRow)
c.Assert(err, IsNil)

txn, err = ctx.GetTxn(true)
Expand Down Expand Up @@ -381,7 +381,7 @@ func (s *testColumnSuite) checkWriteOnlyColumn(c *C, ctx context.Context, d *ddl
c.Assert(err, IsNil)

newRow := []interface{}{int64(11), int64(22), int64(33)}
handle, err = t.AddRecord(ctx, newRow, 0)
handle, err = t.AddRecord(ctx, newRow)
c.Assert(err, IsNil)

txn, err = ctx.GetTxn(true)
Expand Down Expand Up @@ -440,7 +440,7 @@ func (s *testColumnSuite) checkReorganizationColumn(c *C, ctx context.Context, d
c.Assert(err, IsNil)

newRow := []interface{}{int64(11), int64(22), int64(33)}
handle, err = t.AddRecord(ctx, newRow, 0)
handle, err = t.AddRecord(ctx, newRow)
c.Assert(err, IsNil)

txn, err = ctx.GetTxn(true)
Expand Down Expand Up @@ -499,7 +499,7 @@ func (s *testColumnSuite) checkPublicColumn(c *C, ctx context.Context, d *ddl, t
c.Assert(err, IsNil)

newRow := []interface{}{int64(11), int64(22), int64(33), int64(44)}
handle, err = t.AddRecord(ctx, newRow, 0)
handle, err = t.AddRecord(ctx, newRow)
c.Assert(err, IsNil)

txn, err = ctx.GetTxn(true)
Expand Down Expand Up @@ -577,7 +577,7 @@ func (s *testColumnSuite) TestAddColumn(c *C) {
t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)

row := []interface{}{int64(1), int64(2), int64(3)}
handle, err := t.AddRecord(ctx, row, 0)
handle, err := t.AddRecord(ctx, row)
c.Assert(err, IsNil)

err = ctx.FinishTxn(false)
Expand Down Expand Up @@ -645,7 +645,7 @@ func (s *testColumnSuite) TestDropColumn(c *C) {
colName := "c4"
defaultColValue := int64(4)
row := []interface{}{int64(1), int64(2), int64(3)}
handle, err := t.AddRecord(ctx, append(row, defaultColValue), 0)
handle, err := t.AddRecord(ctx, append(row, defaultColValue))
c.Assert(err, IsNil)

err = ctx.FinishTxn(false)
Expand Down
14 changes: 14 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,20 @@ func (d *ddl) buildTableInfo(tableName model.CIStr, cols []*column.Col, constrai
tbInfo.Columns = append(tbInfo.Columns, &v.ColumnInfo)
}
for _, constr := range constraints {
if constr.Tp == coldef.ConstrPrimaryKey {
if len(constr.Keys) == 1 {
key := constr.Keys[0]
col := column.FindCol(cols, key.ColumnName)
if col == nil {
return nil, errors.Errorf("No such column: %v", key)
}
switch col.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
tbInfo.PKIsHandle = true
}
}
}

// 1. check if the column is exists
// 2. add index
indexColumns := make([]*model.IndexColumn, 0, len(constr.Keys))
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ LOOP:
err = t.IterRecords(txn, t.FirstKey(), t.Cols(), func(h int64, data []interface{}, cols []*column.Col) (bool, error) {
i++
k := t.RecordKey(h, col)
_, err1 := txn.Get([]byte(k))
_, err1 := txn.Get(k)
c.Assert(terror.ErrorEqual(err1, kv.ErrNotExist), IsTrue)
return true, nil
})
Expand Down
8 changes: 4 additions & 4 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (ts *testSuite) TestDDL(c *C) {
tb, err := sessionctx.GetDomain(ctx).InfoSchema().TableByName(tbIdent.Schema, tbIdent.Name)
c.Assert(err, IsNil)
c.Assert(tb, NotNil)
_, err = tb.AddRecord(ctx, []interface{}{1, "b", 2, 4}, 0)
_, err = tb.AddRecord(ctx, []interface{}{1, "b", 2, 4})
c.Assert(err, IsNil)

alterStmt := statement(ctx, "alter table t add column aa int first").(*stmts.AlterTableStmt)
Expand Down Expand Up @@ -126,9 +126,9 @@ func (ts *testSuite) TestDDL(c *C) {
tb, err = sessionctx.GetDomain(ctx).InfoSchema().TableByName(tbIdent2.Schema, tbIdent2.Name)
c.Assert(err, IsNil)
c.Assert(tb, NotNil)
rid0, err := tb.AddRecord(ctx, []interface{}{1}, 0)
rid0, err := tb.AddRecord(ctx, []interface{}{1})
c.Assert(err, IsNil)
rid1, err := tb.AddRecord(ctx, []interface{}{2}, 0)
rid1, err := tb.AddRecord(ctx, []interface{}{2})
c.Assert(err, IsNil)

alterStmt = statement(ctx, `alter table t2 add b enum("bb") first`).(*stmts.AlterTableStmt)
Expand All @@ -155,7 +155,7 @@ func (ts *testSuite) TestDDL(c *C) {
c.Assert(cols[0], Equals, nil)
c.Assert(cols[1], BytesEquals, []byte("abc"))
c.Assert(cols[2], Equals, int64(2))
rid3, err := tb.AddRecord(ctx, []interface{}{mysql.Enum{Name: "bb", Value: 1}, "c", 3}, 0)
rid3, err := tb.AddRecord(ctx, []interface{}{mysql.Enum{Name: "bb", Value: 1}, "c", 3})
c.Assert(err, IsNil)
cols, err = tb.Row(ctx, rid3)
c.Assert(err, IsNil)
Expand Down
28 changes: 10 additions & 18 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
package ddl

import (
"bytes"
"strings"

"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -299,7 +296,7 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) error {
}

func checkRowExist(txn kv.Transaction, t table.Table, handle int64) (bool, error) {
_, err := txn.Get([]byte(t.RecordKey(handle, nil)))
_, err := txn.Get(t.RecordKey(handle, nil))
if terror.ErrorEqual(err, kv.ErrNotExist) {
// If row doesn't exist, we may have deleted the row already,
// no need to add index again.
Expand All @@ -320,7 +317,7 @@ func fetchRowColVals(txn kv.Transaction, t table.Table, handle int64, indexInfo

col := cols[v.Offset]
k := t.RecordKey(handle, col)
data, err := txn.Get([]byte(k))
data, err := txn.Get(k)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -373,7 +370,6 @@ func (d *ddl) getSnapshotRows(t table.Table, version uint64, seekHandle int64) (
defer snap.Release()

firstKey := t.RecordKey(seekHandle, nil)
prefix := []byte(t.KeyPrefix())

it, err := snap.Seek(firstKey)
if err != nil {
Expand All @@ -384,13 +380,12 @@ func (d *ddl) getSnapshotRows(t table.Table, version uint64, seekHandle int64) (
handles := make([]int64, 0, maxBatchSize)

for it.Valid() {
key := []byte(it.Key())
if !bytes.HasPrefix(key, prefix) {
if !it.Key().HasPrefix(t.RecordPrefix()) {
break
}

var handle int64
handle, err = tables.DecodeRecordKeyHandle(string(key))
handle, err = tables.DecodeRecordKeyHandle(it.Key())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -418,7 +413,7 @@ func lockRow(txn kv.Transaction, t table.Table, h int64) error {
// Get row lock key
lockKey := t.RecordKey(h, nil)
// set row lock key to current txn
err := txn.Set([]byte(lockKey), []byte(txn.String()))
err := txn.Set(lockKey, []byte(txn.String()))
return errors.Trace(err)
}

Expand Down Expand Up @@ -481,21 +476,18 @@ func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, hand

func (d *ddl) dropTableIndex(t table.Table, indexInfo *model.IndexInfo) error {
prefix := kv.GenIndexPrefix(t.IndexPrefix(), indexInfo.ID)

prefixBytes := []byte(prefix)

for {
keys := make([]string, 0, maxBatchSize)
keys := make([]kv.Key, 0, maxBatchSize)
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
iter, err := txn.Seek(prefixBytes)
iter, err := txn.Seek(prefix)
if err != nil {
return errors.Trace(err)
}

defer iter.Close()
for i := 0; i < maxBatchSize; i++ {
if iter.Valid() && strings.HasPrefix(iter.Key(), prefix) {
keys = append(keys, iter.Key())
if iter.Valid() && iter.Key().HasPrefix(prefix) {
keys = append(keys, iter.Key().Clone())
err = iter.Next()
if err != nil {
return errors.Trace(err)
Expand All @@ -520,7 +512,7 @@ func (d *ddl) dropTableIndex(t table.Table, indexInfo *model.IndexInfo) error {
return errors.Trace(err1)
}

err1 := txn.Delete([]byte(key))
err1 := txn.Delete(key)
// if key doesn't exist, skip this error.
if err1 != nil && !terror.ErrorEqual(err1, kv.ErrNotExist) {
return errors.Trace(err1)
Expand Down
22 changes: 11 additions & 11 deletions ddl/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *testIndexSuite) TestIndex(c *C) {

num := 10
for i := 0; i < num; i++ {
_, err = t.AddRecord(ctx, []interface{}{i, i, i}, 0)
_, err = t.AddRecord(ctx, []interface{}{i, i, i})
c.Assert(err, IsNil)
}

Expand All @@ -122,14 +122,14 @@ func (s *testIndexSuite) TestIndex(c *C) {
index := t.FindIndexByColName("c1")
c.Assert(index, NotNil)

h, err := t.AddRecord(ctx, []interface{}{num + 1, 1, 1}, 0)
h, err := t.AddRecord(ctx, []interface{}{num + 1, 1, 1})
c.Assert(err, IsNil)

h1, err := t.AddRecord(ctx, []interface{}{num + 1, 1, 1}, 0)
h1, err := t.AddRecord(ctx, []interface{}{num + 1, 1, 1})
c.Assert(err, NotNil)
c.Assert(h, Equals, h1)

h, err = t.AddRecord(ctx, []interface{}{1, 1, 1}, 0)
h, err = t.AddRecord(ctx, []interface{}{1, 1, 1})
c.Assert(err, NotNil)

txn, err = ctx.GetTxn(true)
Expand All @@ -153,7 +153,7 @@ func (s *testIndexSuite) TestIndex(c *C) {
c.Assert(err, IsNil)
c.Assert(exist, IsFalse)

h, err = t.AddRecord(ctx, []interface{}{1, 1, 1}, 0)
h, err = t.AddRecord(ctx, []interface{}{1, 1, 1})
c.Assert(err, IsNil)
}

Expand Down Expand Up @@ -230,7 +230,7 @@ func (s *testIndexSuite) checkDeleteOnlyIndex(c *C, ctx context.Context, d *ddl,
c.Assert(err, IsNil)

newRow := []interface{}{int64(11), int64(22), int64(33)}
handle, err = t.AddRecord(ctx, newRow, 0)
handle, err = t.AddRecord(ctx, newRow)
c.Assert(err, IsNil)

txn, err = ctx.GetTxn(true)
Expand Down Expand Up @@ -317,7 +317,7 @@ func (s *testIndexSuite) checkWriteOnlyIndex(c *C, ctx context.Context, d *ddl,
c.Assert(err, IsNil)

newRow := []interface{}{int64(11), int64(22), int64(33)}
handle, err = t.AddRecord(ctx, newRow, 0)
handle, err = t.AddRecord(ctx, newRow)
c.Assert(err, IsNil)

txn, err = ctx.GetTxn(true)
Expand Down Expand Up @@ -397,7 +397,7 @@ func (s *testIndexSuite) checkReorganizationIndex(c *C, ctx context.Context, d *
c.Assert(err, IsNil)

newRow := []interface{}{int64(11), int64(22), int64(33)}
handle, err = t.AddRecord(ctx, newRow, 0)
handle, err = t.AddRecord(ctx, newRow)
c.Assert(err, IsNil)

txn, err = ctx.GetTxn(true)
Expand Down Expand Up @@ -484,7 +484,7 @@ func (s *testIndexSuite) checkPublicIndex(c *C, ctx context.Context, d *ddl, tbl
c.Assert(err, IsNil)

newRow := []interface{}{int64(11), int64(22), int64(33)}
handle, err = t.AddRecord(ctx, newRow, 0)
handle, err = t.AddRecord(ctx, newRow)
c.Assert(err, IsNil)

txn, err = ctx.GetTxn(true)
Expand Down Expand Up @@ -574,7 +574,7 @@ func (s *testIndexSuite) TestAddIndex(c *C) {
t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)

row := []interface{}{int64(1), int64(2), int64(3)}
handle, err := t.AddRecord(ctx, row, 0)
handle, err := t.AddRecord(ctx, row)
c.Assert(err, IsNil)

err = ctx.FinishTxn(false)
Expand Down Expand Up @@ -638,7 +638,7 @@ func (s *testIndexSuite) TestDropIndex(c *C) {
t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)

row := []interface{}{int64(1), int64(2), int64(3)}
handle, err := t.AddRecord(ctx, row, 0)
handle, err := t.AddRecord(ctx, row)
c.Assert(err, IsNil)

err = ctx.FinishTxn(false)
Expand Down
13 changes: 6 additions & 7 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package ddl

import (
"fmt"
"strings"
"time"

"github.com/juju/errors"
Expand Down Expand Up @@ -154,23 +153,23 @@ func (d *ddl) isReorgRunnable(txn kv.Transaction) error {
return nil
}

func (d *ddl) delKeysWithPrefix(prefix string) error {
func (d *ddl) delKeysWithPrefix(prefix kv.Key) error {
for {
keys := make([]string, 0, maxBatchSize)
keys := make([]kv.Key, 0, maxBatchSize)
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
if err1 := d.isReorgRunnable(txn); err1 != nil {
return errors.Trace(err1)
}

iter, err := txn.Seek([]byte(prefix))
iter, err := txn.Seek(prefix)
if err != nil {
return errors.Trace(err)
}

defer iter.Close()
for i := 0; i < maxBatchSize; i++ {
if iter.Valid() && strings.HasPrefix(iter.Key(), prefix) {
keys = append(keys, iter.Key())
if iter.Valid() && iter.Key().HasPrefix(prefix) {
keys = append(keys, iter.Key().Clone())
err = iter.Next()
if err != nil {
return errors.Trace(err)
Expand All @@ -181,7 +180,7 @@ func (d *ddl) delKeysWithPrefix(prefix string) error {
}

for _, key := range keys {
err := txn.Delete([]byte(key))
err := txn.Delete(key)
// must skip ErrNotExist
if err != nil && !terror.ErrorEqual(err, kv.ErrNotExist) {
return errors.Trace(err)
Expand Down
Loading

0 comments on commit be2039c

Please sign in to comment.