Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into CNS-add-koii-spec
Browse files Browse the repository at this point in the history
  • Loading branch information
omerlavanet committed Jan 5, 2024
2 parents cc3a0c1 + e021979 commit dbe9e2f
Show file tree
Hide file tree
Showing 38 changed files with 1,204 additions and 124 deletions.
2 changes: 2 additions & 0 deletions Readme.md → README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Read more about Lava in the [litepaper](https://litepaper.lavanet.xyz?utm_source

Lava is built using the [Cosmos SDK](https://github.com/cosmos/cosmos-sdk/) which runs on top of [Tendermint Core](https://github.com/tendermint/tendermint) consensus engine.

[Documentation](x/README.md)

**Note**: Requires [Go 1.20.5](https://golang.org/dl/)

### Installing development dependencies
Expand Down
12 changes: 12 additions & 0 deletions config/consumer_examples/full_consumer_example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,16 @@ endpoints:
- chain-id: KOIIT
api-interface: jsonrpc
network-address: 127.0.0.1:3388
- chain-id: AGR
api-interface: tendermintrpc
network-address: 127.0.0.1:3388
- chain-id: AGRT
api-interface: rest
network-address: 127.0.0.1:3389
- chain-id: AGRT
api-interface: grpc
network-address: 127.0.0.1:3390
- chain-id: AGRT
api-interface: tendermintrpc
network-address: 127.0.0.1:3391
metrics-listen-address: ":7779"
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ require (
github.com/dgraph-io/ristretto v0.1.1
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/dvsekhvalnov/jose2go v1.5.0 // indirect
github.com/dvsekhvalnov/jose2go v1.6.0 // indirect
github.com/fasthttp/websocket v1.5.0 // indirect
github.com/felixge/httpsnoop v1.0.2 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
Expand Down Expand Up @@ -202,13 +202,13 @@ require (
github.com/zondax/hid v0.9.2 // indirect
go.etcd.io/bbolt v1.3.7 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20230711153332-06a737ee72cb
golang.org/x/net v0.17.0
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0
golang.org/x/text v0.13.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.15.0
golang.org/x/text v0.14.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce
Expand Down
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,8 @@ github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:Htrtb
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/dvsekhvalnov/jose2go v1.5.0 h1:3j8ya4Z4kMCwT5nXIKFSV84YS+HdqSSO0VsTQxaLAeM=
github.com/dvsekhvalnov/jose2go v1.5.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU=
github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY=
github.com/dvsekhvalnov/jose2go v1.6.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
Expand Down Expand Up @@ -1301,8 +1301,8 @@ golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce/go.mod h1:IxCIyHEi3zRg3s0
golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down Expand Up @@ -1572,14 +1572,14 @@ golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4=
golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand All @@ -1591,8 +1591,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
9 changes: 5 additions & 4 deletions protocol/common/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ import (
)

const (
TimePerCU = uint64(100 * time.Millisecond)
MinimumTimePerRelayDelay = time.Second
DataReliabilityTimeoutIncrease = 5 * time.Second
AverageWorldLatency = 300 * time.Millisecond
TimePerCU = uint64(100 * time.Millisecond)
MinimumTimePerRelayDelay = time.Second
DataReliabilityTimeoutIncrease = 5 * time.Second
AverageWorldLatency = 300 * time.Millisecond
CommunicateWithLocalLavaNodeTimeout = (3 * time.Second) + AverageWorldLatency
)

func LocalNodeTimePerCu(cu uint64) time.Duration {
Expand Down
219 changes: 219 additions & 0 deletions protocol/metrics/consumer_relayserver_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
package metrics

import (
"bytes"
"encoding/json"
"errors"
"io"
"net/http"
"sync"
"time"

"github.com/lavanet/lava/utils"
)

type ConsumerRelayServerClient struct {
endPointAddress string
addQueue []UpdateMetricsRequest
ticker *time.Ticker
lock sync.RWMutex
sendID int
isSendQueueRunning bool
}
type UpdateMetricsRequest struct {
RecordDate string `json:"RecordDate"`
Hash string `json:"Hash"`
Chain string `json:"Chain"`
ApiType string `json:"ApiType"`
RelaysInc uint64 `json:"RelaysInc"`
CuInc int `json:"CuInc"`
LatencyToAdd uint64 `json:"LatencyToAdd"`
LatencyAvgCount int `json:"LatencyAvgCount"`
}

func NewConsumerRelayServerClient(endPointAddress string) *ConsumerRelayServerClient {
if endPointAddress == DisabledFlagOption {
utils.LavaFormatInfo("Running with Consumer Relay Server Disabled")
return nil
}

cuc := &ConsumerRelayServerClient{
endPointAddress: endPointAddress,
ticker: time.NewTicker(30 * time.Second),
addQueue: make([]UpdateMetricsRequest, 0),
}

go cuc.relayDataSendQueueStart()

return cuc
}

func (cuc *ConsumerRelayServerClient) relayDataSendQueueStart() {
if cuc == nil {
return
}
utils.LavaFormatDebug("[CUC] Starting relayDataSendQueueStart loop")
for range cuc.ticker.C {
cuc.relayDataSendQueueTick()
}
}

func (cuc *ConsumerRelayServerClient) relayDataSendQueueTick() {
if cuc == nil {
return
}

cuc.lock.Lock()
defer cuc.lock.Unlock()

if !cuc.isSendQueueRunning && len(cuc.addQueue) > 0 {
sendQueue := cuc.addQueue
cuc.addQueue = make([]UpdateMetricsRequest, 0)
cuc.isSendQueueRunning = true
cuc.sendID++
utils.LavaFormatDebug("[CUC] Swapped queues", utils.LogAttr("sendQueue_length", len((sendQueue))), utils.LogAttr("send_id", cuc.sendID))

sendID := cuc.sendID
cucEndpointAddress := cuc.endPointAddress

go func() {
cuc.sendRelayData(sendQueue, sendID, cucEndpointAddress)

cuc.lock.Lock()
cuc.isSendQueueRunning = false
cuc.lock.Unlock()
}()
} else {
utils.LavaFormatDebug("[CUC] server is busy skipping send", utils.LogAttr("id", cuc.sendID))
}
}

func (cuc *ConsumerRelayServerClient) appendQueue(request UpdateMetricsRequest) {
cuc.lock.Lock()
defer cuc.lock.Unlock()
cuc.addQueue = append(cuc.addQueue, request)
}

func (cuc *ConsumerRelayServerClient) SetRelayMetrics(relayMetric *RelayMetrics) {
if cuc == nil {
return
}
request := UpdateMetricsRequest{
RecordDate: relayMetric.Timestamp.Format("20060102"),
Hash: relayMetric.ProjectHash,
Chain: relayMetric.ChainID,
ApiType: relayMetric.APIType,
RelaysInc: 1,
CuInc: int(relayMetric.ComputeUnits),
LatencyToAdd: uint64(relayMetric.Latency),
LatencyAvgCount: 1,
}
cuc.appendQueue(request)
}

func (cuc *ConsumerRelayServerClient) aggregateAndSendRelayData(sendQueue []UpdateMetricsRequest, sendID int, cucEndpointAddress string) (*http.Response, error) {
if cuc == nil {
return nil, utils.LavaFormatError("CUC is nil. misuse detected", nil)
}

if len(sendQueue) == 0 {
return nil, errors.New("sendQueue is empty")
}

aggregatedRequests := cuc.aggregateRelayData(sendQueue)

if len(aggregatedRequests) == 0 {
return nil, errors.New("no requests after aggregate")
}

client := &http.Client{
Timeout: 10 * time.Second,
}

jsonData, err := json.Marshal(aggregatedRequests)
if err != nil {
return nil, utils.LavaFormatError("Failed marshaling aggregated requests", err)
}

var resp *http.Response
for i := 0; i < 3; i++ {
resp, err = client.Post(cucEndpointAddress+"/updateMetrics", "application/json", bytes.NewBuffer(jsonData))
if err != nil {
utils.LavaFormatDebug("[CUC] Failed to post request", utils.LogAttr("Attempt", i+1), utils.LogAttr("err", err))
time.Sleep(2 * time.Second)
} else {
return resp, nil
}
}

return nil, utils.LavaFormatWarning("[CUC] Failed to send requests after 3 attempts", err)
}

func (cuc *ConsumerRelayServerClient) handleSendRelayResponse(resp *http.Response, sendID int) {
if cuc == nil {
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
utils.LavaFormatWarning("[CUC] failed reading response body", err)
} else {
utils.LavaFormatWarning("[CUC] Received non-200 status code", nil, utils.LogAttr("status_code", resp.StatusCode), utils.LogAttr("body", string(bodyBytes)))
}
}
}

func (cuc *ConsumerRelayServerClient) sendRelayData(sendQueue []UpdateMetricsRequest, sendID int, cucEndpointAddress string) {
if cuc == nil {
return
}
resp, err := cuc.aggregateAndSendRelayData(sendQueue, sendID, cucEndpointAddress)
if err != nil {
utils.LavaFormatWarning("[CUC] failed sendRelay data", err)
return
}
cuc.handleSendRelayResponse(resp, sendID)
}

func generateRequestArregatedCacheKey(req UpdateMetricsRequest) string {
return req.RecordDate + req.Hash + req.Chain + req.ApiType
}

func (cuc *ConsumerRelayServerClient) aggregateRelayData(reqs []UpdateMetricsRequest) []UpdateMetricsRequest {
if cuc == nil {
return nil
}

// Create a map to hold the aggregated data
aggregated := make(map[string]*UpdateMetricsRequest)

for _, req := range reqs {
// Use the combination of RecordDate, Hash, Chain, and ApiType as the key
key := generateRequestArregatedCacheKey(req)

// If the key doesn't exist in the map, add it
if _, exists := aggregated[key]; !exists {
aggregated[key] = &UpdateMetricsRequest{
RecordDate: req.RecordDate,
Hash: req.Hash,
Chain: req.Chain,
ApiType: req.ApiType,
}
}

// Aggregate the data
aggregated[key].RelaysInc += req.RelaysInc
aggregated[key].CuInc += req.CuInc
aggregated[key].LatencyToAdd += req.LatencyToAdd
aggregated[key].LatencyAvgCount += req.LatencyAvgCount
}

// Convert the map to a slice
var aggregatedSlice []UpdateMetricsRequest
for _, req := range aggregated {
aggregatedSlice = append(aggregatedSlice, *req)
}

return aggregatedSlice
}
1 change: 1 addition & 0 deletions protocol/metrics/metrics_provider_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

const (
MetricsListenFlagName = "metrics-listen-address"
RelayServerFlagName = "relay-server-address"
DisabledFlagOption = "disabled"
)

Expand Down
Loading

0 comments on commit dbe9e2f

Please sign in to comment.