forked from redis/go-redis
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
2c4f6f8
commit 46f49a1
Showing
5 changed files
with
348 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,237 @@ | ||
package redis | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"log" | ||
"sync" | ||
"time" | ||
|
||
"github.com/golang/groupcache/consistenthash" | ||
) | ||
|
||
var ( | ||
errRingShardsDown = errors.New("redis: all ring shards are down") | ||
) | ||
|
||
// RingOptions are used to configure a ring client and should be | ||
// passed to NewRing. | ||
type RingOptions struct { | ||
// A map of name => host:port addresses of ring shards. | ||
Addrs map[string]string | ||
|
||
// Following options are copied from Options struct. | ||
|
||
DB int64 | ||
Password string | ||
|
||
DialTimeout time.Duration | ||
ReadTimeout time.Duration | ||
WriteTimeout time.Duration | ||
|
||
PoolSize int | ||
PoolTimeout time.Duration | ||
IdleTimeout time.Duration | ||
} | ||
|
||
func (opt *RingOptions) clientOptions() *Options { | ||
return &Options{ | ||
DB: opt.DB, | ||
Password: opt.Password, | ||
|
||
DialTimeout: opt.DialTimeout, | ||
ReadTimeout: opt.ReadTimeout, | ||
WriteTimeout: opt.WriteTimeout, | ||
|
||
PoolSize: opt.PoolSize, | ||
PoolTimeout: opt.PoolTimeout, | ||
IdleTimeout: opt.IdleTimeout, | ||
} | ||
} | ||
|
||
type ringShard struct { | ||
Client *Client | ||
down int | ||
} | ||
|
||
func (shard *ringShard) String() string { | ||
var state string | ||
if shard.IsUp() { | ||
state = "up" | ||
} else { | ||
state = "down" | ||
} | ||
return fmt.Sprintf("%s is %s", shard.Client, state) | ||
} | ||
|
||
func (shard *ringShard) IsDown() bool { | ||
const threshold = 5 | ||
return shard.down >= threshold | ||
} | ||
|
||
func (shard *ringShard) IsUp() bool { | ||
return !shard.IsDown() | ||
} | ||
|
||
// Vote votes to set shard state and returns true if state was changed. | ||
func (shard *ringShard) Vote(up bool) bool { | ||
if up { | ||
changed := shard.IsDown() | ||
shard.down = 0 | ||
return changed | ||
} | ||
|
||
if shard.IsDown() { | ||
return false | ||
} | ||
|
||
shard.down++ | ||
return shard.IsDown() | ||
} | ||
|
||
// Ring is a Redis client that uses constistent hashing to distribute | ||
// keys across multiple Redis servers (shards). | ||
// | ||
// It monitors the state of each shard and removes dead shards from | ||
// the ring. When shard comes online it is added back to the ring. This | ||
// gives you maximum availability and partition tolerance, but no | ||
// consistency between different shards or even clients. Each client | ||
// uses shards that are available to the client and does not do any | ||
// coordination when shard state is changed. | ||
// | ||
// Ring should be used when you use multiple Redis servers for caching | ||
// and can tolerate losing data when one of the servers dies. | ||
// Otherwise you should use Redis Cluster. | ||
type Ring struct { | ||
commandable | ||
|
||
nreplicas int | ||
|
||
mx sync.RWMutex | ||
hash *consistenthash.Map | ||
shards map[string]*ringShard | ||
|
||
closed bool | ||
} | ||
|
||
func NewRing(opt *RingOptions) *Ring { | ||
const nreplicas = 100 | ||
ring := &Ring{ | ||
nreplicas: nreplicas, | ||
hash: consistenthash.New(nreplicas, nil), | ||
shards: make(map[string]*ringShard), | ||
} | ||
ring.commandable.process = ring.process | ||
for name, addr := range opt.Addrs { | ||
clopt := opt.clientOptions() | ||
clopt.Addr = addr | ||
ring.addClient(name, NewClient(clopt)) | ||
} | ||
go ring.heartbeat() | ||
return ring | ||
} | ||
|
||
func (ring *Ring) addClient(name string, cl *Client) { | ||
ring.mx.Lock() | ||
ring.hash.Add(name) | ||
ring.shards[name] = &ringShard{Client: cl} | ||
ring.mx.Unlock() | ||
} | ||
|
||
func (ring *Ring) getClient(key string) (*Client, error) { | ||
ring.mx.RLock() | ||
|
||
if ring.closed { | ||
return nil, errClosed | ||
} | ||
|
||
name := ring.hash.Get(key) | ||
if name == "" { | ||
ring.mx.RUnlock() | ||
return nil, errRingShardsDown | ||
} | ||
|
||
if shard, ok := ring.shards[name]; ok { | ||
ring.mx.RUnlock() | ||
return shard.Client, nil | ||
} | ||
|
||
ring.mx.RUnlock() | ||
return nil, errRingShardsDown | ||
} | ||
|
||
func (ring *Ring) process(cmd Cmder) { | ||
cl, err := ring.getClient(hashKey(cmd.clusterKey())) | ||
if err != nil { | ||
cmd.setErr(err) | ||
return | ||
} | ||
cl.baseClient.process(cmd) | ||
} | ||
|
||
// rebalance removes dead shards from the ring. | ||
func (ring *Ring) rebalance() { | ||
defer ring.mx.Unlock() | ||
ring.mx.Lock() | ||
|
||
ring.hash = consistenthash.New(ring.nreplicas, nil) | ||
for name, shard := range ring.shards { | ||
if shard.IsUp() { | ||
ring.hash.Add(name) | ||
} | ||
} | ||
} | ||
|
||
// heartbeat monitors state of each shard in the ring. | ||
func (ring *Ring) heartbeat() { | ||
ticker := time.NewTicker(100 * time.Millisecond) | ||
defer ticker.Stop() | ||
for _ = range ticker.C { | ||
var rebalance bool | ||
|
||
ring.mx.RLock() | ||
|
||
if ring.closed { | ||
ring.mx.RUnlock() | ||
break | ||
} | ||
|
||
for _, shard := range ring.shards { | ||
err := shard.Client.Ping().Err() | ||
if shard.Vote(err == nil) { | ||
log.Printf("redis: ring shard state changed: %s", shard) | ||
rebalance = true | ||
} | ||
} | ||
|
||
ring.mx.RUnlock() | ||
|
||
if rebalance { | ||
ring.rebalance() | ||
} | ||
} | ||
} | ||
|
||
// Close closes the ring client, releasing any open resources. | ||
// | ||
// It is rare to Close a Client, as the Client is meant to be | ||
// long-lived and shared between many goroutines. | ||
func (ring *Ring) Close() (retErr error) { | ||
defer ring.mx.Unlock() | ||
ring.mx.Lock() | ||
|
||
if ring.closed { | ||
return nil | ||
} | ||
ring.closed = true | ||
|
||
for _, shard := range ring.shards { | ||
if err := shard.Client.Close(); err != nil { | ||
retErr = err | ||
} | ||
} | ||
ring.hash = nil | ||
ring.shards = nil | ||
|
||
return retErr | ||
} |
Oops, something went wrong.