Skip to content

Commit

Permalink
refactor(nodebuilder): moving service/<service> services to respectiv…
Browse files Browse the repository at this point in the history
…e node sub-packages (celestiaorg#1056)
  • Loading branch information
distractedm1nd authored Sep 30, 2022
1 parent 09127c0 commit 646dd9e
Show file tree
Hide file tree
Showing 59 changed files with 390 additions and 396 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ celestia <node_type> start
## Package-specific documentation

- [Header](./service/header/doc.go)
- [Share](./service/share/doc.go)
- [Share](./share/doc.go)
- [DAS](./das/doc.go)

## Code of Conduct
Expand Down
2 changes: 1 addition & 1 deletion das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

"github.com/celestiaorg/celestia-node/fraud"
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/service/share"
"github.com/celestiaorg/celestia-node/share"
)

var log = logging.Logger("das")
Expand Down
26 changes: 12 additions & 14 deletions das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

"github.com/celestiaorg/celestia-node/fraud"
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/service/share"
"github.com/celestiaorg/celestia-node/share"
)

var timeout = time.Second * 15
Expand All @@ -31,12 +31,12 @@ func TestDASerLifecycle(t *testing.T) {
bServ := mdutils.Bserv()
avail := share.TestLightAvailability(bServ)
// 15 headers from the past and 15 future headers
mockGet, shareServ, sub, mockService := createDASerSubcomponents(t, bServ, 15, 15, avail)
mockGet, sub, mockService := createDASerSubcomponents(t, bServ, 15, 15)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
t.Cleanup(cancel)

daser := NewDASer(shareServ, sub, mockGet, ds, mockService)
daser := NewDASer(avail, sub, mockGet, ds, mockService)

err := daser.Start(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -65,12 +65,12 @@ func TestDASer_Restart(t *testing.T) {
bServ := mdutils.Bserv()
avail := share.TestLightAvailability(bServ)
// 15 headers from the past and 15 future headers
mockGet, shareServ, sub, mockService := createDASerSubcomponents(t, bServ, 15, 15, avail)
mockGet, sub, mockService := createDASerSubcomponents(t, bServ, 15, 15)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
t.Cleanup(cancel)

daser := NewDASer(shareServ, sub, mockGet, ds, mockService)
daser := NewDASer(avail, sub, mockGet, ds, mockService)

err := daser.Start(ctx)
require.NoError(t, err)
Expand All @@ -97,7 +97,7 @@ func TestDASer_Restart(t *testing.T) {
restartCtx, restartCancel := context.WithTimeout(context.Background(), timeout)
t.Cleanup(restartCancel)

daser = NewDASer(shareServ, sub, mockGet, ds, mockService)
daser = NewDASer(avail, sub, mockGet, ds, mockService)
err = daser.Start(restartCtx)
require.NoError(t, err)

Expand Down Expand Up @@ -134,16 +134,16 @@ func TestDASer_stopsAfter_BEFP(t *testing.T) {
require.NoError(t, err)
avail := share.TestFullAvailability(bServ)
// 15 headers from the past and 15 future headers
mockGet, shareServ, sub, _ := createDASerSubcomponents(t, bServ, 15, 15, avail)
mockGet, sub, _ := createDASerSubcomponents(t, bServ, 15, 15)

// create fraud service and break one header
// create fraud share and break one header
f := fraud.NewProofService(ps, net.Hosts()[0], mockGet.GetByHeight, ds, false)
require.NoError(t, f.Start(ctx))
mockGet.headers[1] = header.CreateFraudExtHeader(t, mockGet.headers[1], bServ)
newCtx := context.Background()

// create and start DASer
daser := NewDASer(shareServ, sub, mockGet, ds, f)
daser := NewDASer(avail, sub, mockGet, ds, f)
resultCh := make(chan error)
go fraud.OnProof(newCtx, f, fraud.BadEncoding,
func(fraud.Proof) {
Expand All @@ -165,18 +165,16 @@ func TestDASer_stopsAfter_BEFP(t *testing.T) {
// createDASerSubcomponents takes numGetter (number of headers
// to store in mockGetter) and numSub (number of headers to store
// in the mock header.Subscriber), returning a newly instantiated
// mockGetter, share.Service, and mock header.Subscriber.
// mockGetter, share.Availability, and mock header.Subscriber.
func createDASerSubcomponents(
t *testing.T,
bServ blockservice.BlockService,
numGetter,
numSub int,
availability share.Availability,
) (*mockGetter, *share.Service, *header.DummySubscriber, *fraud.DummyService) {
shareServ := share.NewService(bServ, availability)
) (*mockGetter, *header.DummySubscriber, *fraud.DummyService) {
mockGet, sub := createMockGetterAndSub(t, bServ, numGetter, numSub)
fraud := new(fraud.DummyService)
return mockGet, shareServ, sub, fraud
return mockGet, sub, fraud
}

func createMockGetterAndSub(
Expand Down
4 changes: 2 additions & 2 deletions header/p2p/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (
"github.com/celestiaorg/celestia-node/header"
)

// Subscriber manages the lifecycle and relationship of header Service
// Subscriber manages the lifecycle and relationship of header Module
// with the "header-sub" gossipsub topic.
type Subscriber struct {
pubsub *pubsub.PubSub
topic *pubsub.Topic
}

// NewSubscriber returns a Subscriber that manages the header Service's
// NewSubscriber returns a Subscriber that manages the header Module's
// relationship with the "header-sub" gossipsub topic.
func NewSubscriber(ps *pubsub.PubSub) *Subscriber {
return &Subscriber{
Expand Down
2 changes: 1 addition & 1 deletion header/p2p/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/celestiaorg/celestia-node/header"
)

// TestSubscriber tests the header Service's implementation of Subscriber.
// TestSubscriber tests the header Module's implementation of Subscriber.
func TestSubscriber(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
defer cancel()
Expand Down
2 changes: 1 addition & 1 deletion ipld/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Retriever struct {
bServ blockservice.BlockService
}

// NewRetriever creates a new instance of the Retriever over IPLD Service and rmst2d.Codec
// NewRetriever creates a new instance of the Retriever over IPLD BlockService and rmst2d.Codec
func NewRetriever(bServ blockservice.BlockService) *Retriever {
return &Retriever{bServ: bServ}
}
Expand Down
4 changes: 2 additions & 2 deletions nodebuilder/core/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"github.com/celestiaorg/celestia-node/nodebuilder/node"
)

// Module collects all the components and services related to managing the relationship with the Core node.
func Module(tp node.Type, cfg *Config, options ...fx.Option) fx.Option {
// ConstructModule collects all the components and services related to managing the relationship with the Core node.
func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option {
// sanitize config values before constructing module
cfgErr := cfg.Validate()

Expand Down
6 changes: 3 additions & 3 deletions nodebuilder/daser/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ import (
"github.com/ipfs/go-datastore"

"github.com/celestiaorg/celestia-node/das"
"github.com/celestiaorg/celestia-node/fraud"
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/service/share"
"github.com/celestiaorg/celestia-node/nodebuilder/fraud"
"github.com/celestiaorg/celestia-node/share"
)

func NewDASer(
da share.Availability,
hsub header.Subscriber,
store header.Store,
batching datastore.Batching,
fraudService fraud.Service,
fraudService fraud.Module,
) *das.DASer {
return das.NewDASer(da, hsub, store, batching, fraudService)
}
8 changes: 4 additions & 4 deletions nodebuilder/daser/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ import (
"github.com/celestiaorg/celestia-node/das"
"github.com/celestiaorg/celestia-node/fraud"
"github.com/celestiaorg/celestia-node/libs/fxutil"
fraudbuilder "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
fraudServ "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
)

func Module(tp node.Type) fx.Option {
func ConstructModule(tp node.Type) fx.Option {
switch tp {
case node.Light, node.Full:
return fx.Module(
"daser",
fx.Provide(fx.Annotate(
NewDASer,
fx.OnStart(func(ctx context.Context, lc fx.Lifecycle, fservice fraud.Service, das *das.DASer) error {
fx.OnStart(func(ctx context.Context, lc fx.Lifecycle, fservice fraudServ.Module, das *das.DASer) error {
lifecycleCtx := fxutil.WithLifecycle(ctx, lc)
return fraudbuilder.Lifecycle(ctx, lifecycleCtx, fraud.BadEncoding, fservice,
return fraudServ.Lifecycle(ctx, lifecycleCtx, fraud.BadEncoding, fservice,
das.Start, das.Stop)
}),
fx.OnStop(func(ctx context.Context, das *das.DASer) error {
Expand Down
20 changes: 10 additions & 10 deletions nodebuilder/fraud/fraud.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,25 @@ import (
"github.com/celestiaorg/celestia-node/header"
)

// Service constructs a fraud proof service with the syncer disabled.
func Service(
// NewModule constructs a fraud proof service with the syncer disabled.
func NewModule(
lc fx.Lifecycle,
sub *pubsub.PubSub,
host host.Host,
hstore header.Store,
ds datastore.Batching,
) (fraud.Service, error) {
) (Module, error) {
return newFraudService(lc, sub, host, hstore, ds, false)
}

// ServiceWithSyncer constructs fraud proof service with enabled syncer.
func ServiceWithSyncer(
// ModuleWithSyncer constructs fraud proof service with enabled syncer.
func ModuleWithSyncer(
lc fx.Lifecycle,
sub *pubsub.PubSub,
host host.Host,
hstore header.Store,
ds datastore.Batching,
) (fraud.Service, error) {
) (Module, error) {
return newFraudService(lc, sub, host, hstore, ds, true)
}

Expand All @@ -40,7 +40,7 @@ func newFraudService(
host host.Host,
hstore header.Store,
ds datastore.Batching,
isEnabled bool) (fraud.Service, error) {
isEnabled bool) (Module, error) {
pservice := fraud.NewProofService(sub, host, hstore.GetByHeight, ds, isEnabled)
lc.Append(fx.Hook{
OnStart: pservice.Start,
Expand All @@ -55,10 +55,10 @@ func newFraudService(
func Lifecycle(
startCtx, lifecycleCtx context.Context,
p fraud.ProofType,
fservice fraud.Service,
fraudModule Module,
start, stop func(context.Context) error,
) error {
proofs, err := fservice.Get(startCtx, p)
proofs, err := fraudModule.Get(startCtx, p)
switch err {
default:
return err
Expand All @@ -71,7 +71,7 @@ func Lifecycle(
return err
}
// handle incoming Fraud Proofs
go fraud.OnProof(lifecycleCtx, fservice, p, func(fraud.Proof) {
go fraud.OnProof(lifecycleCtx, fraudModule, p, func(fraud.Proof) {
if err := stop(lifecycleCtx); err != nil {
log.Error(err)
}
Expand Down
6 changes: 3 additions & 3 deletions nodebuilder/fraud/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ import (

var log = logging.Logger("fraud-module")

func Module(tp node.Type) fx.Option {
func ConstructModule(tp node.Type) fx.Option {
switch tp {
case node.Light:
return fx.Module(
"fraud",
fx.Provide(ServiceWithSyncer),
fx.Provide(ModuleWithSyncer),
)
case node.Full, node.Bridge:
return fx.Module(
"fraud",
fx.Provide(Service),
fx.Provide(NewModule),
)
default:
panic("invalid node type")
Expand Down
9 changes: 9 additions & 0 deletions nodebuilder/fraud/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package fraud

import "github.com/celestiaorg/celestia-node/fraud"

// Module encompasses the behavior necessary to subscribe and broadcast
// fraud proofs within the network.
type Module interface {
fraud.Service
}
11 changes: 5 additions & 6 deletions nodebuilder/header/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,22 @@ import (
"github.com/celestiaorg/celestia-node/header/store"
"github.com/celestiaorg/celestia-node/header/sync"
"github.com/celestiaorg/celestia-node/libs/fxutil"
fraudbuilder "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
fraudServ "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/params"
headerservice "github.com/celestiaorg/celestia-node/service/header"
)

var log = logging.Logger("header-module")

func Module(tp node.Type, cfg *Config) fx.Option {
func ConstructModule(tp node.Type, cfg *Config) fx.Option {
// sanitize config values before constructing module
cfgErr := cfg.Validate()

baseComponents := fx.Options(
fx.Supply(*cfg),
fx.Error(cfgErr),
fx.Supply(params.BlockTime),
fx.Provide(headerservice.NewHeaderService),
fx.Provide(NewHeaderService),
fx.Provide(fx.Annotate(
store.NewStore,
fx.OnStart(func(ctx context.Context, store header.Store) error {
Expand All @@ -47,7 +46,7 @@ func Module(tp node.Type, cfg *Config) fx.Option {
}),
fx.Provide(fx.Annotate(
sync.NewSyncer,
fx.OnStart(func(ctx context.Context, lc fx.Lifecycle, fservice fraud.Service, syncer *sync.Syncer) error {
fx.OnStart(func(ctx context.Context, lc fx.Lifecycle, fservice fraudServ.Module, syncer *sync.Syncer) error {
syncerStartFunc := func(ctx context.Context) error {
err := syncer.Start(ctx)
switch err {
Expand All @@ -60,7 +59,7 @@ func Module(tp node.Type, cfg *Config) fx.Option {
return nil
}
lifecycleCtx := fxutil.WithLifecycle(ctx, lc)
return fraudbuilder.Lifecycle(ctx, lifecycleCtx, fraud.BadEncoding, fservice,
return fraudServ.Lifecycle(ctx, lifecycleCtx, fraud.BadEncoding, fservice,
syncerStartFunc, syncer.Stop)
}),
fx.OnStop(func(ctx context.Context, syncer *sync.Syncer) error {
Expand Down
59 changes: 59 additions & 0 deletions nodebuilder/header/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package header

import (
"context"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/header/p2p"
"github.com/celestiaorg/celestia-node/header/sync"
)

type Module interface {
// GetByHeight returns the ExtendedHeader at the given height, blocking
// until header has been processed by the store or context deadline is exceeded.
GetByHeight(context.Context, uint64) (*header.ExtendedHeader, error)
// Head returns the ExtendedHeader of the chain head.
Head(context.Context) (*header.ExtendedHeader, error)
// IsSyncing returns the status of sync
IsSyncing() bool
}

// service represents the header service that can be started / stopped on a node.
// service's main function is to manage its sub-services. service can contain several
// sub-services, such as Exchange, ExchangeServer, Syncer, and so forth.
type service struct {
ex header.Exchange

syncer *sync.Syncer
sub header.Subscriber
p2pServer *p2p.ExchangeServer
store header.Store
}

// NewHeaderService creates a new instance of header service.
func NewHeaderService(
syncer *sync.Syncer,
sub header.Subscriber,
p2pServer *p2p.ExchangeServer,
ex header.Exchange,
store header.Store) Module {
return &service{
syncer: syncer,
sub: sub,
p2pServer: p2pServer,
ex: ex,
store: store,
}
}

func (s *service) GetByHeight(ctx context.Context, height uint64) (*header.ExtendedHeader, error) {
return s.store.GetByHeight(ctx, height)
}

func (s *service) Head(ctx context.Context) (*header.ExtendedHeader, error) {
return s.store.Head(ctx)
}

func (s *service) IsSyncing() bool {
return !s.syncer.State().Finished()
}
Loading

0 comments on commit 646dd9e

Please sign in to comment.