Skip to content

Commit

Permalink
Avoid using tipb.KeyRange (pingcap#1685)
Browse files Browse the repository at this point in the history
Do not use tipb.KeyRange.
Use EncodeRowKeyWithHandle to generate endKey.
Avoid decode values when only need handle.
  • Loading branch information
shenli authored Sep 5, 2016
1 parent b9f67a8 commit 232605a
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 150 deletions.
71 changes: 38 additions & 33 deletions executor/executor_xapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/parser/opcode"
"github.com/pingcap/tidb/plan"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/types"
"github.com/pingcap/tidb/xapi"
Expand Down Expand Up @@ -101,23 +102,36 @@ func (s *rowsSorter) Swap(i, j int) {
s.rows[i], s.rows[j] = s.rows[j], s.rows[i]
}

func tableRangesToPBRanges(tableRanges []plan.TableRange) []*tipb.KeyRange {
hrs := make([]*tipb.KeyRange, 0, len(tableRanges))
func tableRangesToKVRanges(tid int64, tableRanges []plan.TableRange) []kv.KeyRange {
krs := make([]kv.KeyRange, 0, len(tableRanges))
for _, tableRange := range tableRanges {
pbRange := new(tipb.KeyRange)
pbRange.Low = codec.EncodeInt(nil, tableRange.LowVal)
startKey := tablecodec.EncodeRowKeyWithHandle(tid, tableRange.LowVal)
hi := tableRange.HighVal
if hi != math.MaxInt64 {
hi++
}
pbRange.High = codec.EncodeInt(nil, hi)
hrs = append(hrs, pbRange)
endKey := tablecodec.EncodeRowKeyWithHandle(tid, hi)
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
}
return hrs
return krs
}

func indexRangesToPBRanges(ranges []*plan.IndexRange, fieldTypes []*types.FieldType) ([]*tipb.KeyRange, error) {
keyRanges := make([]*tipb.KeyRange, 0, len(ranges))
func tableHandlesToKVRanges(tid int64, handles []int64) []kv.KeyRange {
krs := make([]kv.KeyRange, 0, len(handles))
for _, h := range handles {
if h == math.MaxInt64 {
// We can't convert MaxInt64 into an left closed, right open range.
continue
}
startKey := tablecodec.EncodeRowKeyWithHandle(tid, h)
endKey := tablecodec.EncodeRowKeyWithHandle(tid, h+1)
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
}
return krs
}

func indexRangesToKVRanges(tid, idxID int64, ranges []*plan.IndexRange, fieldTypes []*types.FieldType) ([]kv.KeyRange, error) {
krs := make([]kv.KeyRange, 0, len(ranges))
for _, ran := range ranges {
err := convertIndexRangeTypes(ran, fieldTypes)
if err != nil {
Expand All @@ -138,9 +152,11 @@ func indexRangesToPBRanges(ranges []*plan.IndexRange, fieldTypes []*types.FieldT
if !ran.HighExclude {
high = []byte(kv.Key(high).PrefixNext())
}
keyRanges = append(keyRanges, &tipb.KeyRange{Low: low, High: high})
startKey := tablecodec.EncodeIndexSeekKey(tid, idxID, low)
endKey := tablecodec.EncodeIndexSeekKey(tid, idxID, high)
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
}
return keyRanges, nil
return krs, nil
}

func convertIndexRangeTypes(ran *plan.IndexRange, fieldTypes []*types.FieldType) error {
Expand Down Expand Up @@ -753,6 +769,7 @@ func (e *XSelectIndexExec) nextForDoubleRead() (*Row, error) {
if err != nil {
return nil, errors.Trace(err)
}
idxResult.IgnoreData()
idxResult.Fetch()

// Use a background goroutine to fetch index, put the result in e.tasks.
Expand Down Expand Up @@ -842,11 +859,6 @@ func (e *XSelectIndexExec) doIndexRequest() (xapi.SelectResult, error) {
for i, v := range e.indexPlan.Index.Columns {
fieldTypes[i] = &(e.table.Cols()[v.Offset].FieldType)
}
var err error
selIdxReq.Ranges, err = indexRangesToPBRanges(e.indexPlan.Ranges, fieldTypes)
if err != nil {
return nil, errors.Trace(err)
}
if !e.indexPlan.DoubleRead || e.where == nil {
// TODO: when where condition is all index columns limit can be pushed too.
selIdxReq.Limit = e.indexPlan.LimitCount
Expand All @@ -860,7 +872,11 @@ func (e *XSelectIndexExec) doIndexRequest() (xapi.SelectResult, error) {
} else if e.indexPlan.OutOfOrder {
concurrency = defaultConcurrency
}
return xapi.Select(e.txn.GetClient(), selIdxReq, concurrency, !e.indexPlan.OutOfOrder)
keyRanges, err := indexRangesToKVRanges(e.table.Meta().ID, e.indexPlan.Index.ID, e.indexPlan.Ranges, fieldTypes)
if err != nil {
return nil, errors.Trace(err)
}
return xapi.Select(e.txn.GetClient(), selIdxReq, keyRanges, concurrency, !e.indexPlan.OutOfOrder)
}

func (e *XSelectIndexExec) buildTableTasks(handles []int64) []*lookupTableTask {
Expand Down Expand Up @@ -980,24 +996,12 @@ func (e *XSelectIndexExec) doTableRequest(handles []int64) (xapi.SelectResult, e
TableId: e.table.Meta().ID,
}
selTableReq.TableInfo.Columns = xapi.ColumnsToProto(e.indexPlan.Columns, e.table.Meta().PKIsHandle)
selTableReq.Ranges = make([]*tipb.KeyRange, 0, len(handles))
for _, h := range handles {
if h == math.MaxInt64 {
// We can't convert MaxInt64 into an left closed, right open range.
continue
}
pbRange := new(tipb.KeyRange)
bs := make([]byte, 0, 8)
pbRange.Low = codec.EncodeInt(bs, h)
pbRange.High = kv.Key(pbRange.Low).PrefixNext()
selTableReq.Ranges = append(selTableReq.Ranges, pbRange)
}
selTableReq.Where = e.where
// Aggregate Info
selTableReq.Aggregates = e.aggFuncs
selTableReq.GroupBy = e.byItems
// Aggregate Info
resp, err := xapi.Select(e.txn.GetClient(), selTableReq, defaultConcurrency, false)
keyRanges := tableHandlesToKVRanges(e.table.Meta().ID, handles)
resp, err := xapi.Select(e.txn.GetClient(), selTableReq, keyRanges, defaultConcurrency, false)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -1063,7 +1067,6 @@ func (e *XSelectTableExec) doRequest() error {
selReq := new(tipb.SelectRequest)
selReq.StartTs = e.txn.StartTS()
selReq.Where = e.where
selReq.Ranges = tableRangesToPBRanges(e.ranges)
columns := e.Columns
selReq.TableInfo = &tipb.TableInfo{
TableId: e.tableInfo.ID,
Expand All @@ -1076,7 +1079,9 @@ func (e *XSelectTableExec) doRequest() error {
// Aggregate Info
selReq.Aggregates = e.aggFuncs
selReq.GroupBy = e.byItems
e.result, err = xapi.Select(e.txn.GetClient(), selReq, defaultConcurrency, e.keepOrder)

kvRanges := tableRangesToKVRanges(e.table.Meta().ID, e.ranges)
e.result, err = xapi.Select(e.txn.GetClient(), selReq, kvRanges, defaultConcurrency, e.keepOrder)
if err != nil {
return errors.Trace(err)
}
Expand Down
1 change: 1 addition & 0 deletions store/localstore/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func buildRegionTasks(client *dbClient, req *kv.Request) (tasks []*task) {
startKey: info.startKey,
endKey: info.endKey,
data: req.Data,
ranges: req.KeyRanges,
}
task := &task{
region: info.rs,
Expand Down
45 changes: 15 additions & 30 deletions store/localstore/local_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type regionRequest struct {
data []byte
startKey []byte
endKey []byte
ranges []kv.KeyRange
}

type regionResponse struct {
Expand All @@ -51,6 +52,7 @@ type selectContext struct {
groupKeys [][]byte
aggregates []*aggregateFuncExpr
aggregate bool
keyRanges []kv.KeyRange

// Use for DecodeRow.
colTps map[int64]*types.FieldType
Expand All @@ -68,8 +70,9 @@ func (rs *localRegion) Handle(req *regionRequest) (*regionResponse, error) {
}
txn := newTxn(rs.store, kv.Version{Ver: uint64(sel.StartTs)})
ctx := &selectContext{
sel: sel,
txn: txn,
sel: sel,
txn: txn,
keyRanges: req.ranges,
}
ctx.eval = &xeval.Evaluator{Row: make(map[int64]types.Datum)}
if sel.Where != nil {
Expand Down Expand Up @@ -169,7 +172,7 @@ func (rs *localRegion) getRowsFromSelectReq(ctx *selectContext) ([]*tipb.Row, er
ctx.colTps[col.GetColumnId()] = xapi.FieldTypeFromPBColumn(col)
}

kvRanges, desc := rs.extractKVRanges(ctx.sel)
kvRanges, desc := rs.extractKVRanges(ctx)
var rows []*tipb.Row
limit := int64(-1)
if ctx.sel.Limit != nil {
Expand Down Expand Up @@ -226,33 +229,15 @@ func (rs *localRegion) getRowsFromAgg(ctx *selectContext) ([]*tipb.Row, error) {
return rows, nil
}

// extractKVRanges extracts kv.KeyRanges slice from a SelectRequest, and also returns if it is in descending order.
func (rs *localRegion) extractKVRanges(sel *tipb.SelectRequest) (kvRanges []kv.KeyRange, desc bool) {
var (
tid int64
idxID int64
)
if sel.IndexInfo != nil {
tid = sel.IndexInfo.GetTableId()
idxID = sel.IndexInfo.GetIndexId()
} else {
tid = sel.TableInfo.GetTableId()
}
for _, kran := range sel.Ranges {
var upperKey, lowerKey kv.Key
if idxID == 0 {
upperKey = tablecodec.EncodeRowKey(tid, kran.GetHigh())
if bytes.Compare(upperKey, rs.startKey) <= 0 {
continue
}
lowerKey = tablecodec.EncodeRowKey(tid, kran.GetLow())
} else {
upperKey = tablecodec.EncodeIndexSeekKey(tid, idxID, kran.GetHigh())
if bytes.Compare(upperKey, rs.startKey) <= 0 {
continue
}
lowerKey = tablecodec.EncodeIndexSeekKey(tid, idxID, kran.GetLow())
// extractKVRanges extracts kv.KeyRanges slice from ctx.keyRanges, and also returns if it is in descending order.
func (rs *localRegion) extractKVRanges(ctx *selectContext) (kvRanges []kv.KeyRange, desc bool) {
sel := ctx.sel
for _, kran := range ctx.keyRanges {
upperKey := kran.EndKey
if bytes.Compare(upperKey, rs.startKey) <= 0 {
continue
}
lowerKey := kran.StartKey
if bytes.Compare(lowerKey, rs.endKey) >= 0 {
break
}
Expand Down Expand Up @@ -500,7 +485,7 @@ func toPBError(err error) *tipb.Error {
}

func (rs *localRegion) getRowsFromIndexReq(ctx *selectContext) ([]*tipb.Row, error) {
kvRanges, desc := rs.extractKVRanges(ctx.sel)
kvRanges, desc := rs.extractKVRanges(ctx)
var rows []*tipb.Row
limit := int64(-1)
if ctx.sel.Limit != nil {
Expand Down
40 changes: 12 additions & 28 deletions store/tikv/mock-tikv/cop_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type selectContext struct {
groupKeys [][]byte
aggregates []*aggregateFuncExpr
aggregate bool
keyRanges []*coprocessor.KeyRange

// Use for DecodeRow.
colTps map[int64]*types.FieldType
Expand All @@ -61,7 +62,8 @@ func (h *rpcHandler) handleCopRequest(req *coprocessor.Request) (*coprocessor.Re
return nil, errors.Trace(err)
}
ctx := &selectContext{
sel: sel,
sel: sel,
keyRanges: req.Ranges,
}
ctx.eval = &xeval.Evaluator{Row: make(map[int64]types.Datum)}
if sel.Where != nil {
Expand Down Expand Up @@ -195,7 +197,7 @@ func (h *rpcHandler) getRowsFromSelectReq(ctx *selectContext) ([]*tipb.Row, erro
ctx.colTps[col.GetColumnId()] = xapi.FieldTypeFromPBColumn(col)
}

kvRanges, desc := h.extractKVRanges(ctx.sel)
kvRanges, desc := h.extractKVRanges(ctx)
var rows []*tipb.Row
limit := int64(-1)
if ctx.sel.Limit != nil {
Expand All @@ -219,32 +221,14 @@ func (h *rpcHandler) getRowsFromSelectReq(ctx *selectContext) ([]*tipb.Row, erro
}

// extractKVRanges extracts kv.KeyRanges slice from a SelectRequest, and also returns if it is in descending order.
func (h *rpcHandler) extractKVRanges(sel *tipb.SelectRequest) (kvRanges []kv.KeyRange, desc bool) {
var (
tid int64
idxID int64
)
if sel.IndexInfo != nil {
tid = sel.IndexInfo.GetTableId()
idxID = sel.IndexInfo.GetIndexId()
} else {
tid = sel.TableInfo.GetTableId()
}
for _, kran := range sel.Ranges {
var upperKey, lowerKey kv.Key
if idxID == 0 {
upperKey = tablecodec.EncodeRowKey(tid, kran.GetHigh())
if bytes.Compare(upperKey, h.startKey) <= 0 {
continue
}
lowerKey = tablecodec.EncodeRowKey(tid, kran.GetLow())
} else {
upperKey = tablecodec.EncodeIndexSeekKey(tid, idxID, kran.GetHigh())
if bytes.Compare(upperKey, h.startKey) <= 0 {
continue
}
lowerKey = tablecodec.EncodeIndexSeekKey(tid, idxID, kran.GetLow())
func (h *rpcHandler) extractKVRanges(ctx *selectContext) (kvRanges []kv.KeyRange, desc bool) {
sel := ctx.sel
for _, kran := range ctx.keyRanges {
upperKey := kran.GetEnd()
if bytes.Compare(upperKey, h.startKey) <= 0 {
continue
}
lowerKey := kran.GetStart()
if len(h.endKey) != 0 && bytes.Compare([]byte(lowerKey), h.endKey) >= 0 {
break
}
Expand Down Expand Up @@ -485,7 +469,7 @@ func (h *rpcHandler) evalWhereForRow(ctx *selectContext, handle int64, row map[i
}

func (h *rpcHandler) getRowsFromIndexReq(ctx *selectContext) ([]*tipb.Row, error) {
kvRanges, desc := h.extractKVRanges(ctx.sel)
kvRanges, desc := h.extractKVRanges(ctx)
var rows []*tipb.Row
limit := int64(-1)
if ctx.sel.Limit != nil {
Expand Down
41 changes: 41 additions & 0 deletions tablecodec/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tablecodec

import (
"testing"
)

func BenchmarkEncodeRowKeyWithHandle(b *testing.B) {
for i := 0; i < b.N; i++ {
EncodeRowKeyWithHandle(100, 100)
}
}

func BenchmarkEncodeEndKey(b *testing.B) {
for i := 0; i < b.N; i++ {
EncodeRowKeyWithHandle(100, 100)
EncodeRowKeyWithHandle(100, 101)
}
}

// PrefixNext() is slow than using EncodeRowKeyWithHandle.
// BenchmarkEncodeEndKey-4 20000000 97.2 ns/op
// BenchmarkEncodeRowKeyWithPrefixNex-4 10000000 121 ns/op
func BenchmarkEncodeRowKeyWithPrefixNex(b *testing.B) {
for i := 0; i < b.N; i++ {
sk := EncodeRowKeyWithHandle(100, 100)
sk.PrefixNext()
}
}
2 changes: 1 addition & 1 deletion tablecodec/tablecodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func EncodeRowKey(tableID int64, encodedHandle []byte) kv.Key {
func EncodeRowKeyWithHandle(tableID int64, handle int64) kv.Key {
buf := make([]byte, 0, recordRowKeyLen+idLen)
buf = appendTableRecordPrefix(buf, tableID)
buf = EncodeRecordKey(buf, handle)
buf = codec.EncodeInt(buf, handle)
return buf
}

Expand Down
Loading

0 comments on commit 232605a

Please sign in to comment.