Skip to content

Commit

Permalink
Don't pool LZ4 readers that read big blocks.
Browse files Browse the repository at this point in the history
Such readers must have allocated twice as many bytes as the block size,
and we don't want to pool them and keep that memory allocated.

This can happen when reading chunks that were compressed with high buffer
size.

Signed-off-by: Peter Štibraný <[email protected]>
  • Loading branch information
pstibrany authored and cyriltovena committed Dec 19, 2019
1 parent b0a7cf8 commit 9979321
Showing 1 changed file with 41 additions and 2 deletions.
43 changes: 41 additions & 2 deletions pkg/chunkenc/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,59 @@ type LZ4Pool struct {
bufferSize int // available values: 1<<16 (64k), 1<<18 (256k), 1<<20 (1M), 1<<22 (4M). Defaults to 4MB, if not set.
}

// lz4Reader is simple wrapper around *lz4.Reader, which remembers max used block size,
// as reported by this reader. It is used to determine whether we want to reuse it,
// or throw away and garbage-collect.
type lz4Reader struct {
r *lz4.Reader
maxBlockSize int
}

func (l *lz4Reader) Read(p []byte) (n int, err error) {
return l.r.Read(p)
}

func (l *lz4Reader) Reset(src io.Reader) {
l.r.Reset(src)
}

func (l *lz4Reader) onBlockDone(_ int) {
// remember max block size used.
if l.r.BlockMaxSize > l.maxBlockSize {
l.maxBlockSize = l.r.BlockMaxSize
}
}

func newLz4Reader(src io.Reader) *lz4Reader {
lz4r := lz4.NewReader(src)
r := &lz4Reader{r: lz4r}
lz4r.OnBlockDone = r.onBlockDone
return r
}

// GetReader gets or creates a new CompressionReader and reset it to read from src
func (pool *LZ4Pool) GetReader(src io.Reader) io.Reader {
if r := pool.readers.Get(); r != nil {
reader := r.(*lz4.Reader)
reader := r.(*lz4Reader)
reader.Reset(src)
return reader
}
// no need to set buffer size here. Reader uses buffer size based on
// LZ4 header that it is reading.
return lz4.NewReader(src)
r := newLz4Reader(src)
return r
}

// PutReader places back in the pool a CompressionReader
func (pool *LZ4Pool) PutReader(reader io.Reader) {
r := reader.(*lz4Reader)
if r.maxBlockSize > pool.bufferSize {
// Readers base their buffer size based on headers from LZ4 stream.
// If this reader uses bigger buffer than what we use currently, don't pool it.
// Reading from a couple of chunks that used big buffer sizes could otherwise quickly lead
// to high pooled memory usage.
return
}
pool.readers.Put(reader)
}

Expand Down

0 comments on commit 9979321

Please sign in to comment.