Skip to content

Commit

Permalink
util: add kvencoder package, make transfer sql to key-values be possi…
Browse files Browse the repository at this point in the history
…ble. (pingcap#5236)
  • Loading branch information
winkyao authored and coocood committed Nov 29, 2017
1 parent c790b50 commit 5f46848
Show file tree
Hide file tree
Showing 25 changed files with 755 additions and 60 deletions.
2 changes: 1 addition & 1 deletion ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ func (d *ddl) handleAutoIncID(tbInfo *model.TableInfo, schemaID int64) error {
// The operation of the minus 1 to make sure that the current value doesn't be used,
// the next Alloc operation will get this value.
// Its behavior is consistent with MySQL.
if err = tb.RebaseAutoID(tbInfo.AutoIncID-1, false); err != nil {
if err = tb.RebaseAutoID(nil, tbInfo.AutoIncID-1, false); err != nil {
return errors.Trace(err)
}
return nil
Expand Down
10 changes: 5 additions & 5 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (s *testSuite) TestSelectWithoutFrom(c *C) {
r.Check(testkit.Rows("string"))
}

// Issue 3685.
// TestSelectBackslashN Issue 3685.
func (s *testSuite) TestSelectBackslashN(c *C) {
tk := testkit.NewTestKit(c, s.store)

Expand Down Expand Up @@ -332,7 +332,7 @@ func (s *testSuite) TestSelectBackslashN(c *C) {
c.Check(fields[0].Column.Name.O, Equals, `N`)
}

// Issue #4053.
// TestSelectNull Issue #4053.
func (s *testSuite) TestSelectNull(c *C) {
tk := testkit.NewTestKit(c, s.store)

Expand Down Expand Up @@ -365,7 +365,7 @@ func (s *testSuite) TestSelectNull(c *C) {
c.Check(fields[0].Column.Name.O, Equals, `null+NULL`)
}

// Issue #3686.
// TestSelectStringLiteral Issue #3686.
func (s *testSuite) TestSelectStringLiteral(c *C) {
tk := testkit.NewTestKit(c, s.store)

Expand Down Expand Up @@ -1941,7 +1941,7 @@ func (s *testSuite) TestEmptyEnum(c *C) {
tk.MustQuery("select * from t").Check(testkit.Rows("", "", "<nil>"))
}

// This tests https://github.com/pingcap/tidb/issues/4024
// TestIssue4024 This tests https://github.com/pingcap/tidb/issues/4024
func (s *testSuite) TestIssue4024(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("create database test2")
Expand Down Expand Up @@ -2255,7 +2255,7 @@ func (s *testSuite) TestEnhancedRangeAccess(c *C) {
tk.MustQuery("select * from t where (a = 1 and b = 1) or (a = 2 and b = 2)").Check(nil)
}

// Issue #4810
// TestMaxInt64Handle Issue #4810
func (s *testSuite) TestMaxInt64Handle(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)

Expand Down
6 changes: 3 additions & 3 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func updateRecord(ctx context.Context, h int64, oldData, newData []types.Datum,
if errTI != nil {
return false, errors.Trace(errTI)
}
err := t.RebaseAutoID(val, true)
err := t.RebaseAutoID(ctx, val, true)
if err != nil {
return false, errors.Trace(err)
}
Expand Down Expand Up @@ -1110,7 +1110,7 @@ func (e *InsertValues) adjustAutoIncrementDatum(row []types.Datum, i int, c *tab
}
// Use the value if it's not null and not 0.
if recordID != 0 {
err = e.Table.RebaseAutoID(recordID, true)
err = e.Table.RebaseAutoID(e.ctx, recordID, true)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1127,7 +1127,7 @@ func (e *InsertValues) adjustAutoIncrementDatum(row []types.Datum, i int, c *tab
// Change NULL to auto id.
// Change value 0 to auto id, if NoAutoValueOnZero SQL mode is not set.
if row[i].IsNull() || e.ctx.GetSessionVars().SQLMode&mysql.ModeNoAutoValueOnZero == 0 {
recordID, err = e.Table.AllocAutoID()
recordID, err = e.Table.AllocAutoID(e.ctx)
if e.filterErr(errors.Trace(err), ignoreErr) != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (is *infoSchema) AllocByID(id int64) (autoid.Allocator, bool) {
if !ok {
return nil, false
}
return tbl.Allocator(), true
return tbl.Allocator(nil), true
}

func (is *infoSchema) AllSchemaNames() (names []string) {
Expand Down
6 changes: 3 additions & 3 deletions infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -1148,15 +1148,15 @@ func (it *infoschemaTable) UpdateRecord(ctx context.Context, h int64, oldData, n
return table.ErrUnsupportedOp
}

func (it *infoschemaTable) AllocAutoID() (int64, error) {
func (it *infoschemaTable) AllocAutoID(ctx context.Context) (int64, error) {
return 0, table.ErrUnsupportedOp
}

func (it *infoschemaTable) Allocator() autoid.Allocator {
func (it *infoschemaTable) Allocator(ctx context.Context) autoid.Allocator {
return nil
}

func (it *infoschemaTable) RebaseAutoID(newBase int64, isSetStep bool) error {
func (it *infoschemaTable) RebaseAutoID(ctx context.Context, newBase int64, isSetStep bool) error {
return table.ErrUnsupportedOp
}

Expand Down
16 changes: 1 addition & 15 deletions kv/buffer_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,7 @@ func (s *BufferStore) SeekReverse(k Key) (Iterator, error) {

// WalkBuffer iterates all buffered kv pairs.
func (s *BufferStore) WalkBuffer(f func(k Key, v []byte) error) error {
iter, err := s.MemBuffer.Seek(nil)
if err != nil {
return errors.Trace(err)
}
defer iter.Close()
for iter.Valid() {
if err = f(iter.Key(), iter.Value()); err != nil {
return errors.Trace(err)
}
err = iter.Next()
if err != nil {
return errors.Trace(err)
}
}
return nil
return errors.Trace(WalkMemBuffer(s.MemBuffer, f))
}

// SaveTo saves all buffered kv pairs into a Mutator.
Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ type Transaction interface {
// Valid returns if the transaction is valid.
// A transaction become invalid after commit or rollback.
Valid() bool
// GetMemBuffer return the MemBuffer binding to this transaction.
GetMemBuffer() MemBuffer
}

// Client is used to send request to KV layer.
Expand Down
21 changes: 21 additions & 0 deletions kv/memdb_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,24 @@ func (i *memDbIter) Value() []byte {
func (i *memDbIter) Close() {
i.iter.Release()
}

// WalkMemBuffer iterates all buffered kv pairs in memBuf
func WalkMemBuffer(memBuf MemBuffer, f func(k Key, v []byte) error) error {
iter, err := memBuf.Seek(nil)
if err != nil {
return errors.Trace(err)
}

defer iter.Close()
for iter.Valid() {
if err = f(iter.Key(), iter.Value()); err != nil {
return errors.Trace(err)
}
err = iter.Next()
if err != nil {
return errors.Trace(err)
}
}

return nil
}
6 changes: 5 additions & 1 deletion kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type mockTxn struct {
valid bool
}

// Always returns a retryable error.
// Commit always returns a retryable error.
func (t *mockTxn) Commit(ctx goctx.Context) error {
return ErrRetryable
}
Expand Down Expand Up @@ -95,6 +95,10 @@ func (t *mockTxn) Size() int {
return 0
}

func (t *mockTxn) GetMemBuffer() MemBuffer {
return nil
}

// mockStorage is used to start a must commit-failed txn.
type mockStorage struct {
}
Expand Down
8 changes: 7 additions & 1 deletion kv/union_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type UnionStore interface {
DelOption(opt Option)
// GetOption gets an option.
GetOption(opt Option) interface{}
// GetMemBuffer return the MemBuffer binding to this UnionStore.
GetMemBuffer() MemBuffer
}

// Option is used for customizing kv store's behaviors during a transaction.
Expand All @@ -54,7 +56,7 @@ type conditionPair struct {
err error
}

// UnionStore is an in-memory Store which contains a buffer for write and a
// unionStore is an in-memory Store which contains a buffer for write and a
// snapshot for read.
type unionStore struct {
*BufferStore
Expand Down Expand Up @@ -230,6 +232,10 @@ func (us *unionStore) GetOption(opt Option) interface{} {
return us.opts[opt]
}

func (us *unionStore) GetMemBuffer() MemBuffer {
return us.BufferStore.MemBuffer
}

type options map[Option]interface{}

func (opts options) Get(opt Option) (interface{}, bool) {
Expand Down
2 changes: 1 addition & 1 deletion meta/autoid/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Allocator interface {
// If allocIDs is true, it will allocate some IDs and save to the cache.
// If allocIDs is false, it will not allocate IDs.
Rebase(tableID, newBase int64, allocIDs bool) error
// Base is only used for test.
// Base return the current base of Allocator.
Base() int64
// End is only used for test.
End() int64
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sync"
"time"

"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/terror"
Expand Down Expand Up @@ -227,6 +228,10 @@ type SessionVars struct {
// MaxRowCountForINLJ defines max row count that the outer table of index nested loop join could be without force hint.
MaxRowCountForINLJ int

// IDAllocator is provided by kvEncoder, if it is provided, we will use it to alloc auto id instead of using
// Table.alloc.
IDAllocator autoid.Allocator

// MaxChunkSize defines max row count of a Chunk during query execution.
MaxChunkSize int
}
Expand Down
6 changes: 5 additions & 1 deletion store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func newTikvTxnWithStartTS(store *tikvStore, startTS uint64) (*tikvTxn, error) {
}, nil
}

// Implement transaction interface.
// Get implements transaction interface.
func (txn *tikvTxn) Get(k kv.Key) ([]byte, error) {
txnCmdCounter.WithLabelValues("get").Inc()
start := time.Now()
Expand Down Expand Up @@ -214,3 +214,7 @@ func (txn *tikvTxn) Len() int {
func (txn *tikvTxn) Size() int {
return txn.us.Size()
}

func (txn *tikvTxn) GetMemBuffer() kv.MemBuffer {
return txn.us.GetMemBuffer()
}
6 changes: 3 additions & 3 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,15 @@ type Table interface {
RemoveRecord(ctx context.Context, h int64, r []types.Datum) error

// AllocAutoID allocates an auto_increment ID for a new row.
AllocAutoID() (int64, error)
AllocAutoID(ctx context.Context) (int64, error)

// Allocator returns Allocator.
Allocator() autoid.Allocator
Allocator(ctx context.Context) autoid.Allocator

// RebaseAutoID rebases the auto_increment ID base.
// If allocIDs is true, it will allocate some IDs and save to the cache.
// If allocIDs is false, it will not allocate IDs.
RebaseAutoID(newBase int64, allocIDs bool) error
RebaseAutoID(ctx context.Context, newBase int64, allocIDs bool) error

// Meta returns TableInfo.
Meta() *model.TableInfo
Expand Down
6 changes: 3 additions & 3 deletions table/tables/memory_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,17 +248,17 @@ func (t *MemoryTable) RemoveRecord(ctx context.Context, h int64, r []types.Datum
}

// AllocAutoID implements table.Table AllocAutoID interface.
func (t *MemoryTable) AllocAutoID() (int64, error) {
func (t *MemoryTable) AllocAutoID(ctx context.Context) (int64, error) {
return t.alloc.Alloc(t.ID)
}

// Allocator implements table.Table Allocator interface.
func (t *MemoryTable) Allocator() autoid.Allocator {
func (t *MemoryTable) Allocator(ctx context.Context) autoid.Allocator {
return t.alloc
}

// RebaseAutoID implements table.Table RebaseAutoID interface.
func (t *MemoryTable) RebaseAutoID(newBase int64, isSetStep bool) error {
func (t *MemoryTable) RebaseAutoID(ctx context.Context, newBase int64, isSetStep bool) error {
return t.alloc.Rebase(t.ID, newBase, isSetStep)
}

Expand Down
6 changes: 3 additions & 3 deletions table/tables/memory_tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@ func (ts *testMemoryTableSuite) TestMemoryBasic(c *C) {
c.Assert(key, IsNil)
err = tb.UpdateRecord(nil, 0, nil, nil, nil)
c.Assert(err, NotNil)
alc := tb.Allocator()
alc := tb.Allocator(nil)
c.Assert(alc, NotNil)
err = tb.RebaseAutoID(0, false)
err = tb.RebaseAutoID(nil, 0, false)
c.Assert(err, IsNil)

autoid, err := tb.AllocAutoID()
autoid, err := tb.AllocAutoID(nil)
c.Assert(err, IsNil)
c.Assert(autoid, Greater, int64(0))

Expand Down
18 changes: 12 additions & 6 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (t *Table) AddRecord(ctx context.Context, r []types.Datum, skipHandleCheck
}
}
if !hasRecordID {
recordID, err = t.alloc.Alloc(t.ID)
recordID, err = t.AllocAutoID(ctx)
if err != nil {
return 0, errors.Trace(err)
}
Expand Down Expand Up @@ -697,18 +697,24 @@ func GetColDefaultValue(ctx context.Context, col *table.Column, defaultVals []ty
}

// AllocAutoID implements table.Table AllocAutoID interface.
func (t *Table) AllocAutoID() (int64, error) {
return t.alloc.Alloc(t.ID)
func (t *Table) AllocAutoID(ctx context.Context) (int64, error) {
return t.Allocator(ctx).Alloc(t.ID)
}

// Allocator implements table.Table Allocator interface.
func (t *Table) Allocator() autoid.Allocator {
func (t *Table) Allocator(ctx context.Context) autoid.Allocator {
if ctx != nil {
sessAlloc := ctx.GetSessionVars().IDAllocator
if sessAlloc != nil {
return sessAlloc
}
}
return t.alloc
}

// RebaseAutoID implements table.Table RebaseAutoID interface.
func (t *Table) RebaseAutoID(newBase int64, isSetStep bool) error {
return t.alloc.Rebase(t.ID, newBase, isSetStep)
func (t *Table) RebaseAutoID(ctx context.Context, newBase int64, isSetStep bool) error {
return t.Allocator(ctx).Rebase(t.ID, newBase, isSetStep)
}

// Seek implements table.Table Seek interface.
Expand Down
8 changes: 4 additions & 4 deletions table/tables/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (ts *testSuite) TestBasic(c *C) {
c.Assert(string(tb.RecordPrefix()), Not(Equals), "")
c.Assert(tables.FindIndexByColName(tb, "b"), NotNil)

autoid, err := tb.AllocAutoID()
autoid, err := tb.AllocAutoID(nil)
c.Assert(err, IsNil)
c.Assert(autoid, Greater, int64(0))

Expand Down Expand Up @@ -130,10 +130,10 @@ func (ts *testSuite) TestBasic(c *C) {
c.Assert(err, IsNil)

table.MockTableFromMeta(tb.Meta())
alc := tb.Allocator()
alc := tb.Allocator(nil)
c.Assert(alc, NotNil)

err = tb.RebaseAutoID(0, false)
err = tb.RebaseAutoID(nil, 0, false)
c.Assert(err, IsNil)
}

Expand Down Expand Up @@ -207,7 +207,7 @@ func (ts *testSuite) TestUniqueIndexMultipleNullEntries(c *C) {
c.Assert(string(tb.RecordPrefix()), Not(Equals), "")
c.Assert(tables.FindIndexByColName(tb, "b"), NotNil)

autoid, err := tb.AllocAutoID()
autoid, err := tb.AllocAutoID(nil)
c.Assert(err, IsNil)
c.Assert(autoid, Greater, int64(0))
c.Assert(ctx.NewTxn(), IsNil)
Expand Down
Loading

0 comments on commit 5f46848

Please sign in to comment.