Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize expiry based de-duplication, dsmr #1810

Open
wants to merge 39 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
e5ecf69
eliminate emapChunk
tsachiherman Nov 22, 2024
ba644c3
update
tsachiherman Nov 22, 2024
2939543
update
tsachiherman Nov 25, 2024
08f0d59
Merge branch 'main' into tsachi/refactor_validity_window3
tsachiherman Nov 25, 2024
659f1fb
update per CR
tsachiherman Nov 25, 2024
921b908
lint
tsachiherman Nov 25, 2024
508cc92
apply requested changes.
tsachiherman Nov 25, 2024
7a3fed6
fix code + test.
tsachiherman Nov 25, 2024
b312a3c
rollback unneeded changes.
tsachiherman Nov 25, 2024
1238460
rename testing chain indexer.
tsachiherman Nov 25, 2024
8c1ab98
update
tsachiherman Nov 25, 2024
f8e42a8
add unit test.
tsachiherman Nov 26, 2024
b000edf
update unit test.
tsachiherman Nov 26, 2024
1d2536e
Merge branch 'main' into tsachi/refactor_validity_window3
tsachiherman Nov 27, 2024
6911f26
lint
tsachiherman Nov 27, 2024
f2816e7
update
tsachiherman Nov 28, 2024
e78f847
lint
tsachiherman Nov 28, 2024
270245a
update
tsachiherman Nov 28, 2024
144ddd4
Merge branch 'main' into tsachi/refactor_validity_window3
tsachiherman Dec 5, 2024
2fa701d
fix few CR feedback.
tsachiherman Dec 5, 2024
2220b5e
add certSet
tsachiherman Dec 6, 2024
39905b9
remove type declaration block.
tsachiherman Dec 6, 2024
d54e1a0
fix tests
tsachiherman Dec 6, 2024
8e83b6d
update
tsachiherman Dec 6, 2024
fdd4639
update
tsachiherman Dec 6, 2024
9e127f1
update
tsachiherman Dec 9, 2024
0f3fe73
create a wrapper for the block in dsmr (#1829)
tsachiherman Dec 9, 2024
fc87f31
update
tsachiherman Dec 10, 2024
cf82891
update
tsachiherman Dec 10, 2024
864acc1
updatge
tsachiherman Dec 10, 2024
d96160e
fix typo
tsachiherman Dec 10, 2024
9ea09b4
undo unwanted changes
tsachiherman Dec 10, 2024
30bcdce
update
tsachiherman Dec 10, 2024
8d52c05
update
tsachiherman Dec 10, 2024
32207b9
update unit test
tsachiherman Dec 12, 2024
5d62af7
update test
tsachiherman Dec 12, 2024
7ca779b
update
tsachiherman Dec 12, 2024
383a7ef
update
tsachiherman Dec 12, 2024
7902a6a
update per review feedback,.
tsachiherman Dec 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions chain/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewExecutionBlock(block *StatelessBlock) (*ExecutionBlock, error) {
}, nil
}

func (b *ExecutionBlock) ContainsTx(id ids.ID) bool {
func (b *ExecutionBlock) Contains(id ids.ID) bool {
return b.txsSet.Contains(id)
}

Expand All @@ -69,7 +69,7 @@ func (b *ExecutionBlock) Timestamp() int64 {
return b.Tmstmp
}

func (b *ExecutionBlock) Txs() []*Transaction {
func (b *ExecutionBlock) Containers() []*Transaction {
return b.StatelessBlock.Txs
}

Expand Down
4 changes: 2 additions & 2 deletions internal/validitywindow/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ type ExecutionBlock[Container emap.Item] interface {
Parent() ids.ID
Timestamp() int64
Height() uint64
Txs() []Container
ContainsTx(ids.ID) bool
Containers() []Container
Contains(ids.ID) bool
}

type ChainIndex[Container emap.Item] interface {
Expand Down
17 changes: 14 additions & 3 deletions internal/validitywindow/validitywindow.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"sync"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/trace"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
Expand Down Expand Up @@ -45,7 +46,7 @@ func (v *TimeValidityWindow[Container]) Accept(blk ExecutionBlock[Container]) {

evicted := v.seen.SetMin(blk.Timestamp())
v.log.Debug("txs evicted from seen", zap.Int("len", len(evicted)))
v.seen.Add(blk.Txs())
v.seen.Add(blk.Containers())
v.lastAcceptedBlockHeight = blk.Height()
}

Expand All @@ -62,13 +63,23 @@ func (v *TimeValidityWindow[Container]) VerifyExpiryReplayProtection(
return err
}

dup, err := v.isRepeat(ctx, parent, oldestAllowed, blk.Txs(), true)
dup, err := v.isRepeat(ctx, parent, oldestAllowed, blk.Containers(), true)
if err != nil {
return err
}
if dup.Len() > 0 {
return fmt.Errorf("%w: duplicate in ancestry", ErrDuplicateContainer)
}
// make sure we have no repeats within the block itself.
// set.Set
blkTxsIDs := set.NewSet[ids.ID](len(blk.Containers()))
for _, tx := range blk.Containers() {
id := tx.GetID()
if blkTxsIDs.Contains(id) {
return fmt.Errorf("%w: duplicate in block", ErrDuplicateContainer)
}
blkTxsIDs.Add(id)
}
return nil
}

Expand Down Expand Up @@ -110,7 +121,7 @@ func (v *TimeValidityWindow[Container]) isRepeat(
if marker.Contains(i) {
continue
}
if ancestorBlk.ContainsTx(tx.GetID()) {
if ancestorBlk.Contains(tx.GetID()) {
marker.Add(i)
if stop {
return marker, nil
Expand Down
54 changes: 54 additions & 0 deletions x/dsmr/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package dsmr
import (
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/crypto/bls"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/utils/wrappers"
"github.com/ava-labs/avalanchego/vms/platformvm/warp"

Expand Down Expand Up @@ -105,6 +106,41 @@ func ParseChunk[T Tx](chunkBytes []byte) (Chunk[T], error) {
return c, c.init()
}

// ExecutionBlock bridge the gap between the dsmr's block implementation and the validity window's execution block interface.
type ExecutionBlock struct {
innerBlock Block
}

func (e ExecutionBlock) Timestamp() int64 {
return e.innerBlock.Timestamp
}

func (e ExecutionBlock) Height() uint64 {
return e.innerBlock.Height
}

func (e ExecutionBlock) Contains(id ids.ID) bool {
return e.innerBlock.certSet.Contains(id)
}

func (e ExecutionBlock) Parent() ids.ID {
return e.innerBlock.ParentID
}

func (e ExecutionBlock) Containers() []*emapChunkCertificate {
emapChunkCert := make([]*emapChunkCertificate, len(e.innerBlock.ChunkCerts))
for i := range emapChunkCert {
emapChunkCert[i] = &emapChunkCertificate{*e.innerBlock.ChunkCerts[i]}
}
return emapChunkCert
}

func NewExecutionBlock(innerBlock Block) ExecutionBlock {
return ExecutionBlock{
innerBlock: innerBlock,
}
}

type Block struct {
ParentID ids.ID `serialize:"true"`
Height uint64 `serialize:"true"`
Expand All @@ -114,8 +150,26 @@ type Block struct {

blkID ids.ID
blkBytes []byte
certSet set.Set[ids.ID]
}

func NewBlock(parentID ids.ID, height uint64, timestamp int64, chunkCerts []*ChunkCertificate) Block {
blk := Block{
ParentID: parentID,
Height: height,
Timestamp: timestamp,
ChunkCerts: chunkCerts,
}
blk.init()
return blk
}

func (b Block) GetID() ids.ID {
return b.blkID
}

func (b Block) init() {
for _, c := range b.ChunkCerts {
b.certSet.Add(c.ChunkID)
}
}
8 changes: 8 additions & 0 deletions x/dsmr/certificate.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ type ChunkReference struct {
Expiry int64 `serialize:"true"`
}

type emapChunkCertificate struct {
ChunkCertificate
}

func (e emapChunkCertificate) GetID() ids.ID { return e.ChunkID }

func (e emapChunkCertificate) GetExpiry() int64 { return e.Expiry }

type ChunkCertificate struct {
ChunkReference `serialize:"true"`
Signature *warp.BitSetSignature `serialize:"true"`
Expand Down
55 changes: 44 additions & 11 deletions x/dsmr/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ import (
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/network/p2p/acp118"
"github.com/ava-labs/avalanchego/trace"
"github.com/ava-labs/avalanchego/utils/crypto/bls"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/wrappers"
"github.com/ava-labs/avalanchego/vms/platformvm/warp"

"github.com/ava-labs/hypersdk/codec"
"github.com/ava-labs/hypersdk/consts"
"github.com/ava-labs/hypersdk/internal/validitywindow"
"github.com/ava-labs/hypersdk/proto/pb/dsmr"
"github.com/ava-labs/hypersdk/utils"

Expand All @@ -36,6 +38,7 @@ var (

ErrEmptyChunk = errors.New("empty chunk")
ErrNoAvailableChunkCerts = errors.New("no available chunk certs")
ErrAllChunkCertsDuplicate = errors.New("all chunk certs are duplicated")
ErrTimestampNotMonotonicallyIncreasing = errors.New("block timestamp must be greater than parent timestamp")
ErrEmptyBlock = errors.New("block must reference chunks")
ErrInvalidBlockParent = errors.New("invalid referenced block parent")
Expand All @@ -54,6 +57,7 @@ type Validator struct {

func New[T Tx](
log logging.Logger,
tracer trace.Tracer,
nodeID ids.NodeID,
networkID uint32,
chainID ids.ID,
Expand All @@ -70,6 +74,8 @@ func New[T Tx](
lastAccepted Block,
quorumNum uint64,
quorumDen uint64,
chainIndex validitywindow.ChainIndex[*emapChunkCertificate],
validityWindowDuration time.Duration,
) (*Node[T], error) {
return &Node[T]{
ID: nodeID,
Expand All @@ -88,6 +94,10 @@ func New[T Tx](
GetChunkSignatureHandler: getChunkSignatureHandler,
ChunkCertificateGossipHandler: chunkCertificateGossipHandler,
storage: chunkStorage,
log: log,
tracer: tracer,
validityWindow: validitywindow.NewTimeValidityWindow(log, tracer, chainIndex),
validityWindowDuration: validityWindowDuration,
}, nil
}

Expand All @@ -109,6 +119,10 @@ type Node[T Tx] struct {
GetChunkSignatureHandler p2p.Handler
ChunkCertificateGossipHandler p2p.Handler
storage *ChunkStorage[T]
log logging.Logger
tracer trace.Tracer
validityWindowDuration time.Duration
validityWindow *validitywindow.TimeValidityWindow[*emapChunkCertificate]
}

// BuildChunk builds transactions into a Chunk
Expand Down Expand Up @@ -217,31 +231,41 @@ func (n *Node[T]) BuildChunk(
return chunk, chunkCert, n.storage.AddLocalChunkWithCert(chunk, &chunkCert)
}

func (n *Node[T]) BuildBlock(parent Block, timestamp int64) (Block, error) {
func (n *Node[T]) BuildBlock(ctx context.Context, parent Block, timestamp int64) (Block, error) {
if timestamp <= parent.Timestamp {
return Block{}, ErrTimestampNotMonotonicallyIncreasing
}

chunkCerts := n.storage.GatherChunkCerts()
oldestAllowed := max(0, timestamp-int64(n.validityWindowDuration))

emapChunkCert := make([]*emapChunkCertificate, len(chunkCerts))
for i := range emapChunkCert {
emapChunkCert[i] = &emapChunkCertificate{*chunkCerts[i]}
}
dup, err := n.validityWindow.IsRepeat(ctx, NewExecutionBlock(parent), emapChunkCert, oldestAllowed)
if err != nil {
return Block{}, err
}

availableChunkCerts := make([]*ChunkCertificate, 0)
for _, chunkCert := range chunkCerts {
// avoid building blocks with expired chunk certs
if chunkCert.Expiry < timestamp {
for i, chunkCert := range chunkCerts {
// avoid building blocks with duplicate or expired chunk certs
if chunkCert.Expiry < timestamp || dup.Contains(i) {
continue
}

availableChunkCerts = append(availableChunkCerts, chunkCert)
}
if len(availableChunkCerts) == 0 {
return Block{}, ErrNoAvailableChunkCerts
}

blk := Block{
ParentID: parent.GetID(),
Height: parent.Height + 1,
Timestamp: timestamp,
ChunkCerts: availableChunkCerts,
}
blk := NewBlock(
parent.GetID(),
parent.Height+1,
timestamp,
availableChunkCerts,
)

packer := wrappers.Packer{Bytes: make([]byte, 0, InitialChunkSize), MaxSize: consts.NetworkSizeLimit}
if err := codec.LinearCodec.MarshalInto(blk, &packer); err != nil {
Expand Down Expand Up @@ -281,6 +305,13 @@ func (n *Node[T]) Verify(ctx context.Context, parent Block, block Block) error {
return fmt.Errorf("%w: %s", ErrEmptyBlock, block.GetID())
}

// Find repeats
oldestAllowed := max(0, block.Timestamp-int64(n.validityWindowDuration))

if err := n.validityWindow.VerifyExpiryReplayProtection(ctx, NewExecutionBlock(block), oldestAllowed); err != nil {
return err
}

for _, chunkCert := range block.ChunkCerts {
if err := chunkCert.Verify(
ctx,
Expand Down Expand Up @@ -340,6 +371,8 @@ func (n *Node[T]) Accept(ctx context.Context, block Block) error {
}
}
}
// update the validity window with the accepted block.
n.validityWindow.Accept(NewExecutionBlock(block))

if err := n.storage.SetMin(block.Timestamp, chunkIDs); err != nil {
return fmt.Errorf("failed to prune chunks: %w", err)
Expand Down
Loading
Loading