Skip to content

Commit

Permalink
Merge branch 'master' into f-merge
Browse files Browse the repository at this point in the history
  • Loading branch information
James Phillips committed Jul 7, 2016
2 parents 8a9a1b0 + 5f14b72 commit b581220
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 46 deletions.
12 changes: 7 additions & 5 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ import (
// Config provides any necessary configuration to
// the Raft server
type Config struct {
// Time in follower state without a leader before we attempt an election.
// HeartbeatTimeout specifies the time in follower state without
// a leader before we attempt an election.
HeartbeatTimeout time.Duration

// Time in candidate state without a leader before we attempt an election.
// ElectionTimeout specifies the time in candidate state without
// a leader before we attempt an election.
ElectionTimeout time.Duration

// Time without an Apply() operation before we heartbeat to ensure
// a timely commit. Due to random staggering, may be delayed as much as
// 2x this value.
// CommitTimeout controls the time without an Apply() operation
// before we heartbeat to ensure a timely commit. Due to random
// staggering, may be delayed as much as 2x this value.
CommitTimeout time.Duration

// MaxAppendEntries controls the maximum number of append entries
Expand Down
3 changes: 3 additions & 0 deletions fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
// clients to make use of the replicated log.
type FSM interface {
// Apply log is invoked once a log entry is committed.
// It returns a value which will be made available in the
// ApplyFuture returned by Raft.Apply method if that
// method was called on the same Raft node as the FSM.
Apply(*Log) interface{}

// Snapshot is used to support log compaction. This call should
Expand Down
22 changes: 18 additions & 4 deletions future.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,31 @@ import (

// Future is used to represent an action that may occur in the future.
type Future interface {
// Error blocks until the future arrives and then
// returns the error status of the future.
// This may be called any number of times - all
// calls will return the same value.
// Note that it is not OK to call this method
// twice concurrently on the same Future instance.
Error() error
}

// IndexFuture is used for future actions that can result in a raft log entry being created.
type IndexFuture interface {
Future

// Index holds the index of the newly applied log entry.
// This must not be called until after the Error method has returned.
Index() uint64
}

// ApplyFuture is used for Apply() and can returns the FSM response.
// ApplyFuture is used for Apply() and can return the FSM response.
type ApplyFuture interface {
IndexFuture

// Response returns the FSM response as returned
// by the FSM.Apply method. This must not be called
// until after the Error method has returned.
Response() interface{}
}

Expand Down Expand Up @@ -53,6 +66,9 @@ func (d *deferError) init() {

func (d *deferError) Error() error {
if d.err != nil {
// Note that when we've received a nil error, this
// won't trigger, but the channel is closed after
// send so we'll still return nil below.
return d.err
}
if d.errCh == nil {
Expand Down Expand Up @@ -107,9 +123,7 @@ func (s *shutdownFuture) Error() error {
if s.raft == nil {
return nil
}
for s.raft.getRoutines() > 0 {
time.Sleep(5 * time.Millisecond)
}
s.raft.waitShutdown()
if closeable, ok := s.raft.trans.(WithClose); ok {
closeable.Close()
}
Expand Down
42 changes: 42 additions & 0 deletions future_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package raft

import (
"errors"
"testing"
)

func TestDeferFutureSuccess(t *testing.T) {
var f deferError
f.init()
f.respond(nil)
if err := f.Error(); err != nil {
t.Fatalf("unexpected error result; got %#v want nil", err)
}
if err := f.Error(); err != nil {
t.Fatalf("unexpected error result; got %#v want nil", err)
}
}

func TestDeferFutureError(t *testing.T) {
want := errors.New("x")
var f deferError
f.init()
f.respond(want)
if got := f.Error(); got != want {
t.Fatalf("unexpected error result; got %#v want %#v", got, want)
}
if got := f.Error(); got != want {
t.Fatalf("unexpected error result; got %#v want %#v", got, want)
}
}

func TestDeferFutureConcurrent(t *testing.T) {
// Food for the race detector.
want := errors.New("x")
var f deferError
f.init()
go f.respond(want)
if got := f.Error(); got != want {
t.Errorf("unexpected error result; got %#v want %#v", got, want)
}
}
25 changes: 16 additions & 9 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,30 +33,37 @@ const (
// Log entries are replicated to all members of the Raft cluster
// and form the heart of the replicated state machine.
type Log struct {
// Index holds the index of the log entry.
Index uint64
Term uint64
Type LogType
Data []byte

// Term holds the election term of the log entry.
Term uint64

// Type holds the type of the log entry.
Type LogType

// Data holds the log entry's type-specific data.
Data []byte
}

// LogStore is used to provide an interface for storing
// and retrieving logs in a durable fashion.
type LogStore interface {
// Returns the first index written. 0 for no entries.
// FirstIndex returns the first index written. 0 for no entries.
FirstIndex() (uint64, error)

// Returns the last index written. 0 for no entries.
// LastIndex returns the last index written. 0 for no entries.
LastIndex() (uint64, error)

// Gets a log entry at a given index.
// GetLog gets a log entry at a given index.
GetLog(index uint64, log *Log) error

// Stores a log entry.
// StoreLog stores a log entry.
StoreLog(log *Log) error

// Stores multiple log entries.
// StoreLogs stores multiple log entries.
StoreLogs(logs []*Log) error

// Deletes a range of log entries. The range is inclusive.
// DeleteRange deletes a range of log entries. The range is inclusive.
DeleteRange(min, max uint64) error
}
23 changes: 16 additions & 7 deletions observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,20 @@ import (

// Observation is sent along the given channel to observers when an event occurs.
type Observation struct {
// Raft holds the Raft instance generating the observation.
Raft *Raft
// Data holds observation-specific data. Possible types are
// *RequestVoteRequest and RaftState.
Data interface{}
}

// nextObserverId is used to provide a unique ID for each observer to aid in
// deregistration.
var nextObserverID uint64

// FilterFn is a function that can be registered in order to filter observations
// by returning false.
// FilterFn is a function that can be registered in order to filter observations.
// The function reports whether the observation should be included - if
// it returns false, the observation will be filtered out.
type FilterFn func(o *Observation) bool

// Observer describes what to do with a given observation.
Expand All @@ -39,8 +43,13 @@ type Observer struct {
numDropped uint64
}

// Create a new observer with the specified channel, blocking behavior, and
// filter (filter can be nil).
// NewObserver creates a new observer that can be registered
// to make observations on a Raft instance. Observations
// will be sent on the given channel if they satisfy the
// given filter.
//
// If blocking is true, the observer will block when it can't
// send on the channel, otherwise it may discard events.
func NewObserver(channel chan Observation, blocking bool, filter FilterFn) *Observer {
return &Observer{
channel: channel,
Expand All @@ -60,21 +69,21 @@ func (or *Observer) GetNumDropped() uint64 {
return atomic.LoadUint64(&or.numDropped)
}

// Register a new observer.
// RegisterObserver registers a new observer.
func (r *Raft) RegisterObserver(or *Observer) {
r.observersLock.Lock()
defer r.observersLock.Unlock()
r.observers[or.id] = or
}

// Deregister an observer.
// DeregisterObserver deregisters an observer.
func (r *Raft) DeregisterObserver(or *Observer) {
r.observersLock.Lock()
defer r.observersLock.Unlock()
delete(r.observers, or.id)
}

// Send an observation to every observer.
// observe sends an observation to every observer.
func (r *Raft) observe(o interface{}) {
// In general observers should not block. But in any case this isn't
// disastrous as we only hold a read lock, which merely prevents
Expand Down
2 changes: 1 addition & 1 deletion snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type SnapshotMeta struct {
// SnapshotStore interface is used to allow for flexible implementations
// of snapshot storage and retrieval. For example, a client could implement
// a shared state store such as S3, allowing new nodes to restore snapshots
// without steaming from the leader.
// without streaming from the leader.
type SnapshotStore interface {
// Create is used to begin a snapshot at a given index and term,
// with the current configuration already encoded.
Expand Down
24 changes: 8 additions & 16 deletions state.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ type raftState struct {
lastLogIndex uint64
lastLogTerm uint64

// Tracks the number of live routines
runningRoutines int32
// Tracks running goroutines
routinesGroup sync.WaitGroup

// The current state
state RaftState
Expand Down Expand Up @@ -133,28 +133,20 @@ func (r *raftState) setLastApplied(index uint64) {
atomic.StoreUint64(&r.lastApplied, index)
}

func (r *raftState) incrRoutines() {
atomic.AddInt32(&r.runningRoutines, 1)
}

func (r *raftState) decrRoutines() {
atomic.AddInt32(&r.runningRoutines, -1)
}

func (r *raftState) getRoutines() int32 {
return atomic.LoadInt32(&r.runningRoutines)
}

// Start a goroutine and properly handle the race between a routine
// starting and incrementing, and exiting and decrementing.
func (r *raftState) goFunc(f func()) {
r.incrRoutines()
r.routinesGroup.Add(1)
go func() {
defer r.decrRoutines()
defer r.routinesGroup.Done()
f()
}()
}

func (r *raftState) waitShutdown() {
r.routinesGroup.Wait()
}

// getLastIndex returns the last index in stable storage.
// Either from the last log or from the last snapshot.
func (r *raftState) getLastIndex() uint64 {
Expand Down
23 changes: 19 additions & 4 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,19 @@ type Transport interface {
SetHeartbeatHandler(cb func(rpc RPC))
}

// Close() lives in a separate interface as unfortunately it wasn't in the
// WithClose is an interface that a transport may provide which
// allows a transport to be shut down cleanly when a Raft instance
// shuts down.
//
// It is defined separately from Transport as unfortunately it wasn't in the
// original interface specification.
type WithClose interface {
// Permanently close a transport, stop all go-routines etc
// Close permanently closes a transport, stopping
// any associated goroutines and freeing other resources.
Close() error
}

// Loopback transport is an interface that provides a loopback transport suitable for testing
// LoopbackTransport is an interface that provides a loopback transport suitable for testing
// e.g. InmemTransport. It's there so we don't have to rewrite tests.
type LoopbackTransport interface {
Transport // Embedded transport reference
Expand Down Expand Up @@ -96,14 +101,24 @@ type AppendPipeline interface {
// response futures when they are ready.
Consumer() <-chan AppendFuture

// Closes pipeline and cancels all inflight RPCs
// Close closes the pipeline and cancels all inflight RPCs
Close() error
}

// AppendFuture is used to return information about a pipelined AppendEntries request.
type AppendFuture interface {
Future

// Start returns the time that the append request was started.
// It is always OK to call this method.
Start() time.Time

// Request holds the parameters of the AppendEntries call.
// It is always OK to call this method.
Request() *AppendEntriesRequest

// Response holds the results of the AppendEntries call.
// This method must only be called after the Error
// method returns, and will only be valid on success.
Response() *AppendEntriesResponse
}

0 comments on commit b581220

Please sign in to comment.