Skip to content

Commit

Permalink
moving all updaters to its own package for better visibility (lavanet…
Browse files Browse the repository at this point in the history
  • Loading branch information
ranlavanet authored Dec 21, 2023
1 parent 8d3a735 commit 1814450
Show file tree
Hide file tree
Showing 24 changed files with 123 additions and 115 deletions.
7 changes: 3 additions & 4 deletions ecosystem/lavavisor/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,17 @@ import (
lvutil "github.com/lavanet/lava/ecosystem/lavavisor/pkg/util"
"github.com/lavanet/lava/utils/rand"

"github.com/lavanet/lava/protocol/statetracker"

"github.com/lavanet/lava/protocol/chainlib"
"github.com/lavanet/lava/protocol/statetracker/updaters"
"github.com/lavanet/lava/utils"
protocoltypes "github.com/lavanet/lava/x/protocol/types"
"github.com/spf13/cobra"
"gopkg.in/yaml.v2"
)

type LavavisorStateTrackerInf interface {
RegisterForVersionUpdates(ctx context.Context, version *protocoltypes.Version, versionValidator statetracker.VersionValidationInf)
GetProtocolVersion(ctx context.Context) (*statetracker.ProtocolVersionResponse, error)
RegisterForVersionUpdates(ctx context.Context, version *protocoltypes.Version, versionValidator updaters.VersionValidationInf)
GetProtocolVersion(ctx context.Context) (*updaters.ProtocolVersionResponse, error)
}

type LavaVisor struct {
Expand Down
4 changes: 2 additions & 2 deletions ecosystem/lavavisor/pkg/process/version_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"time"

"github.com/lavanet/lava/protocol/common"
"github.com/lavanet/lava/protocol/statetracker"
"github.com/lavanet/lava/protocol/statetracker/updaters"
"github.com/lavanet/lava/utils"
protocoltypes "github.com/lavanet/lava/x/protocol/types"
)
Expand Down Expand Up @@ -193,7 +193,7 @@ func (vm *VersionMonitor) validateLinkPointsToTheRightTarget() error {
return err
}

func (vm *VersionMonitor) ValidateProtocolVersion(incoming *statetracker.ProtocolVersionResponse) error {
func (vm *VersionMonitor) ValidateProtocolVersion(incoming *updaters.ProtocolVersionResponse) error {
if !vm.lock.TryLock() { // if an upgrade is currently ongoing we don't need to check versions. just wait for the flow to end.
utils.LavaFormatDebug("[Lavavisor] ValidateProtocolVersion is locked, assuming upgrade is ongoing")
return nil
Expand Down
18 changes: 9 additions & 9 deletions ecosystem/lavavisor/pkg/state/lavavisor_state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import (
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/tx"
"github.com/lavanet/lava/protocol/chaintracker"
"github.com/lavanet/lava/protocol/statetracker"
"github.com/lavanet/lava/protocol/statetracker/updaters"
"github.com/lavanet/lava/utils"
protocoltypes "github.com/lavanet/lava/x/protocol/types"
spectypes "github.com/lavanet/lava/x/spec/types"
)

// Lava visor doesn't require complicated state tracker, it just needs to periodically fetch the protocol version.
type LavaVisorStateTracker struct {
stateQuery *statetracker.StateQuery
stateQuery *updaters.StateQuery
averageBlockTime time.Duration
ticker *time.Ticker
versionUpdater *LavaVisorVersionUpdater
Expand All @@ -37,20 +37,20 @@ func NewLavaVisorStateTracker(ctx context.Context, txFactory tx.Factory, clientC
if err != nil {
utils.LavaFormatFatal("blockchain missing LAV1 spec, cant initialize lavavisor", err)
}
for i := 0; i < statetracker.BlockResultRetry && err != nil; i++ {
for i := 0; i < updaters.BlockResultRetry && err != nil; i++ {
specResponse, err = specQueryClient.Spec(ctx, &spectypes.QueryGetSpecRequest{
ChainID: "LAV1",
})
}

lst := &LavaVisorStateTracker{stateQuery: statetracker.NewStateQuery(ctx, clientCtx), averageBlockTime: time.Duration(specResponse.Spec.AverageBlockTime) * time.Millisecond}
lst := &LavaVisorStateTracker{stateQuery: updaters.NewStateQuery(ctx, clientCtx), averageBlockTime: time.Duration(specResponse.Spec.AverageBlockTime) * time.Millisecond}
return lst, nil
}

func (lst *LavaVisorStateTracker) RegisterForVersionUpdates(ctx context.Context, version *protocoltypes.Version, versionValidator statetracker.VersionValidationInf) {
lst.versionUpdater = &LavaVisorVersionUpdater{VersionUpdater: statetracker.VersionUpdater{
func (lst *LavaVisorStateTracker) RegisterForVersionUpdates(ctx context.Context, version *protocoltypes.Version, versionValidator updaters.VersionValidationInf) {
lst.versionUpdater = &LavaVisorVersionUpdater{VersionUpdater: updaters.VersionUpdater{
VersionStateQuery: lst.stateQuery,
LastKnownVersion: &statetracker.ProtocolVersionResponse{Version: version, BlockNumber: "uninitialized"},
LastKnownVersion: &updaters.ProtocolVersionResponse{Version: version, BlockNumber: "uninitialized"},
VersionValidationInf: versionValidator,
}}
lst.ticker = time.NewTicker(lst.averageBlockTime)
Expand All @@ -69,12 +69,12 @@ func (lst *LavaVisorStateTracker) RegisterForVersionUpdates(ctx context.Context,
}()
}

func (lst *LavaVisorStateTracker) GetProtocolVersion(ctx context.Context) (*statetracker.ProtocolVersionResponse, error) {
func (lst *LavaVisorStateTracker) GetProtocolVersion(ctx context.Context) (*updaters.ProtocolVersionResponse, error) {
return lst.stateQuery.GetProtocolVersion(ctx)
}

type LavaVisorVersionUpdater struct {
statetracker.VersionUpdater
updaters.VersionUpdater
}

// monitor protocol version on each new block, this method overloads the update
Expand Down
25 changes: 13 additions & 12 deletions protocol/badgegenerator/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/lavanet/lava/protocol/chaintracker"
"github.com/lavanet/lava/protocol/lavasession"
"github.com/lavanet/lava/protocol/statetracker"
"github.com/lavanet/lava/protocol/statetracker/updaters"
"github.com/lavanet/lava/utils"
)

Expand All @@ -18,7 +19,7 @@ import (
const AddBlockDelayForEpochUpdaterBadgeServer = 2

type BadgeStateTracker struct {
stateQuery *statetracker.EpochStateQuery
stateQuery *updaters.EpochStateQuery
*statetracker.StateTracker
statetracker.ConsumerEmergencyTrackerInf
}
Expand All @@ -31,8 +32,8 @@ func NewBadgeStateTracker(ctx context.Context, clientCtx cosmosclient.Context, c
if err != nil {
return nil, err
}
sq := statetracker.NewStateQuery(ctx, clientCtx)
esq := statetracker.NewEpochStateQuery(sq)
sq := updaters.NewStateQuery(ctx, clientCtx)
esq := updaters.NewEpochStateQuery(sq)

pst := &BadgeStateTracker{StateTracker: stateTrackerBase, stateQuery: esq, ConsumerEmergencyTrackerInf: emergencyTracker}

Expand All @@ -41,10 +42,10 @@ func NewBadgeStateTracker(ctx context.Context, clientCtx cosmosclient.Context, c
return pst, err
}

func (st *BadgeStateTracker) RegisterForEpochUpdates(ctx context.Context, epochUpdatable statetracker.EpochUpdatable) {
epochUpdater := statetracker.NewEpochUpdater(st.stateQuery)
func (st *BadgeStateTracker) RegisterForEpochUpdates(ctx context.Context, epochUpdatable updaters.EpochUpdatable) {
epochUpdater := updaters.NewEpochUpdater(st.stateQuery)
epochUpdaterRaw := st.StateTracker.RegisterForUpdates(ctx, epochUpdater)
epochUpdater, ok := epochUpdaterRaw.(*statetracker.EpochUpdater)
epochUpdater, ok := epochUpdaterRaw.(*updaters.EpochUpdater)
if !ok {
err := fmt.Errorf("invalid type")
utils.LavaFormatFatal("invalid updater type returned from RegisterForUpdates", err)
Expand All @@ -53,22 +54,22 @@ func (st *BadgeStateTracker) RegisterForEpochUpdates(ctx context.Context, epochU
epochUpdater.RegisterEpochUpdatable(ctx, epochUpdatable, AddBlockDelayForEpochUpdaterBadgeServer)
}

func (st *BadgeStateTracker) RegisterForSpecUpdates(ctx context.Context, specUpdatable statetracker.SpecUpdatable, endpoint lavasession.RPCEndpoint) error {
func (st *BadgeStateTracker) RegisterForSpecUpdates(ctx context.Context, specUpdatable updaters.SpecUpdatable, endpoint lavasession.RPCEndpoint) error {
// register for spec updates sets spec and updates when a spec has been modified
specUpdater := statetracker.NewSpecUpdater(endpoint.ChainID, st.stateQuery, st.EventTracker)
specUpdater := updaters.NewSpecUpdater(endpoint.ChainID, st.stateQuery, st.EventTracker)
specUpdaterRaw := st.StateTracker.RegisterForUpdates(ctx, specUpdater)
specUpdater, ok := specUpdaterRaw.(*statetracker.SpecUpdater)
specUpdater, ok := specUpdaterRaw.(*updaters.SpecUpdater)
if !ok {
utils.LavaFormatFatal("invalid updater type returned from RegisterForUpdates", nil, utils.Attribute{Key: "updater", Value: specUpdaterRaw})
}
return specUpdater.RegisterSpecUpdatable(ctx, &specUpdatable, endpoint)
}

func (st *BadgeStateTracker) RegisterForDowntimeParamsUpdates(ctx context.Context, downtimeParamsUpdatable statetracker.DowntimeParamsUpdatable) error {
func (st *BadgeStateTracker) RegisterForDowntimeParamsUpdates(ctx context.Context, downtimeParamsUpdatable updaters.DowntimeParamsUpdatable) error {
// register for downtimeParams updates sets downtimeParams and updates when downtimeParams has been changed
downtimeParamsUpdater := statetracker.NewDowntimeParamsUpdater(st.stateQuery, st.EventTracker)
downtimeParamsUpdater := updaters.NewDowntimeParamsUpdater(st.stateQuery, st.EventTracker)
downtimeParamsUpdaterRaw := st.StateTracker.RegisterForUpdates(ctx, downtimeParamsUpdater)
downtimeParamsUpdater, ok := downtimeParamsUpdaterRaw.(*statetracker.DowntimeParamsUpdater)
downtimeParamsUpdater, ok := downtimeParamsUpdaterRaw.(*updaters.DowntimeParamsUpdater)
if !ok {
utils.LavaFormatFatal("invalid updater type returned from RegisterForUpdates", nil, utils.Attribute{Key: "updater", Value: downtimeParamsUpdaterRaw})
}
Expand Down
11 changes: 6 additions & 5 deletions protocol/rpcconsumer/rpcconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/lavanet/lava/protocol/performance"
"github.com/lavanet/lava/protocol/provideroptimizer"
"github.com/lavanet/lava/protocol/statetracker"
"github.com/lavanet/lava/protocol/statetracker/updaters"
"github.com/lavanet/lava/protocol/upgrade"
"github.com/lavanet/lava/utils"
"github.com/lavanet/lava/utils/rand"
Expand Down Expand Up @@ -80,14 +81,14 @@ func (s *strategyValue) Type() string {
}

type ConsumerStateTrackerInf interface {
RegisterForVersionUpdates(ctx context.Context, version *protocoltypes.Version, versionValidator statetracker.VersionValidationInf)
RegisterForVersionUpdates(ctx context.Context, version *protocoltypes.Version, versionValidator updaters.VersionValidationInf)
RegisterConsumerSessionManagerForPairingUpdates(ctx context.Context, consumerSessionManager *lavasession.ConsumerSessionManager)
RegisterForSpecUpdates(ctx context.Context, specUpdatable statetracker.SpecUpdatable, endpoint lavasession.RPCEndpoint) error
RegisterForSpecUpdates(ctx context.Context, specUpdatable updaters.SpecUpdatable, endpoint lavasession.RPCEndpoint) error
RegisterFinalizationConsensusForUpdates(context.Context, *lavaprotocol.FinalizationConsensus)
RegisterForDowntimeParamsUpdates(ctx context.Context, downtimeParamsUpdatable statetracker.DowntimeParamsUpdatable) error
RegisterForDowntimeParamsUpdates(ctx context.Context, downtimeParamsUpdatable updaters.DowntimeParamsUpdatable) error
TxConflictDetection(ctx context.Context, finalizationConflict *conflicttypes.FinalizationConflict, responseConflict *conflicttypes.ResponseConflict, sameProviderConflict *conflicttypes.FinalizationConflict, conflictHandler common.ConflictHandlerInterface) error
GetConsumerPolicy(ctx context.Context, consumerAddress, chainID string) (*plantypes.Policy, error)
GetProtocolVersion(ctx context.Context) (*statetracker.ProtocolVersionResponse, error)
GetProtocolVersion(ctx context.Context) (*updaters.ProtocolVersionResponse, error)
GetLatestVirtualEpoch() uint64
}

Expand Down Expand Up @@ -148,7 +149,7 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, txFactory tx.Factory, client
wg.Add(parallelJobs)
errCh := make(chan error)

consumerStateTracker.RegisterForUpdates(ctx, statetracker.NewMetricsUpdater(consumerMetricsManager))
consumerStateTracker.RegisterForUpdates(ctx, updaters.NewMetricsUpdater(consumerMetricsManager))
utils.LavaFormatInfo("RPCConsumer pubkey: " + consumerAddr.String())
utils.LavaFormatInfo("RPCConsumer setting up endpoints", utils.Attribute{Key: "length", Value: strconv.Itoa(parallelJobs)})

Expand Down
4 changes: 2 additions & 2 deletions protocol/rpcconsumer/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
commonlib "github.com/lavanet/lava/protocol/common"
"github.com/lavanet/lava/protocol/lavasession"
"github.com/lavanet/lava/protocol/rpcprovider"
"github.com/lavanet/lava/protocol/statetracker"
"github.com/lavanet/lava/protocol/statetracker/updaters"
"github.com/lavanet/lava/utils"
"github.com/lavanet/lava/utils/rand"
"github.com/spf13/cobra"
Expand All @@ -32,7 +32,7 @@ func startTesting(ctx context.Context, clientCtx client.Context, txFactory tx.Fa
signal.Stop(signalChan)
cancel()
}()
stateQuery := statetracker.NewConsumerStateQuery(ctx, clientCtx)
stateQuery := updaters.NewConsumerStateQuery(ctx, clientCtx)
for _, rpcProviderEndpoint := range rpcEndpoints {
go func(rpcProviderEndpoint *lavasession.RPCProviderEndpoint) error {
chainParser, err := chainlib.NewChainParser(rpcProviderEndpoint.ApiInterface)
Expand Down
19 changes: 10 additions & 9 deletions protocol/rpcprovider/rpcprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/lavanet/lava/protocol/rpcprovider/reliabilitymanager"
"github.com/lavanet/lava/protocol/rpcprovider/rewardserver"
"github.com/lavanet/lava/protocol/statetracker"
"github.com/lavanet/lava/protocol/statetracker/updaters"
"github.com/lavanet/lava/protocol/upgrade"
"github.com/lavanet/lava/utils"
"github.com/lavanet/lava/utils/rand"
Expand All @@ -52,12 +53,12 @@ var (
)

type ProviderStateTrackerInf interface {
RegisterForVersionUpdates(ctx context.Context, version *protocoltypes.Version, versionValidator statetracker.VersionValidationInf)
RegisterForSpecUpdates(ctx context.Context, specUpdatable statetracker.SpecUpdatable, endpoint lavasession.RPCEndpoint) error
RegisterForSpecVerifications(ctx context.Context, specVerifier statetracker.SpecVerifier, endpoint lavasession.RPCEndpoint) error
RegisterReliabilityManagerForVoteUpdates(ctx context.Context, voteUpdatable statetracker.VoteUpdatable, endpointP *lavasession.RPCProviderEndpoint)
RegisterForEpochUpdates(ctx context.Context, epochUpdatable statetracker.EpochUpdatable)
RegisterForDowntimeParamsUpdates(ctx context.Context, downtimeParamsUpdatable statetracker.DowntimeParamsUpdatable) error
RegisterForVersionUpdates(ctx context.Context, version *protocoltypes.Version, versionValidator updaters.VersionValidationInf)
RegisterForSpecUpdates(ctx context.Context, specUpdatable updaters.SpecUpdatable, endpoint lavasession.RPCEndpoint) error
RegisterForSpecVerifications(ctx context.Context, specVerifier updaters.SpecVerifier, endpoint lavasession.RPCEndpoint) error
RegisterReliabilityManagerForVoteUpdates(ctx context.Context, voteUpdatable updaters.VoteUpdatable, endpointP *lavasession.RPCProviderEndpoint)
RegisterForEpochUpdates(ctx context.Context, epochUpdatable updaters.EpochUpdatable)
RegisterForDowntimeParamsUpdates(ctx context.Context, downtimeParamsUpdatable updaters.DowntimeParamsUpdatable) error
TxRelayPayment(ctx context.Context, relayRequests []*pairingtypes.RelaySession, description string, latestBlocks []*pairingtypes.LatestBlockReport) error
SendVoteReveal(voteID string, vote *reliabilitymanager.VoteData) error
SendVoteCommitment(voteID string, vote *reliabilitymanager.VoteData) error
Expand All @@ -66,10 +67,10 @@ type ProviderStateTrackerInf interface {
VerifyPairing(ctx context.Context, consumerAddress, providerAddress string, epoch uint64, chainID string) (valid bool, total int64, projectId string, err error)
GetEpochSize(ctx context.Context) (uint64, error)
EarliestBlockInMemory(ctx context.Context) (uint64, error)
RegisterPaymentUpdatableForPayments(ctx context.Context, paymentUpdatable statetracker.PaymentUpdatable)
RegisterPaymentUpdatableForPayments(ctx context.Context, paymentUpdatable updaters.PaymentUpdatable)
GetRecommendedEpochNumToCollectPayment(ctx context.Context) (uint64, error)
GetEpochSizeMultipliedByRecommendedEpochNumToCollectPayment(ctx context.Context) (uint64, error)
GetProtocolVersion(ctx context.Context) (*statetracker.ProtocolVersionResponse, error)
GetProtocolVersion(ctx context.Context) (*updaters.ProtocolVersionResponse, error)
GetVirtualEpoch(epoch uint64) uint64
GetAverageBlockTime() time.Duration
}
Expand Down Expand Up @@ -115,7 +116,7 @@ func (rpcp *RPCProvider) Start(ctx context.Context, txFactory tx.Factory, client
}

rpcp.providerStateTracker = providerStateTracker
providerStateTracker.RegisterForUpdates(ctx, statetracker.NewMetricsUpdater(rpcp.providerMetricsManager))
providerStateTracker.RegisterForUpdates(ctx, updaters.NewMetricsUpdater(rpcp.providerMetricsManager))
// check version
version, err := rpcp.providerStateTracker.GetProtocolVersion(ctx)
if err != nil {
Expand Down
Loading

0 comments on commit 1814450

Please sign in to comment.