Skip to content

Commit

Permalink
feat(share): Integrate rearchitectured blocksync into fx (celestiaorg…
Browse files Browse the repository at this point in the history
  • Loading branch information
distractedm1nd committed Feb 16, 2023
1 parent 4bc4048 commit b5e1c65
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 19 deletions.
36 changes: 35 additions & 1 deletion nodebuilder/share/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,21 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/routing"
routingdisc "github.com/libp2p/go-libp2p/p2p/discovery/routing"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-app/pkg/da"
"github.com/celestiaorg/celestia-node/header"
libhead "github.com/celestiaorg/celestia-node/libs/header"
modp2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability/cache"
disc "github.com/celestiaorg/celestia-node/share/availability/discovery"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/getters"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"

"github.com/celestiaorg/celestia-app/pkg/da"
)

func discovery(cfg Config) func(routing.ContentRouting, host.Host) *disc.Discovery {
Expand Down Expand Up @@ -57,3 +65,29 @@ func ensureEmptyCARExists(ctx context.Context, store *eds.Store) error {
}
return err
}

func peerManager(
headerSub libhead.Subscriber[*header.ExtendedHeader],
shrexSub *shrexsub.PubSub,
discovery *disc.Discovery,
host host.Host,
connGater *conngater.BasicConnectionGater,
) *peers.Manager {
// TODO: find better syncTimeout duration?
return peers.NewManager(headerSub, shrexSub, discovery, host, connGater, modp2p.BlockTime*5)
}

func fullGetter(
store *eds.Store,
shrexGetter *getters.ShrexGetter,
ipldGetter *getters.IPLDGetter,
) share.Getter {
return getters.NewCascadeGetter(
[]share.Getter{
getters.NewStoreGetter(store),
getters.NewTeeGetter(shrexGetter, store),
getters.NewTeeGetter(ipldGetter, store),
},
modp2p.BlockTime,
)
}
57 changes: 44 additions & 13 deletions nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/getters"
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
)

Expand All @@ -29,7 +30,12 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
fx.Options(options...),
fx.Provide(discovery(*cfg)),
fx.Provide(newModule),
fxutil.ProvideAs(getters.NewIPLDGetter, new(share.Getter)),
// TODO: Configure for light nodes
fx.Provide(
func(host host.Host, network modp2p.Network) (*shrexnd.Client, error) {
return shrexnd.NewClient(host, shrexnd.WithProtocolSuffix(string(network)))
},
),
)

switch tp {
Expand All @@ -38,20 +44,24 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
"share",
baseComponents,
fx.Invoke(share.EnsureEmptySquareExists),
fx.Provide(fx.Annotate(light.NewShareAvailability)),
// cacheAvailability's lifecycle continues to use a fx hook,
// since the LC requires a cacheAvailability but the constructor returns a share.Availability
fx.Provide(cacheAvailability[*light.ShareAvailability]),
// shrexsub broadcaster stub for daser
fx.Provide(func() shrexsub.BroadcastFn {
return func(context.Context, share.DataHash) error {
return nil
}
}),
fxutil.ProvideAs(getters.NewIPLDGetter, new(share.Getter)),
fx.Provide(fx.Annotate(light.NewShareAvailability)),
// cacheAvailability's lifecycle continues to use a fx hook,
// since the LC requires a cacheAvailability but the constructor returns a share.Availability
fx.Provide(cacheAvailability[*light.ShareAvailability]),
)
case node.Bridge, node.Full:
return fx.Module(
"share",
baseComponents,
fx.Provide(getters.NewIPLDGetter),
fx.Invoke(func(edsSrv *shrexeds.Server, ndSrc *shrexnd.Server) {}),
fx.Provide(fx.Annotate(
func(host host.Host, store *eds.Store, network modp2p.Network) (*shrexeds.Server, error) {
return shrexeds.NewServer(host, store, shrexeds.WithProtocolSuffix(string(network)))
Expand All @@ -63,6 +73,22 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
return server.Stop(ctx)
}),
)),
fx.Provide(fx.Annotate(
func(
host host.Host,
store *eds.Store,
getter *getters.IPLDGetter,
network modp2p.Network,
) (*shrexnd.Server, error) {
return shrexnd.NewServer(host, store, getter, shrexnd.WithProtocolSuffix(string(network)))
},
fx.OnStart(func(ctx context.Context, server *shrexnd.Server) error {
return server.Start(ctx)
}),
fx.OnStop(func(ctx context.Context, server *shrexnd.Server) error {
return server.Stop(ctx)
}),
)),
// Bridge Nodes need a client as well, for requests over FullAvailability
fx.Provide(
func(host host.Host, network modp2p.Network) (*shrexeds.Client, error) {
Expand Down Expand Up @@ -94,27 +120,32 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
return avail.Stop(ctx)
}),
)),
fx.Provide(fx.Annotate(
fx.Provide(
func(ctx context.Context, h host.Host, network modp2p.Network) (*shrexsub.PubSub, error) {
return shrexsub.NewPubSub(
ctx,
h,
string(network),
)
},
fx.OnStart(func(ctx context.Context, pubsub *shrexsub.PubSub) error {
return pubsub.Start(ctx)
),
// cacheAvailability's lifecycle continues to use a fx hook,
// since the LC requires a cacheAvailability but the constructor returns a share.Availability
fx.Provide(cacheAvailability[*full.ShareAvailability]),
fx.Provide(fx.Annotate(
getters.NewShrexGetter,
fx.OnStart(func(ctx context.Context, getter *getters.ShrexGetter) error {
return getter.Start(ctx)
}),
fx.OnStop(func(ctx context.Context, pubsub *shrexsub.PubSub) error {
return pubsub.Stop(ctx)
fx.OnStop(func(ctx context.Context, getter *getters.ShrexGetter) error {
return getter.Stop(ctx)
}),
)),
fx.Provide(func(shrexSub *shrexsub.PubSub) shrexsub.BroadcastFn {
return shrexSub.Broadcast
}),
// cacheAvailability's lifecycle continues to use a fx hook,
// since the LC requires a cacheAvailability but the constructor returns a share.Availability
fx.Provide(cacheAvailability[*full.ShareAvailability]),
fx.Provide(peerManager),
fx.Provide(fullGetter),
)
default:
panic("invalid node type")
Expand Down
19 changes: 18 additions & 1 deletion share/getters/shrex.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

var _ share.Getter = (*ShrexGetter)(nil)

const MaxRequestDuration = time.Second * 10
const defaultMaxRequestDuration = time.Second * 10

// ShrexGetter is a share.Getter that uses the shrex/eds and shrex/nd protocol to retrieve shares.
type ShrexGetter struct {
Expand All @@ -28,6 +28,23 @@ type ShrexGetter struct {
maxRequestDuration time.Duration
}

func NewShrexGetter(edsClient *shrexeds.Client, ndClient *shrexnd.Client, peerManager *peers.Manager) *ShrexGetter {
return &ShrexGetter{
edsClient: edsClient,
ndClient: ndClient,
peerManager: peerManager,
maxRequestDuration: defaultMaxRequestDuration,
}
}

func (sg *ShrexGetter) Start(ctx context.Context) error {
return sg.peerManager.Start(ctx)
}

func (sg *ShrexGetter) Stop(ctx context.Context) error {
return sg.peerManager.Stop(ctx)
}

func (sg *ShrexGetter) GetShare(ctx context.Context, root *share.Root, row, col int) (share.Share, error) {
return nil, errors.New("shrex-getter: GetShare is not supported")
}
Expand Down
8 changes: 6 additions & 2 deletions share/getters/shrex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,12 @@ func TestGetSharesWithProofByNamespace(t *testing.T) {
srvHost := net.NewTestNode().Host
srv, err := shrexnd.NewServer(srvHost, edsStore, NewIPLDGetter(bServ))
require.NoError(t, err)
srv.Start()
t.Cleanup(srv.Stop)
err = srv.Start(ctx)
require.NoError(t, err)

t.Cleanup(func() {
_ = srv.Stop(ctx)
})

// create client and connect it to server
client, err := shrexnd.NewClient(net.NewTestNode().Host)
Expand Down
6 changes: 4 additions & 2 deletions share/p2p/shrexnd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,21 @@ func NewServer(host host.Host, store *eds.Store, getter share.Getter, opts ...Op
}

// Start starts the server
func (srv *Server) Start() {
func (srv *Server) Start(context.Context) error {
ctx, cancel := context.WithCancel(context.Background())
srv.cancel = cancel

srv.host.SetStreamHandler(srv.protocolID, func(s network.Stream) {
srv.handleNamespacedData(ctx, s)
})
return nil
}

// Stop stops the server
func (srv *Server) Stop() {
func (srv *Server) Stop(context.Context) error {
srv.cancel()
srv.host.RemoveStreamHandler(srv.protocolID)
return nil
}

func (srv *Server) handleNamespacedData(ctx context.Context, stream network.Stream) {
Expand Down

0 comments on commit b5e1c65

Please sign in to comment.