Skip to content

Commit

Permalink
store/tikv: fix rpc runtime stats collect (pingcap#20014)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Sep 16, 2020
1 parent d8e4321 commit f0db8c6
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 11 deletions.
14 changes: 14 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6323,6 +6323,20 @@ func (s *testSuite) TestIssue19372(c *C) {
tk.MustQuery("select (select t2.c_str from t2 where t2.c_str <= t1.c_str and t2.c_int in (1, 2) order by t2.c_str limit 1) x from t1 order by c_int;").Check(testkit.Rows("a", "a", "a"))
}

func (s *testSerialSuite1) TestCollectCopRuntimeStats(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (a int, b int)")
tk.MustExec("set tidb_enable_collect_execution_info=1;")
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/tikvStoreRespResult", `return(true)`), IsNil)
rows := tk.MustQuery("explain analyze select * from t1").Rows()
c.Assert(len(rows), Equals, 2)
explain := fmt.Sprintf("%v", rows[0])
c.Assert(explain, Matches, ".*rpc_num: 2, .*regionMiss:.*")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/tikvStoreRespResult"), IsNil)
}

func (s *testSuite) TestCollectDMLRuntimeStats(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
5 changes: 4 additions & 1 deletion store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,9 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
})
req.StoreTp = task.storeType
startTime := time.Now()
worker.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats)
if worker.Stats == nil {
worker.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats)
}
resp, rpcCtx, storeAddr, err := worker.SendReqCtx(bo, req, task.region, ReadTimeoutMedium, task.storeType, task.storeAddr)
if err != nil {
if task.storeType == kv.TiDB {
Expand Down Expand Up @@ -1031,6 +1033,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon
resp.detail = new(CopRuntimeStats)
}
resp.detail.Stats = worker.Stats
worker.Stats = nil
resp.detail.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond
resp.detail.BackoffSleep = make(map[string]time.Duration, len(bo.backoffTimes))
resp.detail.BackoffTimes = make(map[string]int, len(bo.backoffTimes))
Expand Down
27 changes: 17 additions & 10 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -135,12 +136,11 @@ func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *Backoffer, ctxs []co
if rawHook := ctx.Value(RPCCancellerCtxKey{}); rawHook != nil {
ctx, cancel = rawHook.(*RPCCanceller).WithCancel(ctx)
}
start := time.Now()
resp, err = ss.client.SendRequest(ctx, rpcCtx.Addr, req, timout)
if ss.Stats != nil {
defer func(start time.Time) {
recordRegionRequestRuntimeStats(ss.Stats, req.Type, time.Since(start))
}(time.Now())
recordRegionRequestRuntimeStats(ss.Stats, req.Type, time.Since(start))
}
resp, err = ss.client.SendRequest(ctx, rpcCtx.Addr, req, timout)
if err != nil {
cancel()
ss.rpcError = err
Expand Down Expand Up @@ -398,19 +398,26 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, rpcCtx *RPCContext,
defer s.releaseStoreToken(rpcCtx.Store)
}

if s.Stats != nil {
defer func(start time.Time) {
recordRegionRequestRuntimeStats(s.Stats, req.Type, time.Since(start))
}(time.Now())
}

ctx := bo.ctx
if rawHook := ctx.Value(RPCCancellerCtxKey{}); rawHook != nil {
var cancel context.CancelFunc
ctx, cancel = rawHook.(*RPCCanceller).WithCancel(ctx)
defer cancel()
}
start := time.Now()
resp, err = s.client.SendRequest(ctx, rpcCtx.Addr, req, timeout)
if s.Stats != nil {
recordRegionRequestRuntimeStats(s.Stats, req.Type, time.Since(start))
failpoint.Inject("tikvStoreRespResult", func(val failpoint.Value) {
if val.(bool) {
if req.Type == tikvrpc.CmdCop && bo.totalSleep == 0 {
failpoint.Return(&tikvrpc.Response{
Resp: &coprocessor.Response{RegionError: &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}},
}, false, nil)
}
}
})
}
if err != nil {
// Because in rpc logic, context.Cancel() will be transferred to rpcContext.Cancel error. For rpcContext cancel,
// we need to retry the request. But for context cancel active, for example, limitExec gets the required rows,
Expand Down

0 comments on commit f0db8c6

Please sign in to comment.