Skip to content

Commit

Permalink
store/tikv: key range refactor (pingcap#22610)
Browse files Browse the repository at this point in the history
Signed-off-by: disksing <[email protected]>
  • Loading branch information
disksing authored Jan 29, 2021
1 parent 4c8a416 commit c940176
Show file tree
Hide file tree
Showing 6 changed files with 376 additions and 306 deletions.
20 changes: 10 additions & 10 deletions store/tikv/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ type copTaskAndRPCContext struct {
ctx *RPCContext
}

func buildBatchCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, storeType kv.StoreType) ([]*batchCopTask, error) {
func buildBatchCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, storeType kv.StoreType) ([]*batchCopTask, error) {
start := time.Now()
const cmdType = tikvrpc.CmdBatchCop
rangesLen := ranges.len()
rangesLen := ranges.Len()
for {
var tasks []*copTask
appendTask := func(regionWithRangeInfo *KeyLocation, ranges *copRanges) {
appendTask := func(regionWithRangeInfo *KeyLocation, ranges *KeyRanges) {
tasks = append(tasks, &copTask{
region: regionWithRangeInfo.Region,
ranges: ranges,
Expand All @@ -108,7 +108,7 @@ func buildBatchCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, st
})
}

err := splitRanges(bo, cache, ranges, appendTask)
err := SplitKeyRanges(bo, cache, ranges, appendTask)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -171,7 +171,7 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *kv.Var
}
ctx = context.WithValue(ctx, txnStartKey, req.StartTs)
bo := NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
tasks, err := buildBatchCopTasks(bo, c.store.regionCache, &copRanges{mid: req.KeyRanges}, req.StoreType)
tasks, err := buildBatchCopTasks(bo, c.store.regionCache, NewKeyRanges(req.KeyRanges), req.StoreType)
if err != nil {
return copErrorResponse{err}
}
Expand Down Expand Up @@ -313,13 +313,13 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task *

// Merge all ranges and request again.
func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *Backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) {
ranges := &copRanges{}
var ranges []kv.KeyRange
for _, taskCtx := range batchTask.copTasks {
taskCtx.task.ranges.do(func(ran *kv.KeyRange) {
ranges.mid = append(ranges.mid, *ran)
taskCtx.task.ranges.Do(func(ran *kv.KeyRange) {
ranges = append(ranges, *ran)
})
}
return buildBatchCopTasks(bo, b.RegionCache, ranges, b.req.StoreType)
return buildBatchCopTasks(bo, b.RegionCache, NewKeyRanges(ranges), b.req.StoreType)
}

func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, task *batchCopTask) ([]*batchCopTask, error) {
Expand All @@ -332,7 +332,7 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, ta
ConfVer: task.task.region.confVer,
Version: task.task.region.ver,
},
Ranges: task.task.ranges.toPBRanges(),
Ranges: task.task.ranges.ToPBRanges(),
})
}

Expand Down
209 changes: 16 additions & 193 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@
package tikv

import (
"bytes"
"context"
"fmt"
"io"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -67,7 +65,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variable
}
ctx = context.WithValue(ctx, txnStartKey, req.StartTs)
bo := NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
tasks, err := buildCopTasks(bo, c.store.regionCache, &copRanges{mid: req.KeyRanges}, req)
tasks, err := buildCopTasks(bo, c.store.regionCache, NewKeyRanges(req.KeyRanges), req)
if err != nil {
return copErrorResponse{err}
}
Expand Down Expand Up @@ -120,7 +118,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variable
// copTask contains a related Region and KeyRange for a kv.Request.
type copTask struct {
region RegionVerID
ranges *copRanges
ranges *KeyRanges

respChan chan *copResponse
storeAddr string
Expand All @@ -130,123 +128,13 @@ type copTask struct {

func (r *copTask) String() string {
return fmt.Sprintf("region(%d %d %d) ranges(%d) store(%s)",
r.region.id, r.region.confVer, r.region.ver, r.ranges.len(), r.storeAddr)
}

// copRanges is like []kv.KeyRange, but may has extra elements at head/tail.
// It's for avoiding alloc big slice during build copTask.
type copRanges struct {
first *kv.KeyRange
mid []kv.KeyRange
last *kv.KeyRange
}

func (r *copRanges) String() string {
var s string
r.do(func(ran *kv.KeyRange) {
s += fmt.Sprintf("[%q, %q]", ran.StartKey, ran.EndKey)
})
return s
}

func (r *copRanges) len() int {
var l int
if r.first != nil {
l++
}
l += len(r.mid)
if r.last != nil {
l++
}
return l
}

func (r *copRanges) at(i int) kv.KeyRange {
if r.first != nil {
if i == 0 {
return *r.first
}
i--
}
if i < len(r.mid) {
return r.mid[i]
}
return *r.last
}

func (r *copRanges) slice(from, to int) *copRanges {
var ran copRanges
if r.first != nil {
if from == 0 && to > 0 {
ran.first = r.first
}
if from > 0 {
from--
}
if to > 0 {
to--
}
}
if to <= len(r.mid) {
ran.mid = r.mid[from:to]
} else {
if from <= len(r.mid) {
ran.mid = r.mid[from:]
}
if from < to {
ran.last = r.last
}
}
return &ran
}

func (r *copRanges) do(f func(ran *kv.KeyRange)) {
if r.first != nil {
f(r.first)
}
for _, ran := range r.mid {
f(&ran)
}
if r.last != nil {
f(r.last)
}
}

func (r *copRanges) toPBRanges() []*coprocessor.KeyRange {
ranges := make([]*coprocessor.KeyRange, 0, r.len())
r.do(func(ran *kv.KeyRange) {
ranges = append(ranges, &coprocessor.KeyRange{
Start: ran.StartKey,
End: ran.EndKey,
})
})
return ranges
}

// split ranges into (left, right) by key.
func (r *copRanges) split(key []byte) (*copRanges, *copRanges) {
n := sort.Search(r.len(), func(i int) bool {
cur := r.at(i)
return len(cur.EndKey) == 0 || bytes.Compare(cur.EndKey, key) > 0
})
// If a range p contains the key, it will split to 2 parts.
if n < r.len() {
p := r.at(n)
if bytes.Compare(key, p.StartKey) > 0 {
left := r.slice(0, n)
left.last = &kv.KeyRange{StartKey: p.StartKey, EndKey: key}
right := r.slice(n+1, r.len())
right.first = &kv.KeyRange{StartKey: key, EndKey: p.EndKey}
return left, right
}
}
return r.slice(0, n), r.slice(n, r.len())
r.region.id, r.region.confVer, r.region.ver, r.ranges.Len(), r.storeAddr)
}

// rangesPerTask limits the length of the ranges slice sent in one copTask.
const rangesPerTask = 25000

func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, req *kv.Request) ([]*copTask, error) {
func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv.Request) ([]*copTask, error) {
start := time.Now()
cmdType := tikvrpc.CmdCop
if req.Streaming {
Expand All @@ -257,17 +145,17 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, req *kv
return buildTiDBMemCopTasks(ranges, req)
}

rangesLen := ranges.len()
rangesLen := ranges.Len()
var tasks []*copTask
appendTask := func(regionWithRangeInfo *KeyLocation, ranges *copRanges) {
appendTask := func(regionWithRangeInfo *KeyLocation, ranges *KeyRanges) {
// TiKV will return gRPC error if the message is too large. So we need to limit the length of the ranges slice
// to make sure the message can be sent successfully.
rLen := ranges.len()
rLen := ranges.Len()
for i := 0; i < rLen; {
nextI := mathutil.Min(i+rangesPerTask, rLen)
tasks = append(tasks, &copTask{
region: regionWithRangeInfo.Region,
ranges: ranges.slice(i, nextI),
ranges: ranges.Slice(i, nextI),
// Channel buffer is 2 for handling region split.
// In a common case, two region split tasks will not be blocked.
respChan: make(chan *copResponse, 2),
Expand All @@ -278,7 +166,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, req *kv
}
}

err := splitRanges(bo, cache, ranges, appendTask)
err := SplitKeyRanges(bo, cache, ranges, appendTask)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -296,7 +184,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, req *kv
return tasks, nil
}

func buildTiDBMemCopTasks(ranges *copRanges, req *kv.Request) ([]*copTask, error) {
func buildTiDBMemCopTasks(ranges *KeyRanges, req *kv.Request) ([]*copTask, error) {
servers, err := infosync.GetAllServerInfo(context.Background())
if err != nil {
return nil, err
Expand All @@ -323,71 +211,6 @@ func buildTiDBMemCopTasks(ranges *copRanges, req *kv.Request) ([]*copTask, error
return tasks, nil
}

func splitRanges(bo *Backoffer, cache *RegionCache, ranges *copRanges, fn func(regionWithRangeInfo *KeyLocation, ranges *copRanges)) error {
for ranges.len() > 0 {
loc, err := cache.LocateKey(bo, ranges.at(0).StartKey)
if err != nil {
return errors.Trace(err)
}

// Iterate to the first range that is not complete in the region.
var i int
for ; i < ranges.len(); i++ {
r := ranges.at(i)
if !(loc.Contains(r.EndKey) || bytes.Equal(loc.EndKey, r.EndKey)) {
break
}
}
// All rest ranges belong to the same region.
if i == ranges.len() {
fn(loc, ranges)
break
}

r := ranges.at(i)
if loc.Contains(r.StartKey) {
// Part of r is not in the region. We need to split it.
taskRanges := ranges.slice(0, i)
taskRanges.last = &kv.KeyRange{
StartKey: r.StartKey,
EndKey: loc.EndKey,
}
fn(loc, taskRanges)

ranges = ranges.slice(i+1, ranges.len())
ranges.first = &kv.KeyRange{
StartKey: loc.EndKey,
EndKey: r.EndKey,
}
} else {
// rs[i] is not in the region.
taskRanges := ranges.slice(0, i)
fn(loc, taskRanges)
ranges = ranges.slice(i, ranges.len())
}
}

return nil
}

// SplitRegionRanges get the split ranges from pd region.
func SplitRegionRanges(bo *Backoffer, cache *RegionCache, keyRanges []kv.KeyRange) ([]kv.KeyRange, error) {
ranges := copRanges{mid: keyRanges}

var ret []kv.KeyRange
appendRange := func(regionWithRangeInfo *KeyLocation, ranges *copRanges) {
for i := 0; i < ranges.len(); i++ {
ret = append(ret, ranges.at(i))
}
}

err := splitRanges(bo, cache, &ranges, appendRange)
if err != nil {
return nil, errors.Trace(err)
}
return ret, nil
}

func reverseTasks(tasks []*copTask) {
for i := 0; i < len(tasks)/2; i++ {
j := len(tasks) - i - 1
Expand Down Expand Up @@ -843,7 +666,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
Tp: worker.req.Tp,
StartTs: worker.req.StartTs,
Data: worker.req.Data,
Ranges: task.ranges.toPBRanges(),
Ranges: task.ranges.ToPBRanges(),
SchemaVer: worker.req.SchemaVar,
}

Expand Down Expand Up @@ -1144,8 +967,8 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon
// When the request is using streaming API, the `Range` is not nil.
if resp.pbResp.Range != nil {
resp.startKey = resp.pbResp.Range.Start
} else if task.ranges != nil && task.ranges.len() > 0 {
resp.startKey = task.ranges.at(0).StartKey
} else if task.ranges != nil && task.ranges.Len() > 0 {
resp.startKey = task.ranges.At(0).StartKey
}
if resp.detail == nil {
resp.detail = new(CopRuntimeStats)
Expand Down Expand Up @@ -1273,12 +1096,12 @@ func (worker *copIteratorWorker) buildCopTasksFromRemain(bo *Backoffer, lastRang
// split: [s1 --> s2)
// In normal scan order, all data before s1 is consumed, so the remain ranges should be [s1 --> r2) [r3 --> r4)
// In reverse scan order, all data after s2 is consumed, so the remain ranges should be [r1 --> r2) [r3 --> s2)
func (worker *copIteratorWorker) calculateRemain(ranges *copRanges, split *coprocessor.KeyRange, desc bool) *copRanges {
func (worker *copIteratorWorker) calculateRemain(ranges *KeyRanges, split *coprocessor.KeyRange, desc bool) *KeyRanges {
if desc {
left, _ := ranges.split(split.End)
left, _ := ranges.Split(split.End)
return left
}
_, right := ranges.split(split.Start)
_, right := ranges.Split(split.Start)
return right
}

Expand Down
Loading

0 comments on commit c940176

Please sign in to comment.