Skip to content

Commit

Permalink
Compaction without tombstone (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebheitzmann authored Nov 7, 2024
1 parent d5fa566 commit 0ca9659
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 17 deletions.
14 changes: 9 additions & 5 deletions simpledb/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ package simpledb

import (
"errors"
rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
"github.com/thomasjungblut/go-sstables/simpledb/proto"
"github.com/thomasjungblut/go-sstables/skiplist"
"github.com/thomasjungblut/go-sstables/sstables"
"log"
"os"
"path/filepath"
"sort"
"strings"
"time"

rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
"github.com/thomasjungblut/go-sstables/simpledb/proto"
"github.com/thomasjungblut/go-sstables/skiplist"
"github.com/thomasjungblut/go-sstables/sstables"
)

func backgroundCompaction(db *DB) {
Expand Down Expand Up @@ -115,7 +116,10 @@ func executeCompaction(db *DB) (compactionMetadata *proto.CompactionMetadata, er
}
}()

// TODO(thomas): this includes tombstones, do we really need to keep them?
// we need to keep it if the sstable is not the first with this key.
// SS1(KEY1=toto) SS2(KEY2=deleted) in this case the KEY2 can be removed in SS2
// SS1(KEY2=toto) SS2(KEY2=deleted) in this case the KEY2 can't be removed in SS2

err = sstables.NewSSTableMerger(db.cmp).MergeCompact(iterators, writer, sstables.ScanReduceLatestWins)
if err != nil {
return nil, err
Expand Down
97 changes: 96 additions & 1 deletion simpledb/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ package simpledb

import (
"fmt"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/thomasjungblut/go-sstables/memstore"
"github.com/thomasjungblut/go-sstables/sstables"
"github.com/thomasjungblut/go-sstables/sstables/proto"
"testing"
)

func TestExecCompactionLessFilesThanExpected(t *testing.T) {
Expand Down Expand Up @@ -47,3 +52,93 @@ func TestExecCompactionSameContent(t *testing.T) {
// for cleanups
assert.Nil(t, db.sstableManager.currentReader.Close())
}

func writeSSTableWithDataInDatabaseFolder(t *testing.T, db *DB, p string) {
fakeTablePath := filepath.Join(db.basePath, p)
assert.Nil(t, os.MkdirAll(fakeTablePath, 0700))
mStore := memstore.NewMemStore()
for i := 0; i < 1000; i++ {
assert.Nil(t, mStore.Add([]byte(fmt.Sprintf("%d", i)), []byte(fmt.Sprintf("%d", i))))
}
assert.Nil(t, mStore.Flush(
sstables.WriteBasePath(fakeTablePath),
sstables.WithKeyComparator(db.cmp),
))
}

func writeSSTableWithTombstoneInDatabaseFolder(t *testing.T, db *DB, p string) {
fakeTablePath := filepath.Join(db.basePath, p)
assert.Nil(t, os.MkdirAll(fakeTablePath, 0700))
mStore := memstore.NewMemStore()

// delete all key between 500 and 800
for i := 500; i < 800; i++ {
assert.Nil(t, mStore.Tombstone([]byte(fmt.Sprintf("%d", i))))
}
assert.Nil(t, mStore.FlushWithTombstones(
sstables.WriteBasePath(fakeTablePath),
sstables.WithKeyComparator(db.cmp),
))
}

func TestExecCompactionWithTombstone(t *testing.T) {
db := newOpenedSimpleDB(t, "simpledb_compactionSameContent")
defer cleanDatabaseFolder(t, db)
// we'll close the database to mock some internals directly, yes it's very hacky
closeDatabase(t, db)
db.closed = false
db.compactionThreshold = 0

writeSSTableWithDataInDatabaseFolder(t, db, fmt.Sprintf(SSTablePattern, 42))
// only one SStable with holes should shrink
writeSSTableWithTombstoneInDatabaseFolder(t, db, fmt.Sprintf(SSTablePattern, 43))
assert.Nil(t, db.reconstructSSTables())
// 1000 initial + 300 Tombstone on second table
assert.Equal(t, 1300, int(db.sstableManager.currentSSTable().MetaData().GetNumRecords()))

compactionMeta, err := executeCompaction(db)
assert.Nil(t, err)
assert.Equal(t, "sstable_000000000000042", compactionMeta.ReplacementPath)
assert.Equal(t, []string{"sstable_000000000000042", "sstable_000000000000043"}, compactionMeta.SstablePaths)
fmt.Print(compactionMeta)
err = db.sstableManager.reflectCompactionResult(compactionMeta)
assert.NoError(t, err)
v, err := db.Get("512")
assert.ErrorIs(t, err, ErrNotFound)
assert.Equal(t, "", v)
// for cleanups
assert.Nil(t, db.sstableManager.currentReader.Close())

// check size of compacted sstable
assert.Equal(t, 700, int(db.sstableManager.currentSSTable().MetaData().NumRecords))
}
func TestExecCompactionWithTombstoneRewriten(t *testing.T) {
db := newOpenedSimpleDB(t, "simpledb_compactionSameContent")
defer cleanDatabaseFolder(t, db)
// we'll close the database to mock some internals directly, yes it's very hacky
closeDatabase(t, db)
db.closed = false
db.compactionThreshold = 0

writeSSTableWithTombstoneInDatabaseFolder(t, db, fmt.Sprintf(SSTablePattern, 42))
// the tombstone are overwrite
writeSSTableWithDataInDatabaseFolder(t, db, fmt.Sprintf(SSTablePattern, 43))
assert.Nil(t, db.reconstructSSTables())
assert.Equal(t, 1300, int(db.sstableManager.currentSSTable().MetaData().GetNumRecords()))

compactionMeta, err := executeCompaction(db)
assert.Nil(t, err)
assert.Equal(t, "sstable_000000000000042", compactionMeta.ReplacementPath)
assert.Equal(t, []string{"sstable_000000000000042", "sstable_000000000000043"}, compactionMeta.SstablePaths)
fmt.Print(compactionMeta)
err = db.sstableManager.reflectCompactionResult(compactionMeta)
assert.NoError(t, err)
v, err := db.Get("512")
assert.NoError(t, err)
assert.Equal(t, "512", v)
// for cleanups
assert.Nil(t, db.sstableManager.currentReader.Close())

// check size of compacted sstable
assert.Equal(t, 1000, int(db.sstableManager.currentSSTable().MetaData().NumRecords))
}
5 changes: 3 additions & 2 deletions simpledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ type DatabaseI interface {
}

type compactionAction struct {
pathsToCompact []string
totalRecords uint64
pathsToCompact []string
totalRecords uint64
canRemoveTombstone bool // if the compaction don't start from the first sstable we cannot remove tombstone
}

type memStoreFlushAction struct {
Expand Down
18 changes: 12 additions & 6 deletions simpledb/sstable_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package simpledb

import (
"fmt"
"github.com/thomasjungblut/go-sstables/simpledb/proto"
"github.com/thomasjungblut/go-sstables/skiplist"
"github.com/thomasjungblut/go-sstables/sstables"
"golang.org/x/exp/slices"
"os"
"path/filepath"
"sort"
"sync"

"github.com/thomasjungblut/go-sstables/simpledb/proto"
"github.com/thomasjungblut/go-sstables/skiplist"
"github.com/thomasjungblut/go-sstables/sstables"
"golang.org/x/exp/slices"
)

type SSTableManager struct {
Expand Down Expand Up @@ -117,21 +118,26 @@ func (s *SSTableManager) candidateTablesForCompaction(compactionMaxSizeBytes uin
defer s.managerLock.RUnlock()

numRecords := uint64(0)
canRemoveTombstone := false
var paths []string
for i := 0; i < len(s.allSSTableReaders); i++ {
reader := s.allSSTableReaders[i]
// avoid the EmptySStableReader (or empty files) and only include small enough SSTables
if reader.MetaData().NumRecords > 0 && reader.MetaData().TotalBytes < compactionMaxSizeBytes {
paths = append(paths, reader.BasePath())
numRecords += reader.MetaData().NumRecords
if i == 0 {
canRemoveTombstone = true
}
}
}

sort.Strings(paths)

return compactionAction{
pathsToCompact: paths,
totalRecords: numRecords,
pathsToCompact: paths,
totalRecords: numRecords,
canRemoveTombstone: canRemoveTombstone,
}
}

Expand Down
10 changes: 7 additions & 3 deletions sstables/super_sstable_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package sstables

import (
"errors"
"strings"

"github.com/thomasjungblut/go-sstables/skiplist"
"github.com/thomasjungblut/go-sstables/sstables/proto"
"strings"
)

// SuperSSTableReader unifies several sstables under one single reader with the same interface.
Expand Down Expand Up @@ -108,8 +109,11 @@ func ScanReduceLatestWins(key []byte, values [][]byte, context []int) ([]byte, [
maxCtxIndex = i
}
}

return key, values[maxCtxIndex]
val := values[maxCtxIndex]
if len(val) == 0 {
return nil, nil
}
return key, val
}

func (s SuperSSTableReader) Close() (err error) {
Expand Down

0 comments on commit 0ca9659

Please sign in to comment.