Skip to content

Commit

Permalink
chore(da): clea (berachain#1913)
Browse files Browse the repository at this point in the history
  • Loading branch information
archbear authored Aug 16, 2024
1 parent 98176ac commit f52c938
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 85 deletions.
55 changes: 17 additions & 38 deletions mod/da/pkg/da/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,54 +23,33 @@ package da
import (
"context"

"github.com/berachain/beacon-kit/mod/async/pkg/broker"
asynctypes "github.com/berachain/beacon-kit/mod/async/pkg/types"
"github.com/berachain/beacon-kit/mod/log"
"github.com/berachain/beacon-kit/mod/primitives/pkg/events"
)

type Service[
AvailabilityStoreT AvailabilityStore[BeaconBlockBodyT, BlobSidecarsT],
BeaconBlockBodyT any,
AvailabilityStoreT any,
BlobSidecarsT BlobSidecar,
//nolint:lll // formatter.
EventPublisherSubscriberT EventPublisherSubscriber[*asynctypes.Event[BlobSidecarsT]],
ExecutionPayloadT any,
] struct {
avs AvailabilityStoreT
bp BlobProcessor[
AvailabilityStoreT, BeaconBlockBodyT,
BlobSidecarsT, ExecutionPayloadT,
]
sidecarsBroker EventPublisherSubscriberT
avs AvailabilityStoreT
bp BlobProcessor[AvailabilityStoreT, BlobSidecarsT]
sidecarsBroker *broker.Broker[*asynctypes.Event[BlobSidecarsT]]
logger log.Logger[any]
}

// NewService returns a new DA service.
func NewService[
AvailabilityStoreT AvailabilityStore[
BeaconBlockBodyT, BlobSidecarsT,
],
BeaconBlockBodyT any,
AvailabilityStoreT any,
BlobSidecarsT BlobSidecar,
//nolint:lll // formatter.
EventPublisherSubscriberT EventPublisherSubscriber[*asynctypes.Event[BlobSidecarsT]],
ExecutionPayloadT any,
](
avs AvailabilityStoreT,
bp BlobProcessor[
AvailabilityStoreT, BeaconBlockBodyT,
BlobSidecarsT, ExecutionPayloadT,
],
sidecarsBroker EventPublisherSubscriberT,
bp BlobProcessor[AvailabilityStoreT, BlobSidecarsT],
sidecarsBroker *broker.Broker[*asynctypes.Event[BlobSidecarsT]],
logger log.Logger[any],
) *Service[
AvailabilityStoreT, BeaconBlockBodyT,
BlobSidecarsT, EventPublisherSubscriberT, ExecutionPayloadT,
] {
return &Service[
AvailabilityStoreT, BeaconBlockBodyT,
BlobSidecarsT, EventPublisherSubscriberT, ExecutionPayloadT,
]{
) *Service[AvailabilityStoreT, BlobSidecarsT] {
return &Service[AvailabilityStoreT, BlobSidecarsT]{
avs: avs,
bp: bp,
sidecarsBroker: sidecarsBroker,
Expand All @@ -79,12 +58,12 @@ func NewService[
}

// Name returns the name of the service.
func (s *Service[_, _, _, _, _]) Name() string {
func (s *Service[_, _]) Name() string {
return "da"
}

// Start starts the service.
func (s *Service[_, _, _, _, _]) Start(ctx context.Context) error {
func (s *Service[_, _]) Start(ctx context.Context) error {
subSidecarsCh, err := s.sidecarsBroker.Subscribe()
if err != nil {
return err
Expand All @@ -94,7 +73,7 @@ func (s *Service[_, _, _, _, _]) Start(ctx context.Context) error {
}

// start starts the service.
func (s *Service[_, _, BlobSidecarsT, _, _]) start(
func (s *Service[_, BlobSidecarsT]) start(
ctx context.Context,
sidecarsCh chan *asynctypes.Event[BlobSidecarsT],
) {
Expand All @@ -116,7 +95,7 @@ func (s *Service[_, _, BlobSidecarsT, _, _]) start(
// handleBlobSidecarsProcessRequest handles the BlobSidecarsProcessRequest
// event.
// It processes the sidecars and publishes a BlobSidecarsProcessed event.
func (s *Service[_, _, BlobSidecarsT, _, _]) handleBlobSidecarsProcessRequest(
func (s *Service[_, BlobSidecarsT]) handleBlobSidecarsProcessRequest(
msg *asynctypes.Event[BlobSidecarsT],
) {
err := s.processSidecars(msg.Context(), msg.Data())
Expand All @@ -143,7 +122,7 @@ func (s *Service[_, _, BlobSidecarsT, _, _]) handleBlobSidecarsProcessRequest(

// handleBlobSidecarsReceived handles the BlobSidecarsReceived event.
// It receives the sidecars and publishes a BlobSidecarsProcessed event.
func (s *Service[_, _, BlobSidecarsT, _, _]) handleBlobSidecarsReceived(
func (s *Service[_, BlobSidecarsT]) handleBlobSidecarsReceived(
msg *asynctypes.Event[BlobSidecarsT],
) {
err := s.receiveSidecars(msg.Data())
Expand All @@ -169,7 +148,7 @@ func (s *Service[_, _, BlobSidecarsT, _, _]) handleBlobSidecarsReceived(
}

// ProcessSidecars processes the blob sidecars.
func (s *Service[_, _, BlobSidecarsT, _, _]) processSidecars(
func (s *Service[_, BlobSidecarsT]) processSidecars(
_ context.Context,
sidecars BlobSidecarsT,
) error {
Expand All @@ -182,7 +161,7 @@ func (s *Service[_, _, BlobSidecarsT, _, _]) processSidecars(
}

// VerifyIncomingBlobs receives blobs from the network and processes them.
func (s *Service[_, _, BlobSidecarsT, _, _]) receiveSidecars(
func (s *Service[_, BlobSidecarsT]) receiveSidecars(
sidecars BlobSidecarsT,
) error {
// If there are no blobs to verify, return early.
Expand Down
42 changes: 1 addition & 41 deletions mod/da/pkg/da/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,8 @@

package da

import (
"context"

"github.com/berachain/beacon-kit/mod/primitives/pkg/math"
)

// AvailabilityStore interface is responsible for validating and storing
// sidecars for specific blocks, as well as verifying sidecars that have already
// been stored.
type AvailabilityStore[BeaconBlockBodyT any, BlobSidecarsT any] interface {
// Persist makes sure that the sidecar remains accessible for data
// availability checks throughout the beacon node's operation.
Persist(math.Slot, BlobSidecarsT) error
}

// BlobProcessor is the interface for the blobs processor.
type BlobProcessor[
AvailabilityStoreT AvailabilityStore[BeaconBlockBodyT, BlobSidecarsT],
BeaconBlockBodyT any,
BlobSidecarsT BlobSidecar,
ExecutionPayloadT any,
] interface {
type BlobProcessor[AvailabilityStoreT any, BlobSidecarsT any] interface {
// ProcessSidecars processes the blobs and ensures they match the local
// state.
ProcessSidecars(
Expand All @@ -61,23 +41,3 @@ type BlobSidecar interface {
// IsNil checks if the sidecar is nil.
IsNil() bool
}

// EventPublisher represents the event publisher interface.
type EventPublisherSubscriber[T any] interface {
// PublishEvent publishes an event.
Publish(context.Context, T) error
// Subscribe subscribes to the event system.
Subscribe() (chan T, error)
}

// StorageBackend defines an interface for accessing various storage components
// required by the beacon node.
type StorageBackend[
AvailabilityStoreT AvailabilityStore[BeaconBlockBodyT, BlobSidecarsT],
BeaconBlockBodyT,
BeaconStateT,
BlobSidecarsT BlobSidecar,
] interface {
// AvailabilityStore returns the availability store for the given context.
AvailabilityStore() AvailabilityStoreT
}
3 changes: 0 additions & 3 deletions mod/node-core/pkg/components/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,7 @@ func ProvideDAService[
) *DAService {
return da.NewService[
*AvailabilityStore,
*BeaconBlockBody,
*BlobSidecars,
*SidecarsBroker,
*ExecutionPayload,
](
in.AvailabilityStore,
in.BlobProcessor,
Expand Down
3 changes: 0 additions & 3 deletions mod/node-core/pkg/components/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,7 @@ type (
// DAService is a type alias for the DA service.
DAService = da.Service[
*AvailabilityStore,
*BeaconBlockBody,
*BlobSidecars,
*SidecarsBroker,
*ExecutionPayload,
]

// DBManager is a type alias for the database manager.
Expand Down

0 comments on commit f52c938

Please sign in to comment.