Skip to content

Commit

Permalink
consumer session WIP - finished SessionFailure, providerBlock and fix…
Browse files Browse the repository at this point in the history
…ed GetSession
  • Loading branch information
ranlavanet committed Oct 23, 2022
1 parent 8cbb664 commit 10d2f6e
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 56 deletions.
160 changes: 113 additions & 47 deletions relayer/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/rand"
"strconv"
"sync"
"sync/atomic"

sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
"github.com/lavanet/lava/utils"
Expand All @@ -18,9 +19,9 @@ type ConsumerSessionManager struct {
// pairingAdressess for Data reliability
pairingAdressess []string // contains all addressess from the initial pairing.
// providerBlockList + validAddressess == pairingAdressess (while locked)
validAddressess []string // contains all addressess that are currently valid
providerBlockList []string // contains all currently blocked providers, reseted upon epoch change.
addedToPurgeAndReport []string // list of purged providers to report for QoS unavailability.
validAddressess []string // contains all addressess that are currently valid
providerBlockList map[string]bool // contains all currently blocked providers, reseted upon epoch change. (easier to search maps.)
addedToPurgeAndReport map[string]bool // list of purged providers to report for QoS unavailability. (easier to search maps.)

// pairingPurge - contains all pairings that are unwanted this epoch, keeps them in memory in order to avoid release.
// (if a consumer session still uses one of them or we want to report it.)
Expand All @@ -47,9 +48,9 @@ func (cs *ConsumerSessionManager) UpdateAllProviders(ctx context.Context, epoch

// Reset States
cs.validAddressess = make([]string, pairingListLength)
cs.providerBlockList = make([]string, 0)
cs.providerBlockList = make(map[string]bool, 0)
cs.pairingAdressess = make([]string, pairingListLength)
cs.addedToPurgeAndReport = make([]string, 0)
cs.addedToPurgeAndReport = make(map[string]bool, 0)

// Reset the pairingPurge.
// This happens only after an entire epoch. so its impossible to have session connected to the old purged list
Expand All @@ -64,9 +65,9 @@ func (cs *ConsumerSessionManager) UpdateAllProviders(ctx context.Context, epoch
return nil
}

// reads cs.currentEpoch atomically
func (cs *ConsumerSessionManager) atomicReadCurrentEpoch() (epoch uint64) {
// TODO_RAN: populate
return 0
return atomic.LoadUint64(&cs.currentEpoch)
}

// 0. lock pairing for Read only - dont forget to release upon failiures
Expand All @@ -91,9 +92,9 @@ func (cs *ConsumerSessionManager) GetSession(ctx context.Context, cuNeededForSes
// Get a valid consumerSessionWithProvider
consumerSessionWithProvider, providerAddress, sessionEpoch, err := cs.getValidConsumerSessionsWithProvider(providersThatAreNotBlockedYetButWeDontWantToGetSessionsWith, cuNeededForSession)
if err != nil {
if PairingListEmpty.Is(err) {
if PairingListEmptyError.Is(err) {
return nil, 0, err
} else if MaxComputeUnitsExceeded.Is(err) {
} else if MaxComputeUnitsExceededError.Is(err) {
// This provider doesnt have enough compute units for this session, we block it for this session and continue to another provider.
providersThatAreNotBlockedYetButWeDontWantToGetSessionsWith[providerAddress] = false
continue
Expand All @@ -106,8 +107,14 @@ func (cs *ConsumerSessionManager) GetSession(ctx context.Context, cuNeededForSes
connected, endpoint, err := consumerSessionWithProvider.fetchEndpointConnectionFromConsumerSessionWithProvider(ctx, sessionEpoch)
if err != nil {
// verify err is AllProviderEndpointsDisabled and report.
if AllProviderEndpointsDisabled.Is(err) {
cs.providerBlock(providerAddress, true) // reporting and blocking provider this epoch
if AllProviderEndpointsDisabledError.Is(err) {
err = cs.providerBlock(providerAddress, true, sessionEpoch) // reporting and blocking provider this epoch
if err != nil {
if !EpochMismatchError.Is(err) {
// only acceptable error is EpochMismatchError so if different, throw fatal
utils.LavaFormatFatal("Unsupported Error", err, nil)
}
}
continue
} else {
utils.LavaFormatFatal("Unsupported Error", err, nil)
Expand All @@ -120,7 +127,7 @@ func (cs *ConsumerSessionManager) GetSession(ctx context.Context, cuNeededForSes
// Get session from endpoint or create new or continue. if more than 10 connections are open.
consumerSession, err := consumerSessionWithProvider.getConsumerSessionInstanceFromEndpoint(endpoint)
if err != nil {
if MaximumNumberOfSessionsExceeded.Is(err) {
if MaximumNumberOfSessionsExceededError.Is(err) {
// we can get a different provider, adding this provider to the list of providers to skip on.
providersThatAreNotBlockedYetButWeDontWantToGetSessionsWith[providerAddress] = false
continue
Expand All @@ -132,7 +139,7 @@ func (cs *ConsumerSessionManager) GetSession(ctx context.Context, cuNeededForSes
// If we successfully got a consumerSession we can apply the current CU to the consumerSessionWithProvider.UsedComputeUnits
err = consumerSessionWithProvider.addUsedComputeUnits(cuNeededForSession)
if err != nil {
if MaxComputeUnitsExceeded.Is(err) {
if MaxComputeUnitsExceededError.Is(err) {
providersThatAreNotBlockedYetButWeDontWantToGetSessionsWith[providerAddress] = false
// We must unlock the consumer session before continuing.
consumerSession.lock.Unlock()
Expand All @@ -154,7 +161,7 @@ func (cs *ConsumerSessionManager) getValidProviderAddress(ignoredProvidersList m
validAddressessLength := len(cs.validAddressess)
totalValidLength := validAddressessLength - ignoredProvidersListLength
if totalValidLength <= 0 {
err = sdkerrors.Wrapf(PairingListEmpty, "lookup - cs.validAddressess is empty")
err = sdkerrors.Wrapf(PairingListEmptyError, "lookup - cs.validAddressess is empty")
return
}
validAddressIndex := rand.Intn(totalValidLength) // get the N'th valid provider index, only valid providers will increase the addressIndex counter
Expand Down Expand Up @@ -185,53 +192,112 @@ func (cs *ConsumerSessionManager) getValidConsumerSessionsWithProvider(ignoredPr
return
}

// report a failure with the provider.
func (cs *ConsumerSessionManager) providerBlock(address string, reportProvider bool) error {
// read currentEpoch atomic if its the same we need to lock and read again.
// validate errorReceived, some errors will blocklist some will not. if epoch is not older than currentEpoch.
// checks here for anything changed while waiting for lock (epoch / pairing doesnt excisits anymore etc..)
// validate the error
func (cs *ConsumerSessionManager) removeAddressFromValidAddressess(address string) error {
if cs.lock.TryLock() {
// if we managed to lock throw an error for misuse.
defer cs.lock.Unlock()
return sdkerrors.Wrapf(LockMisUseDetectedError, "cs.lock must be locked before accessing this method")
}

// cs Must be Locked here.
for idx, addr := range cs.validAddressess {
if addr == address {
// remove the index from the valid list.
cs.validAddressess = append(cs.validAddressess[:idx], cs.validAddressess[idx+1:]...)
return nil
}
}
return AddressIndexWasNotFoundError
}

// Report a failure with the provider.
// read currentEpoch atomic if its the same we need to lock and read again.
// validate errorReceived, some errors will blocklist some will not. if epoch is not older than currentEpoch.
// checks here for anything changed while waiting for lock (epoch / pairing doesnt excisits anymore etc..).
// validate the error.
// validate the provider is not already blocked as two sessions can report same provider at the same time.
func (cs *ConsumerSessionManager) providerBlock(address string, reportProvider bool, sessionEpoch uint64) error {
// find Index of the address
if sessionEpoch != cs.atomicReadCurrentEpoch() { // we read here atomicly so cs.currentEpoch cant change in the middle, so we can save time if epochs mismatch
return EpochMismatchError
}

cs.lock.Lock() // we lock RW here because we need to make sure nothing changes while we verify validAddressess/addedToPurgeAndReport/providerBlockList
defer cs.lock.Unlock()
if sessionEpoch != cs.currentEpoch { // After we lock we need to verify again that the epoch didnt change while we waited for the lock.
return EpochMismatchError
}

err := cs.removeAddressFromValidAddressess(address)
if err != nil {
return err
}

if reportProvider { // Report provider flow
if _, ok := cs.addedToPurgeAndReport[address]; !ok { // verify it does'nt exist already
cs.addedToPurgeAndReport[address] = true
}
}
if _, ok := cs.providerBlockList[address]; !ok { // verify it does'nt exist already
cs.providerBlockList[address] = true
}

// validate the provider is not already blocked as two sessions can report same provider at the same time
return nil
}

// 1. if we failed we need to update the session UsedComputeUnits. -> lock RelayerClientWrapper to modify it
// 2. clientSession.blocklisted = true
// 3. report provider if needed. check cases.
// unlock clientSession.
func (cs *ConsumerSessionManager) SessionFailure(clientSession *ConsumerSession, errorReceived error) error {
// clientSession must be locked when getting here. verify.
// clientSession must be locked when getting here.
if clientSession.lock.TryLock() { // verify.
// if we managed to lock throw an error for misuse.
defer clientSession.lock.Unlock()
return sdkerrors.Wrapf(LockMisUseDetectedError, "clientSession.lock must be locked before accessing this method, additional info:", errorReceived)
}

// client Session should be locked here. so we can just apply the session failure here.
if clientSession.blocklisted {
// if client session is already blocklisted return an error.
return utils.LavaFormatError("trying to report a session failure of a blocklisted client session", nil, &map[string]string{"clientSession.blocklisted": strconv.FormatBool(clientSession.blocklisted)})
return sdkerrors.Wrapf(SessionIsAlreadyBlockListedErrror, "trying to report a session failure of a blocklisted client session", &map[string]string{"clientSession.blocklisted": strconv.FormatBool(clientSession.blocklisted)})
}
// 1. if we failed we need to update the session UsedComputeUnits. -> lock RelayerClientWrapper to modify it
// 2. clientSession.blocklisted = true
// 3. report provider if needed. check cases.
// unlock clientSession.
return nil
}

// get a session from a specific provider, pairing must be locked before accessing here.
func (cs *ConsumerSessionManager) getSessionFromAProvider(address string, cuNeeded uint64) (clinetSession *ConsumerSession, epoch uint64, err error) {
// get session inner function, to get a session from a specific provider.
// get address from -> pairing map[string]*RelayerClientWrapper ->
// choose a endpoint for the provider: similar to findPairing.
// for _, session := range wrap.Sessions {
// if session.Endpoint != endpoint {
// //skip sessions that don't belong to the active connection
// continue
// }
// if session.Lock.TryLock() {
// return session
// }
// }

return nil, cs.currentEpoch, nil // TODO_RAN: switch cs.currentEpoch to atomic read
clientSession.blocklisted = true // block this session from future usages
cuToDecrease := clientSession.latestRelayCu

// finished with clientSession here can unlock.
clientSession.lock.Unlock() // we unlock before we change anything in the parent ConsumerSessionsWithProvider

err := clientSession.client.decreaseUsedComputeUnits(cuToDecrease) // change the cu in parent
if err != nil {
return err
}

// check if need to block & report
var blockProvider, reportProvider bool
if ReportAndBlockProviderError.Is(errorReceived) {
blockProvider = true
reportProvider = true
} else if BlockProviderError.Is(errorReceived) {
blockProvider = true
}
if blockProvider {
publicProviderAddress, pairingEpoch := clientSession.client.getPublicLavaAddressAndPairingEpoch()
err = cs.providerBlock(publicProviderAddress, reportProvider, pairingEpoch)
if err != nil {
return err
}
}
return nil
}

// get a session from the pool except a specific providers
func (cs *ConsumerSessionManager) GetSessionFromAllExcept(bannedAddresses []string, cuNeeded uint64, bannedAddressessEpoch uint64) (clinetSession *ConsumerSession, epoch uint64, err error) {
// get a session from the pool except specific providers
func (cs *ConsumerSessionManager) GetSessionFromAllExcept(ctx context.Context, bannedAddresses []string, cuNeeded uint64, bannedAddressessEpoch uint64) (clinetSession *ConsumerSession, epoch uint64, err error) {
// if bannedAddressessEpoch != current epoch, we just return GetSession. locks...
if bannedAddressessEpoch != cs.atomicReadCurrentEpoch() {
return cs.GetSession(ctx, cuNeeded)
}

// similar to GetSession code. (they should have same inner function)
return nil, cs.currentEpoch, nil
Expand Down
9 changes: 9 additions & 0 deletions relayer/lavasession/consumer_session_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package lavasession_test

import (
"testing"
)

func TestConsumerSessionManager(t *testing.T) {

}
17 changes: 12 additions & 5 deletions relayer/lavasession/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,16 @@ import (
)

var (
PairingListEmpty = sdkerrors.New("pairingListEmpty Error", 665, "No pairings available.") // client couldnt connect to any provider.
UnreachableCodeError = sdkerrors.New("UnreachableCode Error", 666, "Should not get here.")
AllProviderEndpointsDisabled = sdkerrors.New("AllProviderEndpointsDisabled Error", 667, "All endpoints are not available.") // a provider is completly unresponsive all endpoints are not available
MaximumNumberOfSessionsExceeded = sdkerrors.New("MaximumNumberOfSessionsExceeded Error", 668, "Provider reached maximum number of active sessions.")
MaxComputeUnitsExceeded = sdkerrors.New("MaxComputeUnitsExceeded Error", 669, "Consumer is trying to exceed the maximum number of compute uints available.")
PairingListEmptyError = sdkerrors.New("pairingListEmpty Error", 665, "No pairings available.") // client couldnt connect to any provider.
UnreachableCodeError = sdkerrors.New("UnreachableCode Error", 666, "Should not get here.")
AllProviderEndpointsDisabledError = sdkerrors.New("AllProviderEndpointsDisabled Error", 667, "All endpoints are not available.") // a provider is completly unresponsive all endpoints are not available
MaximumNumberOfSessionsExceededError = sdkerrors.New("MaximumNumberOfSessionsExceeded Error", 668, "Provider reached maximum number of active sessions.")
MaxComputeUnitsExceededError = sdkerrors.New("MaxComputeUnitsExceeded Error", 669, "Consumer is trying to exceed the maximum number of compute uints available.")
EpochMismatchError = sdkerrors.New("ReportingAnOldEpoch Error", 670, "Tried to Report to an older epoch")
AddressIndexWasNotFoundError = sdkerrors.New("AddressIndexWasNotFound Error", 671, "address index was not found in list")
LockMisUseDetectedError = sdkerrors.New("LockMisUseDetected Error", 672, "Faulty use of locks detected")
SessionIsAlreadyBlockListedErrror = sdkerrors.New("SessionIsAlreadyBlockListed Error", 673, "Session is already in block list")
NegativeComputeUnitsAmountError = sdkerrors.New("NegativeComputeUnitsAmount", 674, "Tried to substruct to negative compute units amount")
ReportAndBlockProviderError = sdkerrors.New("ReportAndBlockProvider Error", 675, "Report and block the provider")
BlockProviderError = sdkerrors.New("BlockProvider Error", 676, "Block the provider")
)
25 changes: 21 additions & 4 deletions relayer/lavasession/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,18 @@ type ConsumerSessionsWithProvider struct {
PairingEpoch uint64
}

func (cswp *ConsumerSessionsWithProvider) getPublicLavaAddressAndPairingEpoch() (string, uint64) {
cswp.Lock.Lock() // TODO: change to RLock when LavaMutex is chagned
defer cswp.Lock.Unlock()
return cswp.Acc, cswp.PairingEpoch
}

// Validate the compute units for this provider
func (cswp *ConsumerSessionsWithProvider) validateComputeUnits(cu uint64) error {
cswp.Lock.Lock()
defer cswp.Lock.Unlock()
if (cswp.UsedComputeUnits + cu) > cswp.MaxComputeUnits {
return MaxComputeUnitsExceeded
return MaxComputeUnitsExceededError
}
return nil
}
Expand All @@ -73,12 +79,23 @@ func (cswp *ConsumerSessionsWithProvider) addUsedComputeUnits(cu uint64) error {
cswp.Lock.Lock()
defer cswp.Lock.Unlock()
if (cswp.UsedComputeUnits + cu) > cswp.MaxComputeUnits {
return MaxComputeUnitsExceeded
return MaxComputeUnitsExceededError
}
cswp.UsedComputeUnits += cu
return nil
}

// Validate and add the compute units for this provider
func (cswp *ConsumerSessionsWithProvider) decreaseUsedComputeUnits(cu uint64) error {
cswp.Lock.Lock()
defer cswp.Lock.Unlock()
if (cswp.UsedComputeUnits - cu) < 0 {
return NegativeComputeUnitsAmountError
}
cswp.UsedComputeUnits -= cu
return nil
}

func (cswp *ConsumerSessionsWithProvider) connectRawClient(ctx context.Context, addr string) (*pairingtypes.RelayerClient, error) {
connectCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
Expand Down Expand Up @@ -110,7 +127,7 @@ func (cswp *ConsumerSessionsWithProvider) getConsumerSessionInstanceFromEndpoint
}
// No Sessions available, create a new session or return an error upon maximum sessions allowed
if len(cswp.Sessions) > MaxSessionsAllowedPerProvider {
return nil, MaximumNumberOfSessionsExceeded
return nil, MaximumNumberOfSessionsExceededError
}

randomSessId := int64(0)
Expand Down Expand Up @@ -178,7 +195,7 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes
if allDisabled {
utils.LavaFormatError("purging provider after all endpoints are disabled", nil, &map[string]string{"provider endpoints": fmt.Sprintf("%v", cswp.Endpoints), "provider address": cswp.Acc})
// report provider.
return connected, endpointPtr, AllProviderEndpointsDisabled
return connected, endpointPtr, AllProviderEndpointsDisabledError
}

return connected, endpointPtr, nil
Expand Down

0 comments on commit 10d2f6e

Please sign in to comment.