Skip to content

Commit

Permalink
chore(services): redefine and clean up services (berachain#1147)
Browse files Browse the repository at this point in the history
* scaffolding db manager service

* lint

* u love to see it and by love i mean hate

* feat(storage): logger pro evolution

* dep not inject

* i love depinject

* explicit type for rangestore to avoid typecasting avs indexdb to rangedb

* generate

* tidy

* were so dumb

* hex

* recover panic from nil avs

* move defer block up

* resolve conflicts:
:

* feat: dbmanager stores pruners with map

* chore(pruner): logs & names

* arch linux

* lintgenerate

* i love testing ❤️

* might move later

* bet

* Revert "Merge branch 'main' into services"

This reverts commit 22f0ffc, reversing
changes made to ab8164f.
reeee

* Revert "Revert revert + resolve conflicts ree"

This reverts commit 7ad5955.

* resolve conflicts

* erm

* feat(storage): introduce invariants for range_db for better ammortized pruning runtime

* i love t4esting

* i hate testing

* migrate to events feed

* fixes for merge

* making some generates

* rip

* linter schminter

* new phone who dis

* linter? i barely even know her

* fix

* explicit invariant test on err

* fixes for merge

---------

Signed-off-by: ocnc2 <[email protected]>
Co-authored-by: archbear <[email protected]>
Co-authored-by: archbear <[email protected]>
Co-authored-by: Devon Bear <[email protected]>
  • Loading branch information
4 people authored Jun 2, 2024
1 parent 8a2b97d commit e76d896
Show file tree
Hide file tree
Showing 29 changed files with 1,610 additions and 245 deletions.
10 changes: 10 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ packages:
recursive: False
with-expecter: true
all: True
github.com/berachain/beacon-kit/mod/storage/pkg/pruner:
config:
recursive: False
with-expecter: true
all: True
github.com/berachain/beacon-kit/mod/storage/pkg/manager:
config:
recursive: False
with-expecter: true
all: True
github.com/berachain/beacon-kit/mod/primitives/pkg/crypto:
config:
recursive: False
Expand Down
32 changes: 0 additions & 32 deletions mod/beacon/blockchain/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,42 +126,10 @@ func (s *Service[
// We won't send a fcu if the block is bad, should be addressed
// via ticker later.
go s.sendPostBlockFCU(ctx, st, blk)
go s.postBlockProcessTasks(ctx, st)

return valUpdates, nil
}

// postBlockProcessTasks performs post block processing tasks.
//
// TODO: Deprecate this function and move it's usage outside of the main block
// processing thread.
func (s *Service[
AvailabilityStoreT, BeaconBlockT, BeaconStateT,
BlobSidecarsT, DepositStoreT,
]) postBlockProcessTasks(
ctx context.Context,
st BeaconStateT,
) {
// Prune deposits.
// TODO: This should be moved into a go-routine in the background.
// Watching for logs should be completely decoupled as well.
idx, err := st.GetEth1DepositIndex()
if err != nil {
s.logger.Error(
"failed to get eth1 deposit index in postBlockProcessTasks",
"error", err)
return
}

// TODO: pruner shouldn't be in main block processing thread.
if err = s.PruneDepositEvents(ctx, idx); err != nil {
s.logger.Error(
"failed to prune deposit events in postBlockProcessTasks",
"error", err)
return
}
}

// ProcessBeaconBlock processes the beacon block.
func (s *Service[
AvailabilityStoreT,
Expand Down
4 changes: 2 additions & 2 deletions mod/beacon/blockchain/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ type EventFeed[EventT any] interface {

// DepositStore defines the interface for managing deposit operations.
type DepositStore interface {
// PruneToIndex prunes the deposit store up to the specified index.
PruneToIndex(index uint64) error
// Prune prunes the deposit store up to the specified index.
Prune(index uint64) error
// EnqueueDeposits adds a list of deposits to the deposit store.
EnqueueDeposits(deposits []*types.Deposit) error
}
Expand Down
2 changes: 1 addition & 1 deletion mod/execution/pkg/deposit/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type Deposit[DepositT, WithdrawalCredentialsT any] interface {
// Store defines the interface for managing deposit operations.
type Store[DepositT any] interface {
// PruneToIndex prunes the deposit store up to the specified index.
PruneToIndex(index uint64) error
Prune(index uint64) error
// EnqueueDeposits adds a list of deposits to the deposit store.
EnqueueDeposits(deposits []DepositT) error
}
Expand Down
1 change: 1 addition & 0 deletions mod/node-builder/pkg/components/avs.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func ProvideAvailibilityStore(
filedb.WithDirectoryPermissions(os.ModePerm),
filedb.WithLogger(logger),
),
chainSpec.MinEpochsForBlobsSidecarsRequest()*chainSpec.SlotsPerEpoch(),
),
logger.With("service", "beacon-kit.da.store"),
chainSpec,
Expand Down
47 changes: 47 additions & 0 deletions mod/node-builder/pkg/components/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ import (
"github.com/berachain/beacon-kit/mod/state-transition/pkg/core"
"github.com/berachain/beacon-kit/mod/state-transition/pkg/randao"
depositdb "github.com/berachain/beacon-kit/mod/storage/pkg/deposit"
"github.com/berachain/beacon-kit/mod/storage/pkg/filedb"
"github.com/berachain/beacon-kit/mod/storage/pkg/manager"
"github.com/berachain/beacon-kit/mod/storage/pkg/pruner"
sdkversion "github.com/cosmos/cosmos-sdk/version"
gokzg4844 "github.com/crate-crypto/go-kzg-4844"
"github.com/ethereum/go-ethereum/event"
Expand Down Expand Up @@ -211,6 +214,49 @@ func ProvideRuntime(
ts,
)

depositPruner := pruner.NewPruner[
*types.BeaconBlock,
events.Block[*types.BeaconBlock],
event.Subscription,
](
logger.With("service", manager.DepositPrunerName),
storageBackend.DepositStore(nil),
manager.DepositPrunerName,
&blockFeed,
)

defer func() {
// TODO: at this point, Deposit store and Availability store are both
// nil.
// Recovering from casting nil to *filedb.RangeDB.
_ = recover()
}()

availabilityPruner := pruner.NewPruner[
*types.BeaconBlock,
events.Block[*types.BeaconBlock],
event.Subscription,
](
logger.With("service", manager.AvailabilityPrunerName),
storageBackend.AvailabilityStore(
nil).IndexDB.(*filedb.RangeDB),
manager.AvailabilityPrunerName,
&blockFeed,
)

dbManagerService, err := manager.NewDBManager[
*types.BeaconBlock,
events.Block[*types.BeaconBlock],
event.Subscription,
](
logger.With("service", "db-manager"),
depositPruner,
availabilityPruner,
)
if err != nil {
return nil, err
}

// Build the blockchain service.
chainService := blockchain.NewService[
*dastore.Store[types.BeaconBlockBody],
Expand Down Expand Up @@ -264,6 +310,7 @@ func ProvideRuntime(
ts,
sdkversion.Version,
)),
service.WithService(dbManagerService),
)

// Pass all the services and options into the BeaconKitRuntime.
Expand Down
2 changes: 1 addition & 1 deletion mod/runtime/pkg/runtime/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type DepositStore interface {
DequeueDeposits(
numDequeue uint64,
) ([]*types.Deposit, error)
PruneToIndex(
Prune(
index uint64,
) error
}
Expand Down
4 changes: 2 additions & 2 deletions mod/storage/pkg/deposit/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ func (kv *KVStore[DepositT]) DequeueDeposits(
return kv.depositQueue.PopMulti(context.TODO(), numDequeue)
}

// PruneToIndex removes all deposits up to the given index.
func (kv *KVStore[DepositT]) PruneToIndex(
// Prune removes all deposits up to the given index.
func (kv *KVStore[DepositT]) Prune(
index uint64,
) error {
length, err := kv.depositQueue.Len(context.TODO())
Expand Down
50 changes: 37 additions & 13 deletions mod/storage/pkg/filedb/range_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,35 @@ package filedb

import (
"bytes"
"encoding/hex"
"fmt"
"strconv"

"github.com/berachain/beacon-kit/mod/errors"
"github.com/berachain/beacon-kit/mod/primitives/pkg/hex"
db "github.com/berachain/beacon-kit/mod/storage/pkg/interfaces"
)

// two is a constant for the number 2.
const two = 2

// Compile-time assertion of prunable interface.
var _ db.Prunable = (*RangeDB)(nil)

// RangeDB is a database that stores versioned data.
// It prefixes keys with an index.
// Invariant: No index below firstNonNilIndex should be populated.
type RangeDB struct {
db.DB
dataWindow uint64
firstNonNilIndex uint64
}

// NewRangeDB creates a new RangeDB.
func NewRangeDB(db db.DB) *RangeDB {
func NewRangeDB(db db.DB, dataWindow uint64) *RangeDB {
return &RangeDB{
DB: db,
DB: db,
dataWindow: dataWindow,
firstNonNilIndex: 0,
}
}

Expand All @@ -69,6 +77,10 @@ func (db *RangeDB) Has(index uint64, key []byte) (bool, error) {
// It prefixes the key with the index and a slash before storing it in the
// underlying database.
func (db *RangeDB) Set(index uint64, key []byte, value []byte) error {
// enforce invariant
if index < db.firstNonNilIndex {
db.firstNonNilIndex = index
}
return db.DB.Set(db.prefix(index, key), value)
}

Expand All @@ -95,9 +107,30 @@ func (db *RangeDB) DeleteRange(from, to uint64) error {
return nil
}

func (db *RangeDB) Prune(index uint64) error {
if db.dataWindow > index {
return nil
}
err := db.DeleteRange(db.firstNonNilIndex, index-db.dataWindow)
if err != nil {
// Resets last pruned index in case Delete somehow populates indices on
// err. This will cause the next prune operation is O(n), but next
// successful prune will set it to the correct value, so runtime is
// ammortized
db.firstNonNilIndex = 0
return err
}
db.firstNonNilIndex = index - db.dataWindow
return nil
}

func (db *RangeDB) FirstNonNilIndex() uint64 {
return db.firstNonNilIndex
}

// prefix prefixes the given key with the index and a slash.
func (db *RangeDB) prefix(index uint64, key []byte) []byte {
return []byte(fmt.Sprintf("%d/%s", index, Encode(key)))
return []byte(fmt.Sprintf("%d/%s", index, hex.FromBytes(key).Unwrap()))
}

// ExtractIndex extracts the index from a prefixed key.
Expand All @@ -116,12 +149,3 @@ func ExtractIndex(prefixedKey []byte) (uint64, error) {
//#nosec:g
return index, nil
}

// Encode encodes b as a hex string with 0x prefix.
func Encode(b []byte) string {
//nolint:mnd // its okay.
enc := make([]byte, len(b)*2+2)
copy(enc, "0x")
hex.Encode(enc[2:], b)
return string(enc)
}
128 changes: 0 additions & 128 deletions mod/storage/pkg/filedb/range_db_pruner_test.go

This file was deleted.

Loading

0 comments on commit e76d896

Please sign in to comment.