forked from hashicorp/raft
-
Notifications
You must be signed in to change notification settings - Fork 0
/
peer.go
115 lines (97 loc) · 2.46 KB
/
peer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package raft
import (
"bytes"
"encoding/json"
"io/ioutil"
"log"
"net"
"os"
"path/filepath"
"sync"
)
const (
jsonPeerPath = "peers.json"
)
// PeerStore provides an interface for persistent storage and
// retrieval of peers. We use a seperate interface than StableStore
// since the peers may need to be editted by a human operator. For example,
// in a two node cluster, the failure of either node requires human intervention
// since consensus is impossible.
type PeerStore interface {
// Returns the list of known peers
Peers() ([]net.Addr, error)
// Sets the list of known peers. This is invoked when
// a peer is added or removed
SetPeers([]net.Addr) error
}
// StatisPeers is used to provide a static list of peers
type StaticPeers struct {
StaticPeers []net.Addr
}
func (s *StaticPeers) Peers() ([]net.Addr, error) {
return s.StaticPeers, nil
}
func (s *StaticPeers) SetPeers(p []net.Addr) error {
s.StaticPeers = p
return nil
}
// JSONPeers is used to provide peer persistence on disk in the form
// of a JSON file. This allows human operators to manipulate the file.
type JSONPeers struct {
l sync.Mutex
path string
trans Transport
}
// NewJSONPeers creates a new JSONPeers store. Requires a transport
// to handle the serialization of network addresses
func NewJSONPeers(base string, trans Transport) *JSONPeers {
path := filepath.Join(base, jsonPeerPath)
store := &JSONPeers{
path: path,
trans: trans,
}
return store
}
func (j *JSONPeers) Peers() ([]net.Addr, error) {
j.l.Lock()
defer j.l.Unlock()
// Read the file
buf, err := ioutil.ReadFile(j.path)
if err != nil && !os.IsNotExist(err) {
return nil, err
}
// Check for no peers
if len(buf) == 0 {
return nil, nil
}
// Decode the peers
var peerSet [][]byte
dec := json.NewDecoder(bytes.NewReader(buf))
if err := dec.Decode(&peerSet); err != nil {
return nil, err
}
// Deserialize each peer
var peers []net.Addr
for _, p := range peerSet {
peers = append(peers, j.trans.DecodePeer(p))
}
return peers, nil
}
func (j *JSONPeers) SetPeers(peers []net.Addr) error {
j.l.Lock()
defer j.l.Unlock()
// Encode each peer
var peerSet [][]byte
for _, p := range peers {
peerSet = append(peerSet, j.trans.EncodePeer(p))
}
// Convert to JSON
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
if err := enc.Encode(peerSet); err != nil {
return err
}
// Write out as JSON
log.Printf("[INFO] Writing peer set to %s", j.path)
return ioutil.WriteFile(j.path, buf.Bytes(), 0755)
}