Skip to content

Commit

Permalink
fix(shwap/bitswap): Blockstore.GetSize: getting size with no compute (c…
Browse files Browse the repository at this point in the history
…elestiaorg#3894)

Bitswap prioritizes peers based on their active/pending work and the priority that peers set for requests(work) themselves. The prioritization happens on the `Get` operation of `Blockstore` not `GetSize`. However, we currently do the most expensive work in `GetSize` followed by `Get`. It is still part of the same WANT_BLOCK request(since celestiaorg#3813), and `GetSize` work is usually cached for `Get` to catch it. Still, the prioritization makes it so that the time between `GetSize` and `Get` can be pretty long, inducing cache misses and redoing the same work again, which is confirmed by profiles. Also, doing the expensive part in `GetSize` avoids Bitswap server's rate limiting. 

All this brings the need to make `GetSize` as lightweight as possible, leaving actual proof computed to `Get`. This PR achieves this by defining a constant max block size for every type block.

--- 

🎱 mb blonks certified
  • Loading branch information
Wondertan authored Nov 15, 2024
1 parent 4bf1835 commit 24ba6da
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 31 deletions.
38 changes: 30 additions & 8 deletions share/shwap/p2p/bitswap/block_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (

// EmptyBlock constructs an empty Block with type in the given CID.
func EmptyBlock(cid cid.Cid) (Block, error) {
spec, ok := specRegistry[cid.Prefix().MhType]
if !ok {
return nil, fmt.Errorf("unsupported Block type: %v", cid.Prefix().MhType)
spec, err := getSpec(cid)
if err != nil {
return nil, err
}

blk, err := spec.builder(cid)
Expand All @@ -23,27 +23,49 @@ func EmptyBlock(cid cid.Cid) (Block, error) {
return blk, nil
}

// maxBlockSize returns the maximum size of the Block type in the given CID.
func maxBlockSize(cid cid.Cid) (int, error) {
spec, err := getSpec(cid)
if err != nil {
return 0, err
}

return spec.maxSize, nil
}

// registerBlock registers the new Block type and multihash for it.
func registerBlock(mhcode, codec uint64, idSize int, bldrFn func(cid.Cid) (Block, error)) {
func registerBlock(mhcode, codec uint64, maxSize, idSize int, bldrFn func(cid.Cid) (Block, error)) {
mh.Register(mhcode, func() hash.Hash {
return &hasher{IDSize: idSize}
})
specRegistry[mhcode] = blockSpec{
specRegistry[codec] = blockSpec{
idSize: idSize,
codec: codec,
maxSize: maxSize,
mhCode: mhcode,
builder: bldrFn,
}
}

// getSpec returns the blockSpec for the given CID.
func getSpec(cid cid.Cid) (blockSpec, error) {
spec, ok := specRegistry[cid.Type()]
if !ok {
return blockSpec{}, fmt.Errorf("unsupported codec %d", cid.Type())
}

return spec, nil
}

// blockSpec holds constant metadata about particular Block types.
type blockSpec struct {
idSize int
codec uint64
maxSize int
mhCode uint64
builder func(cid.Cid) (Block, error)
}

func (spec *blockSpec) String() string {
return fmt.Sprintf("BlockSpec{IDSize: %d, Codec: %d}", spec.idSize, spec.codec)
return fmt.Sprintf("BlockSpec{IDSize: %d, MHCode: %d}", spec.idSize, spec.mhCode)
}

var specRegistry = make(map[uint64]blockSpec)
26 changes: 11 additions & 15 deletions share/shwap/p2p/bitswap/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Blockstore struct {
Getter AccessorGetter
}

func (b *Blockstore) getBlock(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
func (b *Blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
blk, err := EmptyBlock(cid)
if err != nil {
return nil, err
Expand All @@ -42,6 +42,7 @@ func (b *Blockstore) getBlock(ctx context.Context, cid cid.Cid) (blocks.Block, e
if err != nil {
return nil, fmt.Errorf("getting EDS Accessor for height %v: %w", blk.Height(), err)
}

defer func() {
if err := acc.Close(); err != nil {
log.Warnf("failed to close EDS accessor for height %v: %s", blk.Height(), err)
Expand All @@ -55,24 +56,19 @@ func (b *Blockstore) getBlock(ctx context.Context, cid cid.Cid) (blocks.Block, e
return convertBitswap(blk)
}

func (b *Blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
blk, err := b.getBlock(ctx, cid)
if err != nil {
return nil, err
}

return blk, nil
}
func (b *Blockstore) GetSize(_ context.Context, cid cid.Cid) (int, error) {
// NOTE: Size is used as a weight for the incoming Bitswap requests. Bitswap uses fair scheduling for the requests
// and prioritizes peers with less *active* work. Active work of a peer is a cumulative weight of all the in-progress
// requests.

func (b *Blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
// TODO(@Wondertan): Bitswap checks the size of the data(GetSize) before serving it via Get. This means
// GetSize may do an unnecessary read from disk which we can avoid by either caching on Blockstore level
// or returning constant size(we know at that point that we have requested data)
blk, err := b.Get(ctx, cid)
// Constant max block size is used instead of factual size. This avoids disk IO but equalizes the weights of the
// requests of the same type. E.g. row of 2MB EDS and row of 8MB EDS will have the same weight.
size, err := maxBlockSize(cid)
if err != nil {
return 0, err
}
return len(blk.RawData()), nil

return size, nil
}

func (b *Blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
Expand Down
4 changes: 3 additions & 1 deletion share/shwap/p2p/bitswap/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import (
const (
testCodec = 0x9999
testMultihashCode = 0x9999
testBlockSize = 256
testIDSize = 2
)

func init() {
registerBlock(
testMultihashCode,
testCodec,
testBlockSize,
testIDSize,
func(cid cid.Cid) (Block, error) {
return newEmptyTestBlock(cid)
Expand Down Expand Up @@ -67,7 +69,7 @@ type testBlock struct {
}

func newTestBlock(id int) *testBlock {
bytes := make([]byte, 256)
bytes := make([]byte, testBlockSize)
_, _ = crand.Read(bytes)
return &testBlock{id: testID(id), data: bytes}
}
Expand Down
16 changes: 9 additions & 7 deletions share/shwap/p2p/bitswap/cid.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,18 @@ func encodeToCID(bm encoding.BinaryMarshaler, mhcode, codec uint64) cid.Cid {

// validateCID checks correctness of the CID.
func validateCID(cid cid.Cid) error {
prefix := cid.Prefix()
spec, ok := specRegistry[prefix.MhType]
if !ok {
return fmt.Errorf("unsupported multihash type %d", prefix.MhType)
spec, err := getSpec(cid)
if err != nil {
return err
}

if prefix.Codec != spec.codec {
return fmt.Errorf("invalid CID codec %d", prefix.Codec)
prefix := cid.Prefix()
if prefix.Version != 1 {
return fmt.Errorf("invalid cid version %d", prefix.Version)
}
if prefix.MhType != spec.mhCode {
return fmt.Errorf("invalid multihash type %d", prefix.MhType)
}

if prefix.MhLength != spec.idSize {
return fmt.Errorf("invalid multihash length %d", prefix.MhLength)
}
Expand Down
6 changes: 6 additions & 0 deletions share/shwap/p2p/bitswap/row_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/ipfs/go-cid"

libshare "github.com/celestiaorg/go-square/v2/share"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/share"
Expand All @@ -22,10 +23,15 @@ const (
rowMultihashCode = 0x7801
)

// maxRowSize is the maximum size of the RowBlock.
// It is calculated as half of the square size multiplied by the share size.
var maxRowSize = share.MaxSquareSize / 2 * libshare.ShareSize

func init() {
registerBlock(
rowMultihashCode,
rowCodec,
maxRowSize,
shwap.RowIDSize,
func(cid cid.Cid) (Block, error) {
return EmptyRowBlockFromCID(cid)
Expand Down
4 changes: 4 additions & 0 deletions share/shwap/p2p/bitswap/row_namespace_data_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@ const (
rowNamespaceDataMultihashCode = 0x7821
)

// maxRNDSize is the maximum size of the RowNamespaceDataBlock.
var maxRNDSize = maxRowSize

func init() {
registerBlock(
rowNamespaceDataMultihashCode,
rowNamespaceDataCodec,
maxRNDSize,
shwap.RowNamespaceDataIDSize,
func(cid cid.Cid) (Block, error) {
return EmptyRowNamespaceDataBlockFromCID(cid)
Expand Down
8 changes: 8 additions & 0 deletions share/shwap/p2p/bitswap/sample_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package bitswap
import (
"context"
"fmt"
"math"

"github.com/ipfs/go-cid"

libshare "github.com/celestiaorg/go-square/v2/share"

"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/shwap"
Expand All @@ -21,10 +24,15 @@ const (
sampleMultihashCode = 0x7811
)

// maxSampleSize is the maximum size of the SampleBlock.
// It is calculated as the size of the share plus the size of the proof.
var maxSampleSize = libshare.ShareSize + share.AxisRootSize*int(math.Log2(float64(share.MaxSquareSize)))

func init() {
registerBlock(
sampleMultihashCode,
sampleCodec,
maxSampleSize,
shwap.SampleIDSize,
func(cid cid.Cid) (Block, error) {
return EmptySampleBlockFromCID(cid)
Expand Down

0 comments on commit 24ba6da

Please sign in to comment.