From 1bd25a14d8076a3e3e725821522a3a9ff4ef8189 Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Thu, 24 Mar 2016 22:05:11 +0800 Subject: [PATCH] store/localstore: add local store xapi support. --- executor/executor_xapi.go | 121 ++++++++++-- kv/key.go | 19 +- kv/key_test.go | 2 +- kv/kv.go | 12 +- optimizer/plan/planbuilder_join.go | 2 +- optimizer/plan/plans.go | 3 + store/localstore/kv.go | 10 +- store/localstore/local_client.go | 185 ++++++++++++++++++ store/localstore/local_pd.go | 39 ++++ store/localstore/local_region.go | 299 +++++++++++++++++++++++++++++ store/localstore/txn.go | 21 +- util/codec/codec_test.go | 15 ++ util/codec/decimal.go | 39 +--- xapi/tablecodec/tablecodec.go | 59 ++---- xapi/tablecodec/tablecodec_test.go | 2 +- xapi/xapi.go | 24 ++- 16 files changed, 727 insertions(+), 125 deletions(-) create mode 100644 store/localstore/local_client.go create mode 100644 store/localstore/local_pd.go create mode 100644 store/localstore/local_region.go diff --git a/executor/executor_xapi.go b/executor/executor_xapi.go index d21a5048e9beb..4861d09433871 100644 --- a/executor/executor_xapi.go +++ b/executor/executor_xapi.go @@ -93,6 +93,7 @@ func resultRowToRow(t table.Table, h int64, data []types.Datum) *Row { } func (e *XSelectTableExec) doRequest() error { + // TODO: add offset and limit. txn, err := e.ctx.GetTxn(false) if err != nil { return errors.Trace(err) @@ -138,6 +139,9 @@ func (e *XSelectIndexExec) Next() (*Row, error) { return nil, nil } row := e.rows[e.cursor] + for i, field := range e.indexPlan.Fields() { + field.Expr.SetDatum(row.Data[i]) + } e.cursor++ return row, nil } @@ -160,33 +164,53 @@ func (e *XSelectIndexExec) doRequest() error { if err != nil { return errors.Trace(err) } - indexOrder := make(map[int64]int) - for i, h := range handles { - indexOrder[h] = i + if len(handles) == 0 { + return nil } + + var indexOrder map[int64]int + if !e.indexPlan.OutOfOrder { + // Save the index order. + indexOrder = make(map[int64]int) + for i, h := range handles { + indexOrder[h] = i + } + } + sort.Sort(int64Slice(handles)) tblResult, err := e.doTableRequest(txn, handles) - unorderedRows, err := extractRowsFromTableResult(e.table, tblResult) + rows, err := extractRowsFromTableResult(e.table, tblResult) if err != nil { return errors.Trace(err) } - // Restore the original index order. - rows := make([]*Row, len(handles)) - for i, h := range handles { - oi := indexOrder[h] - rows[oi] = unorderedRows[i] + if len(rows) < len(handles) { + return errors.Errorf("got %d rows with %d handles", len(rows), len(handles)) + } + if !e.indexPlan.OutOfOrder { + // Restore the index order. + orderedRows := make([]*Row, len(handles)) + for i, h := range handles { + oi := indexOrder[h] + orderedRows[oi] = rows[i] + } + rows = orderedRows } e.rows = rows return nil } func (e *XSelectIndexExec) doIndexRequest(txn kv.Transaction) (*xapi.SelectResult, error) { + // TODO: add offset and limit. selIdxReq := new(tipb.SelectRequest) startTs := txn.StartTS() selIdxReq.StartTs = &startTs selIdxReq.IndexInfo = tablecodec.IndexToProto(e.table.Meta(), e.indexPlan.Index) + fieldTypes := make([]*types.FieldType, len(e.indexPlan.Index.Columns)) + for i, v := range e.indexPlan.Index.Columns { + fieldTypes[i] = &(e.table.Cols()[v.Offset].FieldType) + } var err error - selIdxReq.Ranges, err = indexRangesToPBRanges(e.indexPlan.Ranges) + selIdxReq.Ranges, err = indexRangesToPBRanges(e.indexPlan.Ranges, fieldTypes) if err != nil { return nil, errors.Trace(err) } @@ -194,6 +218,7 @@ func (e *XSelectIndexExec) doIndexRequest(txn kv.Transaction) (*xapi.SelectResul } func (e *XSelectIndexExec) doTableRequest(txn kv.Transaction, handles []int64) (*xapi.SelectResult, error) { + // TODO: add offset and limit. selTableReq := new(tipb.SelectRequest) startTs := txn.StartTS() selTableReq.StartTs = &startTs @@ -206,7 +231,7 @@ func (e *XSelectIndexExec) doTableRequest(txn kv.Transaction, handles []int64) ( } pbRange := new(tipb.KeyRange) pbRange.Low = codec.EncodeInt(nil, h) - pbRange.High = codec.EncodeInt(nil, h) + pbRange.High = kv.Key(pbRange.Low).PrefixNext() selTableReq.Ranges = append(selTableReq.Ranges, pbRange) } selTableReq.Where = conditionsToPBExpression(e.indexPlan.FilterConditions...) @@ -236,28 +261,92 @@ func tableRangesToPBRanges(tableRanges []plan.TableRange) []*tipb.KeyRange { return hrs } -func indexRangesToPBRanges(ranges []*plan.IndexRange) ([]*tipb.KeyRange, error) { +func indexRangesToPBRanges(ranges []*plan.IndexRange, fieldTypes []*types.FieldType) ([]*tipb.KeyRange, error) { keyRanges := make([]*tipb.KeyRange, 0, len(ranges)) for _, ran := range ranges { - low, err := codec.EncodeValue(nil, ran.LowVal...) + err := convertIndexRangeTypes(ran, fieldTypes) + if err != nil { + return nil, errors.Trace(err) + } + low, err := codec.EncodeKey(nil, ran.LowVal...) if err != nil { return nil, errors.Trace(err) } if ran.LowExclude { - low = []byte(kv.Key(low).PartialNext()) + low = []byte(kv.Key(low).PrefixNext()) } - high, err := codec.EncodeValue(nil, ran.HighVal...) + high, err := codec.EncodeKey(nil, ran.HighVal...) if err != nil { return nil, errors.Trace(err) } if !ran.HighExclude { - high = []byte(kv.Key(high).PartialNext()) + high = []byte(kv.Key(high).PrefixNext()) } keyRanges = append(keyRanges, &tipb.KeyRange{Low: low, High: high}) } return keyRanges, nil } +func convertIndexRangeTypes(ran *plan.IndexRange, fieldTypes []*types.FieldType) error { + for i := range ran.LowVal { + if ran.LowVal[i].Kind() == types.KindMinNotNull { + ran.LowVal[i].SetBytes([]byte{}) + continue + } + converted, err := ran.LowVal[i].ConvertTo(fieldTypes[i]) + if err != nil { + return errors.Trace(err) + } + cmp, err := converted.CompareDatum(ran.LowVal[i]) + if err != nil { + return errors.Trace(err) + } + ran.LowVal[i] = converted + if cmp == 0 { + continue + } + if cmp < 0 && !ran.LowExclude { + // For int column a, a >= 1.1 is converted to a > 1. + ran.LowExclude = true + } else if cmp > 0 && ran.LowExclude { + // For int column a, a > 1.9 is converted to a >= 2. + ran.LowExclude = false + } + // The converted value has changed, the other column values doesn't matter. + // For equal condition, converted value changed means there will be no match. + // For non equal condition, this column would be the last one to build the range. + // Break here to prevent the rest columns modify LowExclude again. + break + } + for i := range ran.HighVal { + if ran.HighVal[i].Kind() == types.KindMaxValue { + continue + } + converted, err := ran.HighVal[i].ConvertTo(fieldTypes[i]) + if err != nil { + return errors.Trace(err) + } + cmp, err := converted.CompareDatum(ran.HighVal[i]) + if err != nil { + return errors.Trace(err) + } + ran.HighVal[i] = converted + if cmp == 0 { + continue + } + // For int column a, a < 1.1 is converted to a <= 1. + if cmp < 0 && ran.HighExclude { + ran.HighExclude = false + } + // For int column a, a <= 1.9 is converted to a < 2. + if cmp > 0 && !ran.HighExclude { + ran.HighExclude = true + } + break + } + return nil +} + func extractHandlesFromIndexResult(idxResult *xapi.SelectResult) ([]int64, error) { var handles []int64 for { diff --git a/kv/key.go b/kv/key.go index a03a027996879..c81332870e7aa 100644 --- a/kv/key.go +++ b/kv/key.go @@ -26,13 +26,18 @@ func (k Key) Next() Key { return buf } -// PartialNext returns the next partial key. -// For example, a key composed with two columns. -// Next will return a key with the same first column value, -// but PartialNext will return a key with different first column value. -// key encoding method must ensure the next different value has the -// same length as the original value. -func (k Key) PartialNext() Key { +// PrefixNext returns the next prefix key. +// +// Assume there are keys like: +// +// rowkey1 +// rowkey1_column1 +// rowkey1_column2 +// rowKey2 +// +// If we seek 'rowkey1' Next, we will get 'rowkey1_colum1'. +// If we seek 'rowkey1' PrefixNext, we will get 'rowkey2'. +func (k Key) PrefixNext() Key { buf := make([]byte, len([]byte(k))) copy(buf, []byte(k)) var i int diff --git a/kv/key_test.go b/kv/key_test.go index 4e3ddaa9bbbba..2110f117104a6 100644 --- a/kv/key_test.go +++ b/kv/key_test.go @@ -40,7 +40,7 @@ func (s *testKeySuite) TestPartialNext(c *C) { c.Assert(cmp, Equals, -1) // Use next partial key, we can skip all index keys with first column value equal to "abc". - nextPartialKey := Key(seekKey).PartialNext() + nextPartialKey := Key(seekKey).PrefixNext() cmp = bytes.Compare(nextPartialKey, keyA) c.Assert(cmp, Equals, 1) diff --git a/kv/kv.go b/kv/kv.go index dcd22039af31c..9e7067883594b 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -13,7 +13,10 @@ package kv -import "io" +import ( + "bytes" + "io" +) const ( // PresumeKeyNotExists directives that when dealing with a Get operation but failing to read data from cache, @@ -97,6 +100,8 @@ type Client interface { const ( ReqTypeSelect = 101 ReqTypeIndex = 102 + + ReqSubTypeBasic = 0 ) // KeyRange represents a range where StartKey <= key < EndKey. @@ -105,6 +110,11 @@ type KeyRange struct { EndKey Key } +// IsPoint checks if the key range represents a point. +func (r *KeyRange) IsPoint() bool { + return bytes.Equal(r.StartKey.PrefixNext(), r.EndKey) +} + // Request represents a kv request. type Request struct { // The request type. diff --git a/optimizer/plan/planbuilder_join.go b/optimizer/plan/planbuilder_join.go index 964cb850dd9fc..59e9d3dc61487 100644 --- a/optimizer/plan/planbuilder_join.go +++ b/optimizer/plan/planbuilder_join.go @@ -602,7 +602,7 @@ func (b *planBuilder) buildJoin(sel *ast.SelectStmt) Plan { if !path.attachCondition(whereCond, nil) { // TODO: Find a better way to handle this condition. path.conditions = append(path.conditions, whereCond) - log.Errorf("Failed to attach where condtion.") + log.Warnf("Failed to attach where condtion in %s", sel.Text()) } } path.extractEqualConditon() diff --git a/optimizer/plan/plans.go b/optimizer/plan/plans.go index 7a1a7e5d46192..490c1846905d3 100644 --- a/optimizer/plan/plans.go +++ b/optimizer/plan/plans.go @@ -135,6 +135,9 @@ type IndexScan struct { // FilterConditions can be used to filter result. FilterConditions []ast.ExprNode + + // OutOfOrder indicates if the index scan can return out of order. + OutOfOrder bool } // Accept implements Plan Accept interface. diff --git a/store/localstore/kv.go b/store/localstore/kv.go index 88979bc5aa846..f47f6b24bd9d3 100644 --- a/store/localstore/kv.go +++ b/store/localstore/kv.go @@ -283,6 +283,8 @@ type dbStore struct { mu sync.Mutex closed bool + + pd localPD } type storeCache struct { @@ -354,8 +356,14 @@ func (d Driver) Open(path string) (kv.Storage, error) { s.recentUpdates, err = segmentmap.NewSegmentMap(100) if err != nil { return nil, errors.Trace(err) - } + regionServers := buildLocalRegionServers(s) + var infos []*regionInfo + for _, rs := range regionServers { + ri := ®ionInfo{startKey: rs.startKey, endKey: rs.endKey, rs: rs} + infos = append(infos, ri) + } + s.pd.SetRegionInfo(infos) mc.cache[engineSchema] = s s.compactor.Start() s.wg.Add(1) diff --git a/store/localstore/local_client.go b/store/localstore/local_client.go new file mode 100644 index 0000000000000..67f9a7e0e0e60 --- /dev/null +++ b/store/localstore/local_client.go @@ -0,0 +1,185 @@ +package localstore + +import ( + "io" + + "github.com/pingcap/tidb/kv" +) + +type dbClient struct { + store *dbStore + regionInfo []*regionInfo +} + +func (c *dbClient) Send(req *kv.Request) kv.Response { + it := &response{ + client: c, + concurrency: req.Concurrency, + } + it.tasks = buildRegionTasks(c, req) + if len(it.tasks) == 0 { + // Empty range doesn't produce any task. + it.finished = true + return it + } + if it.concurrency > len(it.tasks) { + it.concurrency = len(it.tasks) + } else if it.concurrency <= 0 { + it.concurrency = 1 + } + it.taskChan = make(chan *task, it.concurrency) + it.errChan = make(chan error, it.concurrency) + it.respChan = make(chan *regionResponse, it.concurrency) + it.run() + return it +} + +func (c *dbClient) SupportRequestType(reqType, subType int64) bool { + switch reqType { + case kv.ReqTypeSelect: + return subType == kv.ReqSubTypeBasic + case kv.ReqTypeIndex: + return subType == kv.ReqSubTypeBasic + } + return false +} + +func (c *dbClient) updateRegionInfo() { + c.regionInfo = c.store.pd.GetRegionInfo() +} + +type localResponseReader struct { + s []byte + i int64 +} + +func (r *localResponseReader) Read(b []byte) (n int, err error) { + if len(b) == 0 { + return 0, nil + } + if r.i >= int64(len(r.s)) { + return 0, io.EOF + } + n = copy(b, r.s[r.i:]) + r.i += int64(n) + return +} + +func (r *localResponseReader) Close() error { + r.i = int64(len(r.s)) + return nil +} + +type response struct { + client *dbClient + reqSent int + respGot int + concurrency int + tasks []*task + responses []*regionResponse + taskChan chan *task + respChan chan *regionResponse + errChan chan error + finished bool +} + +type task struct { + request *regionRequest + region *localRegion +} + +func (it *response) Next() (resp io.ReadCloser, err error) { + if it.finished { + return nil, nil + } + var regionResp *regionResponse + select { + case regionResp = <-it.respChan: + case err = <-it.errChan: + } + if err != nil { + it.Close() + return nil, err + } + if len(regionResp.newStartKey) != 0 { + it.client.updateRegionInfo() + retryTasks := it.createRetryTasks(regionResp) + it.tasks = append(it.tasks, retryTasks...) + } + if it.reqSent < len(it.tasks) { + it.taskChan <- it.tasks[it.reqSent] + it.reqSent++ + } + it.respGot++ + if it.reqSent == len(it.tasks) && it.respGot == it.reqSent { + it.Close() + } + return &localResponseReader{s: regionResp.data}, nil +} + +func (it *response) createRetryTasks(resp *regionResponse) []*task { + return nil +} + +func buildRegionTasks(client *dbClient, req *kv.Request) (tasks []*task) { + infoCursor := 0 + rangeCursor := 0 + var regionReq *regionRequest + infos := client.regionInfo + for rangeCursor < len(req.KeyRanges) && infoCursor < len(infos) { + info := infos[infoCursor] + ran := req.KeyRanges[rangeCursor] + + rangeOnLeft := ran.EndKey.Cmp(info.startKey) <= 0 + rangeOnRight := info.endKey.Cmp(ran.StartKey) <= 0 + noDataOnRegion := rangeOnLeft || rangeOnRight + if noDataOnRegion { + if rangeOnLeft { + rangeCursor++ + } else { + infoCursor++ + } + } else { + regionReq = ®ionRequest{ + Tp: req.Tp, + startKey: info.startKey, + endKey: info.endKey, + data: req.Data, + } + task := &task{ + region: info.rs, + request: regionReq, + } + tasks = append(tasks, task) + infoCursor++ + } + } + return +} + +func (it *response) Close() error { + // Make goroutines quit. + if it.finished { + return nil + } + close(it.taskChan) + it.finished = true + return nil +} + +func (it *response) run() { + for i := 0; i < it.concurrency; i++ { + go func() { + for task := range it.taskChan { + resp, err := task.region.Handle(task.request) + if err != nil { + it.errChan <- err + break + } + it.respChan <- resp + } + }() + it.taskChan <- it.tasks[i] + it.reqSent++ + } +} diff --git a/store/localstore/local_pd.go b/store/localstore/local_pd.go new file mode 100644 index 0000000000000..c3c687b79d605 --- /dev/null +++ b/store/localstore/local_pd.go @@ -0,0 +1,39 @@ +package localstore + +import "github.com/pingcap/tidb/kv" + +type localPD struct { + regions []*regionInfo +} + +type regionInfo struct { + startKey kv.Key + endKey kv.Key + rs *localRegion +} + +func (pd *localPD) GetRegionInfo() []*regionInfo { + return pd.regions +} + +func (pd *localPD) SetRegionInfo(regions []*regionInfo) { + pd.regions = regions +} + +// ChangeRegionInfo used for test handling region info change. +func ChangeRegionInfo(store kv.Storage, regionID int, startKey, endKey []byte) { + s := store.(*dbStore) + for i, region := range s.pd.regions { + if region.rs.id == regionID { + newRegionInfo := ®ionInfo{ + startKey: startKey, + endKey: endKey, + rs: region.rs, + } + region.rs.startKey = startKey + region.rs.endKey = endKey + s.pd.regions[i] = newRegionInfo + break + } + } +} diff --git a/store/localstore/local_region.go b/store/localstore/local_region.go new file mode 100644 index 0000000000000..1ff4bb8af5fa7 --- /dev/null +++ b/store/localstore/local_region.go @@ -0,0 +1,299 @@ +package localstore + +import ( + "bytes" + + "encoding/binary" + "github.com/golang/protobuf/proto" + "github.com/juju/errors" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/terror" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/types" + "github.com/pingcap/tidb/xapi/tablecodec" + "github.com/pingcap/tidb/xapi/tipb" +) + +// local region server. +type localRegion struct { + id int + store *dbStore + startKey []byte + endKey []byte +} + +type regionRequest struct { + Tp int64 + data []byte + startKey []byte + endKey []byte +} + +type regionResponse struct { + req *regionRequest + err error + data []byte + // If region missed some request key range, newStartKey and newEndKey is returned. + newStartKey []byte + newEndKey []byte +} + +func (rs *localRegion) Handle(req *regionRequest) (*regionResponse, error) { + resp := ®ionResponse{ + req: req, + } + if req.Tp == kv.ReqTypeSelect || req.Tp == kv.ReqTypeIndex { + sel := new(tipb.SelectRequest) + err := proto.Unmarshal(req.data, sel) + if err != nil { + return nil, errors.Trace(err) + } + txn := newTxn(rs.store, kv.Version{Ver: uint64(*sel.StartTs)}) + var rows []*tipb.Row + if req.Tp == kv.ReqTypeSelect { + rows, err = rs.getRowsFromSelectReq(txn, sel) + } else { + rows, err = rs.getRowsFromIndexReq(txn, sel) + } + selResp := new(tipb.SelectResponse) + selResp.Error = toPBError(err) + selResp.Rows = rows + resp.err = err + data, err := proto.Marshal(selResp) + if err != nil { + return nil, errors.Trace(err) + } + resp.data = data + } + if bytes.Compare(rs.startKey, req.startKey) < 0 || bytes.Compare(rs.endKey, req.endKey) > 0 { + resp.newStartKey = rs.startKey + resp.newEndKey = rs.endKey + } + return resp, nil +} + +func (rs *localRegion) getRowsFromSelectReq(txn kv.Transaction, sel *tipb.SelectRequest) ([]*tipb.Row, error) { + tid := sel.TableInfo.GetTableId() + kvRanges := rs.extractKVRanges(tid, 0, sel.Ranges) + var handles []int64 + for _, ran := range kvRanges { + ranHandles, err := seekRangeHandles(tid, txn, ran) + if err != nil { + return nil, errors.Trace(err) + } + handles = append(handles, ranHandles...) + } + var rows []*tipb.Row + for _, handle := range handles { + row, err := rs.getRowByHandle(txn, tid, handle, sel.TableInfo.Columns) + if err != nil { + return nil, errors.Trace(err) + } + rows = append(rows, row) + } + return rows, nil +} + +func (rs *localRegion) extractKVRanges(tid int64, idxID int64, krans []*tipb.KeyRange) []kv.KeyRange { + var kvRanges []kv.KeyRange + for _, kran := range krans { + var upperKey, lowerKey kv.Key + if idxID == 0 { + upperKey = tablecodec.EncodeRowKey(tid, kran.GetHigh()) + if bytes.Compare(upperKey, rs.startKey) <= 0 { + continue + } + lowerKey = tablecodec.EncodeRowKey(tid, kran.GetLow()) + } else { + upperKey = tablecodec.EncodeIndexSeekKey(tid, idxID, kran.GetHigh()) + if bytes.Compare(upperKey, rs.startKey) <= 0 { + continue + } + lowerKey = tablecodec.EncodeIndexSeekKey(tid, idxID, kran.GetLow()) + } + if bytes.Compare(lowerKey, rs.endKey) >= 0 { + break + } + var kvr kv.KeyRange + if bytes.Compare(lowerKey, rs.startKey) <= 0 { + kvr.StartKey = rs.startKey + } else { + kvr.StartKey = lowerKey + } + if bytes.Compare(upperKey, rs.endKey) <= 0 { + kvr.EndKey = upperKey + } else { + kvr.EndKey = rs.endKey + } + kvRanges = append(kvRanges, kvr) + } + return kvRanges +} + +func (rs *localRegion) getRowByHandle(txn kv.Transaction, tid, handle int64, columns []*tipb.ColumnInfo) (*tipb.Row, error) { + row := new(tipb.Row) + var d types.Datum + d.SetInt64(handle) + var err error + row.Handle, err = codec.EncodeValue(nil, d) + if err != nil { + return nil, errors.Trace(err) + } + for _, col := range columns { + if *col.PkHandle { + row.Data = append(row.Data, row.Handle...) + } else { + key := tablecodec.EncodeColumnKey(tid, handle, col.GetColumnId()) + data, err := txn.Get(key) + if err != nil { + return nil, errors.Trace(err) + } + row.Data = append(row.Data, data...) + } + } + return row, nil +} + +func toPBError(err error) *tipb.Error { + if err == nil { + return nil + } + perr := new(tipb.Error) + code := int32(1) + perr.Code = &code + errStr := err.Error() + perr.Msg = &errStr + return perr +} + +func seekRangeHandles(tid int64, txn kv.Transaction, ran kv.KeyRange) ([]int64, error) { + if ran.IsPoint() { + _, err := txn.Get(ran.StartKey) + if terror.ErrorEqual(err, kv.ErrNotExist) { + return nil, nil + } else if err != nil { + return nil, errors.Trace(err) + } + h, err := tablecodec.DecodeRowKey(ran.StartKey) + if err != nil { + return nil, errors.Trace(err) + } + return []int64{h}, nil + } + seekKey := ran.StartKey + var handles []int64 + for { + it, err := txn.Seek(seekKey) + if err != nil { + return nil, errors.Trace(err) + } + if !it.Valid() || it.Key().Cmp(ran.EndKey) >= 0 { + break + } + h, err := tablecodec.DecodeRowKey(it.Key()) + if err != nil { + return nil, errors.Trace(err) + } + handles = append(handles, h) + seekKey = it.Key().PrefixNext() + } + return handles, nil +} + +func (rs *localRegion) getRowsFromIndexReq(txn kv.Transaction, sel *tipb.SelectRequest) ([]*tipb.Row, error) { + tid := sel.IndexInfo.GetTableId() + idxID := sel.IndexInfo.GetIndexId() + kvRanges := rs.extractKVRanges(tid, idxID, sel.Ranges) + var rows []*tipb.Row + for _, ran := range kvRanges { + ranRows, err := getIndexRowFromRange(sel.IndexInfo, txn, ran) + if err != nil { + return nil, errors.Trace(err) + } + rows = append(rows, ranRows...) + } + return rows, nil +} + +func getIndexRowFromRange(idxInfo *tipb.IndexInfo, txn kv.Transaction, ran kv.KeyRange) ([]*tipb.Row, error) { + var rows []*tipb.Row + seekKey := ran.StartKey + for { + it, err := txn.Seek(seekKey) + // We have to update the seekKey here, because decoding may change the it.Key(), which should not be allowed. + // TODO: make sure decoding don't modify the original data. + seekKey = it.Key().PrefixNext() + if err != nil { + return nil, errors.Trace(err) + } + if !it.Valid() || it.Key().Cmp(ran.EndKey) >= 0 { + break + } + datums, err := tablecodec.DecodeIndexKey(it.Key()) + if err != nil { + return nil, errors.Trace(err) + } + var handle types.Datum + if len(datums) > len(idxInfo.Columns) { + handle = datums[len(idxInfo.Columns)] + datums = datums[:len(idxInfo.Columns)] + } else { + var intHandle int64 + intHandle, err = decodeHandle(it.Value()) + if err != nil { + return nil, errors.Trace(err) + } + handle.SetInt64(intHandle) + } + data, err := codec.EncodeValue(nil, datums...) + if err != nil { + return nil, errors.Trace(err) + } + handleData, err := codec.EncodeValue(nil, handle) + if err != nil { + return nil, errors.Trace(err) + } + row := &tipb.Row{Handle: handleData, Data: data} + rows = append(rows, row) + } + return rows, nil +} + +func datumStrings(datums ...types.Datum) []string { + var strs []string + for _, d := range datums { + s, _ := d.ToString() + strs = append(strs, s) + } + return strs +} + +func decodeHandle(data []byte) (int64, error) { + var h int64 + buf := bytes.NewBuffer(data) + err := binary.Read(buf, binary.BigEndian, &h) + return h, errors.Trace(err) +} + +func buildLocalRegionServers(store *dbStore) []*localRegion { + return []*localRegion{ + { + id: 1, + store: store, + startKey: []byte(""), + endKey: []byte("t"), + }, + { + id: 2, + store: store, + startKey: []byte("t"), + endKey: []byte("u"), + }, + { + id: 3, + store: store, + startKey: []byte("u"), + endKey: []byte("z"), + }, + } +} diff --git a/store/localstore/txn.go b/store/localstore/txn.go index af3b62cc055f0..e8d602c0b017a 100644 --- a/store/localstore/txn.go +++ b/store/localstore/txn.go @@ -52,12 +52,12 @@ func newTxn(s *dbStore, ver kv.Version) *dbTxn { // Implement transaction interface func (txn *dbTxn) Get(k kv.Key) ([]byte, error) { - log.Debugf("[kv] get key:%q, txn:%d", k, txn.tid) + log.Debugf("[kv] get key:% x, txn:%d", k, txn.tid) return txn.us.Get(k) } func (txn *dbTxn) Set(k kv.Key, data []byte) error { - log.Debugf("[kv] set key:%q, txn:%d", k, txn.tid) + log.Debugf("[kv] set key:% x, txn:%d", k, txn.tid) txn.dirty = true return txn.us.Set(k, data) } @@ -67,12 +67,12 @@ func (txn *dbTxn) String() string { } func (txn *dbTxn) Seek(k kv.Key) (kv.Iterator, error) { - log.Debugf("[kv] seek key:%q, txn:%d", k, txn.tid) + log.Debugf("[kv] seek key:% x, txn:%d", k, txn.tid) return txn.us.Seek(k) } func (txn *dbTxn) Delete(k kv.Key) error { - log.Debugf("[kv] delete key:%q, txn:%d", k, txn.tid) + log.Debugf("[kv] delete key:% x, txn:%d", k, txn.tid) txn.dirty = true return txn.us.Delete(k) } @@ -145,16 +145,5 @@ func (txn *dbTxn) StartTS() int64 { } func (txn *dbTxn) GetClient() kv.Client { - return &dbClient{} -} - -type dbClient struct { -} - -func (c *dbClient) SupportRequestType(reqType, subType int64) bool { - return false -} - -func (c *dbClient) Send(req *kv.Request) kv.Response { - return nil + return &dbClient{store: txn.store, regionInfo: txn.store.pd.GetRegionInfo()} } diff --git a/util/codec/codec_test.go b/util/codec/codec_test.go index 07ba0ab7c0db5..f10514862de8e 100644 --- a/util/codec/codec_test.go +++ b/util/codec/codec_test.go @@ -596,6 +596,7 @@ func (s *testCodecSuite) TestDecimal(c *C) { {"1234", "1234.0000", 0}, {"1234", "12.34", 1}, {"12.34", "12.35", -1}, + {"0.12", "0.1234", -1}, {"0.1234", "12.3400", -1}, {"0.1234", "0.1235", -1}, {"0.123400", "12.34", -1}, @@ -609,6 +610,8 @@ func (s *testCodecSuite) TestDecimal(c *C) { {"-0.0001", "0", -1}, {"-0.1234", "0", -1}, {"-0.1234", "-0.12", -1}, + {"-0.12", "-0.1234", 1}, + {"-0.12", "-0.1200", 0}, {"-0.1234", "0.1234", -1}, {"-1.234", "-12.34", 1}, {"-0.1234", "-12.34", 1}, @@ -661,4 +664,16 @@ func (s *testCodecSuite) TestDecimal(c *C) { ret := bytes.Compare(b1, b2) c.Assert(ret, Equals, t.Ret) } + + floats := []float64{-123.45, -123.40, -23.45, -1.43, -0.93, -0.4333, -0.068, + -0.0099, 0, 0.001, 0.0012, 0.12, 1.2, 1.23, 123.3, 2424.242424} + var decs [][]byte + for i := range floats { + dec := mysql.NewDecimalFromFloat(floats[i]) + decs = append(decs, EncodeDecimal(nil, dec)) + } + for i := 0; i < len(decs)-1; i++ { + cmp := bytes.Compare(decs[i], decs[i+1]) + c.Assert(cmp, LessEqual, 0) + } } diff --git a/util/codec/decimal.go b/util/codec/decimal.go index 8da4c14293841..2e055fc219106 100644 --- a/util/codec/decimal.go +++ b/util/codec/decimal.go @@ -35,35 +35,10 @@ func codecSign(value int64) int64 { return positiveSign } -func encodeExp(expValue int64, expSign int64, valSign int64) int64 { - if expSign == negativeSign { - expValue = -expValue - } - - if expSign != valSign { - expValue = ^expValue - } - - return expValue -} - -func decodeExp(expValue int64, expSign int64, valSign int64) int64 { - if expSign != valSign { - expValue = ^expValue - } - - if expSign == negativeSign { - expValue = -expValue - } - - return expValue -} - // EncodeDecimal encodes a decimal d into a byte slice which can be sorted lexicographically later. // EncodeDecimal guarantees that the encoded value is in ascending order for comparison. // Decimal encoding: // Byte -> value sign -// Byte -> exp sign // EncodeInt -> exp value // EncodeBytes -> abs value bytes func EncodeDecimal(b []byte, d mysql.Decimal) []byte { @@ -97,13 +72,11 @@ func EncodeDecimal(b []byte, d mysql.Decimal) []byte { } expVal := exp + int64(d.Exponent()) - expSign := codecSign(expVal) - - // For negtive exp, do bit reverse for exp. - expVal = encodeExp(expVal, expSign, valSign) + if valSign == negativeSign { + expVal = -expVal + } b = append(b, byte(valSign)) - b = append(b, byte(expSign)) b = EncodeInt(b, expVal) if valSign == negativeSign { b = EncodeBytesDesc(b, value) @@ -135,21 +108,17 @@ func DecodeDecimal(b []byte) ([]byte, mysql.Decimal, error) { return r, d, errors.Trace(err) } - // Decode exp sign. - expSign := int64(r[0]) - r = r[1:] - // Decode exp value. expVal := int64(0) r, expVal, err = DecodeInt(r) if err != nil { return r, d, errors.Trace(err) } - expVal = decodeExp(expVal, expSign, valSign) // Decode abs value bytes. value := []byte{} if valSign == negativeSign { + expVal = -expVal r, value, err = DecodeBytesDesc(r) } else { r, value, err = DecodeBytes(r) diff --git a/xapi/tablecodec/tablecodec.go b/xapi/tablecodec/tablecodec.go index b604d26b4e7d1..c38101d491549 100644 --- a/xapi/tablecodec/tablecodec.go +++ b/xapi/tablecodec/tablecodec.go @@ -14,10 +14,9 @@ package tablecodec import ( - "sort" + "bytes" "time" - "bytes" "github.com/golang/protobuf/proto" "github.com/juju/errors" "github.com/pingcap/tidb/kv" @@ -49,10 +48,10 @@ func EncodeRowKey(tableID int64, encodedHandle []byte) kv.Key { } // EncodeColumnKey encodes the table id, row handle and columnID into a kv.Key -func EncodeColumnKey(tableID int64, encodedHandle []byte, columnID int64) kv.Key { +func EncodeColumnKey(tableID int64, handle int64, columnID int64) kv.Key { buf := make([]byte, 0, recordRowKeyLen+idLen) buf = appendTableRecordPrefix(buf, tableID) - buf = append(buf, encodedHandle...) + buf = codec.EncodeInt(buf, handle) buf = codec.EncodeInt(buf, columnID) return buf } @@ -85,7 +84,7 @@ func DecodeRowKey(key kv.Key) (handle int64, err error) { } // DecodeValues decodes a byte slice into datums with column types. -func DecodeValues(data []byte, fts []*types.FieldType) ([]types.Datum, error) { +func DecodeValues(data []byte, fts []*types.FieldType, inIndex bool) ([]types.Datum, error) { values, err := codec.Decode(data) if err != nil { return nil, errors.Trace(err) @@ -93,6 +92,11 @@ func DecodeValues(data []byte, fts []*types.FieldType) ([]types.Datum, error) { if len(values) > len(fts) { return nil, errors.Errorf("invalid column count %d is less than value count %d", len(fts), len(values)) } + if inIndex { + // We don't need to unflatten index columns for now. + return values, nil + } + for i := range values { values[i], err = unflatten(values[i], fts[i]) if err != nil { @@ -124,7 +128,7 @@ func unflatten(datum types.Datum, ft *types.FieldType) (types.Datum, error) { if err != nil { return datum, errors.Trace(err) } - datum.SetValue(t) + datum.SetMysqlTime(&t) return datum, nil case mysql.TypeDuration: dur := mysql.Duration{Duration: time.Duration(datum.GetInt64())} @@ -160,28 +164,18 @@ func unflatten(datum types.Datum, ft *types.FieldType) (types.Datum, error) { } // EncodeIndexSeekKey encodes an index value to kv.Key. -func EncodeIndexSeekKey(tableID int64, encodedValue []byte) kv.Key { +func EncodeIndexSeekKey(tableID int64, idxID int64, encodedValue []byte) kv.Key { key := make([]byte, 0, prefixLen+len(encodedValue)) key = appendTableIndexPrefix(key, tableID) + key = codec.EncodeInt(key, idxID) key = append(key, encodedValue...) return key } -// IndexRowData extracts the row data and handle in an index key. -// If the index is unique, handle would be nil. -func IndexRowData(key kv.Key, columnCount int) (data, handle []byte, err error) { - b := key[prefixLen:] - // The index key may have primary key appended to the end, we decode the column values - // only to get the column values length. - for columnCount > 0 { - b, _, err = codec.DecodeOne(b) - if err != nil { - return nil, nil, errors.Trace(err) - } - columnCount-- - } - handle = b - return key[prefixLen : len(key)-len(handle)], handle, nil +// DecodeIndexKey decodes datums from an index key. +func DecodeIndexKey(key kv.Key) ([]types.Datum, error) { + b := key[prefixLen+idLen:] + return codec.Decode(b) } // Record prefix is "t[tableID]_r". @@ -200,17 +194,6 @@ func appendTableIndexPrefix(buf []byte, tableID int64) []byte { return buf } -type int64Slice []int64 - -func (p int64Slice) Len() int { return len(p) } -func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] } -func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } - -// SortInt64Slice sorts an int64 slice. -func SortInt64Slice(s []int64) { - sort.Sort(int64Slice(s)) -} - func columnToProto(c *model.ColumnInfo) *tipb.ColumnInfo { pc := &tipb.ColumnInfo{ ColumnId: proto.Int64(c.ID), @@ -275,8 +258,8 @@ func IndexToProto(t *model.TableInfo, idx *model.IndexInfo) *tipb.IndexInfo { Unique: proto.Bool(idx.Unique), } cols := make([]*tipb.ColumnInfo, 0, len(idx.Columns)) - for _, c := range t.Columns { - cols = append(cols, columnToProto(c)) + for _, c := range idx.Columns { + cols = append(cols, columnToProto(t.Columns[c.Offset])) } pi.Columns = cols return pi @@ -298,12 +281,12 @@ func EncodeTableRanges(tid int64, rans []*tipb.KeyRange) []kv.KeyRange { } // EncodeIndexRanges encodes index ranges into kv.KeyRanges. -func EncodeIndexRanges(tid int64, rans []*tipb.KeyRange) []kv.KeyRange { +func EncodeIndexRanges(tid, idxID int64, rans []*tipb.KeyRange) []kv.KeyRange { keyRanges := make([]kv.KeyRange, 0, len(rans)) for _, r := range rans { // Convert range to kv.KeyRange - start := EncodeIndexSeekKey(tid, r.Low) - end := EncodeIndexSeekKey(tid, r.High) + start := EncodeIndexSeekKey(tid, idxID, r.Low) + end := EncodeIndexSeekKey(tid, idxID, r.High) nr := kv.KeyRange{ StartKey: start, EndKey: end, diff --git a/xapi/tablecodec/tablecodec_test.go b/xapi/tablecodec/tablecodec_test.go index 858d833aba119..cdc75b0069508 100644 --- a/xapi/tablecodec/tablecodec_test.go +++ b/xapi/tablecodec/tablecodec_test.go @@ -35,7 +35,7 @@ func (s *tableCodecSuite) TestTableCodec(c *C) { c.Assert(err, IsNil) c.Assert(h, Equals, int64(2)) - key = EncodeColumnKey(1, codec.EncodeInt(nil, 2), 3) + key = EncodeColumnKey(1, 2, 3) h, err = DecodeRowKey(key) c.Assert(err, IsNil) c.Assert(h, Equals, int64(2)) diff --git a/xapi/xapi.go b/xapi/xapi.go index 3a87aa6d4bf26..33235d24a21e6 100644 --- a/xapi/xapi.go +++ b/xapi/xapi.go @@ -28,6 +28,7 @@ import ( // SelectResult is used to get response rows from SelectRequest. type SelectResult struct { + index bool fields []*types.FieldType resp kv.Response } @@ -43,6 +44,7 @@ func (r *SelectResult) Next() (subResult *SubResult, err error) { return nil, nil } subResult = &SubResult{ + index: r.index, fields: r.fields, reader: reader, } @@ -56,6 +58,7 @@ func (r *SelectResult) Close() error { // SubResult represents a subset of select result. type SubResult struct { + index bool fields []*types.FieldType reader io.ReadCloser resp *tipb.SelectResponse @@ -77,20 +80,24 @@ func (r *SubResult) Next() (handle int64, data []types.Datum, err error) { if err != nil { return 0, nil, errors.Trace(err) } + if r.resp.Error != nil { + return 0, nil, errors.Errorf("[%d %s]", r.resp.Error.GetCode(), r.resp.Error.GetMsg()) + } } if r.cursor >= len(r.resp.Rows) { return 0, nil, nil } row := r.resp.Rows[r.cursor] - data, err = tablecodec.DecodeValues(row.Data, r.fields) + data, err = tablecodec.DecodeValues(row.Data, r.fields, r.index) if err != nil { return 0, nil, errors.Trace(err) } handleBytes := row.GetHandle() - _, handle, err = codec.DecodeInt(handleBytes) + datums, err := codec.Decode(handleBytes) if err != nil { return 0, nil, errors.Trace(err) } + handle = datums[0].GetInt64() r.cursor++ return } @@ -111,14 +118,14 @@ func Select(client kv.Client, req *tipb.SelectRequest, concurrency int) (*Select if resp == nil { return nil, errors.New("client returns nil response") } - var columns []*tipb.ColumnInfo + result := &SelectResult{resp: resp} if req.TableInfo != nil { - columns = req.TableInfo.Columns + result.fields = tablecodec.ProtoColumnsToFieldTypes(req.TableInfo.Columns) } else { - columns = req.IndexInfo.Columns + result.fields = tablecodec.ProtoColumnsToFieldTypes(req.IndexInfo.Columns) + result.index = true } - fields := tablecodec.ProtoColumnsToFieldTypes(columns) - return &SelectResult{fields: fields, resp: resp}, nil + return result, nil } // Convert tipb.Request to kv.Request. @@ -129,7 +136,8 @@ func composeRequest(req *tipb.SelectRequest, concurrency int) (*kv.Request, erro if req.IndexInfo != nil { kvReq.Tp = kv.ReqTypeIndex tid := req.IndexInfo.GetTableId() - kvReq.KeyRanges = tablecodec.EncodeIndexRanges(tid, req.Ranges) + idxID := req.IndexInfo.GetIndexId() + kvReq.KeyRanges = tablecodec.EncodeIndexRanges(tid, idxID, req.Ranges) } else { kvReq.Tp = kv.ReqTypeSelect tid := req.GetTableInfo().GetTableId()