From 5200745d6a37a5155c8a4922b20db742ea28bc14 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 26 Sep 2017 23:33:05 -0500 Subject: [PATCH] distsql,store/tikv: SelectDAG function parameter refactor (#4645) * distsql,store/tikv: SelectDAG function parameter refactor 1. move some distsql.SelectDAG parameter to kv.Request struct 2. modify tikv.RPCContext struct, remove kvrpcpb.Context in it 3. let tikvrpc.Request struct share Context with its subcommand request --- ddl/ddl_db_test.go | 1 - distsql/distsql.go | 23 +----- executor/new_distsql.go | 139 +++++++++++++++++++++++++++++++--- server/region_handler.go | 7 +- store/tikv/2pc.go | 8 +- store/tikv/2pc_test.go | 5 +- store/tikv/coprocessor.go | 4 +- store/tikv/mock-tikv/rpc.go | 5 +- store/tikv/region_cache.go | 19 ++--- store/tikv/region_request.go | 24 +++--- store/tikv/scan.go | 4 +- store/tikv/snapshot.go | 8 +- store/tikv/split_region.go | 4 +- store/tikv/tikvrpc/tikvrpc.go | 59 ++------------- 14 files changed, 179 insertions(+), 131 deletions(-) diff --git a/ddl/ddl_db_test.go b/ddl/ddl_db_test.go index f47ca6ab9f3ed..f0294375220c0 100644 --- a/ddl/ddl_db_test.go +++ b/ddl/ddl_db_test.go @@ -1477,7 +1477,6 @@ func (s *testDBSuite) TestGeneratedColumnDDL(c *C) { } func (s *testDBSuite) TestComment(c *C) { - defer testleak.AfterTest(c)() s.tk = testkit.NewTestKit(c, s.store) s.tk.MustExec("use " + s.schemaName) s.tk.MustExec("drop table if exists ct, ct1") diff --git a/distsql/distsql.go b/distsql/distsql.go index c5719227f43f4..e852a65e3410b 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -244,10 +244,8 @@ func Select(client kv.Client, ctx goctx.Context, req *tipb.SelectRequest, keyRan } // SelectDAG sends a DAG request, returns SelectResult. -// concurrency: The max concurrency for underlying coprocessor request. -// keepOrder: If the result should returned in key order. For example if we need keep data in order by -// scan index, we should set keepOrder to true. -func SelectDAG(client kv.Client, ctx goctx.Context, dag *tipb.DAGRequest, keyRanges []kv.KeyRange, concurrency int, keepOrder bool, desc bool, isolationLevel kv.IsoLevel, priority int) (SelectResult, error) { +// In kvReq, KeyRanges is required, Concurrency/KeepOrder/Desc/IsolationLevel/Priority are optional. +func SelectDAG(ctx goctx.Context, client kv.Client, kvReq *kv.Request) (SelectResult, error) { var err error defer func() { // Add metrics. @@ -258,21 +256,6 @@ func SelectDAG(client kv.Client, ctx goctx.Context, dag *tipb.DAGRequest, keyRan } }() - kvReq := &kv.Request{ - Tp: kv.ReqTypeDAG, - StartTs: dag.StartTs, - Concurrency: concurrency, - KeepOrder: keepOrder, - KeyRanges: keyRanges, - Desc: desc, - IsolationLevel: isolationLevel, - Priority: priority, - } - kvReq.Data, err = dag.Marshal() - if err != nil { - return nil, errors.Trace(err) - } - resp := client.Send(ctx, kvReq) if resp == nil { return nil, errors.New("client returns nil response") @@ -280,7 +263,7 @@ func SelectDAG(client kv.Client, ctx goctx.Context, dag *tipb.DAGRequest, keyRan result := &selectResult{ label: "dag", resp: resp, - results: make(chan resultWithErr, concurrency), + results: make(chan resultWithErr, kvReq.Concurrency), closed: make(chan struct{}), } return result, nil diff --git a/executor/new_distsql.go b/executor/new_distsql.go index 275899cc91ab9..e5b1f24b5c346 100644 --- a/executor/new_distsql.go +++ b/executor/new_distsql.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/model" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/types" @@ -133,9 +134,18 @@ func (e *TableReaderExecutor) Next() (Row, error) { // Open implements the Executor Open interface. func (e *TableReaderExecutor) Open() error { - kvRanges := tableRangesToKVRanges(e.tableID, e.ranges) - var err error - e.result, err = distsql.SelectDAG(e.ctx.GetClient(), goctx.Background(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars()), e.priority) + var builder requestBuilder + kvReq, err := builder.SetTableRanges(e.tableID, e.ranges). + SetDAGRequest(e.dagPB). + SetDesc(e.desc). + SetKeepOrder(e.keepOrder). + SetPriority(e.priority). + SetFromSessionVars(e.ctx.GetSessionVars()). + Build() + if err != nil { + return errors.Trace(err) + } + e.result, err = distsql.SelectDAG(goctx.Background(), e.ctx.GetClient(), kvReq) if err != nil { return errors.Trace(err) } @@ -146,9 +156,18 @@ func (e *TableReaderExecutor) Open() error { // doRequestForHandles constructs kv ranges by handles. It is used by index look up executor. func (e *TableReaderExecutor) doRequestForHandles(handles []int64, goCtx goctx.Context) error { sort.Sort(int64Slice(handles)) - kvRanges := tableHandlesToKVRanges(e.tableID, handles) - var err error - e.result, err = distsql.SelectDAG(e.ctx.GetClient(), goCtx, e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars()), e.priority) + var builder requestBuilder + kvReq, err := builder.SetTableHandles(e.tableID, handles). + SetDAGRequest(e.dagPB). + SetDesc(e.desc). + SetKeepOrder(e.keepOrder). + SetPriority(e.priority). + SetFromSessionVars(e.ctx.GetSessionVars()). + Build() + if err != nil { + return errors.Trace(err) + } + e.result, err = distsql.SelectDAG(goCtx, e.ctx.GetClient(), kvReq) if err != nil { return errors.Trace(err) } @@ -251,11 +270,18 @@ func (e *IndexReaderExecutor) Open() error { for i, v := range e.index.Columns { fieldTypes[i] = &(e.table.Cols()[v.Offset].FieldType) } - kvRanges, err := indexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, e.tableID, e.index.ID, e.ranges, fieldTypes) + var builder requestBuilder + kvReq, err := builder.SetIndexRanges(e.ctx.GetSessionVars().StmtCtx, e.tableID, e.index.ID, e.ranges, fieldTypes). + SetDAGRequest(e.dagPB). + SetDesc(e.desc). + SetKeepOrder(e.keepOrder). + SetPriority(e.priority). + SetFromSessionVars(e.ctx.GetSessionVars()). + Build() if err != nil { return errors.Trace(err) } - e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars()), e.priority) + e.result, err = distsql.SelectDAG(e.ctx.GoCtx(), e.ctx.GetClient(), kvReq) if err != nil { return errors.Trace(err) } @@ -265,11 +291,18 @@ func (e *IndexReaderExecutor) Open() error { // doRequestForDatums constructs kv ranges by datums. It is used by index look up executor. func (e *IndexReaderExecutor) doRequestForDatums(values [][]types.Datum, goCtx goctx.Context) error { - kvRanges, err := indexValuesToKVRanges(e.tableID, e.index.ID, values) + var builder requestBuilder + kvReq, err := builder.SetIndexValues(e.tableID, e.index.ID, values). + SetDAGRequest(e.dagPB). + SetDesc(e.desc). + SetKeepOrder(e.keepOrder). + SetPriority(e.priority). + SetFromSessionVars(e.ctx.GetSessionVars()). + Build() if err != nil { return errors.Trace(err) } - e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars()), e.priority) + e.result, err = distsql.SelectDAG(e.ctx.GoCtx(), e.ctx.GetClient(), kvReq) if err != nil { return errors.Trace(err) } @@ -311,8 +344,18 @@ type indexWorker struct { // startIndexWorker launch a background goroutine to fetch handles, send the results to workCh. func (e *IndexLookUpExecutor) startIndexWorker(kvRanges []kv.KeyRange, workCh chan<- *lookupTableTask, finished <-chan struct{}) error { - result, err := distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, - e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars()), e.priority) + var builder requestBuilder + kvReq, err := builder.SetKeyRanges(kvRanges). + SetDAGRequest(e.dagPB). + SetDesc(e.desc). + SetKeepOrder(e.keepOrder). + SetPriority(e.priority). + SetFromSessionVars(e.ctx.GetSessionVars()). + Build() + if err != nil { + return errors.Trace(err) + } + result, err := distsql.SelectDAG(e.ctx.GoCtx(), e.ctx.GetClient(), kvReq) if err != nil { return errors.Trace(err) } @@ -581,3 +624,75 @@ func (e *IndexLookUpExecutor) Next() (Row, error) { e.resultCurr = nil } } + +type requestBuilder struct { + kv.Request + err error +} + +func (builder *requestBuilder) Build() (*kv.Request, error) { + return &builder.Request, errors.Trace(builder.err) +} + +func (builder *requestBuilder) SetTableRanges(tid int64, tableRanges []types.IntColumnRange) *requestBuilder { + builder.Request.KeyRanges = tableRangesToKVRanges(tid, tableRanges) + return builder +} + +func (builder *requestBuilder) SetIndexRanges(sc *variable.StatementContext, tid, idxID int64, ranges []*types.IndexRange, fieldTypes []*types.FieldType) *requestBuilder { + if builder.err != nil { + return builder + } + builder.Request.KeyRanges, builder.err = indexRangesToKVRanges(sc, tid, idxID, ranges, fieldTypes) + return builder +} + +func (builder *requestBuilder) SetTableHandles(tid int64, handles []int64) *requestBuilder { + builder.Request.KeyRanges = tableHandlesToKVRanges(tid, handles) + return builder +} + +func (builder *requestBuilder) SetIndexValues(tid, idxID int64, values [][]types.Datum) *requestBuilder { + if builder.err != nil { + return builder + } + builder.Request.KeyRanges, builder.err = indexValuesToKVRanges(tid, idxID, values) + return builder +} + +func (builder *requestBuilder) SetDAGRequest(dag *tipb.DAGRequest) *requestBuilder { + if builder.err != nil { + return builder + } + + builder.Request.Tp = kv.ReqTypeDAG + builder.Request.StartTs = dag.StartTs + builder.Request.Data, builder.err = dag.Marshal() + return builder +} + +func (builder *requestBuilder) SetKeyRanges(keyRanges []kv.KeyRange) *requestBuilder { + builder.Request.KeyRanges = keyRanges + return builder +} + +func (builder *requestBuilder) SetDesc(desc bool) *requestBuilder { + builder.Request.Desc = desc + return builder +} + +func (builder *requestBuilder) SetKeepOrder(order bool) *requestBuilder { + builder.Request.KeepOrder = order + return builder +} + +func (builder *requestBuilder) SetFromSessionVars(sv *variable.SessionVars) *requestBuilder { + builder.Request.Concurrency = sv.DistSQLScanConcurrency + builder.Request.IsolationLevel = getIsolationLevel(sv) + return builder +} + +func (builder *requestBuilder) SetPriority(priority int) *requestBuilder { + builder.Request.Priority = priority + return builder +} diff --git a/server/region_handler.go b/server/region_handler.go index da1237fd4e28a..4c56d84a91156 100644 --- a/server/region_handler.go +++ b/server/region_handler.go @@ -536,8 +536,7 @@ func (t *regionHandlerTool) getMvccByRecordID(tableID, recordID int64) (*kvrpcpb } tikvReq := &tikvrpc.Request{ - Type: tikvrpc.CmdMvccGetByKey, - Priority: kvrpcpb.CommandPri_Normal, + Type: tikvrpc.CmdMvccGetByKey, MvccGetByKey: &kvrpcpb.MvccGetByKeyRequest{ Key: encodeKey, }, @@ -560,12 +559,12 @@ func (t *regionHandlerTool) getMvccByStartTs(startTS uint64, startKey, endKey [] } tikvReq := &tikvrpc.Request{ - Type: tikvrpc.CmdMvccGetByStartTs, - Priority: kvrpcpb.CommandPri_Low, + Type: tikvrpc.CmdMvccGetByStartTs, MvccGetByStartTs: &kvrpcpb.MvccGetByStartTsRequest{ StartTs: startTS, }, } + tikvReq.Context.Priority = kvrpcpb.CommandPri_Low kvResp, err := t.store.SendReq(t.bo, tikvReq, curRegion.Region, time.Hour) log.Info(startTS, string(startKey), curRegion.Region, string(curRegion.StartKey), string(curRegion.EndKey), kvResp) if err != nil { diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 6193804e7e621..5b9e7170390e6 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -321,8 +321,7 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) } req := &tikvrpc.Request{ - Type: tikvrpc.CmdPrewrite, - Priority: c.priority, + Type: tikvrpc.CmdPrewrite, Prewrite: &pb.PrewriteRequest{ Mutations: mutations, PrimaryLock: c.primary(), @@ -330,6 +329,7 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) LockTtl: c.lockTTL, }, } + req.Context.Priority = c.priority for { resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort) if err != nil { @@ -408,14 +408,14 @@ func kvPriorityToCommandPri(pri int) pb.CommandPri { func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) error { req := &tikvrpc.Request{ - Type: tikvrpc.CmdCommit, - Priority: c.priority, + Type: tikvrpc.CmdCommit, Commit: &pb.CommitRequest{ StartVersion: c.startTS, Keys: batch.keys, CommitVersion: c.commitTS, }, } + req.Context.Priority = c.priority // If we fail to receive response for the request that commits primary key, it will be undetermined whether this // transaction has been successfully committed. diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index d5a5727dd121f..c6d276c21195f 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -279,10 +279,7 @@ type slowClient struct { func (c *slowClient) SendReq(ctx goctx.Context, addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) { for id, delay := range c.regionDelays { - reqCtx, err := req.GetContext() - if err != nil { - return nil, err - } + reqCtx := &req.Context if reqCtx.GetRegionId() == id { time.Sleep(delay) } diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 7dec220d32740..fcbf473375bba 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -463,14 +463,14 @@ func (it *copIterator) handleTask(bo *Backoffer, task *copTask) []copResponse { } req := &tikvrpc.Request{ - Type: tikvrpc.CmdCop, - Priority: kvPriorityToCommandPri(it.req.Priority), + Type: tikvrpc.CmdCop, Cop: &coprocessor.Request{ Tp: it.req.Tp, Data: it.req.Data, Ranges: task.ranges.toPBRanges(), }, } + req.Context.Priority = kvPriorityToCommandPri(it.req.Priority) resp, err := sender.SendReq(bo, req, task.region, readTimeoutMedium) if err != nil { return []copResponse{{err: errors.Trace(err)}} diff --git a/store/tikv/mock-tikv/rpc.go b/store/tikv/mock-tikv/rpc.go index d43cb7514a0e8..993f3ba4147e9 100644 --- a/store/tikv/mock-tikv/rpc.go +++ b/store/tikv/mock-tikv/rpc.go @@ -462,10 +462,7 @@ func (c *RPCClient) SendReq(ctx goctx.Context, addr string, req *tikvrpc.Request if err != nil { return nil, err } - reqCtx, err := req.GetContext() - if err != nil { - return nil, err - } + reqCtx := &req.Context resp := &tikvrpc.Response{} resp.Type = req.Type switch req.Type { diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 0b019213341d1..6706c542b10f5 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -67,14 +67,15 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { // RPCContext contains data that is needed to send RPC to a region. type RPCContext struct { Region RegionVerID - KVCtx *kvrpcpb.Context + Meta *metapb.Region + Peer *metapb.Peer Addr string } // GetStoreID returns StoreID. func (c *RPCContext) GetStoreID() uint64 { - if c.KVCtx != nil && c.KVCtx.Peer != nil { - return c.KVCtx.Peer.StoreId + if c.Peer != nil { + return c.Peer.StoreId } return 0 } @@ -108,9 +109,8 @@ func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext, if region == nil { return nil, nil } - kvCtx := region.GetContext() - addr, err := c.GetStoreAddr(bo, kvCtx.GetPeer().GetStoreId()) + addr, err := c.GetStoreAddr(bo, region.peer.GetStoreId()) if err != nil { return nil, errors.Trace(err) } @@ -121,7 +121,8 @@ func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext, } return &RPCContext{ Region: id, - KVCtx: kvCtx, + Meta: region.meta, + Peer: region.peer, Addr: addr, }, nil } @@ -432,13 +433,13 @@ func (c *RegionCache) OnRequestFail(ctx *RPCContext, err error) { c.mu.Lock() if cachedregion, ok := c.mu.regions[regionID]; ok { region := cachedregion.region - if !region.OnRequestFail(ctx.KVCtx.GetPeer().GetStoreId()) { + if !region.OnRequestFail(ctx.Peer.GetStoreId()) { c.dropRegionFromCache(regionID) } } c.mu.Unlock() // Store's meta may be out of date. - storeID := ctx.KVCtx.GetPeer().GetStoreId() + storeID := ctx.Peer.GetStoreId() c.storeMu.Lock() delete(c.storeMu.stores, storeID) c.storeMu.Unlock() @@ -471,7 +472,7 @@ func (c *RegionCache) OnRegionStale(ctx *RPCContext, newRegions []*metapb.Region meta: meta, peer: meta.Peers[0], } - region.SwitchPeer(ctx.KVCtx.GetPeer().GetStoreId()) + region.SwitchPeer(ctx.Peer.GetStoreId()) c.insertRegionToCache(region) } return nil diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index e1d18a91a402d..96ce3d95e5885 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -76,7 +76,7 @@ func (s *RegionRequestSender) SendReq(bo *Backoffer, req *tikvrpc.Request, regio } s.storeAddr = ctx.Addr - ctx.KVCtx.IsolationLevel = s.isolationLevel + req.Context.IsolationLevel = s.isolationLevel resp, retry, err := s.sendReqToRegion(bo, ctx, req, timeout) if err != nil { return nil, errors.Trace(err) @@ -103,7 +103,7 @@ func (s *RegionRequestSender) SendReq(bo *Backoffer, req *tikvrpc.Request, regio } func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, ctx *RPCContext, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, retry bool, err error) { - if e := tikvrpc.SetContext(req, ctx.KVCtx); e != nil { + if e := tikvrpc.SetContext(req, ctx.Meta, ctx.Peer); e != nil { return nil, false, errors.Trace(e) } context, cancel := goctx.WithTimeout(bo.ctx, timeout) @@ -141,7 +141,7 @@ func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err err // When a store is not available, the leader of related region should be elected quickly. // TODO: the number of retry time should be limited:since region may be unavailable // when some unrecoverable disaster happened. - err = bo.Backoff(boTiKVRPC, errors.Errorf("send tikv request error: %v, ctx: %s, try next peer later", err, ctx.KVCtx)) + err = bo.Backoff(boTiKVRPC, errors.Errorf("send tikv request error: %v, ctx: %v, try next peer later", err, ctx)) return errors.Trace(err) } @@ -149,10 +149,10 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, regi reportRegionError(regionErr) if notLeader := regionErr.GetNotLeader(); notLeader != nil { // Retry if error is `NotLeader`. - log.Debugf("tikv reports `NotLeader`: %s, ctx: %s, retry later", notLeader, ctx.KVCtx) + log.Debugf("tikv reports `NotLeader`: %s, ctx: %v, retry later", notLeader, ctx) s.regionCache.UpdateLeader(ctx.Region, notLeader.GetLeader().GetStoreId()) if notLeader.GetLeader() == nil { - err = bo.Backoff(boRegionMiss, errors.Errorf("not leader: %v, ctx: %s", notLeader, ctx.KVCtx)) + err = bo.Backoff(boRegionMiss, errors.Errorf("not leader: %v, ctx: %v", notLeader, ctx)) if err != nil { return false, errors.Trace(err) } @@ -162,35 +162,35 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, regi if storeNotMatch := regionErr.GetStoreNotMatch(); storeNotMatch != nil { // store not match - log.Warnf("tikv reports `StoreNotMatch`: %s, ctx: %s, retry later", storeNotMatch, ctx.KVCtx) + log.Warnf("tikv reports `StoreNotMatch`: %s, ctx: %v, retry later", storeNotMatch, ctx) s.regionCache.ClearStoreByID(ctx.GetStoreID()) return true, nil } if staleEpoch := regionErr.GetStaleEpoch(); staleEpoch != nil { - log.Debugf("tikv reports `StaleEpoch`, ctx: %s, retry later", ctx.KVCtx) + log.Debugf("tikv reports `StaleEpoch`, ctx: %v, retry later", ctx) err = s.regionCache.OnRegionStale(ctx, staleEpoch.NewRegions) return false, errors.Trace(err) } if regionErr.GetServerIsBusy() != nil { - log.Warnf("tikv reports `ServerIsBusy`, reason: %s, ctx: %s, retry later", regionErr.GetServerIsBusy().GetReason(), ctx.KVCtx) - err = bo.Backoff(boServerBusy, errors.Errorf("server is busy, ctx: %s", ctx.KVCtx)) + log.Warnf("tikv reports `ServerIsBusy`, reason: %s, ctx: %v, retry later", regionErr.GetServerIsBusy().GetReason(), ctx) + err = bo.Backoff(boServerBusy, errors.Errorf("server is busy, ctx: %v", ctx)) if err != nil { return false, errors.Trace(err) } return true, nil } if regionErr.GetStaleCommand() != nil { - log.Debugf("tikv reports `StaleCommand`, ctx: %s", ctx.KVCtx) + log.Debugf("tikv reports `StaleCommand`, ctx: %v", ctx) return true, nil } if regionErr.GetRaftEntryTooLarge() != nil { - log.Warnf("tikv reports `RaftEntryTooLarge`, ctx: %s", ctx.KVCtx) + log.Warnf("tikv reports `RaftEntryTooLarge`, ctx: %v", ctx) return false, errors.New(regionErr.String()) } // For other errors, we only drop cache here. // Because caller may need to re-split the request. - log.Debugf("tikv reports region error: %s, ctx: %s", regionErr, ctx.KVCtx) + log.Debugf("tikv reports region error: %s, ctx: %v", regionErr, ctx) s.regionCache.DropRegion(ctx.Region) return false, nil } diff --git a/store/tikv/scan.go b/store/tikv/scan.go index aa9e3401b5648..b7c6660394409 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -139,14 +139,14 @@ func (s *Scanner) getData(bo *Backoffer) error { return errors.Trace(err) } req := &tikvrpc.Request{ - Type: tikvrpc.CmdScan, - Priority: s.snapshot.priority, + Type: tikvrpc.CmdScan, Scan: &pb.ScanRequest{ StartKey: []byte(s.nextStartKey), Limit: uint32(s.batchSize), Version: s.startTS(), }, } + req.Context.Priority = s.snapshot.priority resp, err := sender.SendReq(bo, req, loc.Region, readTimeoutMedium) if err != nil { return errors.Trace(err) diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index b1fb8da178fff..b8abf3cb5ba0d 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -133,13 +133,13 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll pending := batch.keys for { req := &tikvrpc.Request{ - Type: tikvrpc.CmdBatchGet, - Priority: s.priority, + Type: tikvrpc.CmdBatchGet, BatchGet: &pb.BatchGetRequest{ Keys: pending, Version: s.version.Ver, }, } + req.Context.Priority = s.priority resp, err := sender.SendReq(bo, req, batch.region, readTimeoutMedium) if err != nil { return errors.Trace(err) @@ -211,13 +211,13 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { sender := NewRegionRequestSender(s.store.regionCache, s.store.client, pbIsolationLevel(s.isolationLevel)) req := &tikvrpc.Request{ - Type: tikvrpc.CmdGet, - Priority: s.priority, + Type: tikvrpc.CmdGet, Get: &pb.GetRequest{ Key: k, Version: s.version.Ver, }, } + req.Context.Priority = s.priority for { loc, err := s.store.regionCache.LocateKey(bo, k) if err != nil { diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index f431a450fc070..407daf3447a1d 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -31,12 +31,12 @@ func (s *tikvStore) SplitRegion(splitKey kv.Key) error { bo := NewBackoffer(splitRegionBackoff, goctx.Background()) sender := NewRegionRequestSender(s.regionCache, s.client, kvrpcpb.IsolationLevel_SI) req := &tikvrpc.Request{ - Type: tikvrpc.CmdSplitRegion, - Priority: kvrpcpb.CommandPri_Normal, + Type: tikvrpc.CmdSplitRegion, SplitRegion: &kvrpcpb.SplitRegionRequest{ SplitKey: splitKey, }, } + req.Context.Priority = kvrpcpb.CommandPri_Normal for { loc, err := s.regionCache.LocateKey(bo, splitKey) if err != nil { diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index 0e40d5bd53d81..bf2676bdd2e84 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" ) // CmdType represents the concrete request type in Request or response type in Response. @@ -52,8 +53,8 @@ const ( // Request wraps all kv/coprocessor requests. type Request struct { + kvrpcpb.Context Type CmdType - Priority kvrpcpb.CommandPri Get *kvrpcpb.GetRequest Scan *kvrpcpb.ScanRequest Prewrite *kvrpcpb.PrewriteRequest @@ -75,54 +76,6 @@ type Request struct { SplitRegion *kvrpcpb.SplitRegionRequest } -// GetContext returns the rpc context for the underlying concrete request. -func (req *Request) GetContext() (*kvrpcpb.Context, error) { - var c *kvrpcpb.Context - switch req.Type { - case CmdGet: - c = req.Get.GetContext() - case CmdScan: - c = req.Scan.GetContext() - case CmdPrewrite: - c = req.Prewrite.GetContext() - case CmdCommit: - c = req.Commit.GetContext() - case CmdCleanup: - c = req.Cleanup.GetContext() - case CmdBatchGet: - c = req.BatchGet.GetContext() - case CmdBatchRollback: - c = req.BatchRollback.GetContext() - case CmdScanLock: - c = req.ScanLock.GetContext() - case CmdResolveLock: - c = req.ResolveLock.GetContext() - case CmdGC: - c = req.GC.GetContext() - case CmdDeleteRange: - c = req.DeleteRange.GetContext() - case CmdRawGet: - c = req.RawGet.GetContext() - case CmdRawPut: - c = req.RawPut.GetContext() - case CmdRawDelete: - c = req.RawDelete.GetContext() - case CmdRawScan: - c = req.RawScan.GetContext() - case CmdCop: - c = req.Cop.GetContext() - case CmdMvccGetByKey: - c = req.MvccGetByKey.GetContext() - case CmdMvccGetByStartTs: - c = req.MvccGetByStartTs.GetContext() - case CmdSplitRegion: - c = req.SplitRegion.GetContext() - default: - return nil, fmt.Errorf("invalid request type %v", req.Type) - } - return c, nil -} - // Response wraps all kv/coprocessor responses. type Response struct { Type CmdType @@ -148,8 +101,12 @@ type Response struct { } // SetContext set the Context field for the given req to the specified ctx. -func SetContext(req *Request, ctx *kvrpcpb.Context) error { - ctx.Priority = req.Priority +func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { + ctx := &req.Context + ctx.RegionId = region.Id + ctx.RegionEpoch = region.RegionEpoch + ctx.Peer = peer + switch req.Type { case CmdGet: req.Get.Context = ctx