Skip to content

Commit

Permalink
store/tikv: cache store. (pingcap#2193)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored and coocood committed Dec 9, 2016
1 parent 107992f commit 1e8d970
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 140 deletions.
4 changes: 0 additions & 4 deletions store/tikv/mock-tikv/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"sync"
"time"

"github.com/juju/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/pd-client"
)
Expand Down Expand Up @@ -66,9 +65,6 @@ func (c *pdClient) GetRegion(key []byte) (*metapb.Region, *metapb.Peer, error) {

func (c *pdClient) GetStore(storeID uint64) (*metapb.Store, error) {
store := c.cluster.GetStore(storeID)
if store == nil {
return nil, errors.New("not found")
}
return store, nil
}

Expand Down
241 changes: 137 additions & 104 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"bytes"
"sync"

"github.com/golang/protobuf/proto"
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/petar/GoLLRB/llrb"
Expand All @@ -34,6 +33,10 @@ type RegionCache struct {
regions map[RegionVerID]*Region
sorted *llrb.LLRB
}
storeMu struct {
sync.RWMutex
stores map[uint64]*Store
}
}

// NewRegionCache creates a RegionCache.
Expand All @@ -43,6 +46,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
}
c.mu.regions = make(map[RegionVerID]*Region)
c.mu.sorted = llrb.New()
c.storeMu.stores = make(map[uint64]*Store)
return c
}

Expand All @@ -55,28 +59,30 @@ type RPCContext struct {

// GetRPCContext returns RPCContext for a region. If it returns nil, the region
// must be out of date and already dropped from cache.
func (c *RegionCache) GetRPCContext(id RegionVerID) *RPCContext {
func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext, error) {
c.mu.RLock()
defer c.mu.RUnlock()
if r, ok := c.mu.regions[id]; ok {
return &RPCContext{
Region: id,
KVCtx: r.GetContext(),
Addr: r.GetAddress(),
}
region, ok := c.mu.regions[id]
if !ok {
c.mu.RUnlock()
return nil, nil
}
return nil
}

// GetRegionByVerID finds a Region by Region's verID.
func (c *RegionCache) GetRegionByVerID(id RegionVerID) *Region {
c.mu.RLock()
defer c.mu.RUnlock()
kvCtx := region.GetContext()
c.mu.RUnlock()

if r, ok := c.mu.regions[id]; ok {
return r.Clone()
addr, err := c.GetStoreAddr(bo, kvCtx.GetPeer().GetStoreId())
if err != nil {
return nil, errors.Trace(err)
}
return nil
if addr == "" {
// Store not found, region must be out of date.
c.DropRegion(id)
return nil, nil
}
return &RPCContext{
Region: id,
KVCtx: kvCtx,
Addr: addr,
}, nil
}

// KeyLocation is the region and range that a key is located.
Expand Down Expand Up @@ -153,54 +159,21 @@ func (c *RegionCache) DropRegion(id RegionVerID) {
c.dropRegionFromCache(id)
}

// NextPeer picks next peer as new leader, if out of range of peers delete region.
func (c *RegionCache) NextPeer(id RegionVerID) {
// A and B get the same region and current leader is 1, they both will pick
// peer 2 as leader.
region := c.GetRegionByVerID(id)
if region == nil {
return
}
if leader, err := region.NextPeer(); err != nil {
c.DropRegion(id)
} else {
c.UpdateLeader(id, leader.GetId())
}
}

// UpdateLeader update some region cache with newer leader info.
func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderID uint64) {
func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64) {
c.mu.Lock()
defer c.mu.Unlock()

r, ok := c.mu.regions[regionID]
if !ok {
log.Debugf("regionCache: cannot find region when updating leader %d,%d", regionID, leaderID)
return
}

var found bool
for i, p := range r.meta.Peers {
if p.GetId() == leaderID {
r.curPeerIdx, r.peer = i, p
found = true
break
}
}
if !found {
log.Debugf("regionCache: cannot find peer when updating leader %d,%d", regionID, leaderID)
c.dropRegionFromCache(r.VerID())
log.Debugf("regionCache: cannot find region when updating leader %d,%d", regionID, leaderStoreID)
return
}

store, err := c.pdClient.GetStore(r.peer.GetStoreId())
if err != nil {
log.Warnf("regionCache: failed load store %d", r.peer.GetStoreId())
if !r.SwitchPeer(leaderStoreID) {
log.Debugf("regionCache: cannot find peer when updating leader %d,%d", regionID, leaderStoreID)
c.dropRegionFromCache(r.VerID())
return
}

r.addr = store.GetAddress()
}

func (c *RegionCache) getRegionFromCache(key []byte) *Region {
Expand All @@ -209,10 +182,7 @@ func (c *RegionCache) getRegionFromCache(key []byte) *Region {
r = item.(*llrbItem).region
return false
})
if r == nil {
return nil
}
if r.Contains(key) {
if r != nil && r.Contains(key) {
return r
}
return nil
Expand Down Expand Up @@ -264,24 +234,75 @@ func (c *RegionCache) loadRegion(bo *Backoffer, key []byte) (*Region, error) {
if len(meta.Peers) == 0 {
return nil, errors.New("receive Region with no peer")
}
// Move leader to the first.
region := &Region{
meta: meta,
peer: meta.Peers[0],
}
if leader != nil {
moveLeaderToFirst(meta, leader.GetStoreId())
region.SwitchPeer(leader.GetStoreId())
}
peer := meta.Peers[0]
store, err := c.pdClient.GetStore(peer.GetStoreId())
return region, nil
}
}

// GetStoreAddr returns a tikv server's address by its storeID. It checks cache
// first, sends request to pd server when necessary.
func (c *RegionCache) GetStoreAddr(bo *Backoffer, id uint64) (string, error) {
c.storeMu.RLock()
if store, ok := c.storeMu.stores[id]; ok {
c.storeMu.RUnlock()
return store.Addr, nil
}
c.storeMu.RUnlock()

addr, err := c.loadStoreAddr(bo, id)
if err != nil || addr == "" {
return "", errors.Trace(err)
}

c.storeMu.Lock()
defer c.storeMu.Unlock()
c.storeMu.stores[id] = &Store{
ID: id,
Addr: addr,
}
return addr, nil
}

func (c *RegionCache) loadStoreAddr(bo *Backoffer, id uint64) (string, error) {
for {
store, err := c.pdClient.GetStore(id)
if err != nil {
backoffErr = errors.Errorf("loadStore from PD failed, key %q, storeID: %d, err: %v", key, peer.GetStoreId(), err)
err = errors.Errorf("loadStore from PD failed, id: %d, err: %v", id, err)
if err = bo.Backoff(boPDRPC, err); err != nil {
return "", errors.Trace(err)
}
continue
}
region := &Region{
meta: meta,
peer: peer,
addr: store.GetAddress(),
curPeerIdx: 0,
if store == nil {
return "", nil
}
return store.GetAddress(), nil
}
}

// OnRequestFail is used for clearing cache when a tikv server does not respond.
func (c *RegionCache) OnRequestFail(ctx *RPCContext) {
// Switch region's leader peer to next one.
regionID := ctx.Region
c.mu.Lock()
if region, ok := c.mu.regions[regionID]; ok {
if !region.OnRequestFail(ctx.KVCtx.GetPeer().GetStoreId()) {
c.dropRegionFromCache(regionID)
}
return region, nil
}
c.mu.Unlock()

// Store's meta may be out of date.
storeID := ctx.KVCtx.GetPeer().GetStoreId()
c.storeMu.Lock()
delete(c.storeMu.stores, storeID)
c.storeMu.Unlock()
}

// OnRegionStale removes the old region and inserts new regions into the cache.
Expand All @@ -297,13 +318,12 @@ func (c *RegionCache) OnRegionStale(ctx *RPCContext, newRegions []*metapb.Region
return errors.Errorf("newRegion's range key is not encoded: %v, %v", meta, err)
}
}
moveLeaderToFirst(meta, ctx.KVCtx.GetPeer().GetStoreId())
leader := meta.Peers[0]
c.insertRegionToCache(&Region{
region := &Region{
meta: meta,
peer: leader,
addr: ctx.Addr,
})
peer: meta.Peers[0],
}
region.SwitchPeer(ctx.KVCtx.GetPeer().GetStoreId())
c.insertRegionToCache(region)
}
return nil
}
Expand Down Expand Up @@ -342,22 +362,11 @@ func (item *llrbItem) Less(other llrb.Item) bool {
return bytes.Compare(item.key, other.(*llrbItem).key) < 0
}

// Region stores region info. Region is a readonly class.
// Region stores region's meta and its leader peer.
type Region struct {
meta *metapb.Region
peer *metapb.Peer
addr string
curPeerIdx int
}

// Clone returns a copy of Region.
func (r *Region) Clone() *Region {
return &Region{
meta: proto.Clone(r.meta).(*metapb.Region),
peer: proto.Clone(r.peer).(*metapb.Peer),
addr: r.addr,
curPeerIdx: r.curPeerIdx,
}
meta *metapb.Region
peer *metapb.Peer
unreachableStores []uint64
}

// GetID returns id.
Expand Down Expand Up @@ -391,11 +400,6 @@ func (r *Region) EndKey() []byte {
return r.meta.EndKey
}

// GetAddress returns address.
func (r *Region) GetAddress() string {
return r.addr
}

// GetContext constructs kvprotopb.Context from region info.
func (r *Region) GetContext() *kvrpcpb.Context {
return &kvrpcpb.Context{
Expand All @@ -405,18 +409,47 @@ func (r *Region) GetContext() *kvrpcpb.Context {
}
}

// OnRequestFail records unreachable peer and tries to select another valid peer.
// It returns false if all peers are unreachable.
func (r *Region) OnRequestFail(storeID uint64) bool {
if r.peer.GetStoreId() != storeID {
return true
}
r.unreachableStores = append(r.unreachableStores, storeID)
L:
for _, p := range r.meta.Peers {
for _, id := range r.unreachableStores {
if p.GetStoreId() == id {
continue L
}
}
r.peer = p
return true
}
return false
}

// SwitchPeer switches current peer to the one on specific store. It returns
// false if no peer matches the storeID.
func (r *Region) SwitchPeer(storeID uint64) bool {
for _, p := range r.meta.Peers {
if p.GetStoreId() == storeID {
r.peer = p
return true
}
}
return false
}

// Contains checks whether the key is in the region, for the maximum region endKey is empty.
// startKey <= key < endKey.
func (r *Region) Contains(key []byte) bool {
return bytes.Compare(r.meta.GetStartKey(), key) <= 0 &&
(bytes.Compare(key, r.meta.GetEndKey()) < 0 || len(r.meta.GetEndKey()) == 0)
}

// NextPeer picks next peer as leader, if out of range return error.
func (r *Region) NextPeer() (*metapb.Peer, error) {
nextPeerIdx := r.curPeerIdx + 1
if nextPeerIdx >= len(r.meta.Peers) {
return nil, errors.New("out of range of peer")
}
return r.meta.Peers[nextPeerIdx], nil
// Store contains a tikv server's address.
type Store struct {
ID uint64
Addr string
}
Loading

0 comments on commit 1e8d970

Please sign in to comment.