Skip to content

Commit

Permalink
Simplify LogCache implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
armon committed Jan 14, 2015
1 parent 9bf7eba commit febcccb
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 72 deletions.
71 changes: 26 additions & 45 deletions log_cache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package raft

import (
"fmt"
"sync"
)

Expand All @@ -12,59 +13,36 @@ import (
type LogCache struct {
store LogStore

cache []*Log
current int
lastLogIdx uint64
l sync.RWMutex
cache []*Log
l sync.RWMutex
}

// NewLogCache is used to create a new LogCache with the
// given capacity and backend store.
func NewLogCache(capacity int, store LogStore) *LogCache {
return &LogCache{
func NewLogCache(capacity int, store LogStore) (*LogCache, error) {
if capacity <= 0 {
return nil, fmt.Errorf("capacity must be positive")
}
c := &LogCache{
store: store,
cache: make([]*Log, capacity),
}
return c, nil
}

func (c *LogCache) getLogFromCache(logidx uint64) (*Log, bool) {
func (c *LogCache) GetLog(idx uint64, log *Log) error {
// Check the buffer for an entry
c.l.RLock()
defer c.l.RUnlock()

// 'last' is the index of the element we cached last,
// its raft log index is 'lastLogIdx'
last := (c.current - 1)
m := last - int(c.lastLogIdx-logidx)

// See https://golang.org/issue/448 for why (m % n) is not enough.
n := len(c.cache)
log := c.cache[((m%n)+n)%n]
if log == nil {
return nil, false
}
// If the index does not match, cacheLog’s expected access pattern was
// violated and we need to fall back to reading from the LogStore.
return log, log.Index == logidx
}
cached := c.cache[idx%uint64(len(c.cache))]
c.l.RUnlock()

// cacheLogs should be called with strictly monotonically increasing logidx
// values, otherwise the cache will not be effective.
func (c *LogCache) cacheLogs(logs []*Log) {
c.l.Lock()
defer c.l.Unlock()

for _, l := range logs {
c.cache[c.current] = l
c.lastLogIdx = l.Index
c.current = (c.current + 1) % len(c.cache)
}
}

func (c *LogCache) GetLog(idx uint64, log *Log) error {
if cached, ok := c.getLogFromCache(idx); ok {
// Check if entry is valid
if cached != nil && cached.Index == idx {
*log = *cached
return nil
}

// Forward request on cache miss
return c.store.GetLog(idx, log)
}

Expand All @@ -73,7 +51,13 @@ func (c *LogCache) StoreLog(log *Log) error {
}

func (c *LogCache) StoreLogs(logs []*Log) error {
c.cacheLogs(logs)
// Insert the logs into the ring buffer
c.l.Lock()
for _, l := range logs {
c.cache[l.Index%uint64(len(c.cache))] = l
}
c.l.Unlock()

return c.store.StoreLogs(logs)
}

Expand All @@ -86,13 +70,10 @@ func (c *LogCache) LastIndex() (uint64, error) {
}

func (c *LogCache) DeleteRange(min, max uint64) error {
c.l.Lock()
defer c.l.Unlock()

// Invalidate the cache on deletes
c.lastLogIdx = 0
c.current = 0
c.l.Lock()
c.cache = make([]*Log, len(c.cache))
c.l.Unlock()

return c.store.DeleteRange(min, max)
}
95 changes: 68 additions & 27 deletions log_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,85 @@ import (
"testing"
)

func TestEmptyCache(t *testing.T) {
func TestLogCache(t *testing.T) {
store := NewInmemStore()
c := NewLogCache(10, store)
c, _ := NewLogCache(16, store)

for i := 0; i < 20; i++ {
if _, ok := c.getLogFromCache(uint64(i)); ok {
t.Fatalf("getLogFromCache(%d): got true, want false", i)
}
// Insert into the in-mem store
for i := 0; i < 32; i++ {
log := &Log{Index: uint64(i) + 1}
store.StoreLog(log)
}
}

func TestSingleEntry(t *testing.T) {
store := NewInmemStore()
c := NewLogCache(10, store)
// Check the indexes
if idx, _ := c.FirstIndex(); idx != 1 {
t.Fatalf("bad: %d", idx)
}
if idx, _ := c.LastIndex(); idx != 32 {
t.Fatalf("bad: %d", idx)
}

c.cacheLogs([]*Log{&Log{Index: 1}})
// Try get log with a miss
var out Log
err := c.GetLog(1, &out)
if err != nil {
t.Fatalf("err: %v", err)
}
if out.Index != 1 {
t.Fatalf("bad: %#v", out)
}

for i := 0; i < 20; i++ {
want := (i == 1)
if _, got := c.getLogFromCache(uint64(i)); got != want {
t.Fatalf("getLogFromCache(%d): got %v, want %v", i, got, want)
}
// Store logs
l1 := &Log{Index: 33}
l2 := &Log{Index: 34}
err = c.StoreLogs([]*Log{l1, l2})
if err != nil {
t.Fatalf("err: %v", err)
}
}

func TestMultipleEntries(t *testing.T) {
store := NewInmemStore()
c := NewLogCache(10, store)
if idx, _ := c.LastIndex(); idx != 34 {
t.Fatalf("bad: %d", idx)
}

for i := 0; i < 40; i++ {
c.cacheLogs([]*Log{&Log{Index: uint64(i)}})
// Check that it wrote-through
err = store.GetLog(33, &out)
if err != nil {
t.Fatalf("err: %v", err)
}
err = store.GetLog(34, &out)
if err != nil {
t.Fatalf("err: %v", err)
}

// Delete in the backend
err = store.DeleteRange(33, 34)
if err != nil {
t.Fatalf("err: %v", err)
}

// Should be in the ring buffer
err = c.GetLog(33, &out)
if err != nil {
t.Fatalf("err: %v", err)
}
err = c.GetLog(34, &out)
if err != nil {
t.Fatalf("err: %v", err)
}

for i := 0; i < 50; i++ {
want := (i >= 30 && i <= 39)
if _, got := c.getLogFromCache(uint64(i)); got != want {
t.Fatalf("getLogFromCache(%d): got %v, want %v", i, got, want)
}
// Purge the ring buffer
err = c.DeleteRange(33, 34)
if err != nil {
t.Fatalf("err: %v", err)
}

// Should not be in the ring buffer
err = c.GetLog(33, &out)
if err != ErrLogNotFound {
t.Fatalf("err: %v", err)
}
err = c.GetLog(34, &out)
if err != ErrLogNotFound {
t.Fatalf("err: %v", err)
}
}

0 comments on commit febcccb

Please sign in to comment.