-
Notifications
You must be signed in to change notification settings - Fork 212
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge tag 'v3.2.2' into PRT-v-3-2-2-archive-special-api-near-experime…
…ntal
- Loading branch information
Showing
23 changed files
with
541 additions
and
96 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,281 @@ | ||
package metrics | ||
|
||
import ( | ||
"context" | ||
"os" | ||
"strconv" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
"github.com/goccy/go-json" | ||
"github.com/lavanet/lava/v3/utils" | ||
"github.com/lavanet/lava/v3/utils/rand" | ||
spectypes "github.com/lavanet/lava/v3/x/spec/types" | ||
"golang.org/x/exp/maps" | ||
) | ||
|
||
var ( | ||
OptimizerQosServerPushInterval time.Duration | ||
OptimizerQosServerSamplingInterval time.Duration | ||
) | ||
|
||
type ConsumerOptimizerQoSClient struct { | ||
consumerOrigin string | ||
queueSender *QueueSender | ||
optimizers map[string]OptimizerInf // keys are chain ids | ||
// keys are chain ids, values are maps with provider addresses as keys | ||
chainIdToProviderToRelaysCount map[string]map[string]uint64 | ||
chainIdToProviderToNodeErrorsCount map[string]map[string]uint64 | ||
chainIdToProviderToEpochToStake map[string]map[string]map[uint64]int64 // third key is epoch | ||
currentEpoch atomic.Uint64 | ||
lock sync.RWMutex | ||
} | ||
|
||
type OptimizerQoSReport struct { | ||
ProviderAddress string | ||
SyncScore float64 | ||
AvailabilityScore float64 | ||
LatencyScore float64 | ||
GenericScore float64 | ||
EntryIndex int | ||
} | ||
|
||
type optimizerQoSReportToSend struct { | ||
Timestamp time.Time `json:"timestamp"` | ||
SyncScore float64 `json:"sync_score"` | ||
AvailabilityScore float64 `json:"availability_score"` | ||
LatencyScore float64 `json:"latency_score"` | ||
GenericScore float64 `json:"generic_score"` | ||
ProviderAddress string `json:"provider"` | ||
ConsumerOrigin string `json:"consumer"` | ||
ChainId string `json:"chain_id"` | ||
NodeErrorRate float64 `json:"node_error_rate"` | ||
Epoch uint64 `json:"epoch"` | ||
ProviderStake int64 `json:"provider_stake"` | ||
EntryIndex int `json:"entry_index"` | ||
} | ||
|
||
func (oqosr optimizerQoSReportToSend) String() string { | ||
bytes, err := json.Marshal(oqosr) | ||
if err != nil { | ||
return "" | ||
} | ||
return string(bytes) | ||
} | ||
|
||
type OptimizerInf interface { | ||
CalculateQoSScoresForMetrics(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) []*OptimizerQoSReport | ||
} | ||
|
||
func NewConsumerOptimizerQoSClient(endpointAddress string, interval ...time.Duration) *ConsumerOptimizerQoSClient { | ||
hostname, err := os.Hostname() | ||
if err != nil { | ||
utils.LavaFormatWarning("Error while getting hostname for ConsumerOptimizerQoSClient", err) | ||
hostname = "unknown" + strconv.FormatUint(rand.Uint64(), 10) // random seed for different unknowns | ||
} | ||
|
||
return &ConsumerOptimizerQoSClient{ | ||
consumerOrigin: hostname, | ||
queueSender: NewQueueSender(endpointAddress, "ConsumerOptimizerQoS", nil, interval...), | ||
optimizers: map[string]OptimizerInf{}, | ||
chainIdToProviderToRelaysCount: map[string]map[string]uint64{}, | ||
chainIdToProviderToNodeErrorsCount: map[string]map[string]uint64{}, | ||
chainIdToProviderToEpochToStake: map[string]map[string]map[uint64]int64{}, | ||
} | ||
} | ||
|
||
func (coqc *ConsumerOptimizerQoSClient) getProviderChainMapCounterValue(counterStore map[string]map[string]uint64, chainId, providerAddress string) uint64 { | ||
// must be called under read lock | ||
if counterProvidersMap, found := counterStore[chainId]; found { | ||
return counterProvidersMap[providerAddress] | ||
} | ||
return 0 | ||
} | ||
|
||
func (coqc *ConsumerOptimizerQoSClient) getProviderChainRelaysCount(chainId, providerAddress string) uint64 { | ||
// must be called under read lock | ||
return coqc.getProviderChainMapCounterValue(coqc.chainIdToProviderToRelaysCount, chainId, providerAddress) | ||
} | ||
|
||
func (coqc *ConsumerOptimizerQoSClient) getProviderChainNodeErrorsCount(chainId, providerAddress string) uint64 { | ||
// must be called under read lock | ||
return coqc.getProviderChainMapCounterValue(coqc.chainIdToProviderToNodeErrorsCount, chainId, providerAddress) | ||
} | ||
|
||
func (coqc *ConsumerOptimizerQoSClient) getProviderChainStake(chainId, providerAddress string, epoch uint64) int64 { | ||
// must be called under read lock | ||
if providersMap, found := coqc.chainIdToProviderToEpochToStake[chainId]; found { | ||
if epochMap, found := providersMap[providerAddress]; found { | ||
if stake, found := epochMap[epoch]; found { | ||
return stake | ||
} | ||
} | ||
} | ||
return 0 | ||
} | ||
|
||
func (coqc *ConsumerOptimizerQoSClient) calculateNodeErrorRate(chainId, providerAddress string) float64 { | ||
// must be called under read lock | ||
relaysCount := coqc.getProviderChainRelaysCount(chainId, providerAddress) | ||
if relaysCount > 0 { | ||
errorsCount := coqc.getProviderChainNodeErrorsCount(chainId, providerAddress) | ||
return float64(errorsCount) / float64(relaysCount) | ||
} | ||
|
||
return 0 | ||
} | ||
|
||
func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReport(report *OptimizerQoSReport, chainId string, epoch uint64) { | ||
// must be called under read lock | ||
|
||
optimizerQoSReportToSend := optimizerQoSReportToSend{ | ||
Timestamp: time.Now(), | ||
ConsumerOrigin: coqc.consumerOrigin, | ||
SyncScore: report.SyncScore, | ||
AvailabilityScore: report.AvailabilityScore, | ||
LatencyScore: report.LatencyScore, | ||
GenericScore: report.GenericScore, | ||
ProviderAddress: report.ProviderAddress, | ||
EntryIndex: report.EntryIndex, | ||
ChainId: chainId, | ||
Epoch: epoch, | ||
NodeErrorRate: coqc.calculateNodeErrorRate(chainId, report.ProviderAddress), | ||
ProviderStake: coqc.getProviderChainStake(chainId, report.ProviderAddress, epoch), | ||
} | ||
|
||
coqc.queueSender.appendQueue(optimizerQoSReportToSend) | ||
} | ||
|
||
func (coqc *ConsumerOptimizerQoSClient) getReportsFromOptimizers() { | ||
coqc.lock.RLock() // we only read from the maps here | ||
defer coqc.lock.RUnlock() | ||
|
||
ignoredProviders := map[string]struct{}{} | ||
cu := uint64(10) | ||
requestedBlock := spectypes.LATEST_BLOCK | ||
|
||
currentEpoch := coqc.currentEpoch.Load() | ||
|
||
for chainId, optimizer := range coqc.optimizers { | ||
providersMap, ok := coqc.chainIdToProviderToEpochToStake[chainId] | ||
if !ok { | ||
continue | ||
} | ||
|
||
reports := optimizer.CalculateQoSScoresForMetrics(maps.Keys(providersMap), ignoredProviders, cu, requestedBlock) | ||
for _, report := range reports { | ||
coqc.appendOptimizerQoSReport(report, chainId, currentEpoch) | ||
} | ||
} | ||
} | ||
|
||
func (coqc *ConsumerOptimizerQoSClient) StartOptimizersQoSReportsCollecting(ctx context.Context, samplingInterval time.Duration) { | ||
if coqc == nil { | ||
return | ||
} | ||
|
||
utils.LavaFormatTrace("Starting ConsumerOptimizerQoSClient reports collecting") | ||
go func() { | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
utils.LavaFormatTrace("ConsumerOptimizerQoSClient context done") | ||
return | ||
case <-time.After(samplingInterval): | ||
coqc.getReportsFromOptimizers() | ||
} | ||
} | ||
}() | ||
} | ||
|
||
func (coqc *ConsumerOptimizerQoSClient) RegisterOptimizer(optimizer OptimizerInf, chainId string) { | ||
if coqc == nil { | ||
return | ||
} | ||
|
||
coqc.lock.Lock() | ||
defer coqc.lock.Unlock() | ||
|
||
if _, found := coqc.optimizers[chainId]; found { | ||
utils.LavaFormatWarning("Optimizer already registered for chain", nil, utils.LogAttr("chainId", chainId)) | ||
return | ||
} | ||
|
||
coqc.optimizers[chainId] = optimizer | ||
} | ||
|
||
func (coqc *ConsumerOptimizerQoSClient) incrementStoreCounter(store map[string]map[string]uint64, chainId, providerAddress string) { | ||
// must be called under write lock | ||
if coqc == nil { | ||
return | ||
} | ||
|
||
providersMap, found := store[chainId] | ||
if !found { | ||
store[chainId] = map[string]uint64{providerAddress: 1} | ||
return | ||
} | ||
|
||
count, found := providersMap[providerAddress] | ||
if !found { | ||
store[chainId][providerAddress] = 1 | ||
return | ||
} | ||
|
||
store[chainId][providerAddress] = count + 1 | ||
} | ||
|
||
func (coqc *ConsumerOptimizerQoSClient) SetRelaySentToProvider(providerAddress string, chainId string) { | ||
if coqc == nil { | ||
return | ||
} | ||
|
||
coqc.lock.Lock() | ||
defer coqc.lock.Unlock() | ||
|
||
coqc.incrementStoreCounter(coqc.chainIdToProviderToRelaysCount, chainId, providerAddress) | ||
} | ||
|
||
func (coqc *ConsumerOptimizerQoSClient) SetNodeErrorToProvider(providerAddress string, chainId string) { | ||
if coqc == nil { | ||
return | ||
} | ||
|
||
coqc.lock.Lock() | ||
defer coqc.lock.Unlock() | ||
|
||
coqc.incrementStoreCounter(coqc.chainIdToProviderToNodeErrorsCount, chainId, providerAddress) | ||
} | ||
|
||
func (coqc *ConsumerOptimizerQoSClient) setProviderStake(chainId, providerAddress string, epoch uint64, stake int64) { | ||
// must be called under write lock | ||
coqc.currentEpoch.Store(epoch) | ||
|
||
providersMap, found := coqc.chainIdToProviderToEpochToStake[chainId] | ||
if !found { | ||
coqc.chainIdToProviderToEpochToStake[chainId] = map[string]map[uint64]int64{providerAddress: {epoch: stake}} | ||
return | ||
} | ||
|
||
epochMap, found := providersMap[providerAddress] | ||
if !found { | ||
coqc.chainIdToProviderToEpochToStake[chainId][providerAddress] = map[uint64]int64{epoch: stake} | ||
return | ||
} | ||
|
||
epochMap[epoch] = stake | ||
} | ||
|
||
func (coqc *ConsumerOptimizerQoSClient) UpdatePairingListStake(stakeMap map[string]int64, chainId string, epoch uint64) { | ||
if coqc == nil { | ||
return | ||
} | ||
|
||
coqc.lock.Lock() | ||
defer coqc.lock.Unlock() | ||
|
||
for providerAddr, stake := range stakeMap { | ||
coqc.setProviderStake(chainId, providerAddr, epoch, stake) | ||
} | ||
} |
Oops, something went wrong.