Skip to content

Commit

Permalink
store/localstore: add local store xapi support.
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood committed Mar 29, 2016
1 parent 9ffdc69 commit 1bd25a1
Show file tree
Hide file tree
Showing 16 changed files with 727 additions and 125 deletions.
121 changes: 105 additions & 16 deletions executor/executor_xapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func resultRowToRow(t table.Table, h int64, data []types.Datum) *Row {
}

func (e *XSelectTableExec) doRequest() error {
// TODO: add offset and limit.
txn, err := e.ctx.GetTxn(false)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -138,6 +139,9 @@ func (e *XSelectIndexExec) Next() (*Row, error) {
return nil, nil
}
row := e.rows[e.cursor]
for i, field := range e.indexPlan.Fields() {
field.Expr.SetDatum(row.Data[i])
}
e.cursor++
return row, nil
}
Expand All @@ -160,40 +164,61 @@ func (e *XSelectIndexExec) doRequest() error {
if err != nil {
return errors.Trace(err)
}
indexOrder := make(map[int64]int)
for i, h := range handles {
indexOrder[h] = i
if len(handles) == 0 {
return nil
}

var indexOrder map[int64]int
if !e.indexPlan.OutOfOrder {
// Save the index order.
indexOrder = make(map[int64]int)
for i, h := range handles {
indexOrder[h] = i
}
}

sort.Sort(int64Slice(handles))
tblResult, err := e.doTableRequest(txn, handles)
unorderedRows, err := extractRowsFromTableResult(e.table, tblResult)
rows, err := extractRowsFromTableResult(e.table, tblResult)
if err != nil {
return errors.Trace(err)
}
// Restore the original index order.
rows := make([]*Row, len(handles))
for i, h := range handles {
oi := indexOrder[h]
rows[oi] = unorderedRows[i]
if len(rows) < len(handles) {
return errors.Errorf("got %d rows with %d handles", len(rows), len(handles))
}
if !e.indexPlan.OutOfOrder {
// Restore the index order.
orderedRows := make([]*Row, len(handles))
for i, h := range handles {
oi := indexOrder[h]
orderedRows[oi] = rows[i]
}
rows = orderedRows
}
e.rows = rows
return nil
}

func (e *XSelectIndexExec) doIndexRequest(txn kv.Transaction) (*xapi.SelectResult, error) {
// TODO: add offset and limit.
selIdxReq := new(tipb.SelectRequest)
startTs := txn.StartTS()
selIdxReq.StartTs = &startTs
selIdxReq.IndexInfo = tablecodec.IndexToProto(e.table.Meta(), e.indexPlan.Index)
fieldTypes := make([]*types.FieldType, len(e.indexPlan.Index.Columns))
for i, v := range e.indexPlan.Index.Columns {
fieldTypes[i] = &(e.table.Cols()[v.Offset].FieldType)
}
var err error
selIdxReq.Ranges, err = indexRangesToPBRanges(e.indexPlan.Ranges)
selIdxReq.Ranges, err = indexRangesToPBRanges(e.indexPlan.Ranges, fieldTypes)
if err != nil {
return nil, errors.Trace(err)
}
return xapi.Select(txn.GetClient(), selIdxReq, 1)
}

func (e *XSelectIndexExec) doTableRequest(txn kv.Transaction, handles []int64) (*xapi.SelectResult, error) {
// TODO: add offset and limit.
selTableReq := new(tipb.SelectRequest)
startTs := txn.StartTS()
selTableReq.StartTs = &startTs
Expand All @@ -206,7 +231,7 @@ func (e *XSelectIndexExec) doTableRequest(txn kv.Transaction, handles []int64) (
}
pbRange := new(tipb.KeyRange)
pbRange.Low = codec.EncodeInt(nil, h)
pbRange.High = codec.EncodeInt(nil, h)
pbRange.High = kv.Key(pbRange.Low).PrefixNext()
selTableReq.Ranges = append(selTableReq.Ranges, pbRange)
}
selTableReq.Where = conditionsToPBExpression(e.indexPlan.FilterConditions...)
Expand Down Expand Up @@ -236,28 +261,92 @@ func tableRangesToPBRanges(tableRanges []plan.TableRange) []*tipb.KeyRange {
return hrs
}

func indexRangesToPBRanges(ranges []*plan.IndexRange) ([]*tipb.KeyRange, error) {
func indexRangesToPBRanges(ranges []*plan.IndexRange, fieldTypes []*types.FieldType) ([]*tipb.KeyRange, error) {
keyRanges := make([]*tipb.KeyRange, 0, len(ranges))
for _, ran := range ranges {
low, err := codec.EncodeValue(nil, ran.LowVal...)
err := convertIndexRangeTypes(ran, fieldTypes)
if err != nil {
return nil, errors.Trace(err)
}
low, err := codec.EncodeKey(nil, ran.LowVal...)
if err != nil {
return nil, errors.Trace(err)
}
if ran.LowExclude {
low = []byte(kv.Key(low).PartialNext())
low = []byte(kv.Key(low).PrefixNext())
}
high, err := codec.EncodeValue(nil, ran.HighVal...)
high, err := codec.EncodeKey(nil, ran.HighVal...)
if err != nil {
return nil, errors.Trace(err)
}
if !ran.HighExclude {
high = []byte(kv.Key(high).PartialNext())
high = []byte(kv.Key(high).PrefixNext())
}
keyRanges = append(keyRanges, &tipb.KeyRange{Low: low, High: high})
}
return keyRanges, nil
}

func convertIndexRangeTypes(ran *plan.IndexRange, fieldTypes []*types.FieldType) error {
for i := range ran.LowVal {
if ran.LowVal[i].Kind() == types.KindMinNotNull {
ran.LowVal[i].SetBytes([]byte{})
continue
}
converted, err := ran.LowVal[i].ConvertTo(fieldTypes[i])
if err != nil {
return errors.Trace(err)
}
cmp, err := converted.CompareDatum(ran.LowVal[i])
if err != nil {
return errors.Trace(err)
}
ran.LowVal[i] = converted
if cmp == 0 {
continue
}
if cmp < 0 && !ran.LowExclude {
// For int column a, a >= 1.1 is converted to a > 1.
ran.LowExclude = true
} else if cmp > 0 && ran.LowExclude {
// For int column a, a > 1.9 is converted to a >= 2.
ran.LowExclude = false
}
// The converted value has changed, the other column values doesn't matter.
// For equal condition, converted value changed means there will be no match.
// For non equal condition, this column would be the last one to build the range.
// Break here to prevent the rest columns modify LowExclude again.
break
}
for i := range ran.HighVal {
if ran.HighVal[i].Kind() == types.KindMaxValue {
continue
}
converted, err := ran.HighVal[i].ConvertTo(fieldTypes[i])
if err != nil {
return errors.Trace(err)
}
cmp, err := converted.CompareDatum(ran.HighVal[i])
if err != nil {
return errors.Trace(err)
}
ran.HighVal[i] = converted
if cmp == 0 {
continue
}
// For int column a, a < 1.1 is converted to a <= 1.
if cmp < 0 && ran.HighExclude {
ran.HighExclude = false
}
// For int column a, a <= 1.9 is converted to a < 2.
if cmp > 0 && !ran.HighExclude {
ran.HighExclude = true
}
break
}
return nil
}

func extractHandlesFromIndexResult(idxResult *xapi.SelectResult) ([]int64, error) {
var handles []int64
for {
Expand Down
19 changes: 12 additions & 7 deletions kv/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,18 @@ func (k Key) Next() Key {
return buf
}

// PartialNext returns the next partial key.
// For example, a key composed with two columns.
// Next will return a key with the same first column value,
// but PartialNext will return a key with different first column value.
// key encoding method must ensure the next different value has the
// same length as the original value.
func (k Key) PartialNext() Key {
// PrefixNext returns the next prefix key.
//
// Assume there are keys like:
//
// rowkey1
// rowkey1_column1
// rowkey1_column2
// rowKey2
//
// If we seek 'rowkey1' Next, we will get 'rowkey1_colum1'.
// If we seek 'rowkey1' PrefixNext, we will get 'rowkey2'.
func (k Key) PrefixNext() Key {
buf := make([]byte, len([]byte(k)))
copy(buf, []byte(k))
var i int
Expand Down
2 changes: 1 addition & 1 deletion kv/key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (s *testKeySuite) TestPartialNext(c *C) {
c.Assert(cmp, Equals, -1)

// Use next partial key, we can skip all index keys with first column value equal to "abc".
nextPartialKey := Key(seekKey).PartialNext()
nextPartialKey := Key(seekKey).PrefixNext()
cmp = bytes.Compare(nextPartialKey, keyA)
c.Assert(cmp, Equals, 1)

Expand Down
12 changes: 11 additions & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@

package kv

import "io"
import (
"bytes"
"io"
)

const (
// PresumeKeyNotExists directives that when dealing with a Get operation but failing to read data from cache,
Expand Down Expand Up @@ -97,6 +100,8 @@ type Client interface {
const (
ReqTypeSelect = 101
ReqTypeIndex = 102

ReqSubTypeBasic = 0
)

// KeyRange represents a range where StartKey <= key < EndKey.
Expand All @@ -105,6 +110,11 @@ type KeyRange struct {
EndKey Key
}

// IsPoint checks if the key range represents a point.
func (r *KeyRange) IsPoint() bool {
return bytes.Equal(r.StartKey.PrefixNext(), r.EndKey)
}

// Request represents a kv request.
type Request struct {
// The request type.
Expand Down
2 changes: 1 addition & 1 deletion optimizer/plan/planbuilder_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ func (b *planBuilder) buildJoin(sel *ast.SelectStmt) Plan {
if !path.attachCondition(whereCond, nil) {
// TODO: Find a better way to handle this condition.
path.conditions = append(path.conditions, whereCond)
log.Errorf("Failed to attach where condtion.")
log.Warnf("Failed to attach where condtion in %s", sel.Text())
}
}
path.extractEqualConditon()
Expand Down
3 changes: 3 additions & 0 deletions optimizer/plan/plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ type IndexScan struct {

// FilterConditions can be used to filter result.
FilterConditions []ast.ExprNode

// OutOfOrder indicates if the index scan can return out of order.
OutOfOrder bool
}

// Accept implements Plan Accept interface.
Expand Down
10 changes: 9 additions & 1 deletion store/localstore/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ type dbStore struct {

mu sync.Mutex
closed bool

pd localPD
}

type storeCache struct {
Expand Down Expand Up @@ -354,8 +356,14 @@ func (d Driver) Open(path string) (kv.Storage, error) {
s.recentUpdates, err = segmentmap.NewSegmentMap(100)
if err != nil {
return nil, errors.Trace(err)

}
regionServers := buildLocalRegionServers(s)
var infos []*regionInfo
for _, rs := range regionServers {
ri := &regionInfo{startKey: rs.startKey, endKey: rs.endKey, rs: rs}
infos = append(infos, ri)
}
s.pd.SetRegionInfo(infos)
mc.cache[engineSchema] = s
s.compactor.Start()
s.wg.Add(1)
Expand Down
Loading

0 comments on commit 1bd25a1

Please sign in to comment.