Skip to content

Commit

Permalink
distsql,store/tikv: SelectDAG function parameter refactor (pingcap#4645)
Browse files Browse the repository at this point in the history
* distsql,store/tikv: SelectDAG function parameter refactor

1. move some distsql.SelectDAG parameter to kv.Request struct
2. modify tikv.RPCContext struct, remove kvrpcpb.Context in it
3. let tikvrpc.Request struct share Context with its subcommand request
  • Loading branch information
tiancaiamao authored and coocood committed Sep 27, 2017
1 parent dd3b370 commit 5200745
Show file tree
Hide file tree
Showing 14 changed files with 179 additions and 131 deletions.
1 change: 0 additions & 1 deletion ddl/ddl_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1477,7 +1477,6 @@ func (s *testDBSuite) TestGeneratedColumnDDL(c *C) {
}

func (s *testDBSuite) TestComment(c *C) {
defer testleak.AfterTest(c)()
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use " + s.schemaName)
s.tk.MustExec("drop table if exists ct, ct1")
Expand Down
23 changes: 3 additions & 20 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,8 @@ func Select(client kv.Client, ctx goctx.Context, req *tipb.SelectRequest, keyRan
}

// SelectDAG sends a DAG request, returns SelectResult.
// concurrency: The max concurrency for underlying coprocessor request.
// keepOrder: If the result should returned in key order. For example if we need keep data in order by
// scan index, we should set keepOrder to true.
func SelectDAG(client kv.Client, ctx goctx.Context, dag *tipb.DAGRequest, keyRanges []kv.KeyRange, concurrency int, keepOrder bool, desc bool, isolationLevel kv.IsoLevel, priority int) (SelectResult, error) {
// In kvReq, KeyRanges is required, Concurrency/KeepOrder/Desc/IsolationLevel/Priority are optional.
func SelectDAG(ctx goctx.Context, client kv.Client, kvReq *kv.Request) (SelectResult, error) {
var err error
defer func() {
// Add metrics.
Expand All @@ -258,29 +256,14 @@ func SelectDAG(client kv.Client, ctx goctx.Context, dag *tipb.DAGRequest, keyRan
}
}()

kvReq := &kv.Request{
Tp: kv.ReqTypeDAG,
StartTs: dag.StartTs,
Concurrency: concurrency,
KeepOrder: keepOrder,
KeyRanges: keyRanges,
Desc: desc,
IsolationLevel: isolationLevel,
Priority: priority,
}
kvReq.Data, err = dag.Marshal()
if err != nil {
return nil, errors.Trace(err)
}

resp := client.Send(ctx, kvReq)
if resp == nil {
return nil, errors.New("client returns nil response")
}
result := &selectResult{
label: "dag",
resp: resp,
results: make(chan resultWithErr, concurrency),
results: make(chan resultWithErr, kvReq.Concurrency),
closed: make(chan struct{}),
}
return result, nil
Expand Down
139 changes: 127 additions & 12 deletions executor/new_distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/types"
Expand Down Expand Up @@ -133,9 +134,18 @@ func (e *TableReaderExecutor) Next() (Row, error) {

// Open implements the Executor Open interface.
func (e *TableReaderExecutor) Open() error {
kvRanges := tableRangesToKVRanges(e.tableID, e.ranges)
var err error
e.result, err = distsql.SelectDAG(e.ctx.GetClient(), goctx.Background(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars()), e.priority)
var builder requestBuilder
kvReq, err := builder.SetTableRanges(e.tableID, e.ranges).
SetDAGRequest(e.dagPB).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetPriority(e.priority).
SetFromSessionVars(e.ctx.GetSessionVars()).
Build()
if err != nil {
return errors.Trace(err)
}
e.result, err = distsql.SelectDAG(goctx.Background(), e.ctx.GetClient(), kvReq)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -146,9 +156,18 @@ func (e *TableReaderExecutor) Open() error {
// doRequestForHandles constructs kv ranges by handles. It is used by index look up executor.
func (e *TableReaderExecutor) doRequestForHandles(handles []int64, goCtx goctx.Context) error {
sort.Sort(int64Slice(handles))
kvRanges := tableHandlesToKVRanges(e.tableID, handles)
var err error
e.result, err = distsql.SelectDAG(e.ctx.GetClient(), goCtx, e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars()), e.priority)
var builder requestBuilder
kvReq, err := builder.SetTableHandles(e.tableID, handles).
SetDAGRequest(e.dagPB).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetPriority(e.priority).
SetFromSessionVars(e.ctx.GetSessionVars()).
Build()
if err != nil {
return errors.Trace(err)
}
e.result, err = distsql.SelectDAG(goCtx, e.ctx.GetClient(), kvReq)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -251,11 +270,18 @@ func (e *IndexReaderExecutor) Open() error {
for i, v := range e.index.Columns {
fieldTypes[i] = &(e.table.Cols()[v.Offset].FieldType)
}
kvRanges, err := indexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, e.tableID, e.index.ID, e.ranges, fieldTypes)
var builder requestBuilder
kvReq, err := builder.SetIndexRanges(e.ctx.GetSessionVars().StmtCtx, e.tableID, e.index.ID, e.ranges, fieldTypes).
SetDAGRequest(e.dagPB).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetPriority(e.priority).
SetFromSessionVars(e.ctx.GetSessionVars()).
Build()
if err != nil {
return errors.Trace(err)
}
e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars()), e.priority)
e.result, err = distsql.SelectDAG(e.ctx.GoCtx(), e.ctx.GetClient(), kvReq)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -265,11 +291,18 @@ func (e *IndexReaderExecutor) Open() error {

// doRequestForDatums constructs kv ranges by datums. It is used by index look up executor.
func (e *IndexReaderExecutor) doRequestForDatums(values [][]types.Datum, goCtx goctx.Context) error {
kvRanges, err := indexValuesToKVRanges(e.tableID, e.index.ID, values)
var builder requestBuilder
kvReq, err := builder.SetIndexValues(e.tableID, e.index.ID, values).
SetDAGRequest(e.dagPB).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetPriority(e.priority).
SetFromSessionVars(e.ctx.GetSessionVars()).
Build()
if err != nil {
return errors.Trace(err)
}
e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars()), e.priority)
e.result, err = distsql.SelectDAG(e.ctx.GoCtx(), e.ctx.GetClient(), kvReq)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -311,8 +344,18 @@ type indexWorker struct {

// startIndexWorker launch a background goroutine to fetch handles, send the results to workCh.
func (e *IndexLookUpExecutor) startIndexWorker(kvRanges []kv.KeyRange, workCh chan<- *lookupTableTask, finished <-chan struct{}) error {
result, err := distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges,
e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars()), e.priority)
var builder requestBuilder
kvReq, err := builder.SetKeyRanges(kvRanges).
SetDAGRequest(e.dagPB).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetPriority(e.priority).
SetFromSessionVars(e.ctx.GetSessionVars()).
Build()
if err != nil {
return errors.Trace(err)
}
result, err := distsql.SelectDAG(e.ctx.GoCtx(), e.ctx.GetClient(), kvReq)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -581,3 +624,75 @@ func (e *IndexLookUpExecutor) Next() (Row, error) {
e.resultCurr = nil
}
}

type requestBuilder struct {
kv.Request
err error
}

func (builder *requestBuilder) Build() (*kv.Request, error) {
return &builder.Request, errors.Trace(builder.err)
}

func (builder *requestBuilder) SetTableRanges(tid int64, tableRanges []types.IntColumnRange) *requestBuilder {
builder.Request.KeyRanges = tableRangesToKVRanges(tid, tableRanges)
return builder
}

func (builder *requestBuilder) SetIndexRanges(sc *variable.StatementContext, tid, idxID int64, ranges []*types.IndexRange, fieldTypes []*types.FieldType) *requestBuilder {
if builder.err != nil {
return builder
}
builder.Request.KeyRanges, builder.err = indexRangesToKVRanges(sc, tid, idxID, ranges, fieldTypes)
return builder
}

func (builder *requestBuilder) SetTableHandles(tid int64, handles []int64) *requestBuilder {
builder.Request.KeyRanges = tableHandlesToKVRanges(tid, handles)
return builder
}

func (builder *requestBuilder) SetIndexValues(tid, idxID int64, values [][]types.Datum) *requestBuilder {
if builder.err != nil {
return builder
}
builder.Request.KeyRanges, builder.err = indexValuesToKVRanges(tid, idxID, values)
return builder
}

func (builder *requestBuilder) SetDAGRequest(dag *tipb.DAGRequest) *requestBuilder {
if builder.err != nil {
return builder
}

builder.Request.Tp = kv.ReqTypeDAG
builder.Request.StartTs = dag.StartTs
builder.Request.Data, builder.err = dag.Marshal()
return builder
}

func (builder *requestBuilder) SetKeyRanges(keyRanges []kv.KeyRange) *requestBuilder {
builder.Request.KeyRanges = keyRanges
return builder
}

func (builder *requestBuilder) SetDesc(desc bool) *requestBuilder {
builder.Request.Desc = desc
return builder
}

func (builder *requestBuilder) SetKeepOrder(order bool) *requestBuilder {
builder.Request.KeepOrder = order
return builder
}

func (builder *requestBuilder) SetFromSessionVars(sv *variable.SessionVars) *requestBuilder {
builder.Request.Concurrency = sv.DistSQLScanConcurrency
builder.Request.IsolationLevel = getIsolationLevel(sv)
return builder
}

func (builder *requestBuilder) SetPriority(priority int) *requestBuilder {
builder.Request.Priority = priority
return builder
}
7 changes: 3 additions & 4 deletions server/region_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,8 +536,7 @@ func (t *regionHandlerTool) getMvccByRecordID(tableID, recordID int64) (*kvrpcpb
}

tikvReq := &tikvrpc.Request{
Type: tikvrpc.CmdMvccGetByKey,
Priority: kvrpcpb.CommandPri_Normal,
Type: tikvrpc.CmdMvccGetByKey,
MvccGetByKey: &kvrpcpb.MvccGetByKeyRequest{
Key: encodeKey,
},
Expand All @@ -560,12 +559,12 @@ func (t *regionHandlerTool) getMvccByStartTs(startTS uint64, startKey, endKey []
}

tikvReq := &tikvrpc.Request{
Type: tikvrpc.CmdMvccGetByStartTs,
Priority: kvrpcpb.CommandPri_Low,
Type: tikvrpc.CmdMvccGetByStartTs,
MvccGetByStartTs: &kvrpcpb.MvccGetByStartTsRequest{
StartTs: startTS,
},
}
tikvReq.Context.Priority = kvrpcpb.CommandPri_Low
kvResp, err := t.store.SendReq(t.bo, tikvReq, curRegion.Region, time.Hour)
log.Info(startTS, string(startKey), curRegion.Region, string(curRegion.StartKey), string(curRegion.EndKey), kvResp)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,15 +321,15 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
}

req := &tikvrpc.Request{
Type: tikvrpc.CmdPrewrite,
Priority: c.priority,
Type: tikvrpc.CmdPrewrite,
Prewrite: &pb.PrewriteRequest{
Mutations: mutations,
PrimaryLock: c.primary(),
StartVersion: c.startTS,
LockTtl: c.lockTTL,
},
}
req.Context.Priority = c.priority
for {
resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
if err != nil {
Expand Down Expand Up @@ -408,14 +408,14 @@ func kvPriorityToCommandPri(pri int) pb.CommandPri {

func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) error {
req := &tikvrpc.Request{
Type: tikvrpc.CmdCommit,
Priority: c.priority,
Type: tikvrpc.CmdCommit,
Commit: &pb.CommitRequest{
StartVersion: c.startTS,
Keys: batch.keys,
CommitVersion: c.commitTS,
},
}
req.Context.Priority = c.priority

// If we fail to receive response for the request that commits primary key, it will be undetermined whether this
// transaction has been successfully committed.
Expand Down
5 changes: 1 addition & 4 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,7 @@ type slowClient struct {

func (c *slowClient) SendReq(ctx goctx.Context, addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
for id, delay := range c.regionDelays {
reqCtx, err := req.GetContext()
if err != nil {
return nil, err
}
reqCtx := &req.Context
if reqCtx.GetRegionId() == id {
time.Sleep(delay)
}
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,14 +463,14 @@ func (it *copIterator) handleTask(bo *Backoffer, task *copTask) []copResponse {
}

req := &tikvrpc.Request{
Type: tikvrpc.CmdCop,
Priority: kvPriorityToCommandPri(it.req.Priority),
Type: tikvrpc.CmdCop,
Cop: &coprocessor.Request{
Tp: it.req.Tp,
Data: it.req.Data,
Ranges: task.ranges.toPBRanges(),
},
}
req.Context.Priority = kvPriorityToCommandPri(it.req.Priority)
resp, err := sender.SendReq(bo, req, task.region, readTimeoutMedium)
if err != nil {
return []copResponse{{err: errors.Trace(err)}}
Expand Down
5 changes: 1 addition & 4 deletions store/tikv/mock-tikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,10 +462,7 @@ func (c *RPCClient) SendReq(ctx goctx.Context, addr string, req *tikvrpc.Request
if err != nil {
return nil, err
}
reqCtx, err := req.GetContext()
if err != nil {
return nil, err
}
reqCtx := &req.Context
resp := &tikvrpc.Response{}
resp.Type = req.Type
switch req.Type {
Expand Down
Loading

0 comments on commit 5200745

Please sign in to comment.