Skip to content

Commit

Permalink
refactor: changing nodebuilder/<pkg>/service.go to <pkg>.go (cele…
Browse files Browse the repository at this point in the history
  • Loading branch information
Ryan authored Nov 25, 2022
1 parent 3d8817a commit 95907f4
Show file tree
Hide file tree
Showing 15 changed files with 199 additions and 195 deletions.
3 changes: 2 additions & 1 deletion api/docgen/openrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func ParseCommentsFromNodebuilderModules(moduleNames ...string) Comments {
fset := token.NewFileSet()
nodeComments := make(Comments)
for _, moduleName := range moduleNames {
f, err := parser.ParseFile(fset, "nodebuilder/"+moduleName+"/service.go", nil, parser.AllErrors|parser.ParseComments)
fileName := fmt.Sprintf("nodebuilder/%s/%s.go", moduleName, moduleName)
f, err := parser.ParseFile(fset, fileName, nil, parser.AllErrors|parser.ParseComments)
if err != nil {
panic(err)
}
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
79 changes: 79 additions & 0 deletions nodebuilder/header/constructors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package header

import (
"context"
"time"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/header/p2p"
"github.com/celestiaorg/celestia-node/header/store"
"github.com/celestiaorg/celestia-node/header/sync"
modp2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p"
)

// newP2PServer constructs a new ExchangeServer using the given Network as a protocolID suffix.
func newP2PServer(host host.Host, store header.Store, network modp2p.Network) *p2p.ExchangeServer {
return p2p.NewExchangeServer(host, store, string(network))
}

// newP2PExchange constructs a new Exchange for headers.
func newP2PExchange(cfg Config) func(modp2p.Bootstrappers, modp2p.Network, host.Host) (header.Exchange, error) {
return func(bpeers modp2p.Bootstrappers, network modp2p.Network, host host.Host) (header.Exchange, error) {
peers, err := cfg.trustedPeers(bpeers)
if err != nil {
return nil, err
}
ids := make([]peer.ID, len(peers))
for index, peer := range peers {
ids[index] = peer.ID
host.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL)
}
return p2p.NewExchange(host, ids, string(network)), nil
}
}

// newSyncer constructs new Syncer for headers.
func newSyncer(ex header.Exchange, store initStore, sub header.Subscriber, duration time.Duration) *sync.Syncer {
return sync.NewSyncer(ex, store, sub, duration)
}

// initStore is a type representing initialized header store.
// NOTE: It is needed to ensure that Store is always initialized before Syncer is started.
type initStore header.Store

// newInitStore constructs an initialized store
func newInitStore(
lc fx.Lifecycle,
cfg Config,
net modp2p.Network,
s header.Store,
ex header.Exchange,
) (initStore, error) {
trustedHash, err := cfg.trustedHash(net)
if err != nil {
return nil, err
}

lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
err = store.Init(ctx, s, ex, trustedHash)
if err != nil {
// TODO(@Wondertan): Error is ignored, as otherwise unit tests for Node construction fail.
// This is due to requesting step of initialization, which fetches initial Header by trusted hash from
// the network. The step can't be done during unit tests and fixing it would require either
// * Having some test/dev/offline mode for Node that mocks out all the networking
// * Hardcoding full extended header in params pkg, instead of hashes, so we avoid requesting step
// * Or removing explicit initialization in favor of automated initialization by Syncer
log.Errorf("initializing header store failed: %s", err)
}
return nil
},
})

return s, nil
}
86 changes: 18 additions & 68 deletions nodebuilder/header/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,78 +2,28 @@ package header

import (
"context"
"time"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/header/p2p"
"github.com/celestiaorg/celestia-node/header/store"
"github.com/celestiaorg/celestia-node/header/sync"
modp2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p"
)

// newP2PServer constructs a new ExchangeServer using the given Network as a protocolID suffix.
func newP2PServer(host host.Host, store header.Store, network modp2p.Network) *p2p.ExchangeServer {
return p2p.NewExchangeServer(host, store, string(network))
}

// newP2PExchange constructs a new Exchange for headers.
func newP2PExchange(cfg Config) func(modp2p.Bootstrappers, modp2p.Network, host.Host) (header.Exchange, error) {
return func(bpeers modp2p.Bootstrappers, network modp2p.Network, host host.Host) (header.Exchange, error) {
peers, err := cfg.trustedPeers(bpeers)
if err != nil {
return nil, err
}
ids := make([]peer.ID, len(peers))
for index, peer := range peers {
ids[index] = peer.ID
host.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL)
}
return p2p.NewExchange(host, ids, string(network)), nil
}
}

// newSyncer constructs new Syncer for headers.
func newSyncer(ex header.Exchange, store initStore, sub header.Subscriber, duration time.Duration) *sync.Syncer {
return sync.NewSyncer(ex, store, sub, duration)
// Module exposes the functionality needed for querying headers from the network.
// Any method signature changed here needs to also be changed in the API struct.
//
//go:generate mockgen -destination=mocks/api.go -package=mocks . Module
type Module interface {
// GetByHeight returns the ExtendedHeader at the given height, blocking
// until header has been processed by the store or context deadline is exceeded.
GetByHeight(context.Context, uint64) (*header.ExtendedHeader, error)
// Head returns the ExtendedHeader of the chain head.
Head(context.Context) (*header.ExtendedHeader, error)
// IsSyncing returns the status of sync
IsSyncing() bool
}

// initStore is a type representing initialized header store.
// NOTE: It is needed to ensure that Store is always initialized before Syncer is started.
type initStore header.Store

// newInitStore constructs an initialized store
func newInitStore(
lc fx.Lifecycle,
cfg Config,
net modp2p.Network,
s header.Store,
ex header.Exchange,
) (initStore, error) {
trustedHash, err := cfg.trustedHash(net)
if err != nil {
return nil, err
}

lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
err = store.Init(ctx, s, ex, trustedHash)
if err != nil {
// TODO(@Wondertan): Error is ignored, as otherwise unit tests for Node construction fail.
// This is due to requesting step of initialization, which fetches initial Header by trusted hash
// from the network. The step can't be done during unit tests and fixing it would require either
// * Having some test/dev/offline mode for Node that mocks out all the networking
// * Hardcoding full extended header in params pkg, instead of hashes, so we avoid requesting step
// * Or removing explicit initialization in favor of automated initialization by Syncer
log.Errorf("initializing header store failed: %s", err)
}
return nil
},
})

return s, nil
// API is a wrapper around Module for the RPC.
// TODO(@distractedm1nd): These structs need to be autogenerated.
type API struct {
GetByHeight func(context.Context, uint64) (*header.ExtendedHeader, error)
Head func(context.Context) (*header.ExtendedHeader, error)
IsSyncing func() bool
}
24 changes: 1 addition & 23 deletions nodebuilder/header/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,7 @@ import (
"github.com/celestiaorg/celestia-node/header/sync"
)

// Module exposes the functionality needed for querying headers from the network.
// Any method signature changed here needs to also be changed in the API struct.
//
//go:generate mockgen -destination=mocks/api.go -package=mocks . Module
type Module interface {
// GetByHeight returns the ExtendedHeader at the given height, blocking
// until header has been processed by the store or context deadline is exceeded.
GetByHeight(context.Context, uint64) (*header.ExtendedHeader, error)
// Head returns the ExtendedHeader of the chain head.
Head(context.Context) (*header.ExtendedHeader, error)
// IsSyncing returns the status of sync
IsSyncing() bool
}

// API is a wrapper around Module for the RPC.
// TODO(@distractedm1nd): These structs need to be autogenerated.
type API struct {
GetByHeight func(context.Context, uint64) (*header.ExtendedHeader, error)
Head func(context.Context) (*header.ExtendedHeader, error)
IsSyncing func() bool
}

// Service represents the header service that can be started / stopped on a node.
// Service represents the header Service that can be started / stopped on a node.
// Service's main function is to manage its sub-services. Service can contain several
// sub-services, such as Exchange, ExchangeServer, Syncer, and so forth.
type Service struct {
Expand Down
File renamed without changes.
54 changes: 54 additions & 0 deletions nodebuilder/share/constructors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package share

import (
"context"

"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/routing"
routingdisc "github.com/libp2p/go-libp2p/p2p/discovery/routing"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability/cache"
disc "github.com/celestiaorg/celestia-node/share/availability/discovery"
"github.com/celestiaorg/celestia-node/share/service"
)

func discovery(cfg Config) func(routing.ContentRouting, host.Host) *disc.Discovery {
return func(
r routing.ContentRouting,
h host.Host,
) *disc.Discovery {
return disc.NewDiscovery(
h,
routingdisc.NewRoutingDiscovery(r),
cfg.PeersLimit,
cfg.DiscoveryInterval,
cfg.AdvertiseInterval,
)
}
}

// cacheAvailability wraps either Full or Light availability with a cache for result sampling.
func cacheAvailability[A share.Availability](lc fx.Lifecycle, ds datastore.Batching, avail A) share.Availability {
ca := cache.NewShareAvailability(avail, ds)
lc.Append(fx.Hook{
OnStop: ca.Close,
})
return ca
}

func newModule(lc fx.Lifecycle, bServ blockservice.BlockService, avail share.Availability) Module {
serv := service.NewShareService(bServ, avail)
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return serv.Start(ctx)
},
OnStop: func(ctx context.Context) error {
return serv.Stop(ctx)
},
})
return serv
}
16 changes: 8 additions & 8 deletions nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
fx.Error(cfgErr),
fx.Options(options...),
fx.Invoke(share.EnsureEmptySquareExists),
fx.Provide(Discovery(*cfg)),
fx.Provide(NewModule),
fx.Provide(discovery(*cfg)),
fx.Provide(newModule),
)

switch tp {
Expand All @@ -39,9 +39,9 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
return avail.Stop(ctx)
}),
)),
// CacheAvailability's lifecycle continues to use a fx hook,
// since the LC requires a CacheAvailability but the constructor returns a share.Availability
fx.Provide(CacheAvailability[*light.ShareAvailability]),
// cacheAvailability's lifecycle continues to use a fx hook,
// since the LC requires a cacheAvailability but the constructor returns a share.Availability
fx.Provide(cacheAvailability[*light.ShareAvailability]),
)
case node.Bridge, node.Full:
return fx.Module(
Expand All @@ -56,9 +56,9 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
return avail.Stop(ctx)
}),
)),
// CacheAvailability's lifecycle continues to use a fx hook,
// since the LC requires a CacheAvailability but the constructor returns a share.Availability
fx.Provide(CacheAvailability[*full.ShareAvailability]),
// cacheAvailability's lifecycle continues to use a fx hook,
// since the LC requires a cacheAvailability but the constructor returns a share.Availability
fx.Provide(cacheAvailability[*full.ShareAvailability]),
)
default:
panic("invalid node type")
Expand Down
66 changes: 0 additions & 66 deletions nodebuilder/share/service.go

This file was deleted.

Loading

0 comments on commit 95907f4

Please sign in to comment.