Skip to content

Commit

Permalink
Optimize LimitedReaderSlurper memory utilization (algorand#1876)
Browse files Browse the repository at this point in the history
The memory profiler shown that the `LimitedReaderSlurper` is one of the bigger memory consumer on a relay.
This behavior aligns well with the original intent - preallocate memory so that we won't need to reallocate it on every message. The unintended consequence of that was that a relay that has many incoming active connections might be using more memory than it really needs to.

To address this solution, I have used series of reading buffers. These buffers would be dynamically allocated on per-need basis. In order to retain original performance characteristics, I have set the size of the base buffer to be larger than the average message. Doing so would allow us to avoid allocation on *most* of the messages, and allocate buffers only when truly needed.

Another advantage is that the allocated memory is of a fixed, and small size. Allocating smaller buffers improves the performance when compared with larger buffers.
  • Loading branch information
tsachiherman authored Feb 2, 2021
1 parent 9a0c452 commit 5e4ca61
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 21 deletions.
106 changes: 87 additions & 19 deletions network/limited_reader_slurper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,52 +24,120 @@ import (
// ErrIncomingMsgTooLarge is returned when an incoming message is too large
var ErrIncomingMsgTooLarge = errors.New("read limit exceeded")

// allocationStep is the amount of memory allocated at any single time we don't have enough memory allocated.
const allocationStep = uint64(64 * 1024)

// LimitedReaderSlurper collects bytes from an io.Reader, but stops if a limit is reached.
type LimitedReaderSlurper struct {
// Limit is the maximum total bytes that may be read.
Limit uint64
// remainedUnallocatedSpace is how much more memory we are allowed to allocate for this reader beyond the base allocation.
remainedUnallocatedSpace uint64

// the buffers array contain the memory buffers used to store the data. The first level array is preallocated
// dependening on the desired base allocation. The rest of the levels are dynamically allocated on demand.
buffers [][]byte

buf []byte
size uint64
// lastBuffer is the index of the last filled buffer, or the first one if no buffer was ever filled.
lastBuffer int
}

// MakeLimitedReaderSlurper creates a LimitedReaderSlurper instance with the provided base and max memory allocations.
func MakeLimitedReaderSlurper(baseAllocation, maxAllocation uint64) *LimitedReaderSlurper {
if baseAllocation > maxAllocation {
baseAllocation = maxAllocation
}
lrs := &LimitedReaderSlurper{
remainedUnallocatedSpace: maxAllocation - baseAllocation,
lastBuffer: 0,
buffers: make([][]byte, 1+(maxAllocation-baseAllocation+allocationStep-1)/allocationStep),
}
lrs.buffers[0] = make([]byte, 0, baseAllocation)
return lrs
}

// Read does repeated Read()s on the io.Reader until it gets io.EOF.
// Returns underlying error or ErrIncomingMsgTooLarge if limit reached.
// Returns a nil error if the underlying io.Reader returned io.EOF.
func (s *LimitedReaderSlurper) Read(reader io.Reader) error {
if s.buf == nil {
s.buf = make([]byte, s.Limit+1)
}
var readBuffer []byte
for {
// do we have more room in the current buffer ?
if len(s.buffers[s.lastBuffer]) == cap(s.buffers[s.lastBuffer]) {
// current buffer is full, try to expand buffers
if s.remainedUnallocatedSpace == 0 {
// we ran out of memory, but is there any more data ?
n, err := reader.Read(make([]byte, 1))
switch {
case n > 0:
// yes, there was at least one extra byte - return ErrIncomingMsgTooLarge
return ErrIncomingMsgTooLarge
case err == io.EOF:
// no, no more data. just return nil
return nil
case err == nil:
// if we received err == nil and n == 0, we should retry calling the Read function.
continue
default:
// if we recieved a non-io.EOF error, return it.
return err
}
}

// make another buffer
s.allocateNextBuffer()
}

for s.size <= s.Limit {
more, err := reader.Read(s.buf[s.size:])
readBuffer = s.buffers[s.lastBuffer]
// the entireBuffer is the same underlying buffer as readBuffer, but the length was moved to the maximum buffer capacity.
entireBuffer := readBuffer[:cap(readBuffer)]
// read the data into the unused area of the read buffer.
n, err := reader.Read(entireBuffer[len(readBuffer):])
if err != nil {
if err == io.EOF {
s.size += uint64(more)
s.buffers[s.lastBuffer] = readBuffer[:len(readBuffer)+n]
return nil
}
return err
}

s.size += uint64(more)
s.buffers[s.lastBuffer] = readBuffer[:len(readBuffer)+n]
}

return ErrIncomingMsgTooLarge
}

// Size returs the current total size of contained chunks read from io.Reader
func (s *LimitedReaderSlurper) Size() uint64 {
return s.size
func (s *LimitedReaderSlurper) Size() (size uint64) {
for i := 0; i <= s.lastBuffer; i++ {
size += uint64(len(s.buffers[i]))
}
return
}

// Reset clears the buffered data
func (s *LimitedReaderSlurper) Reset() {
s.size = 0
for i := 1; i <= s.lastBuffer; i++ {
s.remainedUnallocatedSpace += uint64(cap(s.buffers[i]))
s.buffers[i] = nil
}
s.buffers[0] = s.buffers[0][:0]
s.lastBuffer = 0
}

// Bytes returns a copy of all the collected data
func (s *LimitedReaderSlurper) Bytes() []byte {
out := make([]byte, s.size)
copy(out, s.buf)
out := make([]byte, s.Size())
offset := 0
for i := 0; i <= s.lastBuffer; i++ {
copy(out[offset:], s.buffers[i])
offset += len(s.buffers[i])
}
return out
}

// allocateNextBuffer allocates the next buffer and places it in the buffers array.
func (s *LimitedReaderSlurper) allocateNextBuffer() {
s.lastBuffer++
allocationSize := allocationStep
if allocationSize > s.remainedUnallocatedSpace {
allocationSize = s.remainedUnallocatedSpace
}
s.buffers[s.lastBuffer] = make([]byte, 0, allocationSize)
s.remainedUnallocatedSpace -= allocationSize
}
147 changes: 147 additions & 0 deletions network/limited_reader_slurper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright (C) 2019-2021 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package network

import (
"bytes"
"fmt"
"io"
"testing"

"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/crypto"
)

func TestLimitedReaderSlurper(t *testing.T) {
for _, arraySize := range []uint64{30000, 90000, 200000} {
// create a random bytes array.
bytesBlob := make([]byte, arraySize)
crypto.RandBytes(bytesBlob[:])
for baseBufferSize := uint64(0); baseBufferSize < uint64(len(bytesBlob)); baseBufferSize += 731 {
for _, maxSize := range []uint64{arraySize - 10000, arraySize, arraySize + 10000} {
buffer := bytes.NewBuffer(bytesBlob)
reader := MakeLimitedReaderSlurper(baseBufferSize, maxSize)
err := reader.Read(buffer)
if maxSize < uint64(len(bytesBlob)) {
require.Equal(t, ErrIncomingMsgTooLarge, err)
continue
}

require.NoError(t, err)
bytes := reader.Bytes()
require.Equal(t, bytesBlob, bytes)
}
}
}
}

type fuzzReader struct {
pos int
buf []byte
}

func (f *fuzzReader) Read(b []byte) (n int, err error) {
s := int(crypto.RandUint64() % 19)
if s > len(b) {
s = len(b)
}
if f.pos >= len(f.buf) {
return 0, io.EOF
}
if f.pos+s >= len(f.buf) {
// we want a chunk that ends at ( or after ) the end of the data.
n = len(f.buf) - f.pos
err = io.EOF
} else {
n = s
}
copy(b, f.buf[f.pos:f.pos+n])
f.pos += n
return
}

func TestLimitedReaderSlurper_FuzzedBlippedSource(t *testing.T) {
arraySize := uint64(300000)
bytesBlob := make([]byte, arraySize)
crypto.RandBytes(bytesBlob[:])
for i := 0; i < 500; i++ {
for _, maxSize := range []uint64{arraySize - 10000, arraySize, arraySize + 10000} {
reader := MakeLimitedReaderSlurper(512, maxSize)
err := reader.Read(&fuzzReader{buf: bytesBlob})
if maxSize < uint64(len(bytesBlob)) {
require.Equal(t, ErrIncomingMsgTooLarge, err, "i: %d\nmaxSize: %d", i, maxSize)
continue
}
require.NoError(t, err)
bytes := reader.Bytes()
require.Equal(t, bytesBlob, bytes)
}
}
}

func benchmarkLimitedReaderSlurper(b *testing.B, arraySize uint64) {
bytesBlob := make([]byte, arraySize)
crypto.RandBytes(bytesBlob[:])
readers := make([]*LimitedReaderSlurper, b.N)
buffers := make([]*bytes.Buffer, b.N)
for i := 0; i < b.N; i++ {
buffers[i] = bytes.NewBuffer(bytesBlob)
readers[i] = MakeLimitedReaderSlurper(1024, 1024*1024)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
reader := readers[i]
err := reader.Read(buffers[i])
require.NoError(b, err)
reader.Bytes()
reader.Reset()
}
}
func BenchmarkLimitedReaderSlurper(b *testing.B) {
for _, arraySize := range []uint64{200, 2048, 300000} {
b.Run(fmt.Sprintf("%dbytes_message", arraySize), func(b *testing.B) {
benchmarkLimitedReaderSlurper(b, arraySize)
})
}
}

func TestLimitedReaderSlurperMemoryConsumption(t *testing.T) {
for _, arraySize := range []uint64{1024, 2048, 65536, 1024 * 1024} {
result := testing.Benchmark(func(b *testing.B) {
benchmarkLimitedReaderSlurper(b, arraySize)
})
require.True(t, uint64(result.AllocedBytesPerOp()) < 2*arraySize+allocationStep, "AllocedBytesPerOp:%d\nmessage size:%d", result.AllocedBytesPerOp(), arraySize)
}
}

func TestLimitedReaderSlurperBufferAllocations(t *testing.T) {
for baseAllocation := uint64(512); baseAllocation < 100000; baseAllocation += 2048 {
for maxAllocation := uint64(512); maxAllocation < 100000; maxAllocation += 512 {
lrs := MakeLimitedReaderSlurper(baseAllocation, maxAllocation)
// check to see if the allocated buffers count is exactly what needed to match the allocation needs.
allocationNeeds := 1
remainingBytes := int64(maxAllocation - baseAllocation)
for remainingBytes > 0 {
allocationNeeds++
remainingBytes -= int64(allocationStep)
}
require.Equal(t, allocationNeeds, len(lrs.buffers))

}
}
}
3 changes: 2 additions & 1 deletion network/wsPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
)

const maxMessageLength = 4 * 1024 * 1024 // Currently the biggest message is VB vote bundles. TODO: per message type size limit?
const averageMessageLength = 2 * 1024 // Most of the messages are smaller than this size, which makes it into a good base allocation.

// This parameter controls how many messages from a single peer can be
// queued up in the global wsNetwork.readBuffer at a time. Making this
Expand Down Expand Up @@ -359,7 +360,7 @@ func (wp *wsPeer) readLoop() {
wp.readLoopCleanup(cleanupCloseError)
}()
wp.conn.SetReadLimit(maxMessageLength)
slurper := LimitedReaderSlurper{Limit: maxMessageLength}
slurper := MakeLimitedReaderSlurper(averageMessageLength, maxMessageLength)
for {
msg := IncomingMessage{}
mtype, reader, err := wp.conn.NextReader()
Expand Down
3 changes: 2 additions & 1 deletion rpcs/httpTxSync.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type HTTPTxSync struct {
}

const requestContentType = "application/x-www-form-urlencoded"
const baseResponseReadingBufferSize = uint64(1024)

// ResponseBytes reads the content of the response object and return the body content
// while obeying the read size limits
Expand All @@ -62,7 +63,7 @@ func ResponseBytes(response *http.Response, log logging.Logger, limit uint64) (d
_, err = io.ReadFull(response.Body, data)
return
}
slurper := network.LimitedReaderSlurper{Limit: limit}
slurper := network.MakeLimitedReaderSlurper(baseResponseReadingBufferSize, limit)
err = slurper.Read(response.Body)
if err == network.ErrIncomingMsgTooLarge {
log.Errorf("response too large: %d > %d", slurper.Size(), limit)
Expand Down

0 comments on commit 5e4ca61

Please sign in to comment.