From 6819e22c3c430d6aabb3f88a74125bdf43d8b3f6 Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Tue, 20 Sep 2016 11:11:10 +0800 Subject: [PATCH] meta, structure: support snapshot meta. (#1739) Snapshot meta will be used to load history schema, so we can support history read with changed schema version. --- meta/meta.go | 9 ++++++++- meta/meta_test.go | 32 ++++++++++++++++++++++++++++++++ structure/hash.go | 31 ++++++++++++++++++++----------- structure/list.go | 34 +++++++++++++++++++++++----------- structure/string.go | 18 +++++++++++++----- structure/structure.go | 16 ++++++++++------ structure/structure_test.go | 8 ++++---- 7 files changed, 110 insertions(+), 38 deletions(-) diff --git a/meta/meta.go b/meta/meta.go index 5e48cd92a48b2..6cbbfdf6235bf 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -52,6 +52,7 @@ var ( // var ( + mMetaPrefix = []byte("m") mNextGlobalIDKey = []byte("NextGlobalID") mSchemaVersionKey = []byte("SchemaVersionKey") mDBs = []byte("DBs") @@ -83,7 +84,13 @@ type Meta struct { // NewMeta creates a Meta in transaction txn. func NewMeta(txn kv.Transaction) *Meta { - t := structure.NewStructure(txn, []byte{'m'}) + t := structure.NewStructure(txn, txn, mMetaPrefix) + return &Meta{txn: t} +} + +// NewSnapshotMeta creates a Meta with snapshot. +func NewSnapshotMeta(snapshot kv.Snapshot) *Meta { + t := structure.NewStructure(snapshot, nil, mMetaPrefix) return &Meta{txn: t} } diff --git a/meta/meta_test.go b/meta/meta_test.go index 38a37c36f934c..9bcd6169e15ec 100644 --- a/meta/meta_test.go +++ b/meta/meta_test.go @@ -15,6 +15,7 @@ package meta_test import ( "testing" + "time" . "github.com/pingcap/check" "github.com/pingcap/tidb/meta" @@ -163,6 +164,37 @@ func (s *testSuite) TestMeta(c *C) { c.Assert(err, IsNil) } +func (s *testSuite) TestSnapshot(c *C) { + defer testleak.AfterTest(c)() + driver := localstore.Driver{Driver: goleveldb.MemoryDriver{}} + store, err := driver.Open("memory") + c.Assert(err, IsNil) + defer store.Close() + + txn, _ := store.Begin() + m := meta.NewMeta(txn) + m.GenGlobalID() + n, _ := m.GetGlobalID() + c.Assert(n, Equals, int64(1)) + txn.Commit() + + ver1, _ := store.CurrentVersion() + time.Sleep(time.Millisecond) + txn, _ = store.Begin() + m = meta.NewMeta(txn) + m.GenGlobalID() + n, _ = m.GetGlobalID() + c.Assert(n, Equals, int64(2)) + txn.Commit() + + snapshot, _ := store.GetSnapshot(ver1) + snapMeta := meta.NewSnapshotMeta(snapshot) + n, _ = snapMeta.GetGlobalID() + c.Assert(n, Equals, int64(1)) + _, err = snapMeta.GenGlobalID() + c.Assert(err, NotNil) +} + func (s *testSuite) TestDDL(c *C) { defer testleak.AfterTest(c)() driver := localstore.Driver{Driver: goleveldb.MemoryDriver{}} diff --git a/structure/hash.go b/structure/hash.go index bff8734f06a67..8bb9ba552648a 100644 --- a/structure/hash.go +++ b/structure/hash.go @@ -45,6 +45,9 @@ func (meta hashMeta) IsEmpty() bool { // HSet sets the string value of a hash field. func (t *TxStructure) HSet(key []byte, field []byte, value []byte) error { + if t.readWriter == nil { + return errWriteOnSnapshot + } return t.updateHash(key, field, func([]byte) ([]byte, error) { return value, nil }) @@ -53,7 +56,7 @@ func (t *TxStructure) HSet(key []byte, field []byte, value []byte) error { // HGet gets the value of a hash field. func (t *TxStructure) HGet(key []byte, field []byte) ([]byte, error) { dataKey := t.encodeHashDataKey(key, field) - value, err := t.txn.Get(dataKey) + value, err := t.reader.Get(dataKey) if terror.ErrorEqual(err, kv.ErrNotExist) { err = nil } @@ -63,6 +66,9 @@ func (t *TxStructure) HGet(key []byte, field []byte) ([]byte, error) { // HInc increments the integer value of a hash field, by step, returns // the value after the increment. func (t *TxStructure) HInc(key []byte, field []byte, step int64) (int64, error) { + if t.readWriter == nil { + return 0, errWriteOnSnapshot + } base := int64(0) err := t.updateHash(key, field, func(oldValue []byte) ([]byte, error) { if oldValue != nil { @@ -108,7 +114,7 @@ func (t *TxStructure) updateHash(key []byte, field []byte, fn func(oldValue []by return nil } - if err = t.txn.Set(dataKey, newValue); err != nil { + if err = t.readWriter.Set(dataKey, newValue); err != nil { return errors.Trace(err) } @@ -120,7 +126,7 @@ func (t *TxStructure) updateHash(key []byte, field []byte, fn func(oldValue []by if oldValue == nil { meta.FieldCount++ - if err = t.txn.Set(metaKey, meta.Value()); err != nil { + if err = t.readWriter.Set(metaKey, meta.Value()); err != nil { return errors.Trace(err) } } @@ -140,6 +146,9 @@ func (t *TxStructure) HLen(key []byte) (int64, error) { // HDel deletes one or more hash fields. func (t *TxStructure) HDel(key []byte, fields ...[]byte) error { + if t.readWriter == nil { + return errWriteOnSnapshot + } metaKey := t.encodeHashMetaKey(key) meta, err := t.loadHashMeta(metaKey) if err != nil || meta.IsEmpty() { @@ -156,7 +165,7 @@ func (t *TxStructure) HDel(key []byte, fields ...[]byte) error { } if value != nil { - if err = t.txn.Delete(dataKey); err != nil { + if err = t.readWriter.Delete(dataKey); err != nil { return errors.Trace(err) } @@ -165,9 +174,9 @@ func (t *TxStructure) HDel(key []byte, fields ...[]byte) error { } if meta.IsEmpty() { - err = t.txn.Delete(metaKey) + err = t.readWriter.Delete(metaKey) } else { - err = t.txn.Set(metaKey, meta.Value()) + err = t.readWriter.Set(metaKey, meta.Value()) } return errors.Trace(err) @@ -209,19 +218,19 @@ func (t *TxStructure) HClear(key []byte) error { err = t.iterateHash(key, func(field []byte, value []byte) error { k := t.encodeHashDataKey(key, field) - return errors.Trace(t.txn.Delete(k)) + return errors.Trace(t.readWriter.Delete(k)) }) if err != nil { return errors.Trace(err) } - return errors.Trace(t.txn.Delete(metaKey)) + return errors.Trace(t.readWriter.Delete(metaKey)) } func (t *TxStructure) iterateHash(key []byte, fn func(k []byte, v []byte) error) error { dataPrefix := t.hashDataKeyPrefix(key) - it, err := t.txn.Seek(dataPrefix) + it, err := t.reader.Seek(dataPrefix) if err != nil { return errors.Trace(err) } @@ -252,7 +261,7 @@ func (t *TxStructure) iterateHash(key []byte, fn func(k []byte, v []byte) error) } func (t *TxStructure) loadHashMeta(metaKey []byte) (hashMeta, error) { - v, err := t.txn.Get(metaKey) + v, err := t.reader.Get(metaKey) if terror.ErrorEqual(err, kv.ErrNotExist) { err = nil } else if err != nil { @@ -273,7 +282,7 @@ func (t *TxStructure) loadHashMeta(metaKey []byte) (hashMeta, error) { } func (t *TxStructure) loadHashValue(dataKey []byte) ([]byte, error) { - v, err := t.txn.Get(dataKey) + v, err := t.reader.Get(dataKey) if terror.ErrorEqual(err, kv.ErrNotExist) { err = nil v = nil diff --git a/structure/list.go b/structure/list.go index 3649450f5dffc..37731dbbcbf5a 100644 --- a/structure/list.go +++ b/structure/list.go @@ -48,6 +48,9 @@ func (t *TxStructure) RPush(key []byte, values ...[]byte) error { } func (t *TxStructure) listPush(key []byte, left bool, values ...[]byte) error { + if t.readWriter == nil { + return errWriteOnSnapshot + } if len(values) == 0 { return nil } @@ -69,12 +72,12 @@ func (t *TxStructure) listPush(key []byte, left bool, values ...[]byte) error { } dataKey := t.encodeListDataKey(key, index) - if err = t.txn.Set(dataKey, v); err != nil { + if err = t.readWriter.Set(dataKey, v); err != nil { return errors.Trace(err) } } - return t.txn.Set(metaKey, meta.Value()) + return t.readWriter.Set(metaKey, meta.Value()) } // LPop removes and gets the first element in a list. @@ -88,6 +91,9 @@ func (t *TxStructure) RPop(key []byte) ([]byte, error) { } func (t *TxStructure) listPop(key []byte, left bool) ([]byte, error) { + if t.readWriter == nil { + return nil, errWriteOnSnapshot + } metaKey := t.encodeListMetaKey(key) meta, err := t.loadListMeta(metaKey) if err != nil || meta.IsEmpty() { @@ -106,19 +112,19 @@ func (t *TxStructure) listPop(key []byte, left bool) ([]byte, error) { dataKey := t.encodeListDataKey(key, index) var data []byte - data, err = t.txn.Get(dataKey) + data, err = t.reader.Get(dataKey) if err != nil { return nil, errors.Trace(err) } - if err = t.txn.Delete(dataKey); err != nil { + if err = t.readWriter.Delete(dataKey); err != nil { return nil, errors.Trace(err) } if !meta.IsEmpty() { - err = t.txn.Set(metaKey, meta.Value()) + err = t.readWriter.Set(metaKey, meta.Value()) } else { - err = t.txn.Delete(metaKey) + err = t.readWriter.Delete(metaKey) } return data, errors.Trace(err) @@ -142,13 +148,16 @@ func (t *TxStructure) LIndex(key []byte, index int64) ([]byte, error) { index = adjustIndex(index, meta.LIndex, meta.RIndex) if index >= meta.LIndex && index < meta.RIndex { - return t.txn.Get(t.encodeListDataKey(key, index)) + return t.reader.Get(t.encodeListDataKey(key, index)) } return nil, nil } // LSet updates an element in the list by its index. func (t *TxStructure) LSet(key []byte, index int64, value []byte) error { + if t.readWriter == nil { + return errWriteOnSnapshot + } metaKey := t.encodeListMetaKey(key) meta, err := t.loadListMeta(metaKey) if err != nil || meta.IsEmpty() { @@ -158,13 +167,16 @@ func (t *TxStructure) LSet(key []byte, index int64, value []byte) error { index = adjustIndex(index, meta.LIndex, meta.RIndex) if index >= meta.LIndex && index < meta.RIndex { - return t.txn.Set(t.encodeListDataKey(key, index), value) + return t.readWriter.Set(t.encodeListDataKey(key, index), value) } return errInvalidListIndex.Gen("invalid list index %d", index) } // LClear removes the list of the key. func (t *TxStructure) LClear(key []byte) error { + if t.readWriter == nil { + return errWriteOnSnapshot + } metaKey := t.encodeListMetaKey(key) meta, err := t.loadListMeta(metaKey) if err != nil || meta.IsEmpty() { @@ -173,16 +185,16 @@ func (t *TxStructure) LClear(key []byte) error { for index := meta.LIndex; index < meta.RIndex; index++ { dataKey := t.encodeListDataKey(key, index) - if err = t.txn.Delete(dataKey); err != nil { + if err = t.readWriter.Delete(dataKey); err != nil { return errors.Trace(err) } } - return t.txn.Delete(metaKey) + return t.readWriter.Delete(metaKey) } func (t *TxStructure) loadListMeta(metaKey []byte) (listMeta, error) { - v, err := t.txn.Get(metaKey) + v, err := t.reader.Get(metaKey) if terror.ErrorEqual(err, kv.ErrNotExist) { err = nil } else if err != nil { diff --git a/structure/string.go b/structure/string.go index ecd730a2bec74..a8e4a914f22ea 100644 --- a/structure/string.go +++ b/structure/string.go @@ -23,15 +23,17 @@ import ( // Set sets the string value of the key. func (t *TxStructure) Set(key []byte, value []byte) error { + if t.readWriter == nil { + return errWriteOnSnapshot + } ek := t.encodeStringDataKey(key) - - return t.txn.Set(ek, value) + return t.readWriter.Set(ek, value) } // Get gets the string value of a key. func (t *TxStructure) Get(key []byte) ([]byte, error) { ek := t.encodeStringDataKey(key) - value, err := t.txn.Get(ek) + value, err := t.reader.Get(ek) if terror.ErrorEqual(err, kv.ErrNotExist) { err = nil } @@ -52,9 +54,12 @@ func (t *TxStructure) GetInt64(key []byte) (int64, error) { // Inc increments the integer value of a key by step, returns // the value after the increment. func (t *TxStructure) Inc(key []byte, step int64) (int64, error) { + if t.readWriter == nil { + return 0, errWriteOnSnapshot + } ek := t.encodeStringDataKey(key) // txn Inc will lock this key, so we don't lock it here. - n, err := kv.IncInt64(t.txn, ek, step) + n, err := kv.IncInt64(t.readWriter, ek, step) if terror.ErrorEqual(err, kv.ErrNotExist) { err = nil } @@ -63,8 +68,11 @@ func (t *TxStructure) Inc(key []byte, step int64) (int64, error) { // Clear removes the string value of the key. func (t *TxStructure) Clear(key []byte) error { + if t.readWriter == nil { + return errWriteOnSnapshot + } ek := t.encodeStringDataKey(key) - err := t.txn.Delete(ek) + err := t.readWriter.Delete(ek) if terror.ErrorEqual(err, kv.ErrNotExist) { err = nil } diff --git a/structure/structure.go b/structure/structure.go index 64320e7f3886a..81853deeeafed 100644 --- a/structure/structure.go +++ b/structure/structure.go @@ -24,6 +24,7 @@ const ( codeInvalidHashKeyPrefix = 2 codeInvalidListIndex = 3 codeInvalidListMetaData = 4 + codeWriteOnSnapshot = 5 ) var ( @@ -31,19 +32,22 @@ var ( errInvalidHashKeyPrefix = terror.ClassStructure.New(codeInvalidHashKeyPrefix, "invalid encoded hash key prefix") errInvalidListIndex = terror.ClassStructure.New(codeInvalidListMetaData, "invalid list index") errInvalidListMetaData = terror.ClassStructure.New(codeInvalidListMetaData, "invalid list meta data") + errWriteOnSnapshot = terror.ClassStructure.New(codeWriteOnSnapshot, "write on snapshot") ) -// NewStructure creates a TxStructure in transaction txn and with key prefix. -func NewStructure(txn kv.Transaction, prefix []byte) *TxStructure { +// NewStructure creates a TxStructure with Retriever, RetrieverMutator and key prefix. +func NewStructure(reader kv.Retriever, readWriter kv.RetrieverMutator, prefix []byte) *TxStructure { return &TxStructure{ - txn: txn, - prefix: prefix, + reader: reader, + readWriter: readWriter, + prefix: prefix, } } // TxStructure supports some simple data structures like string, hash, list, etc... and // you can use these in a transaction. type TxStructure struct { - txn kv.Transaction - prefix []byte + reader kv.Retriever + readWriter kv.RetrieverMutator + prefix []byte } diff --git a/structure/structure_test.go b/structure/structure_test.go index 815c109fa5c6e..3c0b2d6db3e88 100644 --- a/structure/structure_test.go +++ b/structure/structure_test.go @@ -54,7 +54,7 @@ func (s *testTxStructureSuite) TestString(c *C) { c.Assert(err, IsNil) defer txn.Rollback() - tx := NewStructure(txn, []byte{0x00}) + tx := NewStructure(txn, txn, []byte{0x00}) key := []byte("a") value := []byte("1") @@ -94,7 +94,7 @@ func (s *testTxStructureSuite) TestList(c *C) { c.Assert(err, IsNil) defer txn.Rollback() - tx := NewStructure(txn, []byte{0x00}) + tx := NewStructure(txn, txn, []byte{0x00}) key := []byte("a") err = tx.LPush(key, []byte("3"), []byte("2"), []byte("1")) @@ -177,7 +177,7 @@ func (s *testTxStructureSuite) TestHash(c *C) { c.Assert(err, IsNil) defer txn.Rollback() - tx := NewStructure(txn, []byte{0x00}) + tx := NewStructure(txn, txn, []byte{0x00}) key := []byte("a") @@ -331,7 +331,7 @@ func (s *testTxStructureSuite) TestHash(c *C) { c.Assert(err, IsNil) err = kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { - t := NewStructure(txn, []byte{0x00}) + t := NewStructure(txn, txn, []byte{0x00}) err = t.Set(key, []byte("abc")) c.Assert(err, IsNil)