Skip to content

Commit

Permalink
Adds types for version numbers and a stub for choosing the snapshot v…
Browse files Browse the repository at this point in the history
…ersion.
  • Loading branch information
slackpad committed Aug 1, 2016
1 parent 6a6cb07 commit 038c699
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 24 deletions.
5 changes: 3 additions & 2 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Raft struct {
// protocolVersion is used to inter-operate with Raft servers running
// different versions of the library. See comments in config.go for more
// details.
protocolVersion int
protocolVersion ProtocolVersion

// applyCh is used to async send logs to the main thread to
// be committed and applied to the FSM.
Expand Down Expand Up @@ -303,7 +303,8 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
if err != nil {
return fmt.Errorf("failed to snapshot FSM: %v", err)
}
sink, err := snaps.Create(lastIndex, lastTerm, configuration, 1, trans)
version := getSnapshotVersion(conf.ProtocolVersion)
sink, err := snaps.Create(version, lastIndex, lastTerm, configuration, 1, trans)
if err != nil {
return fmt.Errorf("failed to create snapshot: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package raft
type RPCHeader struct {
// ProtocolVersion is the version of the protocol the sender is
// speaking.
ProtocolVersion int
ProtocolVersion ProtocolVersion
}

// WithRPCHeader is an interface that exposes the RPC header.
Expand Down Expand Up @@ -108,7 +108,7 @@ func (r *RequestVoteResponse) GetRPCHeader() RPCHeader {
// log (and state machine) from a snapshot on another peer.
type InstallSnapshotRequest struct {
RPCHeader
SnapshotVersion int
SnapshotVersion SnapshotVersion

Term uint64
Leader []byte
Expand Down
14 changes: 9 additions & 5 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,11 @@ import (
// this protocol version, along with their server ID. The remove/add cycle
// is required to populate their server ID. Note that removing must be done
// by ID, which will be the old server's address.
type ProtocolVersion int

const (
ProtocolVersionMin = 0
ProtocolVersionMax = 3
ProtocolVersionMin ProtocolVersion = 0
ProtocolVersionMax = 3
)

// These are versions of snapshots that this server can _understand_. Currently,
Expand All @@ -105,9 +107,11 @@ const (
// Since the original Raft library didn't enforce any versioning, we must
// include the legacy peers structure for this version, but we can deprecate
// it in the next snapshot version.
type SnapshotVersion int

const (
SnapshotVersionMin = 0
SnapshotVersionMax = 1
SnapshotVersionMin SnapshotVersion = 0
SnapshotVersionMax = 1
)

// Config provides any necessary configuration for the Raft server.
Expand All @@ -120,7 +124,7 @@ type Config struct {
// configured with compatible versions. See ProtocolVersionMin and
// ProtocolVersionMax for the versions of the protocol that this server
// can _understand_.
ProtocolVersion int
ProtocolVersion ProtocolVersion

// HeartbeatTimeout specifies the time in follower state without
// a leader before we attempt an election.
Expand Down
3 changes: 2 additions & 1 deletion discard_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ func NewDiscardSnapshotStore() *DiscardSnapshotStore {
return &DiscardSnapshotStore{}
}

func (d *DiscardSnapshotStore) Create(index, term uint64, configuration Configuration, configurationIndex uint64, trans Transport) (SnapshotSink, error) {
func (d *DiscardSnapshotStore) Create(version SnapshotVersion, index, term uint64,
configuration Configuration, configurationIndex uint64, trans Transport) (SnapshotSink, error) {
return &DiscardSnapshotSink{}, nil
}

Expand Down
11 changes: 8 additions & 3 deletions file_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,13 @@ func snapshotName(term, index uint64) string {
}

// Create is used to start a new snapshot
func (f *FileSnapshotStore) Create(index, term uint64, configuration Configuration,
configurationIndex uint64, trans Transport) (SnapshotSink, error) {
func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64,
configuration Configuration, configurationIndex uint64, trans Transport) (SnapshotSink, error) {
// We only support version 1 snapshots at this time.
if version != 1 {
return nil, fmt.Errorf("unsupported snapshot version %d", version)
}

// Create a new path
name := snapshotName(term, index)
path := filepath.Join(f.path, name+tmpSuffix)
Expand All @@ -158,7 +163,7 @@ func (f *FileSnapshotStore) Create(index, term uint64, configuration Configurati
dir: path,
meta: fileSnapshotMeta{
SnapshotMeta: SnapshotMeta{
Version: SnapshotVersionMax,
Version: version,
ID: name,
Index: index,
Term: term,
Expand Down
12 changes: 6 additions & 6 deletions file_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestFileSS_CreateSnapshotMissingParentDir(t *testing.T) {

os.RemoveAll(parent)
_, trans := NewInmemTransport(NewInmemAddr())
_, err = snap.Create(10, 3, Configuration{}, 0, trans)
_, err = snap.Create(SnapshotVersionMax, 10, 3, Configuration{}, 0, trans)
if err != nil {
t.Fatalf("should not fail when using non existing parent")
}
Expand Down Expand Up @@ -93,7 +93,7 @@ func TestFileSS_CreateSnapshot(t *testing.T) {
Address: ServerAddress("over here"),
})
_, trans := NewInmemTransport(NewInmemAddr())
sink, err := snap.Create(10, 3, configuration, 2, trans)
sink, err := snap.Create(SnapshotVersionMax, 10, 3, configuration, 2, trans)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestFileSS_CancelSnapshot(t *testing.T) {

// Create a new sink
_, trans := NewInmemTransport(NewInmemAddr())
sink, err := snap.Create(10, 3, Configuration{}, 0, trans)
sink, err := snap.Create(SnapshotVersionMax, 10, 3, Configuration{}, 0, trans)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestFileSS_Retention(t *testing.T) {
// Create a few snapshots
_, trans := NewInmemTransport(NewInmemAddr())
for i := 10; i < 15; i++ {
sink, err := snap.Create(uint64(i), 3, Configuration{}, 0, trans)
sink, err := snap.Create(SnapshotVersionMax, uint64(i), 3, Configuration{}, 0, trans)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestFileSS_Ordering(t *testing.T) {

// Create a new sink
_, trans := NewInmemTransport(NewInmemAddr())
sink, err := snap.Create(130350, 5, Configuration{}, 0, trans)
sink, err := snap.Create(SnapshotVersionMax, 130350, 5, Configuration{}, 0, trans)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -322,7 +322,7 @@ func TestFileSS_Ordering(t *testing.T) {
t.Fatalf("err: %v", err)
}

sink, err = snap.Create(204917, 36, Configuration{}, 0, trans)
sink, err = snap.Create(SnapshotVersionMax, 204917, 36, Configuration{}, 0, trans)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
11 changes: 10 additions & 1 deletion raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ func (r *Raft) checkRPCHeader(rpc RPC) error {
return nil
}

// getSnapshotVersion returns the snapshot version that should be used when
// creating snapshots, given the protocol version in use.
func getSnapshotVersion(protocolVersion ProtocolVersion) SnapshotVersion {
// Right now we only have two versions and they are backwards compatible
// so we don't need to look at the protocol version.
return 1
}

// commitTuple is used to send an index that was committed,
// with an optional associated future that should be invoked.
type commitTuple struct {
Expand Down Expand Up @@ -1141,7 +1149,8 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
reqConfiguration = decodePeers(req.Peers, r.trans)
reqConfigurationIndex = req.LastLogIndex
}
sink, err := r.snapshots.Create(req.LastLogIndex, req.LastLogTerm,
version := getSnapshotVersion(r.protocolVersion)
sink, err := r.snapshots.Create(version, req.LastLogIndex, req.LastLogTerm,
reqConfiguration, reqConfigurationIndex, r.trans)
if err != nil {
r.logger.Printf("[ERR] raft: Failed to create snapshot to install: %v", err)
Expand Down
13 changes: 9 additions & 4 deletions snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (

// SnapshotMeta is for metadata of a snapshot.
type SnapshotMeta struct {
Version int
// Version is the version number of the snapshot metadata. This does not cover
// the application's data in the snapshot, that should be versioned
// separately.
Version SnapshotVersion

// ID is opaque to the store, and is used for opening.
ID string
Expand Down Expand Up @@ -38,8 +41,9 @@ type SnapshotMeta struct {
// without streaming from the leader.
type SnapshotStore interface {
// Create is used to begin a snapshot at a given index and term, and with
// the given committed configuration.
Create(index, term uint64, configuration Configuration,
// the given committed configuration. The version parameter controls
// which snapshot version to create.
Create(version SnapshotVersion, index, term uint64, configuration Configuration,
configurationIndex uint64, trans Transport) (SnapshotSink, error)

// List is used to list the available snapshots in the store.
Expand Down Expand Up @@ -159,7 +163,8 @@ func (r *Raft) takeSnapshot() error {
// Create a new snapshot.
r.logger.Printf("[INFO] raft: Starting snapshot up to %d", snapReq.index)
start := time.Now()
sink, err := r.snapshots.Create(snapReq.index, snapReq.term, committed, committedIndex, r.trans)
version := getSnapshotVersion(r.protocolVersion)
sink, err := r.snapshots.Create(version, snapReq.index, snapReq.term, committed, committedIndex, r.trans)
if err != nil {
return fmt.Errorf("failed to create snapshot: %v", err)
}
Expand Down

0 comments on commit 038c699

Please sign in to comment.