Skip to content

Commit

Permalink
Add buffer configurations to readers and writers (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebheitzmann authored Jan 2, 2025
1 parent 68806d7 commit b06d88e
Show file tree
Hide file tree
Showing 12 changed files with 169 additions and 44 deletions.
9 changes: 5 additions & 4 deletions _examples/recordio.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package main

import (
"errors"
"github.com/thomasjungblut/go-sstables/_examples/proto"
"github.com/thomasjungblut/go-sstables/recordio"
rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
"io"
"log"
"os"

"github.com/thomasjungblut/go-sstables/_examples/proto"
"github.com/thomasjungblut/go-sstables/recordio"
rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
)

func main() {
Expand All @@ -21,7 +22,7 @@ func main() {
}

func simpleRead(path string) {
reader, err := rProto.NewProtoReaderWithPath(path)
reader, err := rProto.NewReader(rProto.ReaderPath(path))
if err != nil {
log.Fatalf("error: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion recordio/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ import (
rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
)

reader, err := rProto.NewProtoReaderWithPath(path)
reader, err := rProto.NewProtoReader(rProto.ReaderPath(path))
if err != nil { log.Fatalf("error: %v", err) }

err = reader.Open()
Expand Down
4 changes: 4 additions & 0 deletions recordio/file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,10 @@ func NewFileReader(readerOptions ...FileReaderOption) (ReaderI, error) {
readOption(opts)
}

if (opts.file == nil) == (opts.path == "") {
return nil, errors.New("NewFileReader: either os.File or string path must be supplied, never both")
}

f, r, err := opts.factory.CreateNewReader(opts.path, opts.bufferSizeBytes)
if err != nil {
return nil, err
Expand Down
22 changes: 20 additions & 2 deletions recordio/file_reader_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package recordio

import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"errors"
"io"
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestReaderHappyPathSingleRecord(t *testing.T) {
Expand Down Expand Up @@ -187,6 +190,21 @@ func TestReaderForbidsDoubleOpens(t *testing.T) {
expectErrorStringOnOpen(t, reader, "already opened")
}

func TestReaderInitNoPath(t *testing.T) {
_, err := NewFileReader()
assert.Equal(t, errors.New("NewFileReader: either os.File or string path must be supplied, never both"), err)
}

func TestReaderInitPathAndFile(t *testing.T) {
f, err := os.OpenFile("test_files/readerTestFile", os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
assert.NoError(t, err)
defer os.Remove("test_files/readerTestFile")
defer f.Close()
reader, err := NewFileReader(ReaderFile(f), ReaderPath("test_files/readerTestFile2"))
assert.Equal(t, errors.New("NewFileReader: either os.File or string path must be supplied, never both"), err)
assert.Nil(t, reader)
}

func expectErrorStringOnOpen(t *testing.T, reader OpenClosableI, expectedError string) {
err := reader.Open()
defer closeOpenClosable(t, reader)
Expand Down
72 changes: 61 additions & 11 deletions recordio/proto/proto_reader.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package proto

import (
"errors"
"os"

"github.com/thomasjungblut/go-sstables/recordio"
"google.golang.org/protobuf/proto"
"os"
)

type Reader struct {
Expand Down Expand Up @@ -36,27 +38,75 @@ func (r *Reader) Close() error {
return r.reader.Close()
}

func NewProtoReaderWithPath(path string) (ReaderI, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
// options

type ReaderOptions struct {
path string
file *os.File
bufSizeBytes int
}

type ReaderOption func(*ReaderOptions)

func ReaderPath(p string) ReaderOption {
return func(args *ReaderOptions) {
args.path = p
}
}

r, err := NewProtoReaderWithFile(f)
if err != nil {
return nil, err
func ReaderFile(p *os.File) ReaderOption {
return func(args *ReaderOptions) {
args.file = p
}
}

return r, nil
func ReadBufferSizeBytes(p int) ReaderOption {
return func(args *ReaderOptions) {
args.bufSizeBytes = p
}
}

func NewProtoReaderWithFile(file *os.File) (ReaderI, error) {
reader, err := recordio.NewFileReaderWithFile(file)
// create a new reader with the given options. Either Path or File must be supplied
func NewReader(readerOptions ...ReaderOption) (ReaderI, error) {
opts := &ReaderOptions{
path: "",
file: nil,
bufSizeBytes: 1024 * 1024 * 4,
}

for _, readerOption := range readerOptions {
readerOption(opts)
}

if (opts.file != nil) && (opts.path != "") {
return nil, errors.New("either os.File or string path must be supplied, never both")
}

if opts.file == nil {
if opts.path == "" {
return nil, errors.New("path was not supplied")
}
}
reader, err := recordio.NewFileReader(
recordio.ReaderPath(opts.path),
recordio.ReaderFile(opts.file),
recordio.ReaderBufferSizeBytes(opts.bufSizeBytes))
if err != nil {
return nil, err
}

return &Reader{
reader: reader,
}, nil

}

// Deprecated: use the NewProtoReader with options.
func NewProtoReaderWithPath(path string) (ReaderI, error) {
return NewReader(ReaderPath(path))
}

// Deprecated: use the NewProtoReader with options.
func NewProtoReaderWithFile(file *os.File) (ReaderI, error) {
return NewReader(ReaderFile(file))
}
9 changes: 5 additions & 4 deletions recordio/proto/recordio_proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package proto

import (
"bufio"
"math/rand"
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thomasjungblut/go-sstables/recordio"
"github.com/thomasjungblut/go-sstables/recordio/test_files"
"math/rand"
"os"
"testing"
)

const TestFile = "../test_files/berlin52.tsp"
Expand Down Expand Up @@ -63,7 +64,7 @@ func endToEndReadWriteProtobuf(writer WriterI, t *testing.T, tmpFile *os.File) {
require.NoError(t, writer.Close())
require.NoError(t, inFile.Close())

reader, err := NewProtoReaderWithPath(tmpFile.Name())
reader, err := NewReader(ReaderPath(tmpFile.Name()))
require.NoError(t, err)
require.NoError(t, reader.Open())

Expand Down
6 changes: 5 additions & 1 deletion simpledb/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ func executeCompaction(db *DB) (compactionMetadata *proto.CompactionMetadata, er
}

func saveCompactionMetadata(writeFolder string, compactionMetadata *proto.CompactionMetadata) (err error) {
metaWriter, err := rProto.NewWriter(rProto.Path(filepath.Join(writeFolder, CompactionFinishedSuccessfulFileName)))
metaWriter, err := rProto.NewWriter(
rProto.Path(filepath.Join(writeFolder, CompactionFinishedSuccessfulFileName)),
rProto.WriteBufferSizeBytes(4*1024),
)

if err != nil {
return err
}
Expand Down
25 changes: 25 additions & 0 deletions simpledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const MemStoreMaxSizeBytes uint64 = 1024 * 1024 * 1024 // 1gb
const NumSSTablesToTriggerCompaction int = 10
const DefaultCompactionMaxSizeBytes uint64 = 5 * 1024 * 1024 * 1024 // 5gb
const DefaultCompactionInterval = 5 * time.Second
const DefaultWriteBufferSizeBytes uint64 = 4 * 1024 * 1024 // 4Mb
const DefaultReadBufferSizeBytes uint64 = 4 * 1024 * 1024 // 4Mb

var ErrNotFound = errors.New("ErrNotFound")
var ErrNotOpenedYet = errors.New("database has not been opened yet, please call Open() first")
Expand Down Expand Up @@ -90,6 +92,9 @@ type DB struct {
compactionTicker *time.Ticker
compactionTickerStopChannel chan interface{}
doneCompactionChannel chan bool

writeBufferSizeBytes uint64
readBufferSizeBytes uint64
}

func (db *DB) Open() error {
Expand Down Expand Up @@ -327,6 +332,8 @@ func NewSimpleDB(basePath string, extraOptions ...ExtraOption) (*DB, error) {
NumSSTablesToTriggerCompaction,
DefaultCompactionMaxSizeBytes,
DefaultCompactionInterval,
DefaultWriteBufferSizeBytes,
DefaultReadBufferSizeBytes,
}

for _, extraOption := range extraOptions {
Expand Down Expand Up @@ -364,6 +371,8 @@ func NewSimpleDB(basePath string, extraOptions ...ExtraOption) (*DB, error) {
doneFlushChannel: doneFlushChan,
compactionTickerStopChannel: compactionTimerStopChannel,
doneCompactionChannel: doneCompactionChan,
readBufferSizeBytes: extraOpts.readBufferSizeBytes,
writeBufferSizeBytes: extraOpts.writeBufferSizeBytes,
}, nil
}

Expand All @@ -377,6 +386,8 @@ type ExtraOptions struct {
compactionFileThreshold int
compactionMaxSizeBytes uint64
compactionRunInterval time.Duration
writeBufferSizeBytes uint64
readBufferSizeBytes uint64
}

type ExtraOption func(options *ExtraOptions)
Expand Down Expand Up @@ -433,3 +444,17 @@ func CompactionMaxSizeBytes(n uint64) ExtraOption {
args.compactionMaxSizeBytes = n
}
}

// WriteBufferSizeBytes is the write buffer size for all buffer used by simple db.
func WriteBufferSizeBytes(n uint64) ExtraOption {
return func(args *ExtraOptions) {
args.writeBufferSizeBytes = n
}
}

// ReadBufferSizeBytes is the read buffer size for all buffer used by simple db.
func ReadBufferSizeBytes(n uint64) ExtraOption {
return func(args *ExtraOptions) {
args.readBufferSizeBytes = n
}
}
7 changes: 5 additions & 2 deletions simpledb/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package simpledb

import (
"fmt"
"github.com/thomasjungblut/go-sstables/memstore"
"github.com/thomasjungblut/go-sstables/sstables"
"log"
"os"
"path/filepath"
"sync/atomic"
"time"

"github.com/thomasjungblut/go-sstables/memstore"
"github.com/thomasjungblut/go-sstables/sstables"
)

func flushMemstoreContinuously(db *DB) {
Expand Down Expand Up @@ -50,6 +51,7 @@ func executeFlush(db *DB, flushAction memStoreFlushAction) error {
err = memStoreToFlush.FlushWithTombstones(
sstables.WriteBasePath(writePath),
sstables.WithKeyComparator(db.cmp),
sstables.WriteBufferSizeBytes(int(db.writeBufferSizeBytes)),
sstables.BloomExpectedNumberOfElements(numElements))
if err != nil {
return err
Expand All @@ -65,6 +67,7 @@ func executeFlush(db *DB, flushAction memStoreFlushAction) error {
reader, err := sstables.NewSSTableReader(
sstables.ReadBasePath(writePath),
sstables.ReadWithKeyComparator(db.cmp),
sstables.ReadBufferSizeBytes(int(db.readBufferSizeBytes)),
)
if err != nil {
return err
Expand Down
19 changes: 11 additions & 8 deletions simpledb/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ package simpledb

import (
"fmt"
"github.com/stretchr/testify/assert"
"github.com/thomasjungblut/go-sstables/memstore"
"github.com/thomasjungblut/go-sstables/skiplist"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/thomasjungblut/go-sstables/memstore"
"github.com/thomasjungblut/go-sstables/skiplist"
)

func TestFlushHappyPath(t *testing.T) {
Expand All @@ -33,11 +34,13 @@ func TestFlushHappyPath(t *testing.T) {
}

db := &DB{
cmp: skiplist.BytesComparator{},
basePath: tmpDir,
currentGeneration: 42,
rwLock: &sync.RWMutex{},
sstableManager: NewSSTableManager(skiplist.BytesComparator{}, &sync.RWMutex{}, tmpDir),
cmp: skiplist.BytesComparator{},
basePath: tmpDir,
currentGeneration: 42,
rwLock: &sync.RWMutex{},
sstableManager: NewSSTableManager(skiplist.BytesComparator{}, &sync.RWMutex{}, tmpDir),
readBufferSizeBytes: DefaultReadBufferSizeBytes,
writeBufferSizeBytes: DefaultWriteBufferSizeBytes,
}
err = executeFlush(db, action)
assert.Nil(t, err)
Expand Down
3 changes: 2 additions & 1 deletion simpledb/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (db *DB) repairCompactions() error {
}

// try to read it, if it's corrupted we would also delete it
reader, err := rProto.NewProtoReaderWithPath(metaPath)
reader, err := rProto.NewReader(rProto.ReaderPath(metaPath))
if err != nil {
return err
}
Expand Down Expand Up @@ -151,6 +151,7 @@ func (db *DB) reconstructSSTables() error {
reader, err := sstables.NewSSTableReader(
sstables.ReadBasePath(p),
sstables.ReadWithKeyComparator(db.cmp),
sstables.ReadBufferSizeBytes(int(db.readBufferSizeBytes)),
)
if err != nil {
return err
Expand Down
Loading

0 comments on commit b06d88e

Please sign in to comment.