Skip to content

Commit

Permalink
fix(nodebuilder/share): Only provide necessary components to Bridge (c…
Browse files Browse the repository at this point in the history
…elestiaorg#1778)

Bridge does not need: 
* shrexeds.Client
* shrexnd.Client
* *shrex.Getter
* peer.Manager

Bridge and Lights will also spawn EnsurePeers lifecycle from nodebuilder
instead (as they don't use peer.Manager yet)

---------

Co-authored-by: Wondertan <[email protected]>
  • Loading branch information
renaynay and Wondertan authored Feb 22, 2023
1 parent dabe2c4 commit 96fd24b
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 71 deletions.
5 changes: 3 additions & 2 deletions nodebuilder/share/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability/cache"
disc "github.com/celestiaorg/celestia-node/share/availability/discovery"
"github.com/celestiaorg/celestia-node/share/availability/light"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/getters"
)
Expand All @@ -36,8 +37,8 @@ func discovery(cfg Config) func(routing.ContentRouting, host.Host) *disc.Discove
}
}

// cacheAvailability wraps either Full or Light availability with a cache for result sampling.
func cacheAvailability[A share.Availability](lc fx.Lifecycle, ds datastore.Batching, avail A) share.Availability {
// cacheAvailability wraps light availability with a cache for result sampling.
func cacheAvailability(lc fx.Lifecycle, ds datastore.Batching, avail *light.ShareAvailability) share.Availability {
ca := cache.NewShareAvailability(avail, ds)
lc.Append(fx.Hook{
OnStop: ca.Close,
Expand Down
158 changes: 89 additions & 69 deletions nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,77 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
fx.Provide(newModule),
)

bridgeAndFullComponents := fx.Options(
fx.Invoke(func(edsSrv *shrexeds.Server, ndSrc *shrexnd.Server) {}),
fx.Provide(fx.Annotate(
func(host host.Host, store *eds.Store, network modp2p.Network) (*shrexeds.Server, error) {
return shrexeds.NewServer(host, store, shrexeds.WithProtocolSuffix(network.String()))
},
fx.OnStart(func(ctx context.Context, server *shrexeds.Server) error {
return server.Start(ctx)
}),
fx.OnStop(func(ctx context.Context, server *shrexeds.Server) error {
return server.Stop(ctx)
}),
)),
fx.Provide(fx.Annotate(
func(
host host.Host,
store *eds.Store,
getter share.Getter,
network modp2p.Network,
) (*shrexnd.Server, error) {
return shrexnd.NewServer(host, store, getter, shrexnd.WithProtocolSuffix(network.String()))
},
fx.OnStart(func(ctx context.Context, server *shrexnd.Server) error {
return server.Start(ctx)
}),
fx.OnStop(func(ctx context.Context, server *shrexnd.Server) error {
return server.Stop(ctx)
}),
)),
fx.Provide(fx.Annotate(
func(path node.StorePath, ds datastore.Batching) (*eds.Store, error) {
return eds.NewStore(string(path), ds)
},
fx.OnStart(func(ctx context.Context, store *eds.Store) error {
err := store.Start(ctx)
if err != nil {
return err
}

return ensureEmptyCARExists(ctx, store)
}),
fx.OnStop(func(ctx context.Context, store *eds.Store) error {
return store.Stop(ctx)
}),
)),
fx.Provide(fx.Annotate(
full.NewShareAvailability,
fx.OnStart(func(ctx context.Context, avail *full.ShareAvailability) error {
return avail.Start(ctx)
}),
fx.OnStop(func(ctx context.Context, avail *full.ShareAvailability) error {
return avail.Stop(ctx)
}),
)),
fx.Provide(func(avail *full.ShareAvailability) share.Availability {
return avail
}),
fx.Provide(func(shrexSub *shrexsub.PubSub) shrexsub.BroadcastFn {
return shrexSub.Broadcast
}),
fx.Provide(
func(ctx context.Context, h host.Host, network modp2p.Network) (*shrexsub.PubSub, error) {
return shrexsub.NewPubSub(
ctx,
h,
network.String(),
)
},
),
)

switch tp {
case node.Light:
return fx.Module(
Expand All @@ -58,71 +129,29 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
fx.Provide(fx.Annotate(light.NewShareAvailability)),
// cacheAvailability's lifecycle continues to use a fx hook,
// since the LC requires a cacheAvailability but the constructor returns a share.Availability
fx.Provide(cacheAvailability[*light.ShareAvailability]),
fx.Provide(cacheAvailability),
)
case node.Bridge, node.Full:
case node.Bridge:
return fx.Module(
"share",
baseComponents,
fx.Invoke(func(edsSrv *shrexeds.Server, ndSrc *shrexnd.Server) {}),
fx.Provide(fx.Annotate(
func(host host.Host, store *eds.Store, network modp2p.Network) (*shrexeds.Server, error) {
return shrexeds.NewServer(host, store, shrexeds.WithProtocolSuffix(network.String()))
},
fx.OnStart(func(ctx context.Context, server *shrexeds.Server) error {
return server.Start(ctx)
}),
fx.OnStop(func(ctx context.Context, server *shrexeds.Server) error {
return server.Stop(ctx)
}),
)),
fx.Provide(fx.Annotate(
func(
host host.Host,
store *eds.Store,
getter *getters.IPLDGetter,
network modp2p.Network,
) (*shrexnd.Server, error) {
return shrexnd.NewServer(host, store, getter, shrexnd.WithProtocolSuffix(network.String()))
},
fx.OnStart(func(ctx context.Context, server *shrexnd.Server) error {
return server.Start(ctx)
}),
fx.OnStop(func(ctx context.Context, server *shrexnd.Server) error {
return server.Stop(ctx)
}),
)),
fx.Provide(fx.Annotate(
func(path node.StorePath, ds datastore.Batching) (*eds.Store, error) {
return eds.NewStore(string(path), ds)
},
fx.OnStart(func(ctx context.Context, store *eds.Store) error {
err := store.Start(ctx)
if err != nil {
return err
}

return ensureEmptyCARExists(ctx, store)
}),
fx.OnStop(func(ctx context.Context, store *eds.Store) error {
return store.Stop(ctx)
}),
)),
fx.Provide(fx.Annotate(
full.NewShareAvailability,
fx.OnStart(func(ctx context.Context, avail *full.ShareAvailability) error {
return avail.Start(ctx)
}),
fx.OnStop(func(ctx context.Context, avail *full.ShareAvailability) error {
return avail.Stop(ctx)
}),
)),
// cacheAvailability's lifecycle continues to use a fx hook,
// since the LC requires a cacheAvailability but the constructor returns a share.Availability
fx.Provide(cacheAvailability[*full.ShareAvailability]),
fx.Provide(func(shrexSub *shrexsub.PubSub) shrexsub.BroadcastFn {
return shrexSub.Broadcast
bridgeAndFullComponents,
fx.Provide(func(store *eds.Store) share.Getter {
return getters.NewStoreGetter(store)
}),
fx.Invoke(func(lc fx.Lifecycle, sub *shrexsub.PubSub) error {
lc.Append(fx.Hook{
OnStart: sub.Start,
OnStop: sub.Stop,
})
return nil
}),
)
case node.Full:
return fx.Module(
"share",
baseComponents,
bridgeAndFullComponents,
fx.Provide(fullGetter),
fx.Provide(
func(host host.Host, network modp2p.Network) (*shrexnd.Client, error) {
Expand All @@ -143,15 +172,6 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
return getter.Stop(ctx)
}),
)),
fx.Provide(
func(ctx context.Context, h host.Host, network modp2p.Network) (*shrexsub.PubSub, error) {
return shrexsub.NewPubSub(
ctx,
h,
network.String(),
)
},
),
fx.Provide(peers.NewManager),
fx.Provide(getters.NewIPLDGetter),
)
Expand Down

0 comments on commit 96fd24b

Please sign in to comment.