Skip to content

Commit

Permalink
refactor(share/availability/light | share/availability/full): Availab…
Browse files Browse the repository at this point in the history
…ility implementations are aware of sampling window, removed from DASer (celestiaorg#3957)
  • Loading branch information
renaynay authored Dec 10, 2024
1 parent 250c129 commit ac5d7a7
Show file tree
Hide file tree
Showing 18 changed files with 192 additions and 88 deletions.
3 changes: 2 additions & 1 deletion core/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ func storeEDS(
eds *rsmt2d.ExtendedDataSquare,
store *store.Store,
window time.Duration,
archival bool,
) error {
if !availability.IsWithinWindow(eh.Time(), window) {
if !archival && !availability.IsWithinWindow(eh.Time(), window) {
log.Debugw("skipping storage of historic block", "height", eh.Height())
return nil
}
Expand Down
6 changes: 4 additions & 2 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Exchange struct {
construct header.ConstructFn

availabilityWindow time.Duration
archival bool

metrics *exchangeMetrics
}
Expand Down Expand Up @@ -54,6 +55,7 @@ func NewExchange(
store: store,
construct: construct,
availabilityWindow: p.availabilityWindow,
archival: p.archival,
metrics: metrics,
}, nil
}
Expand Down Expand Up @@ -147,7 +149,7 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende
&block.Height, hash, eh.Hash())
}

err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow)
err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow, ce.archival)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -187,7 +189,7 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64
panic(fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err))
}

err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow)
err = storeEDS(ctx, eh, eds, ce.store, ce.availabilityWindow, ce.archival)
if err != nil {
return nil, err
}
Expand Down
49 changes: 49 additions & 0 deletions core/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,55 @@ func TestExchange_DoNotStoreHistoric(t *testing.T) {
}
}

// TestExchange_StoreHistoricIfArchival makes sure blocks are stored past
// sampling window if archival is enabled
func TestExchange_StoreHistoricIfArchival(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

cfg := DefaultTestConfig()
fetcher, cctx := createCoreFetcher(t, cfg)

generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx)

store, err := store.NewStore(store.DefaultParameters(), t.TempDir())
require.NoError(t, err)

ce, err := NewExchange(
fetcher,
store,
header.MakeExtendedHeader,
WithAvailabilityWindow(time.Nanosecond), // all blocks will be "historic"
WithArchivalMode(), // make sure to store them anyway
)
require.NoError(t, err)

// initialize store with genesis block
genHeight := int64(1)
genBlock, err := fetcher.GetBlock(ctx, &genHeight)
require.NoError(t, err)
genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes())
require.NoError(t, err)

headers, err := ce.GetRangeByHeight(ctx, genHeader, 30)
require.NoError(t, err)

// ensure all "historic" EDSs were stored
for _, h := range headers {
has, err := store.HasByHeight(ctx, h.Height())
require.NoError(t, err)
assert.True(t, has)

// empty EDSs are expected to exist in the store, so we skip them
if h.DAH.Equals(share.EmptyEDSRoots()) {
continue
}
has, err = store.HasByHash(ctx, h.DAH.Hash())
require.NoError(t, err)
assert.True(t, has)
}
}

func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testnode.Context) {
cctx := StartTestNodeWithConfig(t, cfg)
// wait for height 2 in order to be able to start submitting txs (this prevents
Expand Down
4 changes: 3 additions & 1 deletion core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Listener struct {
construct header.ConstructFn
store *store.Store
availabilityWindow time.Duration
archival bool

headerBroadcaster libhead.Broadcaster[*header.ExtendedHeader]
hashBroadcaster shrexsub.BroadcastFn
Expand Down Expand Up @@ -83,6 +84,7 @@ func NewListener(
construct: construct,
store: store,
availabilityWindow: p.availabilityWindow,
archival: p.archival,
listenerTimeout: 5 * blocktime,
metrics: metrics,
chainID: p.chainID,
Expand Down Expand Up @@ -237,7 +239,7 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS
panic(fmt.Errorf("making extended header: %w", err))
}

err = storeEDS(ctx, eh, eds, cl.store, cl.availabilityWindow)
err = storeEDS(ctx, eh, eds, cl.store, cl.availabilityWindow, cl.archival)
if err != nil {
return fmt.Errorf("storing EDS: %w", err)
}
Expand Down
8 changes: 8 additions & 0 deletions core/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ type params struct {
metrics bool
chainID string
availabilityWindow time.Duration
archival bool
}

func defaultParams() params {
return params{
availabilityWindow: time.Duration(0),
archival: false,
}
}

Expand All @@ -39,3 +41,9 @@ func WithAvailabilityWindow(window time.Duration) Option {
p.availabilityWindow = window
}
}

func WithArchivalMode() Option {
return func(p *params) {
p.archival = true
}
}
14 changes: 0 additions & 14 deletions das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,12 @@ import (

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
)

var log = logging.Logger("das")

// errOutsideSamplingWindow is an error used to inform
// the caller of Sample that the given header is outside
// the sampling window.
var errOutsideSamplingWindow = fmt.Errorf("skipping header outside of sampling window")

// DASer continuously validates availability of data committed to headers.
type DASer struct {
params Parameters
Expand Down Expand Up @@ -160,14 +154,6 @@ func (d *DASer) Stop(ctx context.Context) error {
}

func (d *DASer) sample(ctx context.Context, h *header.ExtendedHeader) error {
// short-circuit if pruning is enabled and the header is outside the
// availability window
if !availability.IsWithinWindow(h.Time(), d.params.samplingWindow) {
log.Debugw("skipping header outside sampling window", "height", h.Height(),
"time", h.Time())
return errOutsideSamplingWindow
}

err := d.da.SharesAvailable(ctx, h)
if err != nil {
var byzantineErr *byzantine.ErrByzantine
Expand Down
41 changes: 0 additions & 41 deletions das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package das
import (
"context"
"fmt"
"strconv"
"testing"
"time"

Expand All @@ -20,7 +19,6 @@ import (
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/header/headertest"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability"
"github.com/celestiaorg/celestia-node/share/availability/mocks"
"github.com/celestiaorg/celestia-node/share/eds/edstest"
)
Expand Down Expand Up @@ -243,45 +241,6 @@ func TestDASerSampleTimeout(t *testing.T) {
}
}

// TestDASer_SamplingWindow tests the sampling window determination
// for headers.
func TestDASer_SamplingWindow(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
sub := new(headertest.Subscriber)
fserv := &fraudtest.DummyService[*header.ExtendedHeader]{}
getter := getterStub{}
avail := mocks.NewMockAvailability(gomock.NewController(t))

// create and start DASer
daser, err := NewDASer(avail, sub, getter, ds, fserv, newBroadcastMock(1),
WithSamplingWindow(time.Second))
require.NoError(t, err)

tests := []struct {
timestamp time.Time
withinWindow bool
}{
{timestamp: time.Now().Add(-(time.Second * 5)), withinWindow: false},
{timestamp: time.Now().Add(-(time.Millisecond * 800)), withinWindow: true},
{timestamp: time.Now().Add(-(time.Hour)), withinWindow: false},
{timestamp: time.Now().Add(-(time.Hour * 24 * 30)), withinWindow: false},
{timestamp: time.Now(), withinWindow: true},
}

for i, tt := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
eh := headertest.RandExtendedHeader(t)
eh.RawHeader.Time = tt.timestamp

assert.Equal(
t,
tt.withinWindow,
availability.IsWithinWindow(eh.Time(), daser.params.samplingWindow),
)
})
}
}

// createDASerSubcomponents takes numGetter (number of headers
// to store in mockGetter) and numSub (number of headers to store
// in the mock header.Subscriber), returning a newly instantiated
Expand Down
13 changes: 0 additions & 13 deletions das/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ type Parameters struct {
// divided between parallel workers. SampleTimeout should be adjusted proportionally to
// ConcurrencyLimit.
SampleTimeout time.Duration

// samplingWindow determines the time window that headers should fall into
// in order to be sampled. If set to 0, the sampling window will include
// all headers.
samplingWindow time.Duration
}

// DefaultParameters returns the default configuration values for the daser parameters
Expand Down Expand Up @@ -161,11 +156,3 @@ func WithSampleTimeout(sampleTimeout time.Duration) Option {
d.params.SampleTimeout = sampleTimeout
}
}

// WithSamplingWindow is a functional option to configure the DASer's
// `samplingWindow` parameter.
func WithSamplingWindow(samplingWindow time.Duration) Option {
return func(d *DASer) {
d.params.samplingWindow = samplingWindow
}
}
5 changes: 3 additions & 2 deletions das/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
libhead "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/share/availability"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
)

Expand Down Expand Up @@ -83,7 +84,7 @@ func (w *worker) run(ctx context.Context, timeout time.Duration, resultCh chan<-
// sampling worker will resume upon restart
return
}
if errors.Is(err, errOutsideSamplingWindow) {
if errors.Is(err, availability.ErrOutsideSamplingWindow) {
skipped++
err = nil
}
Expand Down Expand Up @@ -119,7 +120,7 @@ func (w *worker) sample(ctx context.Context, timeout time.Duration, height uint6
defer cancel()

err = w.sampleFn(ctx, h)
if errors.Is(err, errOutsideSamplingWindow) {
if errors.Is(err, availability.ErrOutsideSamplingWindow) {
// if header is outside sampling window, do not log
// or record it.
return err
Expand Down
4 changes: 0 additions & 4 deletions nodebuilder/das/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/celestiaorg/celestia-node/das"
"github.com/celestiaorg/celestia-node/header"
modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
modshare "github.com/celestiaorg/celestia-node/nodebuilder/share"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
Expand Down Expand Up @@ -45,11 +44,8 @@ func newDASer(
batching datastore.Batching,
fraudServ fraud.Service[*header.ExtendedHeader],
bFn shrexsub.BroadcastFn,
availWindow modshare.Window,
options ...das.Option,
) (*das.DASer, *modfraud.ServiceBreaker[*das.DASer, *header.ExtendedHeader], error) {
options = append(options, das.WithSamplingWindow(availWindow.Duration()))

ds, err := das.NewDASer(da, hsub, store, batching, fraudServ, bFn, options...)
if err != nil {
return nil, nil, err
Expand Down
13 changes: 7 additions & 6 deletions nodebuilder/pruner/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/pruner/full"
"github.com/celestiaorg/celestia-node/share/availability"
fullavail "github.com/celestiaorg/celestia-node/share/availability/full"
"github.com/celestiaorg/celestia-node/share/availability/light"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/discovery"
)
Expand Down Expand Up @@ -59,33 +60,33 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
baseComponents,
prunerService,
fxutil.ProvideAs(full.NewPruner, new(pruner.Pruner)),
fx.Supply([]fullavail.Option{}),
)
}
return fx.Module("prune",
baseComponents,
fx.Invoke(func(ctx context.Context, ds datastore.Batching) error {
return pruner.DetectPreviousRun(ctx, ds)
}),
fx.Supply([]fullavail.Option{fullavail.WithArchivalMode()}),
)
case node.Bridge:
if cfg.EnableService {
return fx.Module("prune",
baseComponents,
prunerService,
fxutil.ProvideAs(full.NewPruner, new(pruner.Pruner)),
fx.Provide(func(window modshare.Window) []core.Option {
return []core.Option{core.WithAvailabilityWindow(window.Duration())}
}),
fx.Supply([]fullavail.Option{}),
fx.Supply([]core.Option{}),
)
}
return fx.Module("prune",
baseComponents,
fx.Invoke(func(ctx context.Context, ds datastore.Batching) error {
return pruner.DetectPreviousRun(ctx, ds)
}),
fx.Provide(func() []core.Option {
return []core.Option{}
}),
fx.Supply([]fullavail.Option{fullavail.WithArchivalMode()}),
fx.Supply([]core.Option{core.WithArchivalMode()}),
)
default:
panic("unknown node type")
Expand Down
Loading

0 comments on commit ac5d7a7

Please sign in to comment.