Skip to content

Commit

Permalink
fix(core): Don't store eds if its hash already exists in eds.Store (c…
Browse files Browse the repository at this point in the history
…elestiaorg#1786)

Resolves celestiaorg#1785 

TODO: 
- [ ] ideally a test that can do duplicate blocks
  • Loading branch information
renaynay authored Feb 23, 2023
1 parent 85f1a37 commit f7c10da
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 28 deletions.
20 changes: 20 additions & 0 deletions core/eds.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package core

import (
"context"
"errors"

"github.com/filecoin-project/dagstore"

"github.com/tendermint/tendermint/types"

"github.com/celestiaorg/celestia-app/pkg/da"
appshares "github.com/celestiaorg/celestia-app/pkg/shares"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/libs/utils"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
)

// extendBlock extends the given block data, returning the resulting
Expand All @@ -24,3 +31,16 @@ func extendBlock(data types.Data) (*rsmt2d.ExtendedDataSquare, error) {
size := utils.SquareSize(len(shares))
return da.ExtendShares(size, appshares.ToBytes(shares))
}

// storeEDS will only store extended block if it is not empty and doesn't already exist.
func storeEDS(ctx context.Context, hash share.DataHash, eds *rsmt2d.ExtendedDataSquare, store *eds.Store) error {
if eds == nil {
return nil
}
err := store.Put(ctx, hash, eds)
if errors.Is(err, dagstore.ErrShardExists) {
// block with given root already exists, return nil
return nil
}
return err
}
21 changes: 8 additions & 13 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,11 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende
if !bytes.Equal(hash, eh.Hash()) {
return nil, fmt.Errorf("incorrect hash in header: expected %x, got %x", hash, eh.Hash())
}
// store extended block if it is not empty
if eds != nil {
err = ce.store.Put(ctx, eh.DAH.Hash(), eds)
if err != nil {
return nil, err
}
err = storeEDS(ctx, eh.DAH.Hash(), eds, ce.store)
if err != nil {
log.Errorw("storing EDS to eds.Store", "err", err)
return nil, err
}

return eh, nil
}

Expand Down Expand Up @@ -136,12 +133,10 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64
if err != nil {
return nil, err
}
// only store extended block if it's not empty
if eds != nil {
err = ce.store.Put(ctx, eh.DAH.Hash(), eds)
if err != nil {
return nil, err
}
err = storeEDS(ctx, eh.DAH.Hash(), eds, ce.store)
if err != nil {
log.Errorw("storing EDS to eds.Store", "err", err)
return nil, err
}
return eh, nil
}
2 changes: 1 addition & 1 deletion core/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func createCoreFetcher(t *testing.T, cfg *TestConfig) (*BlockFetcher, testnode.C
cctx := StartTestNodeWithConfig(t, cfg)
// wait for height 2 in order to be able to start submitting txs (this prevents
// flakiness with accessing account state)
_, err := cctx.WaitForHeightWithTimeout(2, time.Second) // TODO @renaynay: configure?
_, err := cctx.WaitForHeightWithTimeout(2, time.Second*2) // TODO @renaynay: configure?
require.NoError(t, err)
return NewBlockFetcher(cctx.Client), cctx
}
Expand Down
2 changes: 1 addition & 1 deletion core/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
const newBlockSubscriber = "NewBlock/Events"

var (
log = logging.Logger("core/fetcher")
log = logging.Logger("core")
newBlockEventQuery = types.QueryForEvent(types.EventNewBlock).String()
)

Expand Down
12 changes: 5 additions & 7 deletions core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,11 @@ func (cl *Listener) listen(ctx context.Context, sub <-chan *types.Block) {
log.Errorw("listener: making extended header", "err", err)
return
}
// store block data if not empty
if eds != nil {
err = cl.store.Put(ctx, eh.DAH.Hash(), eds)
if err != nil {
log.Errorw("listener: storing extended header", "err", err)
return
}
// attempt to store block data if not empty
err = storeEDS(ctx, eh.DAH.Hash(), eds, cl.store)
if err != nil {
log.Errorw("listener: storing EDS", "err", err)
return
}

// notify network of new EDS hash only if core is already synced
Expand Down
18 changes: 15 additions & 3 deletions share/eds/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,15 @@ func (s *Store) gc(ctx context.Context) {
// The resulting file stores all the shares and NMT Merkle Proofs of the EDS.
// Additionally, the file gets indexed s.t. store.Blockstore can access them.
func (s *Store) Put(ctx context.Context, root share.DataHash, square *rsmt2d.ExtendedDataSquare) (err error) {
// if root already exists, short-circuit
has, err := s.Has(ctx, root)
if err != nil {
return fmt.Errorf("failed to check if root already exists in index: %w", err)
}
if has {
return dagstore.ErrShardExists
}

ctx, span := tracer.Start(ctx, "store/put", trace.WithAttributes(
attribute.String("root", root.String()),
attribute.Int("width", int(square.Width())),
Expand Down Expand Up @@ -372,11 +381,14 @@ func (s *Store) Has(ctx context.Context, root share.DataHash) (bool, error) {

key := root.String()
info, err := s.dgstr.GetShardInfo(shard.KeyFromString(key))
if err == dagstore.ErrShardUnknown {
switch err {
case nil:
return true, info.Error
case dagstore.ErrShardUnknown:
return false, info.Error
default:
return false, err
}

return true, info.Error
}

func setupPath(basepath string) error {
Expand Down
4 changes: 2 additions & 2 deletions share/eds/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestEDSStore(t *testing.T) {
// shard hasn't been registered yet
has, err := edsStore.Has(ctx, dah.Hash())
assert.False(t, has)
assert.Error(t, err, "shard not found")
assert.NoError(t, err)

err = edsStore.Put(ctx, dah.Hash(), eds)
assert.NoError(t, err)
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestEDSStore(t *testing.T) {
eds, dah := randomEDS(t)

ok, err := edsStore.Has(ctx, dah.Hash())
assert.Error(t, err, "shard not found")
assert.NoError(t, err)
assert.False(t, ok)

err = edsStore.Put(ctx, dah.Hash(), eds)
Expand Down
2 changes: 1 addition & 1 deletion share/getters/getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestTeeGetter(t *testing.T) {
// eds store doesn't have the EDS yet
ok, err := edsStore.Has(ctx, dah.Hash())
assert.False(t, ok)
assert.Error(t, err)
assert.NoError(t, err)

retrievedEDS, err := tg.GetEDS(ctx, &dah)
require.NoError(t, err)
Expand Down

0 comments on commit f7c10da

Please sign in to comment.