Skip to content

Commit

Permalink
Add a ringbuffer as a cache for log entries.
Browse files Browse the repository at this point in the history
Raft first writes n entries to the LogStore, waits for them to be
committed, then reads them again. By storing the last
config.MaxAppendEntries in a cache, we get a 1.5x performance win
in throughput.
  • Loading branch information
Michael Stapelberg committed Jan 13, 2015
1 parent d84c99b commit 2c60e99
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 1 deletion.
98 changes: 98 additions & 0 deletions log_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package raft

import (
"sync"
)

// logCache wraps a logstore with a ring buffer providing fast access to the
// last n raft log entries.
type logCache struct {
store LogStore
cache []*Log
current int
lastlogidx uint64
l sync.RWMutex
}

func newLogCache(capacity int, logstore LogStore) *logCache {
return &logCache{
cache: make([]*Log, 0, capacity),
store: logstore,
}
}

func (c *logCache) getLogFromCache(logidx uint64) (*Log, bool) {
c.l.RLock()
defer c.l.RUnlock()

// Return if we have not seen that log entry yet, or
// if it was pushed out of the cache already.
if logidx > c.lastlogidx || (c.lastlogidx-logidx) >= uint64(len(c.cache)) {
return nil, false
}

// 'last' is the index of the element we cached last,
// its raft log index is 'lastlogidx'
last := (c.current - 1)
m := last - int(c.lastlogidx-logidx)

// See https://golang.org/issue/448 for why (m % n) is not enough.
n := cap(c.cache)
log := c.cache[((m%n)+n)%n]
// If the index does not match, cacheLog’s expected access pattern was
// violated and we need to fall back to reading from the LogStore.
return log, log.Index == logidx
}

// cacheLog should be called with strictly monotonically increasing logidx
// values, otherwise the cache will not be effective.
func (c *logCache) cacheLog(logidx uint64, log *Log) {
c.l.Lock()
defer c.l.Unlock()

if len(c.cache) < cap(c.cache) {
c.cache = append(c.cache, log)
} else {
c.cache[c.current] = log
}
c.lastlogidx = logidx
c.current = (c.current + 1) % cap(c.cache)
}

func (c *logCache) GetLog(logidx uint64, log *Log) error {
if cached, ok := c.getLogFromCache(logidx); ok {
*log = *cached
return nil
}
return c.store.GetLog(logidx, log)
}

func (c *logCache) StoreLog(log *Log) error {
return c.StoreLogs([]*Log{log})
}

func (c *logCache) StoreLogs(logs []*Log) error {
for _, l := range logs {
c.cacheLog(l.Index, l)
}
return c.store.StoreLogs(logs)
}

func (c *logCache) FirstIndex() (uint64, error) {
return c.store.FirstIndex()
}

func (c *logCache) LastIndex() (uint64, error) {
return c.store.LastIndex()
}

func (c *logCache) DeleteRange(min, max uint64) error {
c.l.Lock()
defer c.l.Unlock()

c.lastlogidx = 0
c.current = 0
c.cache = make([]*Log, 0, cap(c.cache))

return c.store.DeleteRange(min, max)
}
47 changes: 47 additions & 0 deletions log_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package raft

import (
"testing"
)

func TestEmptyCache(t *testing.T) {
store := NewInmemStore()
c := newLogCache(10, store)

for i := 0; i < 20; i++ {
if _, ok := c.getLogFromCache(uint64(i)); ok {
t.Fatalf("getLogFromCache(%d): got true, want false", i)
}
}
}

func TestSingleEntry(t *testing.T) {
store := NewInmemStore()
c := newLogCache(10, store)

c.cacheLog(1, &Log{Index: 1})

for i := 0; i < 20; i++ {
want := (i == 1)
if _, got := c.getLogFromCache(uint64(i)); got != want {
t.Fatalf("getLogFromCache(%d): got %v, want %v", i, got, want)
}
}
}

func TestMultipleEntries(t *testing.T) {
store := NewInmemStore()
c := newLogCache(10, store)

for i := 0; i < 40; i++ {
c.cacheLog(uint64(i), &Log{Index: uint64(i)})
}

for i := 0; i < 50; i++ {
want := (i >= 30 && i <= 39)
if _, got := c.getLogFromCache(uint64(i)); got != want {
t.Fatalf("getLogFromCache(%d): got %v, want %v", i, got, want)
}
}

}
2 changes: 1 addition & 1 deletion raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
leaderCh: make(chan bool),
localAddr: localAddr,
logger: log.New(conf.LogOutput, "", log.LstdFlags),
logs: logs,
logs: newLogCache(conf.MaxAppendEntries, logs),
peerCh: make(chan *peerFuture),
peers: peers,
peerStore: peerStore,
Expand Down

0 comments on commit 2c60e99

Please sign in to comment.