From e93896ff76775ef49be614aff5be8e4c30ee45af Mon Sep 17 00:00:00 2001 From: xfali Date: Mon, 5 Aug 2019 22:59:13 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=88=86=E7=89=87=E9=9B=86?= =?UTF-8?q?=E7=BE=A4=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 19 +++- cluster/gossip/event_delegate.go | 74 +++++++++++++ cluster/gossip/gossip.go | 79 ++++++++++++++ cluster/raft.go | 4 + cluster/util.go | 23 +++++ config/config.go | 6 ++ go.mod | 1 + go.sum | 22 ++++ handler/cluster.go | 171 +++++++++++++++++++++++++++++++ handler/handler.go | 139 +++++++++++++++++++++++-- main.go | 92 ++++++++++++++--- 11 files changed, 608 insertions(+), 22 deletions(-) create mode 100644 cluster/gossip/event_delegate.go create mode 100644 cluster/gossip/gossip.go create mode 100644 handler/cluster.go diff --git a/README.md b/README.md index f33654a..47162f6 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ go get github.com/xfali/gache ``` -## 运行 +## 运行(主从复制) 测试示例如下: @@ -29,6 +29,23 @@ follower2: ./gache --raft-addr 127.0.0.1:7003 --raft-dir ./tmp/node3 -p 8003 --raft-join 127.0.0.1:8001 ``` +## 运行(分片) + +node1: +``` +./gache -p 8001 --cluster-port 9001 --cluster-slot 0-5000 +``` + +node2: +``` +./gache -p 8002 --cluster-port 9002 --cluster-slot 5001-10000 --cluster-members 127.0.0.1:9001 +``` + +node3: +``` +./gache -p 8003 --cluster-port 9003 --cluster-slot 10001-16383 --cluster-members 127.0.0.1:9001 +``` + ## 访问 地址: diff --git a/cluster/gossip/event_delegate.go b/cluster/gossip/event_delegate.go new file mode 100644 index 0000000..fc1e53f --- /dev/null +++ b/cluster/gossip/event_delegate.go @@ -0,0 +1,74 @@ +// Copyright (C) 2019, Xiongfa Li. +// All right reserved. +// @author xiongfa.li +// @version V1.0 +// Description: + +package gossip + +import ( + "github.com/hashicorp/memberlist" + "log" + "sync" +) + +type handle func(meta []byte) + +type NodeDelegate struct { + mu sync.Mutex + Enabled bool + JoinFunc handle + LeaveFunc handle + UpdateFunc handle +} + +func defaultJoin(meta []byte) { + log.Printf("defaultJoin meta: %s\n", string(meta)) +} + +func defaultLeave(meta []byte) { + log.Printf("defaultLeave meta: %s\n", string(meta)) +} + +func defaultUpdate(meta []byte) { + log.Printf("defaultUpdate meta: %s\n", string(meta)) +} + +func DefaultNodeDelegate() *NodeDelegate { + return &NodeDelegate{ + Enabled: true, + JoinFunc: defaultJoin, + LeaveFunc: defaultLeave, + UpdateFunc: defaultUpdate, + } +} + +func (d *NodeDelegate) NotifyJoin(n *memberlist.Node) { + d.mu.Lock() + defer d.mu.Unlock() + if d.Enabled { + d.JoinFunc(n.Meta) + } +} + +// NotifyLeave is invoked when a node is detected to have left. +// The Node argument must not be modified. +func (d *NodeDelegate) NotifyLeave(n *memberlist.Node) { + d.mu.Lock() + defer d.mu.Unlock() + if d.Enabled { + d.LeaveFunc(n.Meta) + } +} + +// NotifyUpdate is invoked when a node is detected to have +// updated, usually involving the meta data. The Node argument +// must not be modified. +func (d *NodeDelegate) NotifyUpdate(n *memberlist.Node) { + d.mu.Lock() + defer d.mu.Unlock() + if d.Enabled { + d.UpdateFunc(n.Meta) + } +} + diff --git a/cluster/gossip/gossip.go b/cluster/gossip/gossip.go new file mode 100644 index 0000000..85f2881 --- /dev/null +++ b/cluster/gossip/gossip.go @@ -0,0 +1,79 @@ +// Copyright (C) 2019, Xiongfa Li. +// All right reserved. +// @author xiongfa.li +// @version V1.0 +// Description: + +package gossip + +import ( + "gache/cluster" + "gache/config" + "github.com/hashicorp/memberlist" + "os" + "strconv" + "strings" + "time" +) + +type members memberlist.Memberlist + +type Cluster interface { + LocalAddr() string + UpdateLocal(meta []byte) error + UpdateAndWait(meta []byte, timeout time.Duration) error + Close() error +} + +func Startup(conf *config.Config, delegate *NodeDelegate) (Cluster, error) { + hostname, _ := os.Hostname() + config := memberlist.DefaultLocalConfig() + config.Name = hostname + "-" + strconv.Itoa(conf.ClusterPort) + // config := memberlist.DefaultLocalConfig() + config.BindPort = conf.ClusterPort + config.AdvertisePort = conf.ClusterPort + config.Events = delegate + + list, err := memberlist.Create(config) + if err != nil { + return nil, err + } + + _, _, sloterr := cluster.GetSlots(conf.ClusterSlot) + if sloterr != nil { + list.Shutdown() + return nil, sloterr + } + + memList := strings.Split(conf.ClusterMemebers, ",") + var validMembers []string + for _, v := range memList { + mem := strings.TrimSpace(v) + if mem != "" { + validMembers = append(validMembers, mem) + } + } + if len(validMembers) > 0 { + list.Join(validMembers) + } + + return (*members)(list), nil +} + +func (c *members)LocalAddr() string { + return (*memberlist.Memberlist)(c).LocalNode().Address() +} + +func (c *members) UpdateLocal(meta []byte) error { + (*memberlist.Memberlist)(c).LocalNode().Meta = meta + return nil +} + +func (c *members) UpdateAndWait(meta []byte, timeout time.Duration) error { + (*memberlist.Memberlist)(c).LocalNode().Meta = meta + return (*memberlist.Memberlist)(c).UpdateNode(timeout) +} + +func (c *members) Close() error { + return (*memberlist.Memberlist)(c).Shutdown() +} diff --git a/cluster/raft.go b/cluster/raft.go index b7d5962..f3b1f56 100644 --- a/cluster/raft.go +++ b/cluster/raft.go @@ -7,6 +7,7 @@ package cluster import ( + "errors" "gache/config" "gache/db" "github.com/hashicorp/go-hclog" @@ -68,6 +69,9 @@ func New(conf *config.Config, db *db.GacheDb, notifyChan chan bool) (*raft.Raft, } func DoJoin(addr string, cluster *raft.Raft) error { + if cluster == nil { + return errors.New("raft is nil") + } addPeerFuture := cluster.AddVoter(raft.ServerID(addr), raft.ServerAddress(addr), 0, 0) diff --git a/cluster/util.go b/cluster/util.go index cddbd53..65b17c3 100644 --- a/cluster/util.go +++ b/cluster/util.go @@ -7,12 +7,15 @@ package cluster import ( + "errors" "fmt" "gache/config" "io" "io/ioutil" "net/http" "os" + "strconv" + "strings" ) func Join(conf *config.Config) error { @@ -45,3 +48,23 @@ func IsPathExists(path string) bool { func Mkdir(path string) error { return os.MkdirAll(path, os.ModePerm) } + +func GetSlots(slotStr string) (uint32, uint32, error) { + slots := strings.Split(slotStr, "-") + beginSlot, endSlot := 0, 0 + if len(slots) > 1 { + i, err := strconv.Atoi(slots[0]) + if err != nil { + return 0, 0, err + } + beginSlot = i + j, err := strconv.Atoi(slots[1]) + if err != nil { + return 0, 0, err + } + endSlot = j + } else { + return 0, 0, errors.New("Parse slot error") + } + return uint32(beginSlot), uint32(endSlot), nil +} diff --git a/config/config.go b/config/config.go index 53b611f..9a1462a 100644 --- a/config/config.go +++ b/config/config.go @@ -10,4 +10,10 @@ type Config struct { RaftTcpAddr string RaftDir string RaftJoinAddr string + + ClusterPort int + ClusterMemebers string + ClusterSlot string + + ApiPort int } diff --git a/go.mod b/go.mod index 55f808f..4a0832b 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.12 require ( github.com/hashicorp/go-hclog v0.9.1 github.com/hashicorp/go-msgpack v0.5.5 + github.com/hashicorp/memberlist v0.1.4 github.com/hashicorp/raft v1.1.0 github.com/hashicorp/raft-boltdb v0.0.0-20190605210249-ef2e128ed477 ) diff --git a/go.sum b/go.sum index e6032f8..6cd8d39 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,5 @@ github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= +github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 h1:EFSB7Zo9Eg91v7MJPVsifUysc/wPdN+NOnVe6bWbdBM= github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= @@ -9,22 +10,36 @@ github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw= +github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-hclog v0.9.1 h1:9PZfAcVEvez4yhLH2TBU64/h/z4xlFI80cWXRrxuKuM= github.com/hashicorp/go-hclog v0.9.1/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= +github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI= github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= +github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o= +github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= +github.com/hashicorp/go-sockaddr v1.0.0 h1:GeH6tui99pF4NJgfnhp+L6+FfobzVW3Ah46sLo0ICXs= +github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/memberlist v0.1.4 h1:gkyML/r71w3FL8gUi74Vk76avkj/9lYAY9lvg0OcoGs= +github.com/hashicorp/memberlist v0.1.4/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= github.com/hashicorp/raft v1.1.0 h1:qPMePEczgbkiQsqCsRfuHRqvDUO+zmAInDaD5ptXlq0= github.com/hashicorp/raft v1.1.0/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM= github.com/hashicorp/raft-boltdb v0.0.0-20190605210249-ef2e128ed477 h1:bLsrEmB2NUwkHH18FOJBIa04wOV2RQalJrcafTYu6Lg= github.com/hashicorp/raft-boltdb v0.0.0-20190605210249-ef2e128ed477/go.mod h1:aUF6HQr8+t3FC/ZHAC+pZreUBhTaxumuu3L+d37uRxk= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/miekg/dns v1.0.14 h1:9jZdLNd/P4+SfEJ0TNyxYpsK8N4GtfylBLqtbYN1sbA= +github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= +github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -32,10 +47,17 @@ github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4 github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I= +github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= +golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3 h1:KYQXGkl6vs02hK7pK4eIbw0NpNPedieTSTEiJ//bwGs= +golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/handler/cluster.go b/handler/cluster.go new file mode 100644 index 0000000..3b9b0cc --- /dev/null +++ b/handler/cluster.go @@ -0,0 +1,171 @@ +// Copyright (C) 2019, Xiongfa Li. +// All right reserved. +// @author xiongfa.li +// @version V1.0 +// Description: + +package handler + +import ( + "hash/crc32" + "log" + "sort" + "sync" + "sync/atomic" +) + +const ( + OK = iota + ERROR + NOT_READY + MOVE +) + +type NodeInfo struct { + ApiAddr string + Addr string + SlotBegin uint32 + SlotEnd uint32 + Master bool +} + +type NodeList []NodeInfo + +type ClusterManager struct { + mu sync.Mutex + Nodes NodeList + + state int32 +} + +var CRC32Q *crc32.Table + +func init() { + CRC32Q = crc32.MakeTable(0xD5828281) +} + +func (n *NodeList) Len() int { + return len(*n) +} + +func (n *NodeList) Swap(i, j int) { + (*n)[i], (*n)[j] = (*n)[j], (*n)[i] +} + +func (n *NodeList) Less(i, j int) bool { + return (*n)[i].SlotBegin < (*n)[j].SlotBegin +} + +func (cm *ClusterManager) Update(node NodeInfo) { + cm.mu.Lock() + defer cm.mu.Unlock() + + found := false + for i := range cm.Nodes { + if cm.Nodes[i].Addr == node.Addr { + cm.Nodes[i].Master = node.Master + cm.Nodes[i].SlotBegin = node.SlotBegin + cm.Nodes[i].SlotEnd = node.SlotEnd + found = true + break + } + } + if !found { + cm.Nodes = append(cm.Nodes, node) + } + cm.checkNode() +} + +func (cm *ClusterManager) Leave(node NodeInfo) { + cm.mu.Lock() + defer cm.mu.Unlock() + + remove := -1 + for i := range cm.Nodes { + if cm.Nodes[i].Addr == node.Addr { + remove = i + break + } + } + + if remove >= 0 { + cm.Nodes = append(cm.Nodes[:remove], cm.Nodes[remove+1:]...) + } + cm.checkNode() +} + +func (cm *ClusterManager) Join(node NodeInfo) { + if node.Addr == "" { + return + } + cm.mu.Lock() + defer cm.mu.Unlock() + + for i := range cm.Nodes { + if cm.Nodes[i].Addr == node.Addr { + log.Printf("Join same node: %s\n", node.Addr) + return + } + } + + cm.Nodes = append(cm.Nodes, node) + cm.checkNode() +} + +func (cm *ClusterManager) Enable() bool { + return atomic.LoadInt32(&cm.state) == OK +} + +func (cm *ClusterManager) checkNode() bool { + sort.Sort(&cm.Nodes) + log.Printf("checkNode %v\n", cm.Nodes) + length := len(cm.Nodes) + if length == 0 { + atomic.StoreInt32(&cm.state, ERROR) + log.Printf("checkNode status: ERROR\n") + return false + } + if cm.Nodes[0].SlotBegin != 0 { + atomic.StoreInt32(&cm.state, NOT_READY) + log.Printf("checkNode status: NOT_READY, reason: SlotBegin is not 0\n") + return false + } + if cm.Nodes[length-1].SlotEnd != 16383 { + atomic.StoreInt32(&cm.state, NOT_READY) + log.Printf("checkNode status: NOT_READY, reason: SlotEnd is not 16383\n") + return false + } + + for i := 0; i < length-1; i++ { + if cm.Nodes[i].SlotEnd + 1 != cm.Nodes[i+1].SlotBegin { + atomic.StoreInt32(&cm.state, NOT_READY) + log.Printf("checkNode status: NOT_READY, reason: cm.Nodes[i].SlotEnd is %d cm.Nodes[i+1].SlotBegin is %d\n", + cm.Nodes[i].SlotEnd, cm.Nodes[i+1].SlotBegin) + return false + } + } + atomic.StoreInt32(&cm.state, OK) + log.Printf("checkNode status: OK\n") + return true +} + +func (cm *ClusterManager) FindNode(key string, master bool) (string, int32) { + if !cm.Enable() { + return "", cm.state + } + v := crc32.Checksum([]byte(key), CRC32Q) + slot := v % 16384 + for _, v := range cm.Nodes { + if v.SlotBegin > slot || v.SlotEnd < slot { + continue + } else { + if master { + if !v.Master { + continue + } + } + return v.ApiAddr, OK + } + } + return "", ERROR +} diff --git a/handler/handler.go b/handler/handler.go index 3e42cea..aa9fc2d 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -7,33 +7,45 @@ package handler import ( + "encoding/json" + "fmt" "gache/cluster" + "gache/cluster/gossip" "gache/command" + "gache/config" "gache/db" "github.com/hashicorp/raft" "io" "io/ioutil" "log" "net/http" + "strings" "sync/atomic" "time" ) type Handler struct { - methodMap map[string]http.HandlerFunc - raft *raft.Raft - db *db.GacheDb - leader AtomicBool + methodMap map[string]http.HandlerFunc + raft *raft.Raft + db *db.GacheDb + leader AtomicBool + cluster gossip.Cluster + clusterMgr ClusterManager + self NodeInfo } func New(raft *raft.Raft, db *db.GacheDb, notifyCh chan bool) *Handler { + var dummyCluster defaultCluster = 1 ret := &Handler{ methodMap: map[string]http.HandlerFunc{}, raft: raft, db: db, leader: 0, + cluster: &dummyCluster, + } + if raft == nil { + ret.leader.Set() } - ret.methodMap[http.MethodPost] = ret.create ret.methodMap[http.MethodPut] = ret.create ret.methodMap[http.MethodDelete] = ret.delete @@ -52,6 +64,10 @@ func New(raft *raft.Raft, db *db.GacheDb, notifyCh chan bool) *Handler { } else { ret.leader.Unset() } + node := ret.self + node.Master = ret.leader.IsSet() + b, _ := json.Marshal(node) + ret.cluster.UpdateLocal(b) break default: break @@ -62,6 +78,21 @@ func New(raft *raft.Raft, db *db.GacheDb, notifyCh chan bool) *Handler { return ret } +func (ctx *Handler) SetCluster(conf *config.Config, c gossip.Cluster) { + ctx.cluster = c + ctx.self.Addr = c.LocalAddr() + ctx.self.Master = ctx.leader.IsSet() + s := strings.Split(c.LocalAddr(), ":") + ctx.self.ApiAddr = fmt.Sprintf("%s:%d", s[0], conf.ApiPort) + + b, e, _ := cluster.GetSlots(conf.ClusterSlot) + ctx.self.SlotBegin = b + ctx.self.SlotEnd = e + + c.UpdateLocal(marshalMeta(ctx.self)) + ctx.clusterMgr.Join(ctx.self) +} + func (ctx *Handler) Handle(resp http.ResponseWriter, req *http.Request) { handleFunc := ctx.methodMap[req.Method] if handleFunc != nil { @@ -72,6 +103,15 @@ func (ctx *Handler) Handle(resp http.ResponseWriter, req *http.Request) { } func (ctx *Handler) create(resp http.ResponseWriter, req *http.Request) { + key := getKey(req) + clusterRet := ctx.checkCluster(key, true, resp, req) + if clusterRet == MOVE { + return + } + if clusterRet != OK { + resp.WriteHeader(http.StatusBadRequest) + return + } if !ctx.leader.IsSet() { resp.WriteHeader(http.StatusBadRequest) resp.Write([]byte("Not leader")) @@ -81,7 +121,6 @@ func (ctx *Handler) create(resp http.ResponseWriter, req *http.Request) { resp.WriteHeader(http.StatusBadRequest) return } - key := getKey(req) cmdReq := command.Request{ Cmd: command.SET, @@ -93,16 +132,28 @@ func (ctx *Handler) create(resp http.ResponseWriter, req *http.Request) { resp.WriteHeader(http.StatusBadRequest) return } - ctx.raft.Apply(b, 10*time.Second) + if ctx.raft == nil { + ctx.db.Set(key, value) + } else { + ctx.raft.Apply(b, 10*time.Second) + } } func (ctx *Handler) delete(resp http.ResponseWriter, req *http.Request) { + key := getKey(req) + clusterRet := ctx.checkCluster(key, true, resp, req) + if clusterRet == MOVE { + return + } + if clusterRet != OK { + resp.WriteHeader(http.StatusBadRequest) + return + } if !ctx.leader.IsSet() { resp.WriteHeader(http.StatusBadRequest) resp.Write([]byte("Not leader")) } - key := getKey(req) cmdReq := command.Request{ Cmd: command.DEL, K: key, @@ -112,11 +163,23 @@ func (ctx *Handler) delete(resp http.ResponseWriter, req *http.Request) { resp.WriteHeader(http.StatusBadRequest) return } - ctx.raft.Apply(b, 10*time.Second) + if ctx.raft == nil { + ctx.db.Delete(key) + } else { + ctx.raft.Apply(b, 10*time.Second) + } } func (ctx *Handler) get(resp http.ResponseWriter, req *http.Request) { key := getKey(req) + clusterRet := ctx.checkCluster(key, true, resp, req) + if clusterRet == MOVE { + return + } + if clusterRet != OK { + resp.WriteHeader(http.StatusBadRequest) + return + } v := ctx.db.Get(key) io.WriteString(resp, v) @@ -144,6 +207,64 @@ func (ctx *Handler) Cluster(resp http.ResponseWriter, req *http.Request) { } } +func (ctx *Handler) checkCluster(key string, master bool, resp http.ResponseWriter, req *http.Request) int32 { + addr, status := ctx.clusterMgr.FindNode(key, master) + if status == OK && addr != ctx.self.ApiAddr { + log.Printf("move to %s\n", addr) + //注意此处不使用StatusFound,由于302会出于安全考虑将POST重定向时修改为GET。使用307保持Method + http.Redirect(resp, req, "http://" + addr + req.RequestURI, http.StatusTemporaryRedirect) + return MOVE + } + return status +} + +func (ctx *Handler) NodeJoin(meta []byte) { + log.Printf("defaultJoin meta: %s\n", string(meta)) + + ctx.clusterMgr.Join(unmarshalMeta(meta)) +} + +func (ctx *Handler) NodeLeave(meta []byte) { + log.Printf("defaultLeave meta: %s\n", string(meta)) + ctx.clusterMgr.Leave(unmarshalMeta(meta)) +} + +func (ctx *Handler) NodeUpdate(meta []byte) { + log.Printf("defaultUpdate meta: %s\n", string(meta)) + ctx.clusterMgr.Update(unmarshalMeta(meta)) +} + +func marshalMeta(node NodeInfo) []byte { + b, err := json.Marshal(node) + log.Printf("%v\n", err) + return b +} + +func unmarshalMeta(meta []byte) NodeInfo { + ret := NodeInfo{} + err := json.Unmarshal(meta, &ret) + log.Printf("%v\n", err) + return ret +} + +type defaultCluster int + +func (c *defaultCluster) LocalAddr() string { + return "" +} + +func (c *defaultCluster) UpdateLocal(meta []byte) error { + return nil +} + +func (c *defaultCluster) UpdateAndWait(meta []byte, timeout time.Duration) error { + return nil +} + +func (c *defaultCluster) Close() error { + return nil +} + type AtomicBool int32 func (b *AtomicBool) IsSet() bool { return atomic.LoadInt32((*int32)(b)) == 1 } diff --git a/main.go b/main.go index 206291a..44383ab 100644 --- a/main.go +++ b/main.go @@ -9,19 +9,27 @@ package main import ( "flag" "fmt" + "gache/cluster" + "gache/cluster/gossip" "gache/config" "gache/db" "gache/handler" - "gache/cluster" "log" "net/http" + "os" + "os/signal" + "syscall" + "time" ) func main() { - port := flag.Int("p", 8080, "server port") - addr := flag.String("raft-addr", ":12345", "raft tcp address") + port := flag.Int("p", 8000, "server port") + addr := flag.String("raft-addr", "", "raft tcp address, format: :7000") dir := flag.String("raft-dir", "/tmp", "raft dir") joinAddr := flag.String("raft-join", "", "raft join addr") + gossipPort := flag.Int("cluster-port", 9000, "cluster port") + gossipMember := flag.String("cluster-members", "", "member list: HOST1:PORT1,HOST2:PORT2,HOST3:PORT3") + gossipSlots := flag.String("cluster-slot", "", "Slot: 0-16383") flag.Parse() @@ -29,27 +37,87 @@ func main() { RaftTcpAddr: *addr, RaftDir: *dir, RaftJoinAddr: *joinAddr, + + ClusterPort: *gossipPort, + ClusterMemebers: *gossipMember, + ClusterSlot: *gossipSlots, + + ApiPort: *port, } gacheDb := db.New() - notifyCh := make(chan bool, 1) - c, err := cluster.New(conf, gacheDb, notifyCh) - - if err != nil { - log.Fatal(err) + notifyCh := make(chan bool, 1) + var servers []shutdown + if conf.RaftTcpAddr != "" { + raft, err := cluster.New(conf, gacheDb, notifyCh) + if err != nil { + log.Fatal(err) + } + servers = append(servers, func() error { + raft.Shutdown() + return nil + }) } - handler := handler.New(c, gacheDb, notifyCh) + handler := handler.New(nil, gacheDb, notifyCh) if conf.RaftJoinAddr != "" { cluster.Join(conf) } + if conf.ClusterSlot != "" { + c, err := gossip.Startup(conf, &gossip.NodeDelegate{ + Enabled: true, + JoinFunc: handler.NodeJoin, + LeaveFunc: handler.NodeLeave, + UpdateFunc: handler.NodeUpdate, + }) + if err != nil { + closeAll(servers) + os.Exit(-1) + } + handler.SetCluster(conf, c) + servers = append(servers, c.Close) + } + http.HandleFunc("/key/", handler.Handle) http.HandleFunc("/join", handler.Cluster) //设置访问的ip和端口 - err = http.ListenAndServe(fmt.Sprintf(":%d", *port), nil) - if err != nil { - log.Fatal("ListenAndServe:", err) + s := &http.Server{ + Addr: fmt.Sprintf(":%d", conf.ApiPort), + Handler: nil, + ReadTimeout: 15 * time.Second, + WriteTimeout: 15 * time.Second, + IdleTimeout: 15 * time.Second, + MaxHeaderBytes: 1 << 20, + } + + go s.ListenAndServe() + servers = append(servers, s.Close) + + handleSignal(servers) +} + +type shutdown func() error + +func handleSignal(c []shutdown) { + quitChan := make(chan os.Signal) + signal.Notify(quitChan, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGHUP, + ) + <-quitChan + + closeAll(c) + log.Println("server gracefully shutdown") + close(quitChan) +} + +func closeAll(c []shutdown) { + for _, v := range c { + if err := v(); nil != err { + log.Fatalf("server shutdown failed, err: %v\n", err) + } } }