Skip to content

Commit

Permalink
*: support caching coprocessor responses (pingcap#13824)
Browse files Browse the repository at this point in the history
  • Loading branch information
breezewish authored and sre-bot committed Dec 31, 2019
1 parent 23de656 commit ba2ff85
Show file tree
Hide file tree
Showing 14 changed files with 484 additions and 15 deletions.
22 changes: 22 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,21 @@ type TiKVClient struct {
// If a store has been up to the limit, it will return error for successive request to
// prevent the store occupying too much token in dispatching level.
StoreLimit int64 `toml:"store-limit" json:"store-limit"`

CoprCache CoprocessorCache `toml:"copr-cache" json:"copr-cache"`
}

// CoprocessorCache is the config for coprocessor cache.
type CoprocessorCache struct {
// Whether to enable the copr cache. The copr cache saves the result from TiKV Coprocessor in the memory and
// reuses the result when corresponding data in TiKV is unchanged, on a region basis.
Enabled bool `toml:"enabled" json:"enabled"`
// The capacity in MB of the cache.
CapacityMB float64 `toml:"capacity-mb" json:"capacity-mb"`
// Only cache requests whose result set is small.
AdmissionMaxResultMB float64 `toml:"admission-max-result-mb" json:"admission-max-result-mb"`
// Only cache requests takes notable time to process.
AdmissionMinProcessMs uint64 `toml:"admission-min-process-ms" json:"admission-min-process-ms"`
}

// Binlog is the config for binlog.
Expand Down Expand Up @@ -536,6 +551,13 @@ var defaultConf = Config{

RegionCacheTTL: 600,
StoreLimit: 0,

CoprCache: CoprocessorCache{
Enabled: true,
CapacityMB: 1000,
AdmissionMaxResultMB: 10,
AdmissionMinProcessMs: 5,
},
},
Binlog: Binlog{
WriteTimeout: "15s",
Expand Down
13 changes: 13 additions & 0 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,19 @@ region-cache-ttl = 600
# default 0 means shutting off store limit.
store-limit = 0

[tikv-client.copr-cache]
# Whether to enable the copr cache. The copr cache saves the result from TiKV Coprocessor in the memory and
# reuses the result when corresponding data in TiKV is unchanged, on a region basis.
enabled = true

# The capacity in MB of the cache.
capacity-mb = 1000.0

# Only cache requests whose result set is small.
admission-max-result-mb = 10.0
# Only cache requests takes notable time to process.
admission-min-process-ms = 5

[txn-local-latches]
# Enable local latches for transactions. Enable it when
# there are lots of conflicts between transactions.
Expand Down
1 change: 1 addition & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (builder *RequestBuilder) SetTableHandles(tid int64, handles []int64) *Requ
func (builder *RequestBuilder) SetDAGRequest(dag *tipb.DAGRequest) *RequestBuilder {
if builder.err == nil {
builder.Request.Tp = kv.ReqTypeDAG
builder.Request.Cacheable = true
builder.Request.Data, builder.err = dag.Marshal()
}

Expand Down
4 changes: 4 additions & 0 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ func (s *testSuite) TestRequestBuilder1(c *C) {
EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xc, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x23},
},
},
Cacheable: true,
KeepOrder: false,
Desc: false,
Concurrency: 15,
Expand Down Expand Up @@ -385,6 +386,7 @@ func (s *testSuite) TestRequestBuilder2(c *C) {
EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xc, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x3, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x23},
},
},
Cacheable: true,
KeepOrder: false,
Desc: false,
Concurrency: 15,
Expand Down Expand Up @@ -430,6 +432,7 @@ func (s *testSuite) TestRequestBuilder3(c *C) {
EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xf, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x65},
},
},
Cacheable: true,
KeepOrder: false,
Desc: false,
Concurrency: 15,
Expand Down Expand Up @@ -476,6 +479,7 @@ func (s *testSuite) TestRequestBuilder4(c *C) {
StartTs: 0x0,
Data: []uint8{0x18, 0x0, 0x20, 0x0, 0x40, 0x0, 0x5a, 0x0},
KeyRanges: keyRanges,
Cacheable: true,
KeepOrder: false,
Desc: false,
Concurrency: 15,
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ require (
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548
github.com/cznic/sortutil v0.0.0-20181122101858-f5f958428db8
github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f
github.com/dgraph-io/ristretto v0.0.1
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/go-sql-driver/mysql v0.0.0-20170715192408-3955978caca4
github.com/gogo/protobuf v1.2.1
Expand Down
12 changes: 8 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand All @@ -12,6 +14,8 @@ github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kB
github.com/blacktear23/go-proxyprotocol v0.0.0-20180807104634-af7a81e8dd0d h1:rQlvB2AYWme2bIB18r/SipGiMEVJYE9U0z+MGoU/LtQ=
github.com/blacktear23/go-proxyprotocol v0.0.0-20180807104634-af7a81e8dd0d/go.mod h1:VKt7CNAQxpFpSDz3sXyj9hY/GbVsQCr0sB3w59nE7lU=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20171208011716-f6d7a1f6fbf3/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
Expand Down Expand Up @@ -42,10 +46,12 @@ github.com/cznic/y v0.0.0-20170802143616-045f81c6662a/go.mod h1:1rk5VM7oSnA4vjp+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.0.1 h1:cJwdnj42uV8Jg4+KLrYovLiCgIfz9wtWm6E6KA+1tLs=
github.com/dgraph-io/ristretto v0.0.1/go.mod h1:T40EBc7CJke8TkpiYfGGKAeFjSaxuFXhuXRyumBd6RE=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f h1:dDxpBYafY/GYpcl+LS4Bn3ziLPuEdGRkRjYAbSlWxSA=
github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw=
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
Expand Down Expand Up @@ -193,8 +199,6 @@ github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d h1:F8vp38kTAckN+
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d h1:rCmRK0lCRrHMUbS99BKFYhK9YxJDNw0xB033cQbYo0s=
github.com/pingcap/fn v0.0.0-20191016082858-07623b84a47d/go.mod h1:fMRU1BA1y+r89AxUoaAar4JjrhUkVDt0o0Np6V8XbDQ=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107 h1:IXAs9fKFQZJVHf9cXWfUh8Nq8zPO9ihgPgNuU1j7bIo=
Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ type Request struct {
ReplicaRead ReplicaReadType
// StoreType represents this request is sent to the which type of store.
StoreType StoreType
// Cacheable is true if the request can be cached. Currently only deterministic DAG requests can be cached.
Cacheable bool
}

// ResultSubset represents a result subset from a single storage unit.
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (s *testCommitterSuite) SetUpTest(c *C) {
client := mocktikv.NewRPCClient(s.cluster, mvccStore)
pdCli := &codecPDClient{mocktikv.NewPDClient(s.cluster)}
spkv := NewMockSafePointKV()
store, err := newTikvStore("mocktikv-store", pdCli, spkv, client, false)
store, err := newTikvStore("mocktikv-store", pdCli, spkv, client, false, nil)
c.Assert(err, IsNil)
store.EnableTxnLocalLatches(1024000)
s.store = store
Expand Down
59 changes: 54 additions & 5 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,12 +712,36 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
}
})

req := tikvrpc.NewReplicaReadRequest(task.cmdType, &coprocessor.Request{
copReq := coprocessor.Request{
Tp: worker.req.Tp,
StartTs: worker.req.StartTs,
Data: worker.req.Data,
Ranges: task.ranges.toPBRanges(),
}, worker.req.ReplicaRead, worker.replicaReadSeed, kvrpcpb.Context{
}

var cacheKey []byte = nil
var cacheValue *coprCacheValue = nil

// If there are many ranges, it is very likely to be a TableLookupRequest. They are not worth to cache since
// computing is not the main cost. Ignore such requests directly to avoid slowly building the cache key.
if task.cmdType == tikvrpc.CmdCop && worker.store.coprCache != nil && worker.req.Cacheable && len(copReq.Ranges) < 10 {
cKey, err := coprCacheBuildKey(&copReq)
if err == nil {
cacheKey = cKey
cValue := worker.store.coprCache.Get(cKey)
if cValue != nil && cValue.RegionID == task.region.id && cValue.TimeStamp <= worker.req.StartTs {
// Append cache version to the request to skip Coprocessor computation if possible
// when request result is cached
copReq.IsCacheEnabled = true
copReq.CacheIfMatchVersion = cValue.RegionDataVersion
cacheValue = cValue
}
} else {
logutil.BgLogger().Warn("Failed to build copr cache key", zap.Error(err))
}
}

req := tikvrpc.NewReplicaReadRequest(task.cmdType, &copReq, worker.req.ReplicaRead, worker.replicaReadSeed, kvrpcpb.Context{
IsolationLevel: pbIsolationLevel(worker.req.IsolationLevel),
Priority: kvPriorityToCommandPri(worker.req.Priority),
NotFillCache: worker.req.NotFillCache,
Expand All @@ -743,7 +767,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
}

// Handles the response for non-streaming copTask.
return worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp.Resp.(*coprocessor.Response)}, task, ch, nil, costTime)
return worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp.Resp.(*coprocessor.Response)}, cacheKey, cacheValue, task, ch, nil, costTime)
}

type minCommitTSPushed struct {
Expand Down Expand Up @@ -877,7 +901,7 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *RP
return nil, nil
}
for {
remainedTasks, err := worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp}, task, ch, lastRange, costTime)
remainedTasks, err := worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp}, nil, nil, task, ch, lastRange, costTime)
if err != nil || len(remainedTasks) != 0 {
return remainedTasks, errors.Trace(err)
}
Expand Down Expand Up @@ -909,7 +933,7 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *RP
// returns more tasks when that happens, or handles the response if no error.
// if we're handling streaming coprocessor response, lastRange is the range of last
// successful response, otherwise it's nil.
func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCContext, resp *copResponse, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange, costTime time.Duration) ([]*copTask, error) {
func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCContext, resp *copResponse, cacheKey []byte, cacheValue *coprCacheValue, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange, costTime time.Duration) ([]*copTask, error) {
if regionErr := resp.pbResp.GetRegionError(); regionErr != nil {
if rpcCtx != nil && task.storeType == kv.TiDB {
resp.err = errors.Errorf("error: %v", regionErr)
Expand Down Expand Up @@ -977,6 +1001,31 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon
}
}
}
if resp.pbResp.IsCacheHit {
if cacheValue == nil {
return nil, errors.New("Internal error: received illegal TiKV response")
}
// Cache hit and is valid: use cached data as response data and we don't update the cache.
data := make([]byte, len(cacheValue.Data))
copy(data, cacheValue.Data)
resp.pbResp.Data = data
} else {
// Cache not hit or cache hit but not valid: update the cache if the response can be cached.
if cacheKey != nil && resp.pbResp.CacheLastVersion > 0 {
if worker.store.coprCache.CheckAdmission(resp.pbResp.Data.Size(), resp.detail.ProcessTime) {
data := make([]byte, len(resp.pbResp.Data))
copy(data, resp.pbResp.Data)

newCacheValue := coprCacheValue{
Data: data,
TimeStamp: worker.req.StartTs,
RegionID: task.region.id,
RegionDataVersion: resp.pbResp.CacheLastVersion,
}
worker.store.coprCache.Set(cacheKey, &newCacheValue)
}
}
}
worker.sendToRespCh(resp, ch, true)
return nil, nil
}
Expand Down
Loading

0 comments on commit ba2ff85

Please sign in to comment.