Skip to content

Commit

Permalink
feat(share): Periodic GC over EDSStore (celestiaorg#1359)
Browse files Browse the repository at this point in the history
  • Loading branch information
distractedm1nd authored Dec 6, 2022
1 parent 5080aba commit 0af528a
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 11 deletions.
3 changes: 2 additions & 1 deletion nodebuilder/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ type Module interface {
// BandwidthForPeer returns a Stats struct with bandwidth metrics associated with the given peer.ID.
// The metrics returned include all traffic sent / received for the peer, regardless of protocol.
BandwidthForPeer(id peer.ID) metrics.Stats
// BandwidthForProtocol returns a Stats struct with bandwidth metrics associated with the given protocol.ID.
// BandwidthForProtocol returns a Stats struct with bandwidth metrics associated with the given
// protocol.ID.
BandwidthForProtocol(proto protocol.ID) metrics.Stats

// ResourceState returns the state of the resource manager.
Expand Down
52 changes: 44 additions & 8 deletions share/eds/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"io"
"os"
"sync/atomic"
"time"

"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/index"
Expand All @@ -21,20 +23,27 @@ const (
blocksPath = "/blocks/"
indexPath = "/index/"
transientsPath = "/transients/"

defaultGCInterval = time.Hour
)

// Store maintains (via DAGStore) a top-level index enabling granular and efficient random access to
// every share and/or Merkle proof over every registered CARv1 file. The EDSStore provides a custom
// Blockstore interface implementation to achieve access. The main use-case is randomized sampling
// over the whole chain of EDS block data and getting data by namespace.
type Store struct {
cancel context.CancelFunc

dgstr *dagstore.DAGStore
mounts *mount.Registry

topIdx index.Inverted
carIdx index.FullIndexRepo

basepath string
basepath string
gcInterval time.Duration
// lastGCResult is only stored on the store for testing purposes.
lastGCResult atomic.Pointer[dagstore.GCResult]
}

// NewStore creates a new EDS Store under the given basepath and datastore.
Expand Down Expand Up @@ -70,25 +79,52 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) {
}

return &Store{
basepath: basepath,
dgstr: dagStore,
topIdx: invertedRepo,
carIdx: fsRepo,
mounts: r,
basepath: basepath,
dgstr: dagStore,
topIdx: invertedRepo,
carIdx: fsRepo,
gcInterval: defaultGCInterval,
mounts: r,
}, nil
}

// Start starts the underlying DAGStore.
func (s *Store) Start(context.Context) error {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
s.cancel = cancel

go s.gc(ctx)
return s.dgstr.Start(ctx)
}

// Stop stops the underlying DAGStore.
func (s *Store) Stop(context.Context) error {
defer s.cancel()
return s.dgstr.Close()
}

// gc periodically removes all inactive or errored shards.
func (s *Store) gc(ctx context.Context) {
ticker := time.NewTicker(s.gcInterval)
// initialize empty gc result to avoid panic on access
s.lastGCResult.Store(&dagstore.GCResult{
Shards: make(map[shard.Key]error),
})
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
res, err := s.dgstr.GC(ctx)
if err != nil {
log.Errorf("garbage collecting dagstore: %v", err)
return
}
s.lastGCResult.Store(res)
}

}
}

// Put stores the given data square with DataRoot's hash as a key.
//
// The square is verified on the Exchange level, and Put only stores the square, trusting it.
Expand Down
39 changes: 37 additions & 2 deletions share/eds/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"fmt"
"os"
"testing"

"github.com/stretchr/testify/assert"
"time"

"github.com/filecoin-project/dagstore/shard"
"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/ipld/go-car"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/share"
Expand Down Expand Up @@ -156,6 +156,41 @@ func TestEDSStore_Has(t *testing.T) {
assert.True(t, ok)
}

// TestEDSStore_GC verifies that unused transient shards are collected by the GC periodically.
func TestEDSStore_GC(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

edsStore, err := newStore(t)
edsStore.gcInterval = time.Second
require.NoError(t, err)

// kicks off the gc goroutine
err = edsStore.Start(ctx)
require.NoError(t, err)

eds, dah := randomEDS(t)
shardKey := shard.KeyFromString(dah.String())

err = edsStore.Put(ctx, dah, eds)
require.NoError(t, err)

// doesn't exist yet
assert.NotContains(t, edsStore.lastGCResult.Load().Shards, shardKey)

// wait for gc to run, retry three times
for i := 0; i < 3; i++ {
time.Sleep(edsStore.gcInterval)
if _, ok := edsStore.lastGCResult.Load().Shards[shardKey]; ok {
break
}
}
assert.Contains(t, edsStore.lastGCResult.Load().Shards, shardKey)

// assert nil in this context means there was no error re-acquiring the shard during GC
assert.Nil(t, edsStore.lastGCResult.Load().Shards[shardKey])
}

func newStore(t *testing.T) (*Store, error) {
t.Helper()

Expand Down

0 comments on commit 0af528a

Please sign in to comment.