Skip to content

Commit

Permalink
adding logs and adjusting sync loss related errors (lavanet#421)
Browse files Browse the repository at this point in the history
  • Loading branch information
ranlavanet authored Apr 13, 2023
1 parent 15db311 commit d20462a
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 5 deletions.
7 changes: 7 additions & 0 deletions protocol/lavasession/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"time"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/gogo/status"
"google.golang.org/grpc/codes"
)

const (
Expand Down Expand Up @@ -33,3 +35,8 @@ const (
StaleEpochDistance = 3 // relays done 3 epochs back are ready to be rewarded

)

func IsSessionSyncLoss(err error) bool {
code := status.Code(err)
return code == codes.Code(SessionOutOfSyncError.ABCICode())
}
81 changes: 81 additions & 0 deletions protocol/lavasession/end_to_end_lavasession_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package lavasession

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestHappyFlowE2E(t *testing.T) {
s := createGRPCServer(t) // create a grpcServer so we can connect to its endpoint and validate everything works.
defer s.Stop() // stop the server when finished.
ctx := context.Background()

// Consumer Side:
csm := CreateConsumerSessionManager() // consumer session manager

// Provider Side:
psm := initProviderSessionManager() // provider session manager

// Consumer Side:
cswpList := make(map[uint64]*ConsumerSessionsWithProvider, 1)
pairingEndpoints := make([]*Endpoint, 1)
// we need a grpc server to connect to. so we use the public rpc endpoint for now.
pairingEndpoints[0] = &Endpoint{NetworkAddress: grpcListener, Enabled: true, Client: nil, ConnectionRefusals: 0}
cswpList[0] = &ConsumerSessionsWithProvider{
PublicLavaAddress: "provider",
Endpoints: pairingEndpoints,
Sessions: map[int64]*SingleConsumerSession{},
MaxComputeUnits: 200,
ReliabilitySent: false,
PairingEpoch: epoch1,
}
err := csm.UpdateAllProviders(epoch1, cswpList) // update the providers.
require.NoError(t, err)
// get single consumer session
cs, epoch, _, _, err := csm.GetSession(ctx, cuForFirstRequest, nil) // get a session
require.Nil(t, err)
require.NotNil(t, cs)
require.Equal(t, epoch, csm.currentEpoch)
require.Equal(t, cs.LatestRelayCu, uint64(cuForFirstRequest))

// Provider Side:

sps, err := psm.GetSession(ctx, consumerOneAddress, uint64(cs.Client.PairingEpoch), uint64(cs.SessionId), cs.RelayNum)
// validate expected results
require.Empty(t, psm.sessionsWithAllConsumers)
require.Nil(t, sps)
require.Error(t, err)
require.True(t, ConsumerNotRegisteredYet.Is(err))
// expect session to be missing, so we need to register it for the first time
sps, err = psm.RegisterProviderSessionWithConsumer(ctx, consumerOneAddress, uint64(cs.Client.PairingEpoch), uint64(cs.SessionId), cs.RelayNum, cs.Client.MaxComputeUnits, selfProviderIndex)
// validate session was added
require.NotEmpty(t, psm.sessionsWithAllConsumers)
require.Nil(t, err)
require.NotNil(t, sps)

// prepare session for usage
err = sps.PrepareSessionForUsage(ctx, cuForFirstRequest, cs.LatestRelayCu, 0)

// validate session was prepared successfully
require.Nil(t, err)
require.Equal(t, sps.userSessionsParent.atomicReadProviderIndex(), selfProviderIndex)
require.Equal(t, cs.LatestRelayCu, sps.LatestRelayCu)
require.Equal(t, sps.CuSum, cs.LatestRelayCu)
require.Equal(t, sps.SessionID, uint64(cs.SessionId))
require.Equal(t, sps.PairingEpoch, cs.Client.PairingEpoch)

err = psm.OnSessionDone(sps, cs.RelayNum)
require.Equal(t, sps.RelayNum, cs.RelayNum)
require.NoError(t, err)

// Consumer Side:
err = csm.OnSessionDone(cs, epoch1, servicedBlockNumber, cuForFirstRequest, time.Duration(time.Millisecond), cs.CalculateExpectedLatency(2*time.Duration(time.Millisecond)), (servicedBlockNumber - 1), 1, 1)
require.Nil(t, err)
require.Equal(t, cs.CuSum, cuForFirstRequest)
require.Equal(t, cs.LatestRelayCu, latestRelayCuAfterDone)
require.Equal(t, cs.RelayNum, relayNumberAfterFirstCall)
require.Equal(t, cs.LatestBlock, servicedBlockNumber)
}
2 changes: 1 addition & 1 deletion protocol/lavasession/provider_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (psm *ProviderSessionManager) getActiveConsumer(epoch uint64, address strin
}

func (psm *ProviderSessionManager) getSessionFromAnActiveConsumer(ctx context.Context, providerSessionsWithConsumer *ProviderSessionsWithConsumer, sessionId uint64, epoch uint64) (singleProviderSession *SingleProviderSession, err error) {
session, err := providerSessionsWithConsumer.GetExistingSession(ctx, sessionId)
session, err := providerSessionsWithConsumer.getExistingSession(ctx, sessionId)
if err == nil {
return session, nil
} else if SessionDoesNotExist.Is(err) {
Expand Down
2 changes: 1 addition & 1 deletion protocol/lavasession/provider_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (pswc *ProviderSessionsWithConsumer) createNewSingleProviderSession(ctx con
}

// this function returns the session locked to be used
func (pswc *ProviderSessionsWithConsumer) GetExistingSession(ctx context.Context, sessionId uint64) (session *SingleProviderSession, err error) {
func (pswc *ProviderSessionsWithConsumer) getExistingSession(ctx context.Context, sessionId uint64) (session *SingleProviderSession, err error) {
pswc.Lock.RLock()
defer pswc.Lock.RUnlock()
if session, ok := pswc.Sessions[sessionId]; ok {
Expand Down
2 changes: 1 addition & 1 deletion protocol/lavasession/single_provider_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (sps *SingleProviderSession) tryLockForUse(ctx context.Context) error {
return nil
}
occupyingGuid := sps.GetOccupyingGuid()
return utils.LavaFormatError("tryLockForUse failure", LockMisUseDetectedError, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "occupyingGuid", Value: occupyingGuid})
return utils.LavaFormatError("tryLockForUse failure", SessionOutOfSyncError, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "Error", Value: LockMisUseDetectedError}, utils.Attribute{Key: "occupyingGuid", Value: occupyingGuid})
}

func (sps *SingleProviderSession) GetOccupyingGuid() uint64 {
Expand Down
10 changes: 8 additions & 2 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

const (
MaxRelayRetries = 3
MaxRelayRetries = 4
)

// implements Relay Sender interfaced and uses an ChainListener to get it called
Expand Down Expand Up @@ -105,11 +105,17 @@ func (rpccs *RPCConsumerServer) SendRelay(
relayRequestData := lavaprotocol.NewRelayData(ctx, connectionType, url, []byte(req), chainMessage.RequestedBlock(), rpccs.listenEndpoint.ApiInterface)
relayResults := []*lavaprotocol.RelayResult{}
relayErrors := []error{}
blockOnSyncLoss := true
for retries := 0; retries < MaxRelayRetries; retries++ {
// TODO: make this async between different providers
relayResult, err := rpccs.sendRelayToProvider(ctx, chainMessage, relayRequestData, dappID, &unwantedProviders)
if relayResult.ProviderAddress != "" {
unwantedProviders[relayResult.ProviderAddress] = struct{}{}
if blockOnSyncLoss && lavasession.IsSessionSyncLoss(err) {
utils.LavaFormatDebug("Identified SyncLoss in provider, not removing it from list for another attempt", utils.Attribute{Key: "address", Value: relayResult.ProviderAddress})
blockOnSyncLoss = false // on the first sync loss no need to block the provider. give it another chance
} else {
unwantedProviders[relayResult.ProviderAddress] = struct{}{}
}
}
if err != nil {
relayErrors = append(relayErrors, err)
Expand Down
7 changes: 7 additions & 0 deletions protocol/rpcprovider/rpcprovider_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,13 @@ func (rpcps *RPCProviderServer) Relay(ctx context.Context, request *pairingtypes
}
}
}
utils.LavaFormatDebug("Provider returned a relay response",
utils.Attribute{Key: "GUID", Value: ctx},
utils.Attribute{Key: "request.SessionId", Value: request.RelaySession.SessionId},
utils.Attribute{Key: "request.relayNumber", Value: request.RelaySession.RelayNum},
utils.Attribute{Key: "request.cu", Value: request.RelaySession.CuSum},
utils.Attribute{Key: "relay_timeout", Value: common.GetRemainingTimeoutFromContext(ctx)},
)
return reply, rpcps.handleRelayErrorStatus(err)
}

Expand Down

0 comments on commit d20462a

Please sign in to comment.