Skip to content

Commit

Permalink
kv, table: fix pingcap#463
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing committed Dec 2, 2015
1 parent 8c61de9 commit 7210f23
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 63 deletions.
30 changes: 15 additions & 15 deletions kv/index_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,40 +155,40 @@ func (c *kvIndex) GenIndexKey(indexedValues []interface{}, h int64) (key []byte,

// Create creates a new entry in the kvIndex data.
// If the index is unique and there already exists an entry with the same key, Create will return ErrKeyExists
func (c *kvIndex) Create(txn Transaction, indexedValues []interface{}, h int64) error {
func (c *kvIndex) Create(rm RetrieverMutator, indexedValues []interface{}, h int64) error {
key, distinct, err := c.GenIndexKey(indexedValues, h)
if err != nil {
return errors.Trace(err)
}
if !distinct {
// TODO: reconsider value
err = txn.Set(key, []byte("timestamp?"))
err = rm.Set(key, []byte("timestamp?"))
return errors.Trace(err)
}

_, err = txn.Get(key)
_, err = rm.Get(key)
if IsErrNotFound(err) {
err = txn.Set(key, encodeHandle(h))
err = rm.Set(key, encodeHandle(h))
return errors.Trace(err)
}

return errors.Trace(ErrKeyExists)
}

// Delete removes the entry for handle h and indexdValues from KV index.
func (c *kvIndex) Delete(txn Transaction, indexedValues []interface{}, h int64) error {
func (c *kvIndex) Delete(rm RetrieverMutator, indexedValues []interface{}, h int64) error {
key, _, err := c.GenIndexKey(indexedValues, h)
if err != nil {
return errors.Trace(err)
}
err = txn.Delete(key)
err = rm.Delete(key)
return errors.Trace(err)
}

// Drop removes the KV index from store.
func (c *kvIndex) Drop(txn Transaction) error {
func (c *kvIndex) Drop(rm RetrieverMutator) error {
prefix := []byte(c.prefix)
it, err := txn.Seek(Key(prefix))
it, err := rm.Seek(Key(prefix))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -199,7 +199,7 @@ func (c *kvIndex) Drop(txn Transaction) error {
if !strings.HasPrefix(it.Key(), c.prefix) {
break
}
err := txn.Delete([]byte(it.Key()))
err := rm.Delete([]byte(it.Key()))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -212,12 +212,12 @@ func (c *kvIndex) Drop(txn Transaction) error {
}

// Seek searches KV index for the entry with indexedValues.
func (c *kvIndex) Seek(txn Transaction, indexedValues []interface{}) (iter IndexIterator, hit bool, err error) {
func (c *kvIndex) Seek(rm RetrieverMutator, indexedValues []interface{}) (iter IndexIterator, hit bool, err error) {
key, _, err := c.GenIndexKey(indexedValues, 0)
if err != nil {
return nil, false, errors.Trace(err)
}
it, err := txn.Seek(key)
it, err := rm.Seek(key)
if err != nil {
return nil, false, errors.Trace(err)
}
Expand All @@ -230,22 +230,22 @@ func (c *kvIndex) Seek(txn Transaction, indexedValues []interface{}) (iter Index
}

// SeekFirst returns an iterator which points to the first entry of the KV index.
func (c *kvIndex) SeekFirst(txn Transaction) (iter IndexIterator, err error) {
func (c *kvIndex) SeekFirst(rm RetrieverMutator) (iter IndexIterator, err error) {
prefix := []byte(c.prefix)
it, err := txn.Seek(prefix)
it, err := rm.Seek(prefix)
if err != nil {
return nil, errors.Trace(err)
}
return &indexIter{it: it, idx: c, prefix: c.prefix}, nil
}

func (c *kvIndex) Exist(txn Transaction, indexedValues []interface{}, h int64) (bool, int64, error) {
func (c *kvIndex) Exist(rm RetrieverMutator, indexedValues []interface{}, h int64) (bool, int64, error) {
key, distinct, err := c.GenIndexKey(indexedValues, h)
if err != nil {
return false, 0, errors.Trace(err)
}

value, err := txn.Get(key)
value, err := rm.Get(key)
if IsErrNotFound(err) {
return false, 0, nil
}
Expand Down
14 changes: 7 additions & 7 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,11 @@ type IndexIterator interface {

// Index is the interface for index data on KV store.
type Index interface {
Create(txn Transaction, indexedValues []interface{}, h int64) error // supports insert into statement
Delete(txn Transaction, indexedValues []interface{}, h int64) error // supports delete from statement
Drop(txn Transaction) error // supports drop table, drop index statements
Exist(txn Transaction, indexedValues []interface{}, h int64) (bool, int64, error) // supports check index exist
GenIndexKey(indexedValues []interface{}, h int64) (key []byte, distinct bool, err error) // supports index check
Seek(txn Transaction, indexedValues []interface{}) (iter IndexIterator, hit bool, err error) // supports where clause
SeekFirst(txn Transaction) (iter IndexIterator, err error) // supports aggregate min / ascending order by
Create(rw RetrieverMutator, indexedValues []interface{}, h int64) error // supports insert into statement
Delete(rw RetrieverMutator, indexedValues []interface{}, h int64) error // supports delete from statement
Drop(rw RetrieverMutator) error // supports drop table, drop index statements
Exist(rw RetrieverMutator, indexedValues []interface{}, h int64) (bool, int64, error) // supports check index exist
GenIndexKey(indexedValues []interface{}, h int64) (key []byte, distinct bool, err error) // supports index check
Seek(rw RetrieverMutator, indexedValues []interface{}) (iter IndexIterator, hit bool, err error) // supports where clause
SeekFirst(rw RetrieverMutator) (iter IndexIterator, err error) // supports aggregate min / ascending order by
}
44 changes: 44 additions & 0 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,50 @@ func (s *testSessionSuite) TestIssue461(c *C) {
mustExecSQL(c, se, "drop table test;")
}

func (s *testSessionSuite) TestIssue463(c *C) {
// Testcase for https://github.com/pingcap/tidb/issues/463
store := newStore(c, s.dbName)
se := newSession(c, store, s.dbName)
mustExecSQL(c, se,
`CREATE TABLE test (
id int(11) UNSIGNED NOT NULL AUTO_INCREMENT,
val int UNIQUE,
PRIMARY KEY (id)
);`)
mustExecSQL(c, se, "insert into test(id, val) values(1, 1);")
mustExecFailed(c, se, "insert into test(id, val) values(2, 1);")
mustExecSQL(c, se, "insert into test(id, val) values(2, 2);")

mustExecSQL(c, se, "begin;")
mustExecSQL(c, se, "insert into test(id, val) values(3, 3);")
mustExecFailed(c, se, "insert into test(id, val) values(4, 3);")
mustExecSQL(c, se, "insert into test(id, val) values(4, 4);")
mustExecSQL(c, se, "commit;")
se1 := newSession(c, store, s.dbName)
mustExecSQL(c, se1, "begin;")
mustExecSQL(c, se1, "insert into test(id, val) values(5, 6);")
mustExecSQL(c, se, "begin;")
mustExecSQL(c, se, "insert into test(id, val) values(20, 6);")
mustExecSQL(c, se, "commit;")
mustExecFailed(c, se1, "commit;")
mustExecSQL(c, se1, "insert into test(id, val) values(5, 5);")

mustExecSQL(c, se, "drop table test;")

mustExecSQL(c, se,
`CREATE TABLE test (
id int(11) UNSIGNED NOT NULL AUTO_INCREMENT,
val1 int UNIQUE,
val2 int UNIQUE,
PRIMARY KEY (id)
);`)
mustExecSQL(c, se, "insert into test(id, val1, val2) values(1, 1, 1);")
mustExecSQL(c, se, "insert into test(id, val1, val2) values(2, 2, 2);")
mustExecFailed(c, se, "update test set val1 = 3, val2 = 2 where id = 1;")
mustExecSQL(c, se, "insert into test(id, val1, val2) values(3, 3, 3);")
mustExecSQL(c, se, "drop table test;")
}

func (s *testSessionSuite) TestIssue177(c *C) {
store := newStore(c, s.dbName)
se := newSession(c, store, s.dbName)
Expand Down
6 changes: 3 additions & 3 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ type Table interface {
Row(ctx context.Context, h int64) ([]interface{}, error)

// RemoveRowIndex removes an index of a row.
RemoveRowIndex(ctx context.Context, h int64, vals []interface{}, idx *column.IndexedCol) error
RemoveRowIndex(rm kv.RetrieverMutator, h int64, vals []interface{}, idx *column.IndexedCol) error

// BuildIndexForRow builds an index for a row.
BuildIndexForRow(ctx context.Context, h int64, vals []interface{}, idx *column.IndexedCol) error
BuildIndexForRow(rm kv.RetrieverMutator, h int64, vals []interface{}, idx *column.IndexedCol) error

// TableName returns table name.
TableName() model.CIStr
Expand Down Expand Up @@ -104,7 +104,7 @@ type Table interface {

// SetColValue sets the column value.
// If the column is untouched, we don't need to do this.
SetColValue(txn kv.Transaction, key []byte, data interface{}) error
SetColValue(rm kv.RetrieverMutator, key []byte, data interface{}) error

// LockRow locks a row.
LockRow(ctx context.Context, h int64) error
Expand Down
64 changes: 35 additions & 29 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,26 @@ func (t *Table) UpdateRecord(ctx context.Context, h int64, oldData []interface{}
return errors.Trace(err)
}

txn, err := ctx.GetTxn(false)
if err != nil {
return errors.Trace(err)
}

bs := kv.NewBufferStore(txn)
defer bs.Release()

// set new value
if err := t.setNewData(ctx, h, touched, currentData); err != nil {
if err := t.setNewData(bs, h, touched, currentData); err != nil {
return errors.Trace(err)
}

// rebuild index
if err := t.rebuildIndices(ctx, h, touched, oldData, currentData); err != nil {
if err := t.rebuildIndices(bs, h, touched, oldData, currentData); err != nil {
return errors.Trace(err)
}

err = bs.Save(txn)
if err != nil {
return errors.Trace(err)
}

Expand All @@ -325,38 +338,33 @@ func (t *Table) setOnUpdateData(ctx context.Context, touched map[int]bool, data
}

// SetColValue implements table.Table SetColValue interface.
func (t *Table) SetColValue(txn kv.Transaction, key []byte, data interface{}) error {
func (t *Table) SetColValue(rm kv.RetrieverMutator, key []byte, data interface{}) error {
v, err := t.EncodeValue(data)
if err != nil {
return errors.Trace(err)
}
if err := txn.Set(key, v); err != nil {
if err := rm.Set(key, v); err != nil {
return errors.Trace(err)
}
return nil
}

func (t *Table) setNewData(ctx context.Context, h int64, touched map[int]bool, data []interface{}) error {
txn, err := ctx.GetTxn(false)
if err != nil {
return errors.Trace(err)
}

func (t *Table) setNewData(rm kv.RetrieverMutator, h int64, touched map[int]bool, data []interface{}) error {
for _, col := range t.Cols() {
if !touched[col.Offset] {
continue
}

k := t.RecordKey(h, col)
if err := t.SetColValue(txn, k, data[col.Offset]); err != nil {
if err := t.SetColValue(rm, k, data[col.Offset]); err != nil {
return errors.Trace(err)
}
}

return nil
}

func (t *Table) rebuildIndices(ctx context.Context, h int64, touched map[int]bool, oldData []interface{}, newData []interface{}) error {
func (t *Table) rebuildIndices(rm kv.RetrieverMutator, h int64, touched map[int]bool, oldData []interface{}, newData []interface{}) error {
for _, idx := range t.Indices() {
idxTouched := false
for _, ic := range idx.Columns {
Expand All @@ -374,7 +382,7 @@ func (t *Table) rebuildIndices(ctx context.Context, h int64, touched map[int]boo
return errors.Trace(err)
}

if t.RemoveRowIndex(ctx, h, oldVs, idx); err != nil {
if t.RemoveRowIndex(rm, h, oldVs, idx); err != nil {
return errors.Trace(err)
}

Expand All @@ -383,7 +391,7 @@ func (t *Table) rebuildIndices(ctx context.Context, h int64, touched map[int]boo
return errors.Trace(err)
}

if err := t.BuildIndexForRow(ctx, h, newVs, idx); err != nil {
if err := t.BuildIndexForRow(rm, h, newVs, idx); err != nil {
return errors.Trace(err)
}
}
Expand All @@ -405,17 +413,20 @@ func (t *Table) AddRecord(ctx context.Context, r []interface{}, h int64) (record
if err != nil {
return 0, errors.Trace(err)
}
bs := kv.NewBufferStore(txn)
defer bs.Release()

for _, v := range t.indices {
if v == nil || v.State == model.StateDeleteOnly || v.State == model.StateDeleteReorganization {
// if index is in delete only or delete reorganization state, we can't add it.
continue
}
colVals, _ := v.FetchValues(r)
if err = v.X.Create(txn, colVals, recordID); err != nil {
if err = v.X.Create(bs, colVals, recordID); err != nil {
if terror.ErrorEqual(err, kv.ErrKeyExists) {
// Get the duplicate row handle
// For insert on duplicate syntax, we should update the row
iter, _, err1 := v.X.Seek(txn, colVals)
iter, _, err1 := v.X.Seek(bs, colVals)
if err1 != nil {
return 0, errors.Trace(err1)
}
Expand Down Expand Up @@ -458,6 +469,10 @@ func (t *Table) AddRecord(ctx context.Context, r []interface{}, h int64) (record
}
}

if err = bs.Save(txn); err != nil {
return 0, errors.Trace(err)
}

variable.GetSessionVars(ctx).AddAffectedRows(1)
return recordID, nil
}
Expand Down Expand Up @@ -605,30 +620,21 @@ func (t *Table) removeRowIndices(ctx context.Context, h int64, rec []interface{}
}

// RemoveRowIndex implements table.Table RemoveRowIndex interface.
func (t *Table) RemoveRowIndex(ctx context.Context, h int64, vals []interface{}, idx *column.IndexedCol) error {
txn, err := ctx.GetTxn(false)
if err != nil {
return errors.Trace(err)
}
if err = idx.X.Delete(txn, vals, h); err != nil {
func (t *Table) RemoveRowIndex(rm kv.RetrieverMutator, h int64, vals []interface{}, idx *column.IndexedCol) error {
if err := idx.X.Delete(rm, vals, h); err != nil {
return errors.Trace(err)
}
return nil
}

// BuildIndexForRow implements table.Table BuildIndexForRow interface.
func (t *Table) BuildIndexForRow(ctx context.Context, h int64, vals []interface{}, idx *column.IndexedCol) error {
func (t *Table) BuildIndexForRow(rm kv.RetrieverMutator, h int64, vals []interface{}, idx *column.IndexedCol) error {
if idx.State == model.StateDeleteOnly || idx.State == model.StateDeleteReorganization {
// If the index is in delete only or write reorganization state, we can not add index.
return nil
}

txn, err := ctx.GetTxn(false)
if err != nil {
return errors.Trace(err)
}

if err = idx.X.Create(txn, vals, h); err != nil {
if err := idx.X.Create(rm, vals, h); err != nil {
return errors.Trace(err)
}
return nil
Expand Down
22 changes: 14 additions & 8 deletions table/tables/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,24 @@ func (ts *testSuite) TestBasic(c *C) {
return true, nil
})

c.Assert(tb.RemoveRecord(ctx, rid, []interface{}{1, "cba"}), IsNil)
indexCnt := func() int {
cnt, err := countEntriesWithPrefix(ctx, tb.IndexPrefix())
c.Assert(err, IsNil)
return cnt
}

// Make sure there is index data in the storage.
prefix := tb.IndexPrefix()
cnt, err := countEntriesWithPrefix(ctx, prefix)
c.Assert(indexCnt(), Greater, 0)
c.Assert(tb.RemoveRecord(ctx, rid, []interface{}{1, "cba"}), IsNil)
// Make sure index data is also removed after tb.RemoveRecord().
c.Assert(indexCnt(), Equals, 0)
_, err = tb.AddRecord(ctx, []interface{}{1, "abc"}, 0)
c.Assert(err, IsNil)
c.Assert(cnt, Greater, 0)
c.Assert(tb.Truncate(ctx), IsNil)
c.Assert(indexCnt(), Greater, 0)
// Make sure index data is also removed after tb.Truncate().
cnt, err = countEntriesWithPrefix(ctx, prefix)
c.Assert(err, IsNil)
c.Assert(cnt, Equals, 0)
c.Assert(tb.Truncate(ctx), IsNil)
c.Assert(indexCnt(), Equals, 0)

_, err = ts.se.Execute("drop table test.t")
c.Assert(err, IsNil)
}
Expand Down
2 changes: 1 addition & 1 deletion tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func mustExecMatch(c *C, se Session, sql string, expected [][]interface{}) {

func mustExecFailed(c *C, se Session, sql string, args ...interface{}) {
r, err := exec(c, se, sql, args...)
if err == nil {
if err == nil && r != nil {
// sometimes we may meet error after executing first row.
_, err = r.FirstRow()
}
Expand Down

0 comments on commit 7210f23

Please sign in to comment.