Skip to content

Commit

Permalink
增加分片集群实现
Browse files Browse the repository at this point in the history
  • Loading branch information
xfali committed Aug 5, 2019
1 parent d87bb63 commit e93896f
Show file tree
Hide file tree
Showing 11 changed files with 608 additions and 22 deletions.
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
go get github.com/xfali/gache
```

## 运行
## 运行(主从复制)

测试示例如下:

Expand All @@ -29,6 +29,23 @@ follower2:
./gache --raft-addr 127.0.0.1:7003 --raft-dir ./tmp/node3 -p 8003 --raft-join 127.0.0.1:8001
```

## 运行(分片)

node1:
```
./gache -p 8001 --cluster-port 9001 --cluster-slot 0-5000
```

node2:
```
./gache -p 8002 --cluster-port 9002 --cluster-slot 5001-10000 --cluster-members 127.0.0.1:9001
```

node3:
```
./gache -p 8003 --cluster-port 9003 --cluster-slot 10001-16383 --cluster-members 127.0.0.1:9001
```

## 访问

地址:
Expand Down
74 changes: 74 additions & 0 deletions cluster/gossip/event_delegate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (C) 2019, Xiongfa Li.
// All right reserved.
// @author xiongfa.li
// @version V1.0
// Description:

package gossip

import (
"github.com/hashicorp/memberlist"
"log"
"sync"
)

type handle func(meta []byte)

type NodeDelegate struct {
mu sync.Mutex
Enabled bool
JoinFunc handle
LeaveFunc handle
UpdateFunc handle
}

func defaultJoin(meta []byte) {
log.Printf("defaultJoin meta: %s\n", string(meta))
}

func defaultLeave(meta []byte) {
log.Printf("defaultLeave meta: %s\n", string(meta))
}

func defaultUpdate(meta []byte) {
log.Printf("defaultUpdate meta: %s\n", string(meta))
}

func DefaultNodeDelegate() *NodeDelegate {
return &NodeDelegate{
Enabled: true,
JoinFunc: defaultJoin,
LeaveFunc: defaultLeave,
UpdateFunc: defaultUpdate,
}
}

func (d *NodeDelegate) NotifyJoin(n *memberlist.Node) {
d.mu.Lock()
defer d.mu.Unlock()
if d.Enabled {
d.JoinFunc(n.Meta)
}
}

// NotifyLeave is invoked when a node is detected to have left.
// The Node argument must not be modified.
func (d *NodeDelegate) NotifyLeave(n *memberlist.Node) {
d.mu.Lock()
defer d.mu.Unlock()
if d.Enabled {
d.LeaveFunc(n.Meta)
}
}

// NotifyUpdate is invoked when a node is detected to have
// updated, usually involving the meta data. The Node argument
// must not be modified.
func (d *NodeDelegate) NotifyUpdate(n *memberlist.Node) {
d.mu.Lock()
defer d.mu.Unlock()
if d.Enabled {
d.UpdateFunc(n.Meta)
}
}

79 changes: 79 additions & 0 deletions cluster/gossip/gossip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright (C) 2019, Xiongfa Li.
// All right reserved.
// @author xiongfa.li
// @version V1.0
// Description:

package gossip

import (
"gache/cluster"
"gache/config"
"github.com/hashicorp/memberlist"
"os"
"strconv"
"strings"
"time"
)

type members memberlist.Memberlist

type Cluster interface {
LocalAddr() string
UpdateLocal(meta []byte) error
UpdateAndWait(meta []byte, timeout time.Duration) error
Close() error
}

func Startup(conf *config.Config, delegate *NodeDelegate) (Cluster, error) {
hostname, _ := os.Hostname()
config := memberlist.DefaultLocalConfig()
config.Name = hostname + "-" + strconv.Itoa(conf.ClusterPort)
// config := memberlist.DefaultLocalConfig()
config.BindPort = conf.ClusterPort
config.AdvertisePort = conf.ClusterPort
config.Events = delegate

list, err := memberlist.Create(config)
if err != nil {
return nil, err
}

_, _, sloterr := cluster.GetSlots(conf.ClusterSlot)
if sloterr != nil {
list.Shutdown()
return nil, sloterr
}

memList := strings.Split(conf.ClusterMemebers, ",")
var validMembers []string
for _, v := range memList {
mem := strings.TrimSpace(v)
if mem != "" {
validMembers = append(validMembers, mem)
}
}
if len(validMembers) > 0 {
list.Join(validMembers)
}

return (*members)(list), nil
}

func (c *members)LocalAddr() string {
return (*memberlist.Memberlist)(c).LocalNode().Address()
}

func (c *members) UpdateLocal(meta []byte) error {
(*memberlist.Memberlist)(c).LocalNode().Meta = meta
return nil
}

func (c *members) UpdateAndWait(meta []byte, timeout time.Duration) error {
(*memberlist.Memberlist)(c).LocalNode().Meta = meta
return (*memberlist.Memberlist)(c).UpdateNode(timeout)
}

func (c *members) Close() error {
return (*memberlist.Memberlist)(c).Shutdown()
}
4 changes: 4 additions & 0 deletions cluster/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package cluster

import (
"errors"
"gache/config"
"gache/db"
"github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -68,6 +69,9 @@ func New(conf *config.Config, db *db.GacheDb, notifyChan chan bool) (*raft.Raft,
}

func DoJoin(addr string, cluster *raft.Raft) error {
if cluster == nil {
return errors.New("raft is nil")
}
addPeerFuture := cluster.AddVoter(raft.ServerID(addr),
raft.ServerAddress(addr),
0, 0)
Expand Down
23 changes: 23 additions & 0 deletions cluster/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@
package cluster

import (
"errors"
"fmt"
"gache/config"
"io"
"io/ioutil"
"net/http"
"os"
"strconv"
"strings"
)

func Join(conf *config.Config) error {
Expand Down Expand Up @@ -45,3 +48,23 @@ func IsPathExists(path string) bool {
func Mkdir(path string) error {
return os.MkdirAll(path, os.ModePerm)
}

func GetSlots(slotStr string) (uint32, uint32, error) {
slots := strings.Split(slotStr, "-")
beginSlot, endSlot := 0, 0
if len(slots) > 1 {
i, err := strconv.Atoi(slots[0])
if err != nil {
return 0, 0, err
}
beginSlot = i
j, err := strconv.Atoi(slots[1])
if err != nil {
return 0, 0, err
}
endSlot = j
} else {
return 0, 0, errors.New("Parse slot error")
}
return uint32(beginSlot), uint32(endSlot), nil
}
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,10 @@ type Config struct {
RaftTcpAddr string
RaftDir string
RaftJoinAddr string

ClusterPort int
ClusterMemebers string
ClusterSlot string

ApiPort int
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.12
require (
github.com/hashicorp/go-hclog v0.9.1
github.com/hashicorp/go-msgpack v0.5.5
github.com/hashicorp/memberlist v0.1.4
github.com/hashicorp/raft v1.1.0
github.com/hashicorp/raft-boltdb v0.0.0-20190605210249-ef2e128ed477
)
22 changes: 22 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 h1:EFSB7Zo9Eg91v7MJPVsifUysc/wPdN+NOnVe6bWbdBM=
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
Expand All @@ -9,33 +10,54 @@ github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-hclog v0.9.1 h1:9PZfAcVEvez4yhLH2TBU64/h/z4xlFI80cWXRrxuKuM=
github.com/hashicorp/go-hclog v0.9.1/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI=
github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs=
github.com/hashicorp/go-sockaddr v1.0.0 h1:GeH6tui99pF4NJgfnhp+L6+FfobzVW3Ah46sLo0ICXs=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/memberlist v0.1.4 h1:gkyML/r71w3FL8gUi74Vk76avkj/9lYAY9lvg0OcoGs=
github.com/hashicorp/memberlist v0.1.4/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/raft v1.1.0 h1:qPMePEczgbkiQsqCsRfuHRqvDUO+zmAInDaD5ptXlq0=
github.com/hashicorp/raft v1.1.0/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM=
github.com/hashicorp/raft-boltdb v0.0.0-20190605210249-ef2e128ed477 h1:bLsrEmB2NUwkHH18FOJBIa04wOV2RQalJrcafTYu6Lg=
github.com/hashicorp/raft-boltdb v0.0.0-20190605210249-ef2e128ed477/go.mod h1:aUF6HQr8+t3FC/ZHAC+pZreUBhTaxumuu3L+d37uRxk=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/miekg/dns v1.0.14 h1:9jZdLNd/P4+SfEJ0TNyxYpsK8N4GtfylBLqtbYN1sbA=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3 h1:KYQXGkl6vs02hK7pK4eIbw0NpNPedieTSTEiJ//bwGs=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Loading

0 comments on commit e93896f

Please sign in to comment.