Skip to content

Commit

Permalink
meta, structure: support snapshot meta. (pingcap#1739)
Browse files Browse the repository at this point in the history
Snapshot meta will be used to load history schema, so we can
support history read with changed schema version.
  • Loading branch information
coocood authored Sep 20, 2016
1 parent ee9df58 commit 6819e22
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 38 deletions.
9 changes: 8 additions & 1 deletion meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var (
//

var (
mMetaPrefix = []byte("m")
mNextGlobalIDKey = []byte("NextGlobalID")
mSchemaVersionKey = []byte("SchemaVersionKey")
mDBs = []byte("DBs")
Expand Down Expand Up @@ -83,7 +84,13 @@ type Meta struct {

// NewMeta creates a Meta in transaction txn.
func NewMeta(txn kv.Transaction) *Meta {
t := structure.NewStructure(txn, []byte{'m'})
t := structure.NewStructure(txn, txn, mMetaPrefix)
return &Meta{txn: t}
}

// NewSnapshotMeta creates a Meta with snapshot.
func NewSnapshotMeta(snapshot kv.Snapshot) *Meta {
t := structure.NewStructure(snapshot, nil, mMetaPrefix)
return &Meta{txn: t}
}

Expand Down
32 changes: 32 additions & 0 deletions meta/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package meta_test

import (
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/meta"
Expand Down Expand Up @@ -163,6 +164,37 @@ func (s *testSuite) TestMeta(c *C) {
c.Assert(err, IsNil)
}

func (s *testSuite) TestSnapshot(c *C) {
defer testleak.AfterTest(c)()
driver := localstore.Driver{Driver: goleveldb.MemoryDriver{}}
store, err := driver.Open("memory")
c.Assert(err, IsNil)
defer store.Close()

txn, _ := store.Begin()
m := meta.NewMeta(txn)
m.GenGlobalID()
n, _ := m.GetGlobalID()
c.Assert(n, Equals, int64(1))
txn.Commit()

ver1, _ := store.CurrentVersion()
time.Sleep(time.Millisecond)
txn, _ = store.Begin()
m = meta.NewMeta(txn)
m.GenGlobalID()
n, _ = m.GetGlobalID()
c.Assert(n, Equals, int64(2))
txn.Commit()

snapshot, _ := store.GetSnapshot(ver1)
snapMeta := meta.NewSnapshotMeta(snapshot)
n, _ = snapMeta.GetGlobalID()
c.Assert(n, Equals, int64(1))
_, err = snapMeta.GenGlobalID()
c.Assert(err, NotNil)
}

func (s *testSuite) TestDDL(c *C) {
defer testleak.AfterTest(c)()
driver := localstore.Driver{Driver: goleveldb.MemoryDriver{}}
Expand Down
31 changes: 20 additions & 11 deletions structure/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ func (meta hashMeta) IsEmpty() bool {

// HSet sets the string value of a hash field.
func (t *TxStructure) HSet(key []byte, field []byte, value []byte) error {
if t.readWriter == nil {
return errWriteOnSnapshot
}
return t.updateHash(key, field, func([]byte) ([]byte, error) {
return value, nil
})
Expand All @@ -53,7 +56,7 @@ func (t *TxStructure) HSet(key []byte, field []byte, value []byte) error {
// HGet gets the value of a hash field.
func (t *TxStructure) HGet(key []byte, field []byte) ([]byte, error) {
dataKey := t.encodeHashDataKey(key, field)
value, err := t.txn.Get(dataKey)
value, err := t.reader.Get(dataKey)
if terror.ErrorEqual(err, kv.ErrNotExist) {
err = nil
}
Expand All @@ -63,6 +66,9 @@ func (t *TxStructure) HGet(key []byte, field []byte) ([]byte, error) {
// HInc increments the integer value of a hash field, by step, returns
// the value after the increment.
func (t *TxStructure) HInc(key []byte, field []byte, step int64) (int64, error) {
if t.readWriter == nil {
return 0, errWriteOnSnapshot
}
base := int64(0)
err := t.updateHash(key, field, func(oldValue []byte) ([]byte, error) {
if oldValue != nil {
Expand Down Expand Up @@ -108,7 +114,7 @@ func (t *TxStructure) updateHash(key []byte, field []byte, fn func(oldValue []by
return nil
}

if err = t.txn.Set(dataKey, newValue); err != nil {
if err = t.readWriter.Set(dataKey, newValue); err != nil {
return errors.Trace(err)
}

Expand All @@ -120,7 +126,7 @@ func (t *TxStructure) updateHash(key []byte, field []byte, fn func(oldValue []by

if oldValue == nil {
meta.FieldCount++
if err = t.txn.Set(metaKey, meta.Value()); err != nil {
if err = t.readWriter.Set(metaKey, meta.Value()); err != nil {
return errors.Trace(err)
}
}
Expand All @@ -140,6 +146,9 @@ func (t *TxStructure) HLen(key []byte) (int64, error) {

// HDel deletes one or more hash fields.
func (t *TxStructure) HDel(key []byte, fields ...[]byte) error {
if t.readWriter == nil {
return errWriteOnSnapshot
}
metaKey := t.encodeHashMetaKey(key)
meta, err := t.loadHashMeta(metaKey)
if err != nil || meta.IsEmpty() {
Expand All @@ -156,7 +165,7 @@ func (t *TxStructure) HDel(key []byte, fields ...[]byte) error {
}

if value != nil {
if err = t.txn.Delete(dataKey); err != nil {
if err = t.readWriter.Delete(dataKey); err != nil {
return errors.Trace(err)
}

Expand All @@ -165,9 +174,9 @@ func (t *TxStructure) HDel(key []byte, fields ...[]byte) error {
}

if meta.IsEmpty() {
err = t.txn.Delete(metaKey)
err = t.readWriter.Delete(metaKey)
} else {
err = t.txn.Set(metaKey, meta.Value())
err = t.readWriter.Set(metaKey, meta.Value())
}

return errors.Trace(err)
Expand Down Expand Up @@ -209,19 +218,19 @@ func (t *TxStructure) HClear(key []byte) error {

err = t.iterateHash(key, func(field []byte, value []byte) error {
k := t.encodeHashDataKey(key, field)
return errors.Trace(t.txn.Delete(k))
return errors.Trace(t.readWriter.Delete(k))
})

if err != nil {
return errors.Trace(err)
}

return errors.Trace(t.txn.Delete(metaKey))
return errors.Trace(t.readWriter.Delete(metaKey))
}

func (t *TxStructure) iterateHash(key []byte, fn func(k []byte, v []byte) error) error {
dataPrefix := t.hashDataKeyPrefix(key)
it, err := t.txn.Seek(dataPrefix)
it, err := t.reader.Seek(dataPrefix)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -252,7 +261,7 @@ func (t *TxStructure) iterateHash(key []byte, fn func(k []byte, v []byte) error)
}

func (t *TxStructure) loadHashMeta(metaKey []byte) (hashMeta, error) {
v, err := t.txn.Get(metaKey)
v, err := t.reader.Get(metaKey)
if terror.ErrorEqual(err, kv.ErrNotExist) {
err = nil
} else if err != nil {
Expand All @@ -273,7 +282,7 @@ func (t *TxStructure) loadHashMeta(metaKey []byte) (hashMeta, error) {
}

func (t *TxStructure) loadHashValue(dataKey []byte) ([]byte, error) {
v, err := t.txn.Get(dataKey)
v, err := t.reader.Get(dataKey)
if terror.ErrorEqual(err, kv.ErrNotExist) {
err = nil
v = nil
Expand Down
34 changes: 23 additions & 11 deletions structure/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ func (t *TxStructure) RPush(key []byte, values ...[]byte) error {
}

func (t *TxStructure) listPush(key []byte, left bool, values ...[]byte) error {
if t.readWriter == nil {
return errWriteOnSnapshot
}
if len(values) == 0 {
return nil
}
Expand All @@ -69,12 +72,12 @@ func (t *TxStructure) listPush(key []byte, left bool, values ...[]byte) error {
}

dataKey := t.encodeListDataKey(key, index)
if err = t.txn.Set(dataKey, v); err != nil {
if err = t.readWriter.Set(dataKey, v); err != nil {
return errors.Trace(err)
}
}

return t.txn.Set(metaKey, meta.Value())
return t.readWriter.Set(metaKey, meta.Value())
}

// LPop removes and gets the first element in a list.
Expand All @@ -88,6 +91,9 @@ func (t *TxStructure) RPop(key []byte) ([]byte, error) {
}

func (t *TxStructure) listPop(key []byte, left bool) ([]byte, error) {
if t.readWriter == nil {
return nil, errWriteOnSnapshot
}
metaKey := t.encodeListMetaKey(key)
meta, err := t.loadListMeta(metaKey)
if err != nil || meta.IsEmpty() {
Expand All @@ -106,19 +112,19 @@ func (t *TxStructure) listPop(key []byte, left bool) ([]byte, error) {
dataKey := t.encodeListDataKey(key, index)

var data []byte
data, err = t.txn.Get(dataKey)
data, err = t.reader.Get(dataKey)
if err != nil {
return nil, errors.Trace(err)
}

if err = t.txn.Delete(dataKey); err != nil {
if err = t.readWriter.Delete(dataKey); err != nil {
return nil, errors.Trace(err)
}

if !meta.IsEmpty() {
err = t.txn.Set(metaKey, meta.Value())
err = t.readWriter.Set(metaKey, meta.Value())
} else {
err = t.txn.Delete(metaKey)
err = t.readWriter.Delete(metaKey)
}

return data, errors.Trace(err)
Expand All @@ -142,13 +148,16 @@ func (t *TxStructure) LIndex(key []byte, index int64) ([]byte, error) {
index = adjustIndex(index, meta.LIndex, meta.RIndex)

if index >= meta.LIndex && index < meta.RIndex {
return t.txn.Get(t.encodeListDataKey(key, index))
return t.reader.Get(t.encodeListDataKey(key, index))
}
return nil, nil
}

// LSet updates an element in the list by its index.
func (t *TxStructure) LSet(key []byte, index int64, value []byte) error {
if t.readWriter == nil {
return errWriteOnSnapshot
}
metaKey := t.encodeListMetaKey(key)
meta, err := t.loadListMeta(metaKey)
if err != nil || meta.IsEmpty() {
Expand All @@ -158,13 +167,16 @@ func (t *TxStructure) LSet(key []byte, index int64, value []byte) error {
index = adjustIndex(index, meta.LIndex, meta.RIndex)

if index >= meta.LIndex && index < meta.RIndex {
return t.txn.Set(t.encodeListDataKey(key, index), value)
return t.readWriter.Set(t.encodeListDataKey(key, index), value)
}
return errInvalidListIndex.Gen("invalid list index %d", index)
}

// LClear removes the list of the key.
func (t *TxStructure) LClear(key []byte) error {
if t.readWriter == nil {
return errWriteOnSnapshot
}
metaKey := t.encodeListMetaKey(key)
meta, err := t.loadListMeta(metaKey)
if err != nil || meta.IsEmpty() {
Expand All @@ -173,16 +185,16 @@ func (t *TxStructure) LClear(key []byte) error {

for index := meta.LIndex; index < meta.RIndex; index++ {
dataKey := t.encodeListDataKey(key, index)
if err = t.txn.Delete(dataKey); err != nil {
if err = t.readWriter.Delete(dataKey); err != nil {
return errors.Trace(err)
}
}

return t.txn.Delete(metaKey)
return t.readWriter.Delete(metaKey)
}

func (t *TxStructure) loadListMeta(metaKey []byte) (listMeta, error) {
v, err := t.txn.Get(metaKey)
v, err := t.reader.Get(metaKey)
if terror.ErrorEqual(err, kv.ErrNotExist) {
err = nil
} else if err != nil {
Expand Down
18 changes: 13 additions & 5 deletions structure/string.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,17 @@ import (

// Set sets the string value of the key.
func (t *TxStructure) Set(key []byte, value []byte) error {
if t.readWriter == nil {
return errWriteOnSnapshot
}
ek := t.encodeStringDataKey(key)

return t.txn.Set(ek, value)
return t.readWriter.Set(ek, value)
}

// Get gets the string value of a key.
func (t *TxStructure) Get(key []byte) ([]byte, error) {
ek := t.encodeStringDataKey(key)
value, err := t.txn.Get(ek)
value, err := t.reader.Get(ek)
if terror.ErrorEqual(err, kv.ErrNotExist) {
err = nil
}
Expand All @@ -52,9 +54,12 @@ func (t *TxStructure) GetInt64(key []byte) (int64, error) {
// Inc increments the integer value of a key by step, returns
// the value after the increment.
func (t *TxStructure) Inc(key []byte, step int64) (int64, error) {
if t.readWriter == nil {
return 0, errWriteOnSnapshot
}
ek := t.encodeStringDataKey(key)
// txn Inc will lock this key, so we don't lock it here.
n, err := kv.IncInt64(t.txn, ek, step)
n, err := kv.IncInt64(t.readWriter, ek, step)
if terror.ErrorEqual(err, kv.ErrNotExist) {
err = nil
}
Expand All @@ -63,8 +68,11 @@ func (t *TxStructure) Inc(key []byte, step int64) (int64, error) {

// Clear removes the string value of the key.
func (t *TxStructure) Clear(key []byte) error {
if t.readWriter == nil {
return errWriteOnSnapshot
}
ek := t.encodeStringDataKey(key)
err := t.txn.Delete(ek)
err := t.readWriter.Delete(ek)
if terror.ErrorEqual(err, kv.ErrNotExist) {
err = nil
}
Expand Down
16 changes: 10 additions & 6 deletions structure/structure.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,30 @@ const (
codeInvalidHashKeyPrefix = 2
codeInvalidListIndex = 3
codeInvalidListMetaData = 4
codeWriteOnSnapshot = 5
)

var (
errInvalidHashKeyFlag = terror.ClassStructure.New(codeInvalidHashKeyFlag, "invalid encoded hash key flag")
errInvalidHashKeyPrefix = terror.ClassStructure.New(codeInvalidHashKeyPrefix, "invalid encoded hash key prefix")
errInvalidListIndex = terror.ClassStructure.New(codeInvalidListMetaData, "invalid list index")
errInvalidListMetaData = terror.ClassStructure.New(codeInvalidListMetaData, "invalid list meta data")
errWriteOnSnapshot = terror.ClassStructure.New(codeWriteOnSnapshot, "write on snapshot")
)

// NewStructure creates a TxStructure in transaction txn and with key prefix.
func NewStructure(txn kv.Transaction, prefix []byte) *TxStructure {
// NewStructure creates a TxStructure with Retriever, RetrieverMutator and key prefix.
func NewStructure(reader kv.Retriever, readWriter kv.RetrieverMutator, prefix []byte) *TxStructure {
return &TxStructure{
txn: txn,
prefix: prefix,
reader: reader,
readWriter: readWriter,
prefix: prefix,
}
}

// TxStructure supports some simple data structures like string, hash, list, etc... and
// you can use these in a transaction.
type TxStructure struct {
txn kv.Transaction
prefix []byte
reader kv.Retriever
readWriter kv.RetrieverMutator
prefix []byte
}
Loading

0 comments on commit 6819e22

Please sign in to comment.