Skip to content

Commit

Permalink
PRT-41 lower QOS score on failed data reliability sessions (lavanet#184)
Browse files Browse the repository at this point in the history
* PRT-175 - fixing the portal caching system

* PRT-175 - comment edit

* PRT-41 lower QOS on DR failures

* PRT-40 updating QOS score on successful DR sessions

* PRT-41 - fixing PR issues.

* PRT-41 linter fix

* PRT-41 linter fix
  • Loading branch information
ranlavanet authored Dec 19, 2022
1 parent 2c88407 commit da4c707
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 78 deletions.
28 changes: 28 additions & 0 deletions docs/static/openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29843,6 +29843,8 @@ paths:
type: string
vrfpk:
type: string
moniker:
type: string
pagination:
type: object
properties:
Expand Down Expand Up @@ -30007,6 +30009,8 @@ paths:
type: string
vrfpk:
type: string
moniker:
type: string
default:
description: An unexpected error response.
schema:
Expand Down Expand Up @@ -30086,6 +30090,8 @@ paths:
type: string
vrfpk:
type: string
moniker:
type: string
output:
type: string
default:
Expand Down Expand Up @@ -30382,6 +30388,8 @@ paths:
type: string
vrfpk:
type: string
moniker:
type: string
default:
description: An unexpected error response.
schema:
Expand Down Expand Up @@ -30732,6 +30740,8 @@ paths:
type: string
vrfpk:
type: string
moniker:
type: string
output:
type: string
default:
Expand Down Expand Up @@ -30984,6 +30994,8 @@ paths:
type: string
vrfpk:
type: string
moniker:
type: string
maxCU:
type: string
format: uint64
Expand Down Expand Up @@ -53085,6 +53097,8 @@ definitions:
type: string
vrfpk:
type: string
moniker:
type: string
pagination:
type: object
properties:
Expand Down Expand Up @@ -53193,6 +53207,8 @@ definitions:
type: string
vrfpk:
type: string
moniker:
type: string
lavanet.lava.epochstorage.QueryParamsResponse:
type: object
properties:
Expand Down Expand Up @@ -53252,6 +53268,8 @@ definitions:
type: string
vrfpk:
type: string
moniker:
type: string
lavanet.lava.epochstorage.StakeStorage:
type: object
properties:
Expand Down Expand Up @@ -53301,6 +53319,8 @@ definitions:
type: string
vrfpk:
type: string
moniker:
type: string
lavanet.lava.pairing.CacheUsage:
type: object
properties:
Expand Down Expand Up @@ -53617,6 +53637,8 @@ definitions:
type: string
vrfpk:
type: string
moniker:
type: string
output:
type: string
lavanet.lava.pairing.QueryGetEpochPaymentsResponse:
Expand Down Expand Up @@ -53701,6 +53723,8 @@ definitions:
type: string
vrfpk:
type: string
moniker:
type: string
lavanet.lava.pairing.QueryGetProviderPaymentStorageResponse:
type: object
properties:
Expand Down Expand Up @@ -53827,6 +53851,8 @@ definitions:
type: string
vrfpk:
type: string
moniker:
type: string
output:
type: string
lavanet.lava.pairing.QueryUserEntryResponse:
Expand Down Expand Up @@ -53874,6 +53900,8 @@ definitions:
type: string
vrfpk:
type: string
moniker:
type: string
maxCU:
type: string
format: uint64
Expand Down
43 changes: 26 additions & 17 deletions relayer/chainproxy/chainproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func SendRelay(
}
// consumerSession is locked here.

callback_send_relay := func(consumerSession *lavasession.SingleConsumerSession) (*pairingtypes.RelayReply, *pairingtypes.Relayer_RelaySubscribeClient, *pairingtypes.RelayRequest, time.Duration, error) {
callback_send_relay := func(consumerSession *lavasession.SingleConsumerSession) (*pairingtypes.RelayReply, *pairingtypes.Relayer_RelaySubscribeClient, *pairingtypes.RelayRequest, time.Duration, bool, error) {
// client session is locked here
blockHeight = int64(epoch) // epochs heights only

Expand All @@ -141,18 +141,18 @@ func SendRelay(
}
sig, err := sigs.SignRelay(privKey, *relayRequest)
if err != nil {
return nil, nil, nil, 0, err
return nil, nil, nil, 0, false, err
}
relayRequest.Sig = sig
c := *consumerSession.Endpoint.Client

relaySentTime := time.Now()
connectCtx, cancel := context.WithTimeout(ctx, DefaultTimeout)
defer cancel()

var replyServer pairingtypes.Relayer_RelaySubscribeClient
var reply *pairingtypes.RelayReply

relaySentTime := time.Now()
if isSubscription {
replyServer, err = c.RelaySubscribe(ctx, relayRequest)
} else {
Expand All @@ -163,11 +163,15 @@ func SendRelay(
utils.LavaFormatError("cache not connected", err, nil)
}
reply, err = c.Relay(connectCtx, relayRequest)
} else {
// Info was fetched from cache, so we need to change the state
// so we can return here, no need to update anything and calculate as this info was fetched from the cache
return reply, nil, relayRequest, 0, true, nil
}
}
currentLatency := time.Since(relaySentTime)
if err != nil {
return nil, nil, nil, 0, err
return nil, nil, nil, 0, false, err
}

if !isSubscription {
Expand All @@ -176,22 +180,22 @@ func SendRelay(
finalized := cp.GetSentry().IsFinalizedBlock(relayRequest.RequestBlock, reply.LatestBlock)
err = VerifyRelayReply(reply, relayRequest, providerPublicAddress, cp.GetSentry().GetSpecComparesHashes())
if err != nil {
return nil, nil, nil, 0, err
return nil, nil, nil, 0, false, err
}
cache := cp.GetCache()
// TODO: response sanity, check its under an expected format add that format to spec
cache.SetEntry(ctx, relayRequest, cp.GetSentry().ApiInterface, nil, cp.GetSentry().ChainID, dappID, reply, finalized) // caching in the portal doesn't care about hashes
return reply, nil, relayRequest, currentLatency, nil
return reply, nil, relayRequest, currentLatency, false, nil
}
// isSubscription
return reply, &replyServer, relayRequest, currentLatency, nil
return reply, &replyServer, relayRequest, currentLatency, false, nil
}

callback_send_reliability := func(consumerSession *lavasession.SingleConsumerSession, dataReliability *pairingtypes.VRFData, providerAddress string) (*pairingtypes.RelayReply, *pairingtypes.RelayRequest, error) {
callback_send_reliability := func(consumerSession *lavasession.SingleConsumerSession, dataReliability *pairingtypes.VRFData, providerAddress string) (*pairingtypes.RelayReply, *pairingtypes.RelayRequest, time.Duration, error) {
// client session is locked here
sentry := cp.GetSentry()
if blockHeight < 0 {
return nil, nil, fmt.Errorf("expected callback_send_relay to be called first and set blockHeight")
return nil, nil, 0, fmt.Errorf("expected callback_send_relay to be called first and set blockHeight")
}

relayRequest := &pairingtypes.RelayRequest{
Expand All @@ -212,30 +216,31 @@ func SendRelay(

sig, err := sigs.SignRelay(privKey, *relayRequest)
if err != nil {
return nil, nil, err
return nil, nil, 0, err
}
relayRequest.Sig = sig

sig, err = sigs.SignVRFData(privKey, relayRequest.DataReliability)
if err != nil {
return nil, nil, err
return nil, nil, 0, err
}
relayRequest.DataReliability.Sig = sig
c := *consumerSession.Endpoint.Client
relaySentTime := time.Now()
reply, err := c.Relay(ctx, relayRequest)
if err != nil {
return nil, nil, err
return nil, nil, 0, err
}

currentLatency := time.Since(relaySentTime)
err = VerifyRelayReply(reply, relayRequest, providerAddress, cp.GetSentry().GetSpecComparesHashes())
if err != nil {
return nil, nil, err
return nil, nil, 0, err
}

return reply, relayRequest, nil
return reply, relayRequest, currentLatency, nil
}

reply, replyServer, relayLatency, firstSessionError := cp.GetSentry().SendRelay(ctx, singleConsumerSession, epoch, providerPublicAddress, callback_send_relay, callback_send_reliability, nodeMsg.GetServiceApi().Category)
reply, replyServer, relayLatency, isCachedResult, firstSessionError := cp.GetSentry().SendRelay(ctx, singleConsumerSession, epoch, providerPublicAddress, callback_send_relay, callback_send_reliability, nodeMsg.GetServiceApi().Category)
if firstSessionError != nil {
// on session failure here
errReport := cp.GetConsumerSessionManager().OnSessionFailure(singleConsumerSession, firstSessionError)
Expand All @@ -250,7 +255,7 @@ func SendRelay(
return nil, nil, utils.LavaFormatError("relay_retry_attempt - Failed to get a second session from a different provider", nil, &map[string]string{"Original Error": firstSessionError.Error(), "GetSessionFromAllExcept Error": err.Error(), "ChainID": cp.GetSentry().ChainID, "Original_Provider_Address": originalProviderAddress})
}
var secondSessionError error
reply, replyServer, relayLatency, secondSessionError = cp.GetSentry().SendRelay(ctx, singleConsumerSession, epoch, providerPublicAddress, callback_send_relay, callback_send_reliability, nodeMsg.GetServiceApi().Category)
reply, replyServer, relayLatency, isCachedResult, secondSessionError = cp.GetSentry().SendRelay(ctx, singleConsumerSession, epoch, providerPublicAddress, callback_send_relay, callback_send_reliability, nodeMsg.GetServiceApi().Category)
if secondSessionError != nil {
errReport = cp.GetConsumerSessionManager().OnSessionFailure(singleConsumerSession, secondSessionError)
if errReport != nil {
Expand All @@ -270,6 +275,10 @@ func SendRelay(
}
}
if !isSubscription {
if isCachedResult {
err = cp.GetConsumerSessionManager().OnSessionUnUsed(singleConsumerSession)
return reply, replyServer, err
}
latestBlock := reply.LatestBlock
expectedBH, numOfProviders := cp.GetSentry().ExpectedBlockHeight()
err = cp.GetConsumerSessionManager().OnSessionDone(singleConsumerSession, epoch, latestBlock, nodeMsg.GetServiceApi().ComputeUnits, relayLatency, expectedBH, numOfProviders, cp.GetSentry().GetProvidersCount()) // session done successfully
Expand Down
Loading

0 comments on commit da4c707

Please sign in to comment.