Skip to content

Commit

Permalink
Add SetBig / GetBig API for working with big entries exceeding 64KB
Browse files Browse the repository at this point in the history
  • Loading branch information
valyala committed Jan 21, 2019
1 parent 59f4284 commit ab7cba1
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 2 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ with inserts.

* Keys and values must be byte slices. Other types must be marshaled before
storing them in the cache.
* Summary size of a (key, value) entry cannot exceed 64KB. Bigger values must be
split into smaller values before storing in the cache.
* Big entries with sizes exceeding 64KB must be stored via [distinct API](http://godoc.org/github.com/VictoriaMetrics/fastcache#Cache.SetBig).
* There is no cache expiration. Entries are evicted from the cache only
on cache size overflow. Entry deadline may be stored inside the value in order
to implement cache expiration.
Expand Down
148 changes: 148 additions & 0 deletions bigcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package fastcache

import (
"sync"
"sync/atomic"

"github.com/cespare/xxhash"
)

// maxSubvalueLen is the maximum size of subvalue chunk.
//
// - 16 bytes are for subkey encoding
// - 4 bytes are for len(key)+len(value) encoding inside fastcache
// - 1 byte is implementation detail of fastcache
const maxSubvalueLen = chunkSize - 16 - 4 - 1

// maxKeyLen is the maximum size of key.
//
// - 16 bytes are for (hash + valueLen)
// - 4 bytes are for len(key)+len(subkey)
// - 1 byte is implementation detail of fastcache
const maxKeyLen = chunkSize - 16 - 4 - 1

// SetBig sets (k, v) to c where len(v) may exceed 64KB.
//
// GetBig must be used for reading stored values.
//
// The stored entry may be evicted at any time either due to cache
// overflow or due to unlikely hash collision.
// Pass higher maxBytes value to New if the added items disappear
// frequently.
//
// It is safe to store entries smaller than 64KB with SetBig.
//
// k and v contents may be modified after returning from SetBig.
func (c *Cache) SetBig(k, v []byte) {
if len(k) > maxKeyLen {
atomic.AddUint64(&c.bigStats.TooBigKeyErrors, 1)
return
}
valueLen := len(v)
valueHash := xxhash.Sum64(v)

// Split v into chunks with up to 64Kb each.
subkey := getSubkeyBuf()
var i uint64
for len(v) > 0 {
subkey.B = marshalUint64(subkey.B[:0], valueHash)
subkey.B = marshalUint64(subkey.B, uint64(i))
i++
subvalueLen := maxSubvalueLen
if len(v) < subvalueLen {
subvalueLen = len(v)
}
subvalue := v[:subvalueLen]
v = v[subvalueLen:]
c.Set(subkey.B, subvalue)
}

// Write metavalue, which consists of valueHash and valueLen.
subkey.B = marshalUint64(subkey.B[:0], valueHash)
subkey.B = marshalUint64(subkey.B, uint64(valueLen))
c.Set(k, subkey.B)
putSubkeyBuf(subkey)
}

// GetBig searches for the value for the given k, appends it to dst
// and returns the result.
//
// GetBig returns only values stored via SetBig. It doesn't work
// with values stored via other methods.
//
// k contents may be modified after returning from GetBig.
func (c *Cache) GetBig(dst, k []byte) []byte {
subkey := getSubkeyBuf()
defer putSubkeyBuf(subkey)

// Read and parse metavalue
subkey.B = c.Get(subkey.B[:0], k)
if len(subkey.B) == 0 {
// Nothing found.
return dst
}
if len(subkey.B) != 16 {
atomic.AddUint64(&c.bigStats.InvalidMetavalueErrors, 1)
return dst
}
valueHash := unmarshalUint64(subkey.B)
valueLen := unmarshalUint64(subkey.B[8:])

// Collect result from chunks.
dstLen := len(dst)
dst = append(dst, make([]byte, int(valueLen))...)
dst = dst[:dstLen]
var i uint64
for uint64(len(dst)-dstLen) < valueLen {
subkey.B = marshalUint64(subkey.B[:0], valueHash)
subkey.B = marshalUint64(subkey.B, uint64(i))
i++
dstNew := c.Get(dst, subkey.B)
if len(dstNew) == len(dst) {
// Cannot find subvalue
return dst[:dstLen]
}
dst = dstNew
}

// Verify the obtained value.
v := dst[dstLen:]
if uint64(len(v)) != valueLen {
atomic.AddUint64(&c.bigStats.InvalidValueLenErrors, 1)
return dst[:dstLen]
}
h := xxhash.Sum64(v)
if h != valueHash {
atomic.AddUint64(&c.bigStats.InvalidValueHashErrors, 1)
return dst[:dstLen]
}
return dst
}

func getSubkeyBuf() *bytesBuf {
v := subkeyPool.Get()
if v == nil {
return &bytesBuf{}
}
return v.(*bytesBuf)
}

func putSubkeyBuf(bb *bytesBuf) {
bb.B = bb.B[:0]
subkeyPool.Put(bb)
}

var subkeyPool sync.Pool

type bytesBuf struct {
B []byte
}

func marshalUint64(dst []byte, u uint64) []byte {
return append(dst, byte(u>>56), byte(u>>48), byte(u>>40), byte(u>>32), byte(u>>24), byte(u>>16), byte(u>>8), byte(u))
}

func unmarshalUint64(src []byte) uint64 {
_ = src[7]
return uint64(src[0])<<56 | uint64(src[1])<<48 | uint64(src[2])<<40 | uint64(src[3])<<32 | uint64(src[4])<<24 | uint64(src[5])<<16 | uint64(src[6])<<8 | uint64(src[7])
}
50 changes: 50 additions & 0 deletions bigcache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package fastcache

import (
"bytes"
"fmt"
"testing"
)

func TestSetGetBig(t *testing.T) {
c := New(256 * 1024 * 1024)
const valuesCount = 10
for _, valueSize := range []int{1, 100, 1<<16 - 1, 1 << 16, 1<<16 + 1, 1 << 17, 1<<17 + 1, 1<<17 - 1, 1 << 19} {
t.Run(fmt.Sprintf("valueSize_%d", valueSize), func(t *testing.T) {
for seed := 0; seed < 3; seed++ {
testSetGetBig(t, c, valueSize, valuesCount, seed)
}
})
}
}

func testSetGetBig(t *testing.T, c *Cache, valueSize, valuesCount, seed int) {
m := make(map[string][]byte)
var buf []byte
for i := 0; i < valuesCount; i++ {
key := []byte(fmt.Sprintf("key %d", i))
value := createValue(valueSize, seed)
c.SetBig(key, value)
m[string(key)] = value
buf = c.GetBig(buf[:0], key)
if !bytes.Equal(buf, value) {
t.Fatalf("seed=%d; unexpected value obtained for key=%q; got len(value)=%d; want len(value)=%d", seed, key, len(buf), len(value))
}
}

// Verify that values stil exist
for key, value := range m {
buf = c.GetBig(buf[:0], []byte(key))
if !bytes.Equal(buf, value) {
t.Fatalf("seed=%d; unexpected value obtained for key=%q; got len(value)=%d; want len(value)=%d", seed, key, len(buf), len(value))
}
}
}

func createValue(size, seed int) []byte {
var buf []byte
for i := 0; i < size; i++ {
buf = append(buf, byte(i+seed))
}
return buf
}
33 changes: 33 additions & 0 deletions bigcache_timing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package fastcache

import (
"testing"
)

func BenchmarkSetBig(b *testing.B) {
key := []byte("key12345")
value := createValue(256*1024, 0)
c := New(1024 * 1024)
b.SetBytes(int64(len(value)))
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
c.SetBig(key, value)
}
})
}

func BenchmarkGetBig(b *testing.B) {
key := []byte("key12345")
value := createValue(265*1024, 0)
c := New(1024 * 1024)
c.SetBig(key, value)
b.SetBytes(int64(len(value)))
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
var buf []byte
for pb.Next() {
buf = c.GetBig(buf[:0], key)
}
})
}
37 changes: 37 additions & 0 deletions fastcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,38 @@ type Stats struct {

// BytesSize is the current size of the cache in bytes.
BytesSize uint64

// BigStats contains stats for GetBig/SetBig methods.
BigStats
}

// Reset resets s, so it may be re-used again in Cache.UpdateStats.
func (s *Stats) Reset() {
*s = Stats{}
}

// BigStats contains stats for GetBig/SetBig methods.
type BigStats struct {
// TooBigKeyErrors is the number of calls to SetBig with too big key.
TooBigKeyErrors uint64

// InvalidMetavalueErrors is the number of calls to GetBig resulting
// to invalid metavalue.
InvalidMetavalueErrors uint64

// InvalidValueLenErrors is the number of calls to GetBig resulting
// to a chunk with invalid length.
InvalidValueLenErrors uint64

// InvalidValueHashErrors is the number of calls to GetBig resulting
// to a chunk with invalid hash value.
InvalidValueHashErrors uint64
}

func (bs *BigStats) reset() {
*bs = BigStats{}
}

// Cache is a fast thread-safe inmemory cache optimized for big number
// of entries.
//
Expand All @@ -64,6 +89,8 @@ func (s *Stats) Reset() {
// memory.
type Cache struct {
buckets [bucketsCount]bucket

bigStats BigStats
}

// New returns new cache with the given maxBytes capacity in bytes.
Expand All @@ -86,12 +113,15 @@ func New(maxBytes int) *Cache {

// Set stores (k, v) in the cache.
//
// Get must be used for reading the stored entry.
//
// The stored entry may be evicted at any time either due to cache
// overflow or due to unlikely hash collision.
// Pass higher maxBytes value to New if the added items disappear
// frequently.
//
// (k, v) entries with summary size exceeding 64KB aren't stored in the cache.
// SetBig can be used for storing entries exceeding 64KB.
//
// k and v contents may be modified after returning from Set.
func (c *Cache) Set(k, v []byte) {
Expand All @@ -104,6 +134,8 @@ func (c *Cache) Set(k, v []byte) {
//
// Get allocates new byte slice for the returned value if dst is nil.
//
// Get returns only values stored in c via Set.
//
// k contents may be modified after returning from Get.
func (c *Cache) Get(dst, k []byte) []byte {
h := xxhash.Sum64(k)
Expand All @@ -125,6 +157,7 @@ func (c *Cache) Reset() {
for i := range c.buckets[:] {
c.buckets[i].Reset()
}
c.bigStats.reset()
}

// UpdateStats adds cache stats to s.
Expand All @@ -134,6 +167,10 @@ func (c *Cache) UpdateStats(s *Stats) {
for i := range c.buckets[:] {
c.buckets[i].UpdateStats(s)
}
s.TooBigKeyErrors += atomic.LoadUint64(&c.bigStats.TooBigKeyErrors)
s.InvalidMetavalueErrors += atomic.LoadUint64(&c.bigStats.InvalidMetavalueErrors)
s.InvalidValueLenErrors += atomic.LoadUint64(&c.bigStats.InvalidValueLenErrors)
s.InvalidValueHashErrors += atomic.LoadUint64(&c.bigStats.InvalidValueHashErrors)
}

type bucket struct {
Expand Down

0 comments on commit ab7cba1

Please sign in to comment.