Skip to content

Commit

Permalink
*: add tests for xapi
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood committed Apr 1, 2016
1 parent f773610 commit df71593
Show file tree
Hide file tree
Showing 6 changed files with 297 additions and 17 deletions.
10 changes: 10 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,3 +943,13 @@ func (s *testSuite) TestJoin(c *C) {
result.Check(ca.result)
}
}

func (s *testSuite) TestIndexScan(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int unique)")
tk.MustExec("insert t values (-1), (2), (3), (5), (6), (7), (8), (9)")
result := tk.MustQuery("select a from t where a < 0 or (a >= 2.1 and a < 5.1) or ( a > 5.9 and a <= 7.9) or a > '8.1'")
result.Check(testkit.Rows("-1", "3", "5", "6", "7", "9"))
}
6 changes: 3 additions & 3 deletions store/localstore/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func count(db engine.DB) int {
}

func (s *localstoreCompactorTestSuite) TestCompactor(c *C) {
store := createMemStore()
store := createMemStore(time.Now().Nanosecond())
db := store.(*dbStore).db
store.(*dbStore).compactor.Stop()

Expand Down Expand Up @@ -92,7 +92,7 @@ func (s *localstoreCompactorTestSuite) TestCompactor(c *C) {
}

func (s *localstoreCompactorTestSuite) TestGetAllVersions(c *C) {
store := createMemStore()
store := createMemStore(time.Now().Nanosecond())
compactor := store.(*dbStore).compactor
txn, _ := store.Begin()
txn.Set([]byte("a"), []byte("1"))
Expand All @@ -115,7 +115,7 @@ func (s *localstoreCompactorTestSuite) TestGetAllVersions(c *C) {
// TestStartStop is to test `Panic: sync: WaitGroup is reused before previous Wait has returned`
// in Stop function.
func (s *localstoreCompactorTestSuite) TestStartStop(c *C) {
store := createMemStore()
store := createMemStore(time.Now().Nanosecond())
db := store.(*dbStore).db

for i := 0; i < 10000; i++ {
Expand Down
9 changes: 0 additions & 9 deletions store/localstore/local_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,15 +259,6 @@ func getIndexRowFromRange(idxInfo *tipb.IndexInfo, txn kv.Transaction, ran kv.Ke
return rows, nil
}

func datumStrings(datums ...types.Datum) []string {
var strs []string
for _, d := range datums {
s, _ := d.ToString()
strs = append(strs, s)
}
return strs
}

func decodeHandle(data []byte) (int64, error) {
var h int64
buf := bytes.NewBuffer(data)
Expand Down
8 changes: 4 additions & 4 deletions store/localstore/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ type testMvccSuite struct {
s kv.Storage
}

func createMemStore() kv.Storage {
func createMemStore(suffix int) kv.Storage {
// avoid cache
path := fmt.Sprintf("memory://%d", time.Now().UnixNano())
path := fmt.Sprintf("memory://%d", suffix)
d := Driver{
goleveldb.MemoryDriver{},
}
Expand Down Expand Up @@ -81,7 +81,7 @@ func (t *testMvccSuite) scanRawEngine(c *C, f func([]byte, []byte)) {

func (t *testMvccSuite) SetUpTest(c *C) {
// create new store
t.s = createMemStore()
t.s = createMemStore(time.Now().Nanosecond())
t.addDirtyData()
// insert test data
txn, err := t.s.Begin()
Expand Down Expand Up @@ -295,7 +295,7 @@ func (t *testMvccSuite) TestMvccSuiteGetLatest(c *C) {
}

func (t *testMvccSuite) TestBufferedIterator(c *C) {
s := createMemStore()
s := createMemStore(time.Now().Nanosecond())
tx, _ := s.Begin()
tx.Set([]byte{0x0, 0x0}, []byte("1"))
tx.Set([]byte{0x0, 0xff}, []byte("2"))
Expand Down
280 changes: 280 additions & 0 deletions store/localstore/xapi_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package localstore

import (
"fmt"
"io/ioutil"
"math"
"sort"
"testing"
"time"

"github.com/golang/protobuf/proto"
"github.com/juju/errors"
. "github.com/pingcap/check"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/types"
"github.com/pingcap/tidb/xapi/tablecodec"
"github.com/pingcap/tidb/xapi/tipb"
)

func TestT(t *testing.T) {
TestingT(t)
}

var _ = Suite(&testXAPISuite{})

type testXAPISuite struct {
}

var tbInfo = &simpleTableInfo{
tID: 1,
cTypes: []byte{mysql.TypeVarchar, mysql.TypeDouble},
cIDs: []int64{3, 4},
indices: []int{0}, // column 3 of varchar type.
iIDs: []int64{5},
}

func (s *testXAPISuite) TestSelect(c *C) {
store := createMemStore(time.Now().Nanosecond())
count := int64(10)
err := prepareTableData(store, tbInfo, count, genValues)
c.Check(err, IsNil)

// Select Table request.
txn, err := store.Begin()
c.Check(err, IsNil)
client := txn.GetClient()
req, err := prepareSelectRequest(tbInfo, txn.StartTS())
c.Check(err, IsNil)
resp := client.Send(req)
subResp, err := resp.Next()
c.Check(err, IsNil)
data, err := ioutil.ReadAll(subResp)
c.Check(err, IsNil)
selResp := new(tipb.SelectResponse)
proto.Unmarshal(data, selResp)
c.Check(selResp.Rows, HasLen, int(count))
for i, row := range selResp.Rows {
handle := int64(i + 1)
expectedDatums := []types.Datum{types.NewDatum(handle)}
expectedDatums = append(expectedDatums, genValues(handle, tbInfo)...)
var expectedEncoded []byte
expectedEncoded, err = codec.EncodeValue(nil, expectedDatums...)
c.Assert(err, IsNil)
c.Assert(row.Data, BytesEquals, expectedEncoded)
}
txn.Commit()

// Select Index request.
txn, err = store.Begin()
c.Check(err, IsNil)
client = txn.GetClient()
req, err = prepareIndexRequest(tbInfo, txn.StartTS())
c.Check(err, IsNil)
resp = client.Send(req)
subResp, err = resp.Next()
c.Check(err, IsNil)
data, err = ioutil.ReadAll(subResp)
c.Check(err, IsNil)
idxResp := new(tipb.SelectResponse)
proto.Unmarshal(data, idxResp)
c.Check(idxResp.Rows, HasLen, int(count))
handles := make([]int, 0, 10)
for _, row := range idxResp.Rows {
var err error
datums, err := codec.Decode(row.Handle)
c.Check(err, IsNil)
c.Check(datums, HasLen, 1)
handles = append(handles, int(datums[0].GetInt64()))
}
sort.Ints(handles)
for i, h := range handles {
c.Assert(h, Equals, i+1)
}
txn.Commit()
}

// simpleTableInfo just have the minimum information enough to describe the table.
// The first column is pk handle column.
type simpleTableInfo struct {
tID int64 // table ID.
cTypes []byte // columns not including pk handle column.
cIDs []int64
indices []int // indexed column offsets. only single column index for now.
iIDs []int64
}

func (s *simpleTableInfo) toPBTableInfo() *tipb.TableInfo {
tbInfo := new(tipb.TableInfo)
tbInfo.TableId = proto.Int64(s.tID)
pkColumn := new(tipb.ColumnInfo)
pkColumn.Tp = proto.Int32(int32(mysql.TypeLonglong))
// It's ok to just use table ID for pk column ID, as it doesn't have a column kv.
pkColumn.ColumnId = tbInfo.TableId
pkColumn.PkHandle = proto.Bool(true)
tbInfo.Columns = append(tbInfo.Columns, pkColumn)
for i, colTp := range s.cTypes {
coInfo := &tipb.ColumnInfo{
ColumnId: proto.Int64(s.cIDs[i]),
Tp: proto.Int32(int32(colTp)),
PkHandle: proto.Bool(false),
}
tbInfo.Columns = append(tbInfo.Columns, coInfo)
}
return tbInfo
}

func (s *simpleTableInfo) toPBIndexInfo(idxOff int) *tipb.IndexInfo {
idxInfo := new(tipb.IndexInfo)
idxInfo.TableId = proto.Int64(s.tID)
idxInfo.IndexId = proto.Int64(s.iIDs[idxOff])
colOff := s.indices[idxOff]
idxInfo.Columns = []*tipb.ColumnInfo{
{
ColumnId: proto.Int64(s.cIDs[colOff]),
Tp: proto.Int32((int32(s.cTypes[colOff]))),
PkHandle: proto.Bool(false),
},
}
return idxInfo
}

func genValues(handle int64, tbl *simpleTableInfo) []types.Datum {
values := make([]types.Datum, 0, len(tbl.cTypes))
for _, tp := range tbl.cTypes {
switch tp {
case mysql.TypeLong:
values = append(values, types.NewDatum(handle))
case mysql.TypeVarchar:
values = append(values, types.NewDatum(fmt.Sprintf("varchar:%d", handle)))
case mysql.TypeDouble:
values = append(values, types.NewDatum(float64(handle)/10))
default:
values = append(values, types.Datum{})
}
}
return values
}

type genValueFunc func(handle int64, tbl *simpleTableInfo) []types.Datum

func prepareTableData(store kv.Storage, tbl *simpleTableInfo, count int64, gen genValueFunc) error {
txn, err := store.Begin()
if err != nil {
return errors.Trace(err)
}
for i := int64(1); i <= count; i++ {
setRow(txn, i, tbl, gen)
}
return txn.Commit()
}

func setRow(txn kv.Transaction, handle int64, tbl *simpleTableInfo, gen genValueFunc) error {
rowKey := tablecodec.EncodeRowKey(tbl.tID, codec.EncodeInt(nil, handle))
txn.Set(rowKey, []byte(txn.String()))
columnValues := gen(handle, tbl)
for i, v := range columnValues {
cKey, cVal, err := encodeColumnKV(tbl.tID, handle, tbl.cIDs[i], v)
if err != nil {
return errors.Trace(err)
}
err = txn.Set(cKey, cVal)
if err != nil {
return errors.Trace(err)
}
}
for i, idxCol := range tbl.indices {
idxVal := columnValues[idxCol]
encoded, err := codec.EncodeKey(nil, idxVal, types.NewDatum(handle))
if err != nil {
return errors.Trace(err)
}
idxKey := tablecodec.EncodeIndexSeekKey(tbl.tID, tbl.iIDs[i], encoded)
err = txn.Set(idxKey, []byte{0})
if err != nil {
return errors.Trace(err)
}
}
return nil
}

func encodeColumnKV(tid, handle, cid int64, value types.Datum) (kv.Key, []byte, error) {
key := tablecodec.EncodeColumnKey(tid, handle, cid)
val, err := codec.EncodeValue(nil, value)
if err != nil {
return nil, nil, errors.Trace(err)
}
return key, val, nil
}

func prepareSelectRequest(simpleInfo *simpleTableInfo, startTs int64) (*kv.Request, error) {
selReq := new(tipb.SelectRequest)
selReq.TableInfo = simpleInfo.toPBTableInfo()
selReq.StartTs = proto.Int64(startTs)
selReq.Ranges = []*tipb.KeyRange{fullPBTableRange}
data, err := proto.Marshal(selReq)
if err != nil {
return nil, errors.Trace(err)
}
req := new(kv.Request)
req.Tp = kv.ReqTypeSelect
req.Concurrency = 1
req.KeyRanges = []kv.KeyRange{fullTableRange(simpleInfo.tID)}
req.Data = data
return req, nil
}

func fullTableRange(tid int64) kv.KeyRange {
return kv.KeyRange{
StartKey: tablecodec.EncodeRowKey(tid, codec.EncodeInt(nil, math.MinInt64)),
EndKey: tablecodec.EncodeRowKey(tid, codec.EncodeInt(nil, math.MaxInt64)),
}
}

var fullPBTableRange = &tipb.KeyRange{
Low: codec.EncodeInt(nil, math.MinInt64),
High: codec.EncodeInt(nil, math.MaxInt64),
}
var fullPBIndexRange = &tipb.KeyRange{
Low: []byte{0},
High: []byte{255},
}

func fullIndexRange(tid int64, idxID int64) kv.KeyRange {
return kv.KeyRange{
StartKey: tablecodec.EncodeIndexSeekKey(tid, idxID, []byte{0}),
EndKey: tablecodec.EncodeIndexSeekKey(tid, idxID, []byte{255}),
}
}

func prepareIndexRequest(simpleInfo *simpleTableInfo, startTs int64) (*kv.Request, error) {
selReq := new(tipb.SelectRequest)
selReq.IndexInfo = simpleInfo.toPBIndexInfo(0)
selReq.StartTs = proto.Int64(startTs)
selReq.Ranges = []*tipb.KeyRange{fullPBIndexRange}
data, err := proto.Marshal(selReq)
if err != nil {
return nil, errors.Trace(err)
}
req := new(kv.Request)
req.Tp = kv.ReqTypeIndex
req.Concurrency = 1
req.KeyRanges = []kv.KeyRange{fullIndexRange(simpleInfo.tID, simpleInfo.iIDs[0])}
req.Data = data
return req, nil
}
1 change: 0 additions & 1 deletion xapi/tablecodec/tablecodec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,4 @@ func (s *tableCodecSuite) TestTableCodec(c *C) {
h, err = DecodeRowKey(key)
c.Assert(err, IsNil)
c.Assert(h, Equals, int64(2))

}

0 comments on commit df71593

Please sign in to comment.