Skip to content

Commit

Permalink
unistore: add failpoint to test copr cache in unit test (pingcap#20049)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzmhhh123 authored Oct 16, 2020
1 parent 743a910 commit 5db77b3
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 3 deletions.
64 changes: 64 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ var _ = Suite(&partitionTableSuite{&baseTestSuite{}})
var _ = SerialSuites(&tiflashTestSuite{})
var _ = SerialSuites(&globalIndexSuite{&baseTestSuite{}})
var _ = SerialSuites(&testSerialSuite{&baseTestSuite{}})
var _ = SerialSuites(&testCoprCache{})

type testSuite struct{ *baseTestSuite }
type testSuiteP1 struct{ *baseTestSuite }
Expand All @@ -145,6 +146,11 @@ type testSlowQuery struct{ *baseTestSuite }
type partitionTableSuite struct{ *baseTestSuite }
type globalIndexSuite struct{ *baseTestSuite }
type testSerialSuite struct{ *baseTestSuite }
type testCoprCache struct {
store kv.Storage
dom *domain.Domain
cls cluster.Cluster
}

type baseTestSuite struct {
cluster cluster.Cluster
Expand Down Expand Up @@ -6472,6 +6478,64 @@ func (s *testSuite) TestIssue13758(c *C) {
))
}

func (s *testCoprCache) SetUpSuite(c *C) {
originConfig := config.GetGlobalConfig()
config.StoreGlobalConfig(config.NewConfig())
defer config.StoreGlobalConfig(originConfig)
cli := &regionProperityClient{}
hijackClient := func(c tikv.Client) tikv.Client {
cli.Client = c
return cli
}
var err error
s.store, err = mockstore.NewMockStore(
mockstore.WithClusterInspector(func(c cluster.Cluster) {
mockstore.BootstrapWithSingleStore(c)
s.cls = c
}),
mockstore.WithClientHijacker(hijackClient),
)
c.Assert(err, IsNil)
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
}

func (s *testCoprCache) TearDownSuite(c *C) {
s.dom.Close()
s.store.Close()
}

func (s *testCoprCache) TestIntegrationCopCache(c *C) {
originConfig := config.GetGlobalConfig()
config.StoreGlobalConfig(config.NewConfig())
defer config.StoreGlobalConfig(originConfig)

tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int primary key)")
tblInfo, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tid := tblInfo.Meta().ID
tk.MustExec(`insert into t values(1),(2),(3),(4),(5),(6),(7),(8),(9),(10),(11),(12)`)
s.cls.SplitTable(tid, 6)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/cophandler/mockCopCacheInUnistore", `return(123)`), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/cophandler/mockCopCacheInUnistore"), IsNil)
}()

rows := tk.MustQuery("explain analyze select * from t where t.a < 10").Rows()
c.Assert(rows[0][2], Equals, "9")
c.Assert(strings.Contains(rows[0][5].(string), "cop_task: {num: 5"), Equals, true)
c.Assert(strings.Contains(rows[0][5].(string), "copr_cache_hit_ratio: 0.00"), Equals, true)

rows = tk.MustQuery("explain analyze select * from t").Rows()
c.Assert(rows[0][2], Equals, "12")
c.Assert(strings.Contains(rows[0][5].(string), "cop_task: {num: 6"), Equals, true)
c.Assert(strings.Contains(rows[0][5].(string), "copr_cache_hit_ratio: 0.67"), Equals, true)
}

func (s *testSerialSuite) TestCoprocessorOOMAction(c *C) {
// Assert Coprocessor OOMAction
tk := testkit.NewTestKit(c, s.store)
Expand Down
24 changes: 22 additions & 2 deletions store/mockstore/unistore/cophandler/cop_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/ngaut/unistore/lockstore"
"github.com/ngaut/unistore/tikv/dbreader"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -64,9 +65,28 @@ type dagContext struct {
}

// handleCopDAGRequest handles coprocessor DAG request.
func handleCopDAGRequest(dbReader *dbreader.DBReader, lockStore *lockstore.MemStore, req *coprocessor.Request) *coprocessor.Response {
func handleCopDAGRequest(dbReader *dbreader.DBReader, lockStore *lockstore.MemStore, req *coprocessor.Request) (resp *coprocessor.Response) {
startTime := time.Now()
resp := &coprocessor.Response{}
resp = &coprocessor.Response{}
failpoint.Inject("mockCopCacheInUnistore", func(cacheVersion failpoint.Value) {
if req.IsCacheEnabled {
if uint64(cacheVersion.(int)) == req.CacheIfMatchVersion {
failpoint.Return(&coprocessor.Response{IsCacheHit: true, CacheLastVersion: uint64(cacheVersion.(int))})
} else {
defer func() {
resp.CanBeCached = true
resp.CacheLastVersion = uint64(cacheVersion.(int))
if resp.ExecDetails == nil {
resp.ExecDetails = &kvrpcpb.ExecDetails{HandleTime: &kvrpcpb.HandleTime{ProcessMs: 500}}
} else if resp.ExecDetails.HandleTime == nil {
resp.ExecDetails.HandleTime = &kvrpcpb.HandleTime{ProcessMs: 500}
} else {
resp.ExecDetails.HandleTime.ProcessMs = 500
}
}()
}
}
})
dagCtx, dagReq, err := buildDAG(dbReader, lockStore, req)
if err != nil {
resp.OtherError = err.Error()
Expand Down
3 changes: 3 additions & 0 deletions store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,9 @@ func (s *tikvStore) Close() error {
s.txnLatches.Close()
}
s.regionCache.Close()
if s.coprCache != nil {
s.coprCache.cache.Close()
}
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion store/tikv/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tikv
import (
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/tikv/pd/client"
)
Expand All @@ -34,7 +35,7 @@ func NewTestTiKVStore(client Client, pdClient pd.Client, clientHijack func(Clien
// Make sure the uuid is unique.
uid := uuid.New().String()
spkv := NewMockSafePointKV()
tikvStore, err := newTikvStore(uid, pdCli, spkv, client, false, nil)
tikvStore, err := newTikvStore(uid, pdCli, spkv, client, false, &config.GetGlobalConfig().TiKVClient.CoprCache)

if txnLocalLatches > 0 {
tikvStore.EnableTxnLocalLatches(txnLocalLatches)
Expand Down

0 comments on commit 5db77b3

Please sign in to comment.