Skip to content

Commit

Permalink
swamp: rework fill blocks (celestiaorg#1000)
Browse files Browse the repository at this point in the history
* swamp: rework fill blocks

* swamp: attempt to fix flakiness in TestFullReconstructFromLights

* swamp: subscribe on fraud proof before node starts
  • Loading branch information
vgonkivs authored Aug 12, 2022
1 parent cb6f845 commit 208eb0b
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 34 deletions.
7 changes: 5 additions & 2 deletions node/tests/fraud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,14 @@ func TestFraudProofBroadcasting(t *testing.T) {
store := node.MockStore(t, node.DefaultConfig(node.Full))
full := sw.NewNodeWithStore(node.Full, store, node.WithTrustedPeers(addrs[0].String()))

err = full.Start(ctx)
// subscribe to fraud proof before node starts helps
// to prevent flakiness when fraud proof is propagating before subscribing on it
subscr, err := full.FraudServ.Subscribe(fraud.BadEncoding)
require.NoError(t, err)

subscr, err := full.FraudServ.Subscribe(fraud.BadEncoding)
err = full.Start(ctx)
require.NoError(t, err)

_, err = subscr.Proof(ctx)
require.NoError(t, err)

Expand Down
64 changes: 42 additions & 22 deletions node/tests/reconstruct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ func TestFullReconstructFromBridge(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
t.Cleanup(cancel)
sw := swamp.NewSwamp(t, swamp.WithBlockTime(btime))
go sw.FillBlocks(ctx, t, bsize, blocks)
errCh := make(chan error)
go func() {
errCh <- sw.FillBlocks(ctx, bsize, blocks)
}()

bridge := sw.NewBridgeNode()
err := bridge.Start(ctx)
Expand All @@ -66,9 +69,8 @@ func TestFullReconstructFromBridge(t *testing.T) {
return full.ShareServ.SharesAvailable(bctx, h.DAH)
})
}

err = errg.Wait()
require.NoError(t, err)
require.NoError(t, <-errCh)
require.NoError(t, errg.Wait())
}

/*
Expand All @@ -79,12 +81,13 @@ Pre-Reqs:
Steps:
1. Create a Bridge Node(BN)
2. Start a BN
3. Create 69 Light Nodes(LNs) with BN as a trusted peer
4. Start 69 LNs
5. Create a Full Node(FN) with 69 LNs as trusted peers
6. Unlink FN connection to BN
7. Start a FN
8. Check that a FN can retrieve shares from 1 to 20 blocks
3. Create a Full Node(FN) that will act as a bootstraper
4. Create 69 Light Nodes(LNs) with BN as a trusted peer and a bootstaper
5. Start 69 LNs
6. Create a Full Node(FN) with a bootstraper
7. Unlink FN connection to BN
8. Start a FN
9. Check that a FN can retrieve shares from 1 to 20 blocks
*/
func TestFullReconstructFromLights(t *testing.T) {
ipld.RetrieveQuadrantTimeout = time.Millisecond * 100
Expand All @@ -96,36 +99,52 @@ func TestFullReconstructFromLights(t *testing.T) {
lnodes = 69
)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
t.Cleanup(cancel)
sw := swamp.NewSwamp(t, swamp.WithBlockTime(btime))
go sw.FillBlocks(ctx, t, bsize, blocks)
errCh := make(chan error)
go func() {
errCh <- sw.FillBlocks(ctx, bsize, blocks)
}()

cfg := node.DefaultConfig(node.Bridge)
cfg.P2P.Bootstrapper = true
const defaultTimeInterval = time.Second * 10
const defaultTimeInterval = time.Second * 5
var defaultOptions = []node.Option{
node.WithRefreshRoutingTablePeriod(defaultTimeInterval),
node.WithDiscoveryInterval(defaultTimeInterval),
node.WithAdvertiseInterval(defaultTimeInterval),
}

bridgeConfig := append([]node.Option{node.WithConfig(cfg)}, defaultOptions...)
cfg := node.DefaultConfig(node.Full)
cfg.P2P.Bootstrapper = true
bridge := sw.NewBridgeNode(bridgeConfig...)
bridge := sw.NewBridgeNode()
addrsBridge, err := peer.AddrInfoToP2pAddrs(host.InfoFromHost(bridge.Host))
require.NoError(t, err)
bootstrapConfig := append([]node.Option{node.WithConfig(cfg)}, defaultOptions...)
bootstapFN := sw.NewFullNode(bootstrapConfig...)
require.NoError(t, bootstapFN.Start(ctx))
require.NoError(t, bridge.Start(ctx))
addr := host.InfoFromHost(bridge.Host)
addrBootstrapNode := host.InfoFromHost(bootstapFN.Host)

nodesConfig := append([]node.Option{node.WithBootstrappers([]peer.AddrInfo{*addr})}, defaultOptions...)
nodesConfig := append(
[]node.Option{
node.WithTrustedPeers(addrsBridge[0].String()),
node.WithBootstrappers([]peer.AddrInfo{*addrBootstrapNode})},
defaultOptions...,
)
full := sw.NewFullNode(nodesConfig...)
lights := make([]*node.Node, lnodes)
subs := make([]event.Subscription, lnodes)
errg, errCtx := errgroup.WithContext(ctx)
for i := 0; i < lnodes; i++ {
i := i
errg.Go(func() error {
light := sw.NewLightNode(nodesConfig...)
sub, err := light.Host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
lnConfig := append(
[]node.Option{
node.WithTrustedPeers(addrsBridge[0].String())},
nodesConfig...,
)
light := sw.NewLightNode(lnConfig...)
sub, err := light.Host.EventBus().Subscribe(&event.EvtPeerIdentificationCompleted{})
if err != nil {
return err
}
Expand All @@ -141,6 +160,7 @@ func TestFullReconstructFromLights(t *testing.T) {
case <-ctx.Done():
t.Fatal("peer was not found")
case <-subs[i].Out():
require.NoError(t, subs[i].Close())
continue
}
}
Expand All @@ -156,7 +176,7 @@ func TestFullReconstructFromLights(t *testing.T) {
return full.ShareServ.SharesAvailable(bctx, h.DAH)
})
}

require.NoError(t, <-errCh)
require.NoError(t, errg.Wait())
}

Expand Down
26 changes: 16 additions & 10 deletions node/tests/swamp/swamp_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,45 @@ package swamp

import (
"context"
"fmt"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/ipld"
)

// SubmitData submits given data in the block.
// TODO(@Wondertan): This must be a real PFD using celestia-app, once we able to run App
// in the Swamp.
func (s *Swamp) SubmitData(ctx context.Context, t *testing.T, data []byte) {
func (s *Swamp) SubmitData(ctx context.Context, data []byte) error {
result, err := s.CoreClient.BroadcastTxSync(ctx, append([]byte("key="), data...))
require.NoError(t, err)
require.Zero(t, result.Code)
if err != nil {
return err
}
if result.Code != 0 {
return fmt.Errorf("invalid status code: %d", result.Code)
}

return nil
}

func (s *Swamp) FillBlocks(ctx context.Context, t *testing.T, bsize, blocks int) {
func (s *Swamp) FillBlocks(ctx context.Context, bsize, blocks int) error {
btime := s.comps.CoreCfg.Consensus.CreateEmptyBlocksInterval
timer := time.NewTimer(btime)
defer timer.Stop()

data := make([]byte, bsize*ipld.ShareSize)
for range make([]int, blocks) {
rand.Read(data) //nolint:gosec
s.SubmitData(ctx, t, data)

if err := s.SubmitData(ctx, data); err != nil {
return err
}
timer.Reset(btime)
select {
case <-timer.C:
case <-ctx.Done():
return
return ctx.Err()
}
}
return nil
}

0 comments on commit 208eb0b

Please sign in to comment.