Skip to content

Commit

Permalink
chore(async): clear channel before emitting event from abci + cleanup (
Browse files Browse the repository at this point in the history
  • Loading branch information
ocnc2 authored Aug 21, 2024
1 parent 3b00f9b commit f32b8e2
Show file tree
Hide file tree
Showing 29 changed files with 243 additions and 105 deletions.
2 changes: 1 addition & 1 deletion mod/async/pkg/broker/assert.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ package broker
func ensureType[T any](e any) (T, error) {
typedE, ok := e.(T)
if !ok {
return *new(T), errIncompatibleAssignee(*new(T), e)
return *new(T), errWrongType(*new(T), e)
}
return typedE, nil
}
21 changes: 13 additions & 8 deletions mod/async/pkg/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ import (
"github.com/berachain/beacon-kit/mod/primitives/pkg/async"
)

// Broker is responsible for broadcasting all events corresponding to the
// <eventID> to all registered client channels.
// Broker is the unique publisher for broadcasting all events corresponding
// to the <eventID> to all registered client channels.
// There should be exactly one broker responsible for every eventID.
type Broker[T async.BaseEvent] struct {
// eventID is a unique identifier for the event that this broker is
// responsible for.
Expand Down Expand Up @@ -61,12 +62,13 @@ func (b *Broker[T]) EventID() async.EventID {
return b.eventID
}

// Start starts the broker loop.
// Start starts the broker loop in a goroutine to listen and broadcast events
// to all subscribers.
func (b *Broker[T]) Start(ctx context.Context) {
go b.start(ctx)
}

// start starts the broker loop.
// start is a helper function to listen and broadcast events.
func (b *Broker[T]) start(ctx context.Context) {
for {
select {
Expand All @@ -82,8 +84,9 @@ func (b *Broker[T]) start(ctx context.Context) {
}

// Publish publishes a msg to all subscribers.
// Returns ErrTimeout on timeout.
// Errors if the message is not of type T, or if the context is canceled.
func (b *Broker[T]) Publish(msg async.BaseEvent) error {
// assert that the message is of type T
typedMsg, err := ensureType[T](msg)
if err != nil {
return err
Expand All @@ -97,11 +100,12 @@ func (b *Broker[T]) Publish(msg async.BaseEvent) error {
}
}

// Subscribe registers the provided channel to the broker,
// Returns ErrTimeout on timeout.
// Contract: the channel must be a Subscription[T], where T is the expected
// Subscribe registers the provided channel to the broker.
// Errors if the channel is not of type chan T.
// Contract: the channel must be a Chan[T], where T is the expected
// type of the event data.
func (b *Broker[T]) Subscribe(ch any) error {
// assert that the channel is of type chan T
client, err := ensureType[chan T](ch)
if err != nil {
return err
Expand All @@ -115,6 +119,7 @@ func (b *Broker[T]) Subscribe(ch any) error {
// Unsubscribe removes a client from the broker.
// Returns an error if the provided channel is not of type chan T.
func (b *Broker[T]) Unsubscribe(ch any) error {
// assert that the channel is of type chan T
client, err := ensureType[chan T](ch)
if err != nil {
return err
Expand Down
10 changes: 6 additions & 4 deletions mod/async/pkg/broker/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ import (
//
//nolint:gochecknoglobals // errors
var (
ErrIncompatible = errors.New("incompatible assignee")
// errIncompatibleAssignee is the error returned when the assignee is not
// ErrWrongType is the error returned when the assignee is not
// compatible with the assigner.
errIncompatibleAssignee = func(
ErrWrongType = errors.New("incompatible assignee")
// errWrongType is the error returned when the assignee is not
// compatible with the assigner.
errWrongType = func(
assigner interface{}, assignee interface{},
) error {
return errors.Wrapf(
ErrIncompatible,
ErrWrongType,
"expected: %T, received: %T",
assigner,
assignee,
Expand Down
11 changes: 11 additions & 0 deletions mod/async/pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ func (d *Dispatcher) Subscribe(eventID async.EventID, ch any) error {
return broker.Subscribe(ch)
}

// Unsubscribe unsubscribes the given channel from the broker with the given
// eventID.
func (d *Dispatcher) Unsubscribe(eventID async.EventID, ch any) error {
broker, ok := d.brokers[eventID]
if !ok {
return errBrokerNotFound(eventID)
}
return broker.Unsubscribe(ch)
}

// Start will start all the brokers in the Dispatcher.
func (d *Dispatcher) Start(ctx context.Context) error {
for _, broker := range d.brokers {
Expand All @@ -84,6 +94,7 @@ func (d *Dispatcher) Start(ctx context.Context) error {
return nil
}

// Name returns the name of the dispatcher.
func (d *Dispatcher) Name() string {
return "dispatcher"
}
Expand Down
11 changes: 9 additions & 2 deletions mod/async/pkg/dispatcher/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,22 @@ import (

//nolint:gochecknoglobals // errors
var (
ErrNotFound = errors.New("not found")
ErrAlreadyExists = errors.New("already exists")
// ErrNotFound is returned when a broker is not found for an eventID.
ErrNotFound = errors.New("not found")
// ErrAlreadyExists is returned when a broker is already registered for an
// eventID.
ErrAlreadyExists = errors.New("already exists")
// errBrokerNotFound is a helper function to wrap the ErrNotFound error
// with the eventID.
errBrokerNotFound = func(eventID async.EventID) error {
return errors.Wrapf(
ErrNotFound,
"publisher not found for eventID: %s",
eventID,
)
}
// errBrokerAlreadyExists is a helper function to wrap the ErrAlreadyExists
// error with the eventID.
errBrokerAlreadyExists = func(eventID async.EventID) error {
return errors.Wrapf(
ErrAlreadyExists,
Expand Down
49 changes: 49 additions & 0 deletions mod/async/pkg/types/broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// SPDX-License-Identifier: BUSL-1.1
//
// Copyright (C) 2024, Berachain Foundation. All rights reserved.
// Use of this software is governed by the Business Source License included
// in the LICENSE file of this repository and at www.mariadb.com/bsl11.
//
// ANY USE OF THE LICENSED WORK IN VIOLATION OF THIS LICENSE WILL AUTOMATICALLY
// TERMINATE YOUR RIGHTS UNDER THIS LICENSE FOR THE CURRENT AND ALL OTHER
// VERSIONS OF THE LICENSED WORK.
//
// THIS LICENSE DOES NOT GRANT YOU ANY RIGHT IN ANY TRADEMARK OR LOGO OF
// LICENSOR OR ITS AFFILIATES (PROVIDED THAT YOU MAY USE A TRADEMARK OR LOGO OF
// LICENSOR AS EXPRESSLY REQUIRED BY THIS LICENSE).
//
// TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
// AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
// EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
// TITLE.

package types

import (
"context"

"github.com/berachain/beacon-kit/mod/primitives/pkg/async"
)

// Broker is the unique publisher for broadcasting all events corresponding
// to the <eventID> to all registered client channels.
// There should be exactly one broker responsible for every eventID.
type Broker interface {
// EventID returns the event ID that the publisher is responsible for.
EventID() async.EventID
// Start starts the broker loop in a goroutine to listen and broadcast
// events to all subscribers.
Start(ctx context.Context)
// Publish publishes a msg to all subscribers.
// Errors if the message is not of type T, or if the context is canceled.
Publish(event async.BaseEvent) error
// Subscribe registers the provided channel to the broker.
// Errors if the channel is not of type chan T.
// Contract: the channel must be a Chan[T], where T is the expected
// type of the event data.
Subscribe(ch any) error
// Unsubscribe removes a client from the broker.
// Returns an error if the provided channel is not of type chan T.
Unsubscribe(ch any) error
}
26 changes: 7 additions & 19 deletions mod/async/pkg/types/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"github.com/berachain/beacon-kit/mod/primitives/pkg/async"
)

// Dispatcher is the full API for a dispatcher that facilitates the publishing
// of async events and the sending and receiving of async messages.
// Dispatcher is the maximal interface for a dispatcher that facilitates the
// starting and registration of the dispatcher.
type Dispatcher interface {
EventDispatcher
// Start starts the dispatcher.
Expand All @@ -38,8 +38,8 @@ type Dispatcher interface {
Name() string
}

// EventDispatcher is the API for a dispatcher that facilitates the publishing
// of async events.
// EventDispatcher is the minimal interface for an event dispatcher that
// facilitates subscribing and publishing of async events.
type EventDispatcher interface {
// Publish publishes an event to the dispatcher.
Publish(event async.BaseEvent) error
Expand All @@ -48,19 +48,7 @@ type EventDispatcher interface {
// Contract: the channel must be a Subscription[T], where T is the expected
// type of the event data.
Subscribe(eventID async.EventID, ch any) error
// TODO: add unsubscribe
}

// publisher is the interface that supports basic event publisher operations.
type Broker interface {
// Start starts the event publisher.
Start(ctx context.Context)
// Publish publishes the given event to the event publisher.
Publish(event async.BaseEvent) error
// Subscribe subscribes the given channel to the event publisher.
Subscribe(ch any) error
// Unsubscribe unsubscribes the given channel from the event publisher.
Unsubscribe(ch any) error
// EventID returns the event ID that the publisher is responsible for.
EventID() async.EventID
// Unsubscribe unsubscribes the given channel from the broker with the given
// eventID.
Unsubscribe(eventID async.EventID, ch any) error
}
11 changes: 7 additions & 4 deletions mod/beacon/block_store/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Service[
dispatcher asynctypes.EventDispatcher
// store is the block store for the service.
store BlockStoreT
// subFinalizedBlkEvents is a channel for receiving finalized block events.
// subFinalizedBlkEvents is a channel holding BeaconBlockFinalized
subFinalizedBlkEvents chan async.Event[BeaconBlockT]
}

Expand Down Expand Up @@ -69,8 +69,9 @@ func (s *Service[_, _]) Name() string {
return "block-service"
}

// Start starts the block service.
func (s *Service[BeaconBlockT, _]) Start(ctx context.Context) error {
// Start subscribes the BlockStore service to BeaconBlockFinalized events
// and starts the main event loop to handle them accordingly.
func (s *Service[_, _]) Start(ctx context.Context) error {
if !s.config.Enabled {
s.logger.Warn("block service is disabled, skipping storing blocks")
return nil
Expand All @@ -84,11 +85,13 @@ func (s *Service[BeaconBlockT, _]) Start(ctx context.Context) error {
return err
}

// start the event loop to listen and handle events.
go s.eventLoop(ctx)
return nil
}

func (s *Service[BeaconBlockT, BlockStoreT]) eventLoop(ctx context.Context) {
// eventLoop is the main event loop for the block service.
func (s *Service[_, _]) eventLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
Expand Down
27 changes: 16 additions & 11 deletions mod/beacon/blockchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,11 @@ type Service[
// forceStartupSyncOnce is used to force a sync of the startup head.
forceStartupSyncOnce *sync.Once

// subFinalBlkReceived is a channel for receiving finalize beacon block
// requests.
// subFinalBlkReceived is a channel holding FinalBeaconBlockReceived events.
subFinalBlkReceived chan async.Event[BeaconBlockT]
// subBlockReceived is a channel for receiving verify beacon block requests.
// subBlockReceived is a channel holding BeaconBlockReceived events.
subBlockReceived chan async.Event[BeaconBlockT]
// subGenDataReceived is a subscription for receiving genesis data
// received events.
// subGenDataReceived is a channel holding GenesisDataReceived events.
subGenDataReceived chan async.Event[GenesisT]
}

Expand Down Expand Up @@ -157,11 +155,11 @@ func (s *Service[
return "blockchain"
}

// Start sets up the service to listen for FinalizeBeaconBlock,
// VerifyBeaconBlock, and ProcessGenesisData requests, and handles them
// accordingly.
// Start subscribes the Blockchain service to GenesisDataReceived,
// BeaconBlockReceived, and FinalBeaconBlockReceived events, and begins
// the main event loop to handle them accordingly.
func (s *Service[
_, BeaconBlockT, _, _, _, _, _, _, GenesisT, _,
_, _, _, _, _, _, _, _, _, _,
]) Start(ctx context.Context) error {
if err := s.dispatcher.Subscribe(
async.GenesisDataReceived, s.subGenDataReceived,
Expand All @@ -181,7 +179,7 @@ func (s *Service[
return err
}

// start a goroutine to listen for requests and handle accordingly
// start the main event loop to listen and handle events.
go s.eventLoop(ctx)
return nil
}
Expand All @@ -205,9 +203,11 @@ func (s *Service[
}

/* -------------------------------------------------------------------------- */
/* Message Handlers */
/* Event Handlers */
/* -------------------------------------------------------------------------- */

// handleGenDataReceived processes the genesis data received and emits a
// GenesisDataProcessed event containing the resulting validator updates.
func (s *Service[
_, _, _, _, _, _, _, _, GenesisT, _,
]) handleGenDataReceived(msg async.Event[GenesisT]) {
Expand Down Expand Up @@ -242,6 +242,8 @@ func (s *Service[
}
}

// handleBeaconBlockReceived emits a BeaconBlockVerified event with the error
// result from VerifyIncomingBlock.
func (s *Service[
_, BeaconBlockT, _, _, _, _, _, _, _, _,
]) handleBeaconBlockReceived(
Expand Down Expand Up @@ -270,6 +272,9 @@ func (s *Service[
}
}

// handleBeaconBlockFinalization processes the finalized beacon block and emits
// a FinalValidatorUpdatesProcessed event containing the resulting validator
// updates.
func (s *Service[
_, BeaconBlockT, _, _, _, _, _, _, _, _,
]) handleBeaconBlockFinalization(
Expand Down
7 changes: 3 additions & 4 deletions mod/beacon/validator/block_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ import (

// buildBlockAndSidecars builds a new beacon block.
func (s *Service[
AttestationDataT, BeaconBlockT, _, _, BlobSidecarsT, _, _, _, _, _, _,
SlashingInfoT, SlotDataT,
_, BeaconBlockT, _, _, BlobSidecarsT, _, _, _, _, _, _, _, SlotDataT,
]) buildBlockAndSidecars(
ctx context.Context,
slotData SlotDataT,
Expand Down Expand Up @@ -250,8 +249,8 @@ func (s *Service[

// BuildBlockBody assembles the block body with necessary components.
func (s *Service[
AttestationDataT, BeaconBlockT, _, BeaconStateT, _,
_, _, Eth1DataT, ExecutionPayloadT, _, _, SlashingInfoT, SlotDataT,
_, BeaconBlockT, _, BeaconStateT, _, _, _, Eth1DataT, ExecutionPayloadT, _,
_, _, SlotDataT,
]) buildBlockBody(
_ context.Context,
st BeaconStateT,
Expand Down
Loading

0 comments on commit f32b8e2

Please sign in to comment.