forked from hakki37/lava
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumer_types.go
543 lines (483 loc) · 21.8 KB
/
consumer_types.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
package lavasession
import (
"context"
"math"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/lavanet/lava/protocol/provideroptimizer"
"github.com/lavanet/lava/utils"
"github.com/lavanet/lava/utils/rand"
pairingtypes "github.com/lavanet/lava/x/pairing/types"
planstypes "github.com/lavanet/lava/x/plans/types"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)
const AllowInsecureConnectionToProvidersFlag = "allow-insecure-provider-dialing"
var AllowInsecureConnectionToProviders = false
type SessionInfo struct {
Session *SingleConsumerSession
StakeSize sdk.Coin
QoSSummeryResult sdk.Dec // using ComputeQoS to get the total QOS
Epoch uint64
ReportedProviders []*pairingtypes.ReportedProvider
}
type ConsumerSessionsMap map[string]*SessionInfo
type ProviderOptimizer interface {
AppendProbeRelayData(providerAddress string, latency time.Duration, success bool)
AppendRelayFailure(providerAddress string)
AppendRelayData(providerAddress string, latency time.Duration, isHangingApi bool, cu, syncBlock uint64)
ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64, perturbationPercentage float64) (addresses []string)
GetExcellenceQoSReportForProvider(string) *pairingtypes.QualityOfServiceReport
Strategy() provideroptimizer.Strategy
}
type ignoredProviders struct {
providers map[string]struct{}
currentEpoch uint64
}
type QoSReport struct {
LastQoSReport *pairingtypes.QualityOfServiceReport
LastExcellenceQoSReport *pairingtypes.QualityOfServiceReport
LatencyScoreList []sdk.Dec
SyncScoreSum int64
TotalSyncScore int64
TotalRelays uint64
AnsweredRelays uint64
}
type SingleConsumerSession struct {
CuSum uint64
LatestRelayCu uint64 // set by GetSessions cuNeededForSession
QoSInfo QoSReport
SessionId int64
Parent *ConsumerSessionsWithProvider
lock utils.LavaMutex
RelayNum uint64
LatestBlock int64
Endpoint *Endpoint
BlockListed bool // if session lost sync we blacklist it.
ConsecutiveErrors []error
errorsCount uint64
}
type DataReliabilitySession struct {
SingleConsumerSession *SingleConsumerSession
Epoch uint64
ProviderPublicAddress string
UniqueIdentifier bool
}
type Endpoint struct {
NetworkAddress string // change at the end to NetworkAddress
Enabled bool
Client *pairingtypes.RelayerClient
connection *grpc.ClientConn
ConnectionRefusals uint64
Addons map[string]struct{}
Extensions map[string]struct{}
Geolocation planstypes.Geolocation
}
type SessionWithProvider struct {
SessionsWithProvider *ConsumerSessionsWithProvider
CurrentEpoch uint64
}
type SessionWithProviderMap map[string]*SessionWithProvider
type RPCEndpoint struct {
NetworkAddress string `yaml:"network-address,omitempty" json:"network-address,omitempty" mapstructure:"network-address"` // HOST:PORT
ChainID string `yaml:"chain-id,omitempty" json:"chain-id,omitempty" mapstructure:"chain-id"` // spec chain identifier
ApiInterface string `yaml:"api-interface,omitempty" json:"api-interface,omitempty" mapstructure:"api-interface"`
TLSEnabled bool `yaml:"tls-enabled,omitempty" json:"tls-enabled,omitempty" mapstructure:"tls-enabled"`
HealthCheckPath string `yaml:"health-check-path,omitempty" json:"health-check-path,omitempty" mapstructure:"health-check-path"` // health check status code 200 path, default is "/"
Geolocation uint64 `yaml:"geolocation,omitempty" json:"geolocation,omitempty" mapstructure:"geolocation"`
}
func (endpoint *RPCEndpoint) String() (retStr string) {
retStr = endpoint.ChainID + ":" + endpoint.ApiInterface + " Network Address:" + endpoint.NetworkAddress + " Geolocation:" + strconv.FormatUint(endpoint.Geolocation, 10)
return
}
func (rpce *RPCEndpoint) New(address, chainID, apiInterface string, geolocation uint64) *RPCEndpoint {
// TODO: validate correct url address
rpce.NetworkAddress = address
rpce.ChainID = chainID
rpce.ApiInterface = apiInterface
rpce.Geolocation = geolocation
return rpce
}
func (rpce *RPCEndpoint) Key() string {
return rpce.ChainID + rpce.ApiInterface
}
type ConsumerSessionsWithProvider struct {
Lock sync.RWMutex
PublicLavaAddress string
Endpoints []*Endpoint
Sessions map[int64]*SingleConsumerSession
MaxComputeUnits uint64
UsedComputeUnits uint64
PairingEpoch uint64
// whether we already reported this provider this epoch, we can only report one conflict per provider per epoch
conflictFoundAndReported uint32 // 0 == not reported, 1 == reported
stakeSize sdk.Coin // the stake size the provider staked
}
func NewConsumerSessionWithProvider(publicLavaAddress string, pairingEndpoints []*Endpoint, maxCu uint64, epoch uint64, stakeSize sdk.Coin) *ConsumerSessionsWithProvider {
return &ConsumerSessionsWithProvider{
PublicLavaAddress: publicLavaAddress,
Endpoints: pairingEndpoints,
Sessions: map[int64]*SingleConsumerSession{},
MaxComputeUnits: maxCu,
PairingEpoch: epoch,
stakeSize: stakeSize,
}
}
func (cswp *ConsumerSessionsWithProvider) atomicReadConflictReported() bool {
return atomic.LoadUint32(&cswp.conflictFoundAndReported) == 1
}
func (cswp *ConsumerSessionsWithProvider) atomicWriteConflictReported() {
atomic.StoreUint32(&cswp.conflictFoundAndReported, 1) // we can only set conflict to "reported".
}
// checking if this provider was reported this epoch already, as we can only report once per epoch
func (cswp *ConsumerSessionsWithProvider) ConflictAlreadyReported() bool {
// returns true if reported, false if not.
return cswp.atomicReadConflictReported()
}
// setting this provider as conflict reported.
func (cswp *ConsumerSessionsWithProvider) StoreConflictReported() {
cswp.atomicWriteConflictReported()
}
func (cswp *ConsumerSessionsWithProvider) IsSupportingAddon(addon string) bool {
cswp.Lock.RLock()
defer cswp.Lock.RUnlock()
if addon == "" {
return true
}
for _, endpoint := range cswp.Endpoints {
if _, ok := endpoint.Addons[addon]; ok {
return true
}
}
return false
}
func (cswp *ConsumerSessionsWithProvider) IsSupportingExtensions(extensions []string) bool {
cswp.Lock.RLock()
defer cswp.Lock.RUnlock()
endpointLoop:
for _, endpoint := range cswp.Endpoints {
for _, extension := range extensions {
if _, ok := endpoint.Extensions[extension]; !ok {
// doesn;t support the extension required, continue to next endpoint
continue endpointLoop
}
}
// get here only if all extensions are supported in the endpoint
return true
}
return false
}
func (cswp *ConsumerSessionsWithProvider) atomicReadUsedComputeUnits() uint64 {
return atomic.LoadUint64(&cswp.UsedComputeUnits)
}
// verify data reliability session exists or not
func (cswp *ConsumerSessionsWithProvider) verifyDataReliabilitySessionWasNotAlreadyCreated() (singleConsumerSession *SingleConsumerSession, pairingEpoch uint64, err error) {
cswp.Lock.RLock()
defer cswp.Lock.RUnlock()
if dataReliabilitySession, ok := cswp.Sessions[DataReliabilitySessionId]; ok { // check if we already have a data reliability session.
// validate our relay number reached the data reliability relay number limit
if dataReliabilitySession.RelayNum >= DataReliabilityRelayNumber {
return nil, cswp.PairingEpoch, DataReliabilityAlreadySentThisEpochError
}
dataReliabilitySession.lock.Lock() // lock before returning.
return dataReliabilitySession, cswp.PairingEpoch, nil
}
return nil, cswp.PairingEpoch, NoDataReliabilitySessionWasCreatedError
}
// get a data reliability session from an endpoint
func (cswp *ConsumerSessionsWithProvider) getDataReliabilitySingleConsumerSession(endpoint *Endpoint) (singleConsumerSession *SingleConsumerSession, pairingEpoch uint64, err error) {
cswp.Lock.Lock()
defer cswp.Lock.Unlock()
// we re validate the data reliability session now that we are locked.
if dataReliabilitySession, ok := cswp.Sessions[DataReliabilitySessionId]; ok { // check if we already have a data reliability session.
if dataReliabilitySession.RelayNum >= DataReliabilityRelayNumber {
return nil, cswp.PairingEpoch, DataReliabilityAlreadySentThisEpochError
}
// we already have the dr session. so return it.
return dataReliabilitySession, cswp.PairingEpoch, nil
}
singleDataReliabilitySession := &SingleConsumerSession{
SessionId: DataReliabilitySessionId,
Parent: cswp,
Endpoint: endpoint,
RelayNum: 0,
}
singleDataReliabilitySession.lock.Lock() // we must lock the session so other requests wont get it.
cswp.Sessions[singleDataReliabilitySession.SessionId] = singleDataReliabilitySession // applying the session to the pool of sessions.
return singleDataReliabilitySession, cswp.PairingEpoch, nil
}
func (cswp *ConsumerSessionsWithProvider) GetPairingEpoch() uint64 {
return atomic.LoadUint64(&cswp.PairingEpoch)
}
func (cswp *ConsumerSessionsWithProvider) getPublicLavaAddressAndPairingEpoch() (string, uint64) {
cswp.Lock.RLock()
defer cswp.Lock.RUnlock()
return cswp.PublicLavaAddress, cswp.PairingEpoch
}
// Validate the compute units for this provider
func (cswp *ConsumerSessionsWithProvider) validateComputeUnits(cu uint64, virtualEpoch uint64) error {
cswp.Lock.RLock()
defer cswp.Lock.RUnlock()
// add additional CU for virtual epochs
if (cswp.UsedComputeUnits + cu) > cswp.MaxComputeUnits*(virtualEpoch+1) {
return utils.LavaFormatWarning("validateComputeUnits", MaxComputeUnitsExceededError,
utils.Attribute{Key: "cu", Value: cswp.UsedComputeUnits + cu},
utils.Attribute{Key: "maxCu", Value: cswp.MaxComputeUnits * (virtualEpoch + 1)},
utils.Attribute{Key: "virtualEpoch", Value: virtualEpoch},
)
}
return nil
}
// Validate and add the compute units for this provider
func (cswp *ConsumerSessionsWithProvider) addUsedComputeUnits(cu, virtualEpoch uint64) error {
cswp.Lock.Lock()
defer cswp.Lock.Unlock()
// add additional CU for virtual epochs
if (cswp.UsedComputeUnits + cu) > cswp.MaxComputeUnits*(virtualEpoch+1) {
return MaxComputeUnitsExceededError
}
cswp.UsedComputeUnits += cu
return nil
}
// Validate and add the compute units for this provider
func (cswp *ConsumerSessionsWithProvider) getProviderStakeSize() sdk.Coin {
cswp.Lock.RLock()
defer cswp.Lock.RUnlock()
return cswp.stakeSize
}
// Validate and add the compute units for this provider
func (cswp *ConsumerSessionsWithProvider) decreaseUsedComputeUnits(cu uint64) error {
cswp.Lock.Lock()
defer cswp.Lock.Unlock()
if cswp.UsedComputeUnits < cu {
return NegativeComputeUnitsAmountError
}
cswp.UsedComputeUnits -= cu
return nil
}
func (cswp *ConsumerSessionsWithProvider) ConnectRawClientWithTimeout(ctx context.Context, addr string) (*pairingtypes.RelayerClient, *grpc.ClientConn, error) {
connectCtx, cancel := context.WithTimeout(ctx, TimeoutForEstablishingAConnection)
defer cancel()
conn, err := ConnectgRPCClient(connectCtx, addr, AllowInsecureConnectionToProviders)
if err != nil {
return nil, nil, err
}
ch := make(chan bool)
go func() {
for {
// Check if the connection state is not Connecting
if conn.GetState() == connectivity.Ready {
ch <- true
return
}
// Add some delay to avoid busy-waiting
time.Sleep(20 * time.Millisecond)
}
}()
select {
case <-connectCtx.Done():
case <-ch:
}
c := pairingtypes.NewRelayerClient(conn)
return &c, conn, nil
}
func (cswp *ConsumerSessionsWithProvider) GetConsumerSessionInstanceFromEndpoint(endpoint *Endpoint, numberOfResets uint64) (singleConsumerSession *SingleConsumerSession, pairingEpoch uint64, err error) {
// TODO: validate that the endpoint even belongs to the ConsumerSessionsWithProvider and is enabled.
// Multiply numberOfReset +1 by MaxAllowedBlockListedSessionPerProvider as every reset needs to allow more blocked sessions allowed.
maximumBlockedSessionsAllowed := MaxAllowedBlockListedSessionPerProvider * (numberOfResets + 1) // +1 as we start from 0
cswp.Lock.Lock()
defer cswp.Lock.Unlock()
// try to lock an existing session, if can't create a new one
var numberOfBlockedSessions uint64 = 0
for sessionID, session := range cswp.Sessions {
if sessionID == DataReliabilitySessionId {
continue // we cant use the data reliability session. which is located at key DataReliabilitySessionId
}
if session.Endpoint != endpoint {
// skip sessions that don't belong to the active connection
continue
}
if numberOfBlockedSessions >= maximumBlockedSessionsAllowed {
return nil, 0, MaximumNumberOfBlockListedSessionsError
}
if session.lock.TryLock() {
if session.BlockListed { // this session cannot be used.
numberOfBlockedSessions += 1 // increase the number of blocked sessions so we can block this provider is too many are blocklisted
session.lock.Unlock()
continue
}
// if we locked the session its available to use, otherwise someone else is already using it
return session, cswp.PairingEpoch, nil
}
}
// No Sessions available, create a new session or return an error upon maximum sessions allowed
if len(cswp.Sessions) > MaxSessionsAllowedPerProvider {
return nil, 0, MaximumNumberOfSessionsExceededError
}
randomSessionId := int64(0)
for randomSessionId == 0 { // we don't allow 0
randomSessionId = rand.Int63()
}
consumerSession := &SingleConsumerSession{
SessionId: randomSessionId,
Parent: cswp,
Endpoint: endpoint,
}
consumerSession.lock.Lock() // we must lock the session so other requests wont get it.
cswp.Sessions[consumerSession.SessionId] = consumerSession // applying the session to the pool of sessions.
return consumerSession, cswp.PairingEpoch, nil
}
// fetching an endpoint from a ConsumerSessionWithProvider and establishing a connection,
// can fail without an error if trying to connect once to each endpoint but none of them are active.
func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSessionWithProvider(ctx context.Context) (connected bool, endpointPtr *Endpoint, providerAddress string, err error) {
getConnectionFromConsumerSessionsWithProvider := func(ctx context.Context) (connected bool, endpointPtr *Endpoint, allDisabled bool) {
cswp.Lock.Lock()
defer cswp.Lock.Unlock()
for idx, endpoint := range cswp.Endpoints {
if !endpoint.Enabled {
continue
}
connectEndpoint := func(cswp *ConsumerSessionsWithProvider, ctx context.Context, endpoint *Endpoint) (connected_ bool) {
if endpoint.Client != nil && endpoint.connection != nil && endpoint.connection.GetState() != connectivity.Shutdown && endpoint.connection.GetState() != connectivity.Idle {
return true
}
client, conn, err := cswp.ConnectRawClientWithTimeout(ctx, endpoint.NetworkAddress)
if err != nil {
endpoint.ConnectionRefusals++
utils.LavaFormatInfo("error connecting to provider", utils.LogAttr("err", err), utils.Attribute{Key: "provider endpoint", Value: endpoint.NetworkAddress}, utils.Attribute{Key: "provider address", Value: cswp.PublicLavaAddress}, utils.Attribute{Key: "endpoint", Value: endpoint}, utils.Attribute{Key: "refusals", Value: endpoint.ConnectionRefusals})
if endpoint.ConnectionRefusals >= MaxConsecutiveConnectionAttempts {
endpoint.Enabled = false
utils.LavaFormatWarning("disabling provider endpoint for the duration of current epoch.", nil, utils.Attribute{Key: "Endpoint", Value: endpoint.NetworkAddress}, utils.Attribute{Key: "address", Value: cswp.PublicLavaAddress})
}
return false
}
endpoint.ConnectionRefusals = 0
endpoint.Client = client
if endpoint.connection != nil {
endpoint.connection.Close() // just to be safe
}
endpoint.connection = conn
return true
}
endpointState := connectivity.Idle
if endpoint.connection != nil {
endpointState = endpoint.connection.GetState()
}
if endpoint.Client == nil {
connected_ := connectEndpoint(cswp, ctx, endpoint)
if !connected_ {
continue
}
} else if endpointState == connectivity.Shutdown || endpointState == connectivity.Idle {
// connection was shut down, so we need to create a new one
endpoint.connection.Close()
connected_ := connectEndpoint(cswp, ctx, endpoint)
if !connected_ {
continue
}
} else if endpointState == connectivity.TransientFailure || endpointState == connectivity.Connecting {
// can't use this one right now, but we could in the future
continue
}
cswp.Endpoints[idx] = endpoint
return true, endpoint, false
}
// checking disabled endpoints, as we can disable an endpoint mid run of the previous loop, we should re test the current endpoint state
// before verifying all are Disabled.
allDisabled = true
for _, endpoint := range cswp.Endpoints {
if !endpoint.Enabled {
continue
}
// even one endpoint is enough for us to not purge.
allDisabled = false
}
return false, nil, allDisabled
}
var allDisabled bool
connected, endpointPtr, allDisabled = getConnectionFromConsumerSessionsWithProvider(ctx)
if allDisabled {
utils.LavaFormatInfo("purging provider after all endpoints are disabled", utils.Attribute{Key: "provider endpoints", Value: cswp.Endpoints}, utils.Attribute{Key: "provider address", Value: cswp.PublicLavaAddress})
// report provider.
return connected, endpointPtr, cswp.PublicLavaAddress, AllProviderEndpointsDisabledError
}
return connected, endpointPtr, cswp.PublicLavaAddress, nil
}
// returns the expected latency to a threshold.
func (cs *SingleConsumerSession) CalculateExpectedLatency(timeoutGivenToRelay time.Duration) time.Duration {
expectedLatency := (timeoutGivenToRelay / 2)
return expectedLatency
}
// cs should be locked here to use this method, returns the computed qos or zero if last qos is nil or failed to compute.
func (cs *SingleConsumerSession) getQosComputedResultOrZero() sdk.Dec {
if cs.QoSInfo.LastExcellenceQoSReport != nil {
qosComputed, errComputing := cs.QoSInfo.LastExcellenceQoSReport.ComputeQoSExcellence()
if errComputing == nil { // if we failed to compute the qos will be 0 so this provider wont be picked to return the error in case we get it
return qosComputed
}
utils.LavaFormatError("Failed computing QoS used for error parsing", errComputing, utils.LogAttr("Report", cs.QoSInfo.LastExcellenceQoSReport))
}
return sdk.ZeroDec()
}
func (cs *SingleConsumerSession) CalculateQoS(latency, expectedLatency time.Duration, blockHeightDiff int64, numOfProviders int, servicersToCount int64) {
// Add current Session QoS
cs.QoSInfo.TotalRelays++ // increase total relays
cs.QoSInfo.AnsweredRelays++ // increase answered relays
if cs.QoSInfo.LastQoSReport == nil {
cs.QoSInfo.LastQoSReport = &pairingtypes.QualityOfServiceReport{}
}
downtimePercentage, scaledAvailabilityScore := CalculateAvailabilityScore(&cs.QoSInfo)
cs.QoSInfo.LastQoSReport.Availability = scaledAvailabilityScore
if sdk.OneDec().GT(cs.QoSInfo.LastQoSReport.Availability) {
utils.LavaFormatInfo("QoS Availability report", utils.Attribute{Key: "Availability", Value: cs.QoSInfo.LastQoSReport.Availability}, utils.Attribute{Key: "down percent", Value: downtimePercentage})
}
latencyScore := sdk.MinDec(sdk.OneDec(), sdk.NewDecFromInt(sdk.NewInt(int64(expectedLatency))).Quo(sdk.NewDecFromInt(sdk.NewInt(int64(latency)))))
insertSorted := func(list []sdk.Dec, value sdk.Dec) []sdk.Dec {
index := sort.Search(len(list), func(i int) bool {
return list[i].GTE(value)
})
if len(list) == index { // nil or empty slice or after last element
return append(list, value)
}
list = append(list[:index+1], list[index:]...) // index < len(a)
list[index] = value
return list
}
cs.QoSInfo.LatencyScoreList = insertSorted(cs.QoSInfo.LatencyScoreList, latencyScore)
cs.QoSInfo.LastQoSReport.Latency = cs.QoSInfo.LatencyScoreList[int(float64(len(cs.QoSInfo.LatencyScoreList))*PercentileToCalculateLatency)]
// checking if we have enough information to calculate the sync score for the providers, if we haven't talked
// with enough providers we don't have enough information and we will wait to have more information before setting the sync score
shouldCalculateSyncScore := int64(numOfProviders) > int64(math.Ceil(float64(servicersToCount)*MinProvidersForSync))
if shouldCalculateSyncScore { //
if blockHeightDiff <= 0 { // if the diff is bigger than 0 than the block is too old (blockHeightDiff = expected - allowedLag - blockHeight) and we don't give him the score
cs.QoSInfo.SyncScoreSum++
}
cs.QoSInfo.TotalSyncScore++
cs.QoSInfo.LastQoSReport.Sync = sdk.NewDec(cs.QoSInfo.SyncScoreSum).QuoInt64(cs.QoSInfo.TotalSyncScore)
if sdk.OneDec().GT(cs.QoSInfo.LastQoSReport.Sync) {
utils.LavaFormatDebug("QoS Sync report",
utils.Attribute{Key: "Sync", Value: cs.QoSInfo.LastQoSReport.Sync},
utils.Attribute{Key: "block diff", Value: blockHeightDiff},
utils.Attribute{Key: "sync score", Value: strconv.FormatInt(cs.QoSInfo.SyncScoreSum, 10) + "/" + strconv.FormatInt(cs.QoSInfo.TotalSyncScore, 10)},
utils.Attribute{Key: "session_id", Value: cs.SessionId},
utils.Attribute{Key: "provider", Value: cs.Parent.PublicLavaAddress},
)
}
} else {
// we prefer to give them a score of 1 when there is no other data, since otherwise we damage their payments
cs.QoSInfo.LastQoSReport.Sync = sdk.NewDec(1)
}
}
func CalculateAvailabilityScore(qosReport *QoSReport) (downtimePercentageRet, scaledAvailabilityScoreRet sdk.Dec) {
downtimePercentage := sdk.NewDecWithPrec(int64(qosReport.TotalRelays-qosReport.AnsweredRelays), 0).Quo(sdk.NewDecWithPrec(int64(qosReport.TotalRelays), 0))
scaledAvailabilityScore := sdk.MaxDec(sdk.ZeroDec(), AvailabilityPercentage.Sub(downtimePercentage).Quo(AvailabilityPercentage))
return downtimePercentage, scaledAvailabilityScore
}
// validate if this is a data reliability session
func (scs *SingleConsumerSession) IsDataReliabilitySession() bool {
return scs.SessionId <= DataReliabilitySessionId
}