Skip to content

Commit

Permalink
Adding a Barrier, to enable blocking until FSM is up to date
Browse files Browse the repository at this point in the history
  • Loading branch information
armon committed Jan 9, 2014
1 parent 7f90ed2 commit 3278852
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 2 deletions.
8 changes: 8 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ const (

// Used to remove an existing peer
LogRemovePeer

// Barrier is used to ensure all preceeding
// operations have been applied to the FSM. It is
// similar to LogNoop, but instead of returning once committed,
// it only returns once the FSM manager acks it. Otherwise it is
// possible there are operations committed but not yet applied to
// the FSM.
LogBarrier
)

var (
Expand Down
40 changes: 38 additions & 2 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,35 @@ func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture {
}
}

// Barrier is used to issue a command that blocks until all preceeding
// operations have been applied to the FSM. It can be used to ensure the
// FSM reflects all queued writes. An optional timeout can be provided to
// limit the amount of time we wait for the command to be started. This
// must be run on the leader or it will fail.
func (r *Raft) Barrier(timeout time.Duration) Future {
var timer <-chan time.Time
if timeout > 0 {
timer = time.After(timeout)
}

// Create a log future, no index or term yet
logFuture := &logFuture{
log: Log{
Type: LogBarrier,
},
}
logFuture.init()

select {
case <-timer:
return errorFuture{EnqueueTimeout}
case <-r.shutdownCh:
return errorFuture{RaftShutdown}
case r.applyCh <- logFuture:
return logFuture
}
}

// AddPeer is used to add a new peer into the cluster. This must be
// run on the leader or it will fail.
func (r *Raft) AddPeer(peer net.Addr) Future {
Expand Down Expand Up @@ -347,8 +376,11 @@ func (r *Raft) runFSM() {
req.respond(err)

case commitTuple := <-r.fsmCommitCh:
// Apply the log
resp := r.fsm.Apply(commitTuple.log)
// Apply the log if a command
var resp interface{}
if commitTuple.log.Type == LogCommand {
resp = r.fsm.Apply(commitTuple.log)
}

// Update the indexes
lastIndex = commitTuple.log.Index
Expand Down Expand Up @@ -725,6 +757,10 @@ func (r *Raft) processLogs(index uint64, future *logFuture) {
// processLog is invoked to process the application of a single committed log
func (r *Raft) processLog(l *Log, future *logFuture) {
switch l.Type {
case LogBarrier:
// Barrier is handled by the FSM
fallthrough

case LogCommand:
// Forward to the fsm handler
select {
Expand Down
28 changes: 28 additions & 0 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,3 +1052,31 @@ func TestRaft_LeaderLeaseExpire(t *testing.T) {
t.Fatalf("expected step down")
}
}

func TestRaft_Barrier(t *testing.T) {
// Make the cluster
c := MakeCluster(3, t, nil)
defer c.Close()

// Get the leader
leader := c.Leader()

// Commit a lot of things
for i := 0; i < 100; i++ {
leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0)
}

// Wait for a barrier complete
barrier := leader.Barrier(0)

// Wait for the barrier future to apply
if err := barrier.Error(); err != nil {
t.Fatalf("err: %v", err)
}

// Ensure all the logs are the same
c.EnsureSame(t)
if len(c.fsms[0].logs) != 100 {
t.Fatalf("Bad log length")
}
}

0 comments on commit 3278852

Please sign in to comment.