Skip to content

Commit

Permalink
Update transports to support SetHeartbeatHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
armon committed Jan 6, 2015
1 parent 5f5bef8 commit 37876b9
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 0 deletions.
5 changes: 5 additions & 0 deletions inmem_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ func NewInmemTransport() (*InmemAddr, *InmemTransport) {
return addr, trans
}

// SetHeartbeatHandler is used to set optional fast-path for
// heartbeats, not supported for this transport.
func (i *InmemTransport) SetHeartbeatHandler(cb func(RPC)) {
}

// Consumer implements the Transport interface.
func (i *InmemTransport) Consumer() <-chan RPC {
return i.consumerCh
Expand Down
33 changes: 33 additions & 0 deletions net_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ type NetworkTransport struct {

consumeCh chan RPC

heartbeatFn func(RPC)
heartbeatFnLock sync.Mutex

logger *log.Logger

maxPool int
Expand Down Expand Up @@ -136,6 +139,15 @@ func NewNetworkTransport(
return trans
}

// SetHeartbeatHandler is used to setup a heartbeat handler
// as a fast-pass. This is to avoid head-of-line blocking from
// disk IO.
func (n *NetworkTransport) SetHeartbeatHandler(cb func(rpc RPC)) {
n.heartbeatFnLock.Lock()
defer n.heartbeatFnLock.Unlock()
n.heartbeatFn = cb
}

// Close is used to stop the network transport
func (n *NetworkTransport) Close() error {
n.shutdownLock.Lock()
Expand Down Expand Up @@ -388,6 +400,7 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, en
}

// Decode the command
isHeartbeat := false
switch rpcType {
case rpcAppendEntries:
var req AppendEntriesRequest
Expand All @@ -396,6 +409,13 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, en
}
rpc.Command = &req

// Check if this is a heartbeat
if req.Term != 0 && req.Leader != nil &&
req.PrevLogEntry == 0 && req.PrevLogTerm == 0 &&
len(req.Entries) == 0 && req.LeaderCommitIndex == 0 {
isHeartbeat = true
}

case rpcRequestVote:
var req RequestVoteRequest
if err := dec.Decode(&req); err != nil {
Expand All @@ -415,6 +435,18 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, en
return fmt.Errorf("unknown rpc type %d", rpcType)
}

// Check for heartbeat fast-path
if isHeartbeat {
n.heartbeatFnLock.Lock()
fn := n.heartbeatFn
n.heartbeatFnLock.Unlock()
if fn != nil {
n.logger.Printf("[REMOVEME] raft-net: Fast-path heartbeat")
fn(rpc)
goto RESP
}
}

// Dispatch the RPC
select {
case n.consumeCh <- rpc:
Expand All @@ -423,6 +455,7 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, en
}

// Wait for response
RESP:
select {
case resp := <-respCh:
// Send the error first
Expand Down

0 comments on commit 37876b9

Please sign in to comment.