diff --git a/.github/workflows/protocol_tests.yml b/.github/workflows/protocol_tests.yml index 36ed370091..362b777742 100644 --- a/.github/workflows/protocol_tests.yml +++ b/.github/workflows/protocol_tests.yml @@ -61,4 +61,4 @@ jobs: ### Run protocol unitests ###################################################### - name: Run Lava Protocol Tests - run: go test ./protocol/... -v \ No newline at end of file + run: go test ./protocol/... \ No newline at end of file diff --git a/config/provider_examples/all_endpoints:.yml b/config/provider_examples/all_endpoints:.yml new file mode 100644 index 0000000000..9037259741 --- /dev/null +++ b/config/provider_examples/all_endpoints:.yml @@ -0,0 +1,201 @@ +endpoints: + - api-interface: jsonrpc + chain-id: ETH1 + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: jsonrpc + chain-id: GTH1 + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: jsonrpc + chain-id: FTM250 + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: jsonrpc + chain-id: CELO + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: jsonrpc + chain-id: ALFAJORES + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: jsonrpc + chain-id: ARB1 + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: jsonrpc + chain-id: STRK + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: rest + chain-id: APT1 + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: jsonrpc + chain-id: POLYGON1 + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: jsonrpc + chain-id: OPTM + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: jsonrpc + chain-id: BASET + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: rest + chain-id: COS3 + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: tendermintrpc + chain-id: COS3 + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: grpc + chain-id: COS3 + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: rest + chain-id: COS4 + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: tendermintrpc + chain-id: COS4 + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: grpc + chain-id: COS4 + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: rest + chain-id: LAV1 + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: tendermintrpc + chain-id: LAV1 + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: grpc + chain-id: LAV1 + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: rest + chain-id: COS5 + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: tendermintrpc + chain-id: COS5 + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: grpc + chain-id: COS5 + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: rest + chain-id: JUN1 + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: tendermintrpc + chain-id: JUN1 + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: grpc + chain-id: JUN1 + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: jsonrpc + chain-id: EVMOS + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: rest + chain-id: EVMOS + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: tendermintrpc + chain-id: EVMOS + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: grpc + chain-id: EVMOS + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: jsonrpc + chain-id: CANTO + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: rest + chain-id: CANTO + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: tendermintrpc + chain-id: CANTO + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: grpc + chain-id: CANTO + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: rest + chain-id: AXELAR + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: tendermintrpc + chain-id: AXELAR + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: grpc + chain-id: AXELAR + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: jsonrpc + chain-id: BSC + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: jsonrpc + chain-id: SOLANA + network-address: 127.0.0.1:2221 + node-urls: + - url: + - api-interface: jsonrpc + chain-id: SUIT + network-address: 127.0.0.1:2221 + node-urls: + - url: diff --git a/docs/static/openapi.yml b/docs/static/openapi.yml index a0fa5b21de..628fe2c8ce 100644 --- a/docs/static/openapi.yml +++ b/docs/static/openapi.yml @@ -29674,6 +29674,16 @@ paths: in: query required: false type: boolean + - name: pagination.reverse + description: >- + reverse is set to true if results are to be returned in the + descending order. + + + Since: cosmos-sdk 0.43 + in: query + required: false + type: boolean tags: - Query '/lavanet/lava/epochstorage/fixated_params/{index}': @@ -29933,6 +29943,16 @@ paths: in: query required: false type: boolean + - name: pagination.reverse + description: >- + reverse is set to true if results are to be returned in the + descending order. + + + Since: cosmos-sdk 0.43 + in: query + required: false + type: boolean tags: - Query '/lavanet/lava/epochstorage/stake_storage/{index}': diff --git a/go.mod b/go.mod index c40340fe2e..b6788adf17 100644 --- a/go.mod +++ b/go.mod @@ -45,8 +45,9 @@ require ( github.com/gogo/googleapis v1.4.0 // indirect github.com/golang/glog v1.0.0 // indirect github.com/pelletier/go-toml/v2 v2.0.5 // indirect - golang.org/x/mod v0.7.0 // indirect - golang.org/x/tools v0.2.0 // indirect + golang.org/x/mod v0.9.0 // indirect + golang.org/x/tools v0.7.0 // indirect + gonum.org/v1/gonum v0.13.0 // indirect ) require ( @@ -185,12 +186,12 @@ require ( go.etcd.io/bbolt v1.3.6 // indirect go.opencensus.io v0.23.0 // indirect golang.org/x/crypto v0.1.0 // indirect - golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e - golang.org/x/net v0.7.0 + golang.org/x/exp v0.0.0-20230321023759-10a507213a29 + golang.org/x/net v0.8.0 golang.org/x/sync v0.1.0 // indirect - golang.org/x/sys v0.5.0 // indirect - golang.org/x/term v0.5.0 // indirect - golang.org/x/text v0.7.0 // indirect + golang.org/x/sys v0.6.0 // indirect + golang.org/x/term v0.6.0 // indirect + golang.org/x/text v0.8.0 // indirect golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce diff --git a/go.sum b/go.sum index b66ac87772..b53927ae90 100644 --- a/go.sum +++ b/go.sum @@ -1536,6 +1536,8 @@ golang.org/x/exp v0.0.0-20200331195152-e8c3332aa8e5/go.mod h1:4M0jN8W1tt0AVLNr8H golang.org/x/exp v0.0.0-20220426173459-3bcf042a4bf5/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e h1:+WEEuIdZHnUeJJmEUjyYC2gfUMj69yZXw17EnHg/otA= golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e/go.mod h1:Kr81I6Kryrl9sr8s2FK3vxD90NdsKWRuOIl2O4CvYbA= +golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug= +golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= @@ -1567,6 +1569,8 @@ golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= golang.org/x/mod v0.7.0 h1:LapD9S96VoQRhi/GrNTqeBJFrUjs5UHCAtTlgwA5oZA= golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.9.0 h1:KENHtAZL2y3NLMYZeHY9DW8HW8V+kQyJsY/V9JlKvCs= +golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1633,6 +1637,8 @@ golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= +golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1772,11 +1778,15 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw= +golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1788,6 +1798,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= +golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1871,6 +1883,8 @@ golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.8-0.20211029000441-d6a9af8af023/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/tools v0.2.0 h1:G6AHpWxTMGY1KyEYoAQ5WTtIekUUvDNjan3ugu60JvE= golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA= +golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4= +golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -1880,6 +1894,8 @@ golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNq gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/gonum v0.6.0/go.mod h1:9mxDZsDKxgMAuccQkewq682L+0eCu4dCN2yonUJTCLU= +gonum.org/v1/gonum v0.13.0 h1:a0T3bh+7fhRyqeNbiC3qVHYmkiQgit3wnNan/2c0HMM= +gonum.org/v1/gonum v0.13.0/go.mod h1:/WPYRckkfWrhWefxyYTfrTtQR0KH4iyHNuzxqXAKyAU= gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc= diff --git a/protocol/chainlib/chainlib.go b/protocol/chainlib/chainlib.go index f600d83ba5..04457265d4 100644 --- a/protocol/chainlib/chainlib.go +++ b/protocol/chainlib/chainlib.go @@ -14,12 +14,6 @@ import ( spectypes "github.com/lavanet/lava/x/spec/types" ) -const ( - TimePerCU = uint64(100 * time.Millisecond) - MinimumTimePerRelayDelay = time.Second - DataReliabilityTimeoutIncrease = 5 * time.Second -) - func NewChainParser(apiInterface string) (chainParser ChainParser, err error) { switch apiInterface { case spectypes.APIInterfaceJsonRPC: @@ -101,7 +95,3 @@ func GetChainProxy(ctx context.Context, nConns uint, rpcProviderEndpoint *lavase } return nil, fmt.Errorf("chain proxy for apiInterface (%s) not found", rpcProviderEndpoint.ApiInterface) } - -func LocalNodeTimePerCu(cu uint64) time.Duration { - return time.Duration(cu*TimePerCU) + lavasession.AverageWorldLatency // TODO: remove average world latency once our providers run locally, or allow a flag that says local to make it tight, tighter timeouts are better -} diff --git a/protocol/chainlib/chainproxy/connector.go b/protocol/chainlib/chainproxy/connector.go index 110f03ac28..fa6ee90b5e 100644 --- a/protocol/chainlib/chainproxy/connector.go +++ b/protocol/chainlib/chainproxy/connector.go @@ -151,8 +151,7 @@ func (connector *Connector) Close() { } func (connector *Connector) increaseNumberOfClients(ctx context.Context, numberOfFreeClients int) { - utils.LavaFormatDebug("increasing number of clients", utils.Attribute{Key: "numberOfFreeClients", Value: numberOfFreeClients}, - utils.Attribute{Key: "url", Value: connector.nodeUrl.Url}) + utils.LavaFormatDebug("increasing number of clients", utils.Attribute{Key: "numberOfFreeClients", Value: numberOfFreeClients}, utils.Attribute{Key: "url", Value: connector.nodeUrl.Url}) var rpcClient *rpcclient.Client var err error for connectionAttempt := 0; connectionAttempt < MaximumNumberOfParallelConnectionsAttempts; connectionAttempt++ { diff --git a/protocol/chainlib/grpc.go b/protocol/chainlib/grpc.go index 9a8ca4745f..b372b5b703 100644 --- a/protocol/chainlib/grpc.go +++ b/protocol/chainlib/grpc.go @@ -283,7 +283,7 @@ func (cp *GrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, if !ok { return nil, "", nil, utils.LavaFormatError("invalid message type in grpc failed to cast RPCInput from chainMessage", nil, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "rpcMessage", Value: rpcInputMessage}) } - relayTimeout := LocalNodeTimePerCu(chainMessage.GetServiceApi().ComputeUnits) + relayTimeout := common.LocalNodeTimePerCu(chainMessage.GetServiceApi().ComputeUnits) // check if this API is hanging (waiting for block confirmation) if chainMessage.GetInterface().Category.HangingApi { relayTimeout += cp.averageBlockTime diff --git a/protocol/chainlib/jsonRPC.go b/protocol/chainlib/jsonRPC.go index 39b8044a7c..30003ad76b 100644 --- a/protocol/chainlib/jsonRPC.go +++ b/protocol/chainlib/jsonRPC.go @@ -244,7 +244,7 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context) { ctx, cancel := context.WithCancel(context.Background()) ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) defer cancel() // incase there's a problem make sure to cancel the connection - utils.LavaFormatInfo("ws in <<<", utils.Attribute{Key: "seed", Value: msgSeed}, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "msg", Value: msg}, utils.Attribute{Key: "dappID", Value: dappID}) + utils.LavaFormatDebug("ws in <<<", utils.Attribute{Key: "seed", Value: msgSeed}, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "msg", Value: msg}, utils.Attribute{Key: "dappID", Value: dappID}) metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface) reply, replyServer, err := apil.relaySender.SendRelay(ctx, "", string(msg), http.MethodPost, dappID, metricsData) go apil.logger.AddMetricForWebSocket(metricsData, err, websockConn) @@ -408,7 +408,7 @@ func (cp *JrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, if ch != nil { sub, rpcMessage, err = rpc.Subscribe(context.Background(), nodeMessage.ID, nodeMessage.Method, ch, nodeMessage.Params) } else { - relayTimeout := LocalNodeTimePerCu(chainMessage.GetServiceApi().ComputeUnits) + relayTimeout := common.LocalNodeTimePerCu(chainMessage.GetServiceApi().ComputeUnits) // check if this API is hanging (waiting for block confirmation) if chainMessage.GetInterface().Category.HangingApi { relayTimeout += cp.averageBlockTime diff --git a/protocol/chainlib/rest.go b/protocol/chainlib/rest.go index bfe3309fd0..022f391e6d 100644 --- a/protocol/chainlib/rest.go +++ b/protocol/chainlib/rest.go @@ -331,7 +331,7 @@ func (rcp *RestChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, return nil, "", nil, utils.LavaFormatError("Subscribe is not allowed on rest", nil) } httpClient := http.Client{ - Timeout: LocalNodeTimePerCu(chainMessage.GetServiceApi().ComputeUnits), + Timeout: common.LocalNodeTimePerCu(chainMessage.GetServiceApi().ComputeUnits), } rpcInputMessage := chainMessage.GetRPCMessage() @@ -349,7 +349,7 @@ func (rcp *RestChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, msgBuffer := bytes.NewBuffer(nodeMessage.Msg) url := rcp.NodeUrl.Url + nodeMessage.Path - relayTimeout := LocalNodeTimePerCu(chainMessage.GetServiceApi().ComputeUnits) + relayTimeout := common.LocalNodeTimePerCu(chainMessage.GetServiceApi().ComputeUnits) // check if this API is hanging (waiting for block confirmation) if chainMessage.GetInterface().Category.HangingApi { relayTimeout += rcp.averageBlockTime diff --git a/protocol/chainlib/tendermintRPC.go b/protocol/chainlib/tendermintRPC.go index a617b5ca22..38327b51aa 100644 --- a/protocol/chainlib/tendermintRPC.go +++ b/protocol/chainlib/tendermintRPC.go @@ -478,14 +478,14 @@ func (cp *tendermintRpcChainProxy) SendURI(ctx context.Context, nodeMessage *rpc // create a new http client with a timeout set by the getTimePerCu function httpClient := http.Client{ - Timeout: LocalNodeTimePerCu(chainMessage.GetServiceApi().ComputeUnits), + Timeout: common.LocalNodeTimePerCu(chainMessage.GetServiceApi().ComputeUnits), } // construct the url by concatenating the node url with the path variable url := cp.httpNodeUrl.Url + "/" + nodeMessage.Path // create context - relayTimeout := LocalNodeTimePerCu(chainMessage.GetServiceApi().ComputeUnits) + relayTimeout := common.LocalNodeTimePerCu(chainMessage.GetServiceApi().ComputeUnits) // check if this API is hanging (waiting for block confirmation) if chainMessage.GetInterface().Category.HangingApi { relayTimeout += cp.averageBlockTime @@ -559,7 +559,7 @@ func (cp *tendermintRpcChainProxy) SendRPC(ctx context.Context, nodeMessage *rpc sub, rpcMessage, err = rpc.Subscribe(context.Background(), nodeMessage.ID, nodeMessage.Method, ch, nodeMessage.Params) } else { // create a context with a timeout set by the LocalNodeTimePerCu function - relayTimeout := LocalNodeTimePerCu(chainMessage.GetServiceApi().ComputeUnits) + relayTimeout := common.LocalNodeTimePerCu(chainMessage.GetServiceApi().ComputeUnits) // check if this API is hanging (waiting for block confirmation) if chainMessage.GetInterface().Category.HangingApi { relayTimeout += cp.averageBlockTime diff --git a/protocol/chaintracker/chain_tracker.go b/protocol/chaintracker/chain_tracker.go index acce0afa8e..b38d7353ce 100644 --- a/protocol/chaintracker/chain_tracker.go +++ b/protocol/chaintracker/chain_tracker.go @@ -329,7 +329,7 @@ func (cs *ChainTracker) fetchInitDataWithRetry(ctx context.Context) (err error) } _, err = cs.fetchAllPreviousBlocks(ctx, newLatestBlock) for idx := 0; idx < initRetriesCount && err != nil; idx++ { - utils.LavaFormatDebug("failed fetching data on chain tracker init, retry", utils.Attribute{Key: "retry Num", Value: idx}, utils.Attribute{Key: "endpoint", Value: cs.endpoint}) + utils.LavaFormatDebug("failed fetching data on chain tracker init, retry", utils.Attribute{Key: "retry Num", Value: idx}, utils.Attribute{Key: "endpoint", Value: cs.endpoint.String()}) _, err = cs.fetchAllPreviousBlocks(ctx, newLatestBlock) } if err != nil { diff --git a/protocol/lavaprotocol/request_builder.go b/protocol/lavaprotocol/request_builder.go index 279e4a6431..d4cbd51a3e 100644 --- a/protocol/lavaprotocol/request_builder.go +++ b/protocol/lavaprotocol/request_builder.go @@ -5,10 +5,8 @@ import ( "context" "encoding/binary" "strconv" - "time" "github.com/btcsuite/btcd/btcec" - "github.com/lavanet/lava/protocol/chainlib" "github.com/lavanet/lava/protocol/lavasession" "github.com/lavanet/lava/utils" "github.com/lavanet/lava/utils/sigs" @@ -114,10 +112,6 @@ func ConstructRelayRequest(ctx context.Context, privKey *btcec.PrivateKey, lavaC return relayRequest, nil } -func GetTimePerCu(cu uint64) time.Duration { - return chainlib.LocalNodeTimePerCu(cu) + chainlib.MinimumTimePerRelayDelay -} - func UpdateRequestedBlock(request *pairingtypes.RelayPrivateData, response *pairingtypes.RelayReply) { // since sometimes the user is sending requested block that is a magic like latest, or earliest we need to specify to the reliability what it is request.RequestBlock = ReplaceRequestedBlock(request.RequestBlock, response.LatestBlock) diff --git a/protocol/lavasession/common.go b/protocol/lavasession/common.go index 1feccee222..b56335284a 100644 --- a/protocol/lavasession/common.go +++ b/protocol/lavasession/common.go @@ -21,9 +21,10 @@ const ( GeolocationFlag = "geolocation" TendermintUnsubscribeAll = "unsubscribe_all" IndexNotFound = -15 - AverageWorldLatency = 300 * time.Millisecond MinValidAddressesForBlockingProbing = 2 BACKOFF_TIME_ON_FAILURE = 3 * time.Second + BLOCKING_PROBE_SLEEP_TIME = 1000 * time.Millisecond // maximum amount of time to sleep before triggering probe, to scatter probes uniformly across chains + BLOCKING_PROBE_TIMEOUT = time.Minute // maximum time to wait for probe to complete before updating pairing ) var AvailabilityPercentage sdk.Dec = sdk.NewDecWithPrec(5, 2) // TODO move to params pairing diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index bd1e513f81..c41a88ec44 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -10,6 +10,7 @@ import ( sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" "github.com/gogo/status" + "github.com/lavanet/lava/protocol/common" "github.com/lavanet/lava/utils" "google.golang.org/grpc/codes" "google.golang.org/protobuf/types/known/wrapperspb" @@ -39,18 +40,15 @@ func (csm *ConsumerSessionManager) RPCEndpoint() RPCEndpoint { return *csm.rpcEndpoint } -// Update the provider pairing list for the ConsumerSessionManager func (csm *ConsumerSessionManager) UpdateAllProviders(epoch uint64, pairingList map[uint64]*ConsumerSessionsWithProvider) error { pairingListLength := len(pairingList) - // if csm.validAddressesLen() > MinValidAddressesForBlockingProbing { - // // we have enough valid providers, probe before updating the pairing - // csm.probeProviders(pairingList, epoch) // probe providers to eliminate offline ones from affecting relays, pairingList is thread safe it's members are as long as it's blocking - // } else { - // } + // TODO: we can block updating until some of the probing is done, this can prevent failed attempts on epoch change when we have no information on the providers, + // and all of them are new (less effective on big pairing lists or a process that runs for a few epochs) defer func() { // run this after done updating pairing time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond) // sleep up to 500ms in order to scatter different chains probe triggers - go csm.probeProviders(pairingList, epoch) // probe providers to eliminate offline ones from affecting relays, pairingList is thread safe it's members are not (accessed through csm.pairing) + ctx := context.Background() + go csm.probeProviders(ctx, pairingList, epoch) // probe providers to eliminate offline ones from affecting relays, pairingList is thread safe it's members are not (accessed through csm.pairing) }() csm.lock.Lock() // start by locking the class lock. defer csm.lock.Unlock() // we defer here so in case we return an error it will unlock automatically. @@ -101,38 +99,63 @@ func (csm *ConsumerSessionManager) validAddressesLen() int { return len(csm.validAddresses) } -func (csm *ConsumerSessionManager) probeProviders(pairingList map[uint64]*ConsumerSessionsWithProvider, epoch uint64) { - ctx := context.Background() +func (csm *ConsumerSessionManager) probeProviders(ctx context.Context, pairingList map[uint64]*ConsumerSessionsWithProvider, epoch uint64) error { guid := utils.GenerateUniqueIdentifier() ctx = utils.AppendUniqueIdentifier(ctx, guid) utils.LavaFormatInfo("providers probe initiated", utils.Attribute{Key: "endpoint", Value: csm.rpcEndpoint}, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "epoch", Value: epoch}) + // Create a wait group to synchronize the goroutines + wg := sync.WaitGroup{} + wg.Add(len(pairingList)) // increment by this and not by 1 for each go routine because we don;t want a race finishing the go routine before the next invocation for _, consumerSessionWithProvider := range pairingList { - // consumerSessionWithProvider is thread safe since it's unreachable yet on other threads - latency, providerAddress, err := csm.probeProvider(ctx, consumerSessionWithProvider, epoch) - failure := err != nil // if failure then regard it in availability - csm.providerOptimizer.AppendRelayData(providerAddress, latency, failure) + // Start a new goroutine for each provider + go func(consumerSessionsWithProvider *ConsumerSessionsWithProvider) { + // Call the probeProvider function and defer the WaitGroup Done call + defer wg.Done() + latency, providerAddress, err := csm.probeProvider(ctx, consumerSessionsWithProvider, epoch) + success := err == nil // if failure then regard it in availability + csm.providerOptimizer.AppendProbeRelayData(providerAddress, latency, success) + }(consumerSessionWithProvider) + } + done := make(chan struct{}) + go func() { + defer close(done) + wg.Wait() + }() + + select { + case <-done: + // all probes finished in time + utils.LavaFormatDebug("providers probe done", utils.Attribute{Key: "endpoint", Value: csm.rpcEndpoint}, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "epoch", Value: epoch}) + return nil + case <-ctx.Done(): + utils.LavaFormatWarning("providers probe ran out of time", nil, utils.Attribute{Key: "endpoint", Value: csm.rpcEndpoint}, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "epoch", Value: epoch}) + // ran out of time + return ctx.Err() } } +// this code needs to be thread safe func (csm *ConsumerSessionManager) probeProvider(ctx context.Context, consumerSessionsWithProvider *ConsumerSessionsWithProvider, epoch uint64) (latency time.Duration, providerAddress string, err error) { // TODO: fetch all endpoints not just one connected, endpoint, providerAddress, err := consumerSessionsWithProvider.fetchEndpointConnectionFromConsumerSessionWithProvider(ctx) if err != nil || !connected { return 0, providerAddress, err } - if endpoint.Client == nil { - consumerSessionsWithProvider.Lock.Lock() - defer consumerSessionsWithProvider.Lock.Unlock() - return 0, providerAddress, utils.LavaFormatError("returned nil client in endpoint", nil, utils.Attribute{Key: "consumerSessionWithProvider", Value: consumerSessionsWithProvider}) - } + relaySentTime := time.Now() - connectCtx, cancel := context.WithTimeout(ctx, AverageWorldLatency) + connectCtx, cancel := context.WithTimeout(ctx, common.AverageWorldLatency) defer cancel() guid, found := utils.GetUniqueIdentifier(connectCtx) if !found { return 0, providerAddress, utils.LavaFormatError("probeProvider failed fetching unique identifier from context when it's set", nil) } - probeResp, err := (*endpoint.Client).Probe(ctx, &wrapperspb.UInt64Value{Value: guid}) + if endpoint.Client == nil { + consumerSessionsWithProvider.Lock.Lock() + defer consumerSessionsWithProvider.Lock.Unlock() + return 0, providerAddress, utils.LavaFormatError("returned nil client in endpoint", nil, utils.Attribute{Key: "consumerSessionWithProvider", Value: consumerSessionsWithProvider}) + } + client := *endpoint.Client + probeResp, err := client.Probe(connectCtx, &wrapperspb.UInt64Value{Value: guid}) relayLatency := time.Since(relaySentTime) if err != nil { return 0, providerAddress, utils.LavaFormatError("probe call error", err, utils.Attribute{Key: "provider", Value: providerAddress}) @@ -140,6 +163,7 @@ func (csm *ConsumerSessionManager) probeProvider(ctx context.Context, consumerSe if probeResp.Value != guid { return 0, providerAddress, utils.LavaFormatWarning("mismatch probe response", nil) } + // public lava address is a value that is not changing, so it's thread safe utils.LavaFormatDebug("Probed provider successfully", utils.Attribute{Key: "latency", Value: relayLatency}, utils.Attribute{Key: "provider", Value: consumerSessionsWithProvider.PublicLavaAddress}) return relayLatency, providerAddress, nil } diff --git a/protocol/lavasession/consumer_session_manager_test.go b/protocol/lavasession/consumer_session_manager_test.go index 424263772b..3ab69643a7 100644 --- a/protocol/lavasession/consumer_session_manager_test.go +++ b/protocol/lavasession/consumer_session_manager_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/lavanet/lava/protocol/common" "github.com/lavanet/lava/protocol/provideroptimizer" "github.com/lavanet/lava/utils" pairingtypes "github.com/lavanet/lava/x/pairing/types" @@ -37,7 +38,8 @@ const ( func CreateConsumerSessionManager() *ConsumerSessionManager { rand.Seed(time.Now().UnixNano()) - return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_QOS)) + baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better + return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 3)) } func createGRPCServer(t *testing.T) *grpc.Server { diff --git a/protocol/lavasession/consumer_types.go b/protocol/lavasession/consumer_types.go index 854be5274b..d251d1e56e 100644 --- a/protocol/lavasession/consumer_types.go +++ b/protocol/lavasession/consumer_types.go @@ -18,7 +18,7 @@ import ( ) type ProviderOptimizer interface { - AppendRelayData(providerAddress string, latency time.Duration, failure bool) + AppendProbeRelayData(providerAddress string, latency time.Duration, success bool) } type ignoredProviders struct { @@ -294,7 +294,6 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes } else if endpoint.connection.GetState() == connectivity.Shutdown { // connection was shut down, so we need to create a new one endpoint.connection.Close() - endpoint.Client = nil connected_ := connectEndpoint(cswp, ctx, endpoint) if !connected_ { continue diff --git a/protocol/lavasession/provider_types.go b/protocol/lavasession/provider_types.go index 083bc226cc..a5dbe12658 100644 --- a/protocol/lavasession/provider_types.go +++ b/protocol/lavasession/provider_types.go @@ -34,7 +34,7 @@ func (endpoint *RPCProviderEndpoint) UrlsString() string { return strings.Join(st_urls, ", ") } -func (endpoint *RPCProviderEndpoint) String() (retStr string) { +func (endpoint *RPCProviderEndpoint) String() string { return endpoint.ChainID + ":" + endpoint.ApiInterface + " Network Address:" + endpoint.NetworkAddress + " Node: " + endpoint.UrlsString() + " Geolocation:" + strconv.FormatUint(endpoint.Geolocation, 10) } diff --git a/protocol/provideroptimizer/provider_optimizer.go b/protocol/provideroptimizer/provider_optimizer.go index 23c490a960..dccb9f49e0 100644 --- a/protocol/provideroptimizer/provider_optimizer.go +++ b/protocol/provideroptimizer/provider_optimizer.go @@ -1,23 +1,406 @@ package provideroptimizer -import "time" +import ( + "math" + "math/rand" + "sync" + "time" + + "github.com/dgraph-io/ristretto" + "github.com/lavanet/lava/protocol/common" + "github.com/lavanet/lava/utils" + "github.com/lavanet/lava/utils/score" + "gonum.org/v1/gonum/mathext" +) + +const ( + debug = false + CacheMaxCost = 2000 // each item cost would be 1 + CacheNumCounters = 20000 // expect 2000 items + INITIAL_DATA_STALENESS = 24 + HALF_LIFE_TIME = time.Hour + MAX_HALF_TIME = 14 * 24 * time.Hour + PROBE_UPDATE_WEIGHT = 0.25 + RELAY_UPDATE_WEIGHT = 1 + DEFAULT_EXPLORATION_CHANCE = 0.1 + COST_EXPLORATION_CHANCE = 0.01 +) + +type ConcurrentBlockStore struct { + Lock sync.Mutex + Time time.Time + Block uint64 +} type ProviderOptimizer struct { - strategy Strategy + strategy Strategy + providersStorage *ristretto.Cache + providerRelayStats *ristretto.Cache // used to decide on the half time of the decay + averageBlockTime time.Duration + baseWorldLatency time.Duration + wantedNumProvidersInConcurrency int + latestSyncData ConcurrentBlockStore +} + +type ProviderData struct { + Availability score.ScoreStore // will be used to calculate the probability of error + Latency score.ScoreStore // will be used to calculate the latency score + Sync score.ScoreStore // will be used to calculate the sync score for spectypes.LATEST_BLOCK/spectypes.NOT_APPLICABLE requests + SyncBlock uint64 // will be used to calculate the probability of block error } type Strategy int const ( - STRATEGY_QOS Strategy = iota + STRATEGY_BALANCED Strategy = iota + STRATEGY_LATENCY + STRATEGY_SYNC_FRESHNESS STRATEGY_COST STRATEGY_PRIVACY STRATEGY_ACCURACY ) -func (po *ProviderOptimizer) AppendRelayData(providerAddress string, latency time.Duration, failure bool) { +func (po *ProviderOptimizer) AppendRelayData(providerAddress string, latency time.Duration, isHangingApi bool, success bool, cu uint64, syncBlock uint64) { + latestSync, timeSync := po.updateLatestSyncData(syncBlock) + providerData, _ := po.getProviderData(providerAddress) + halfTime := po.calculateHalfTime(providerAddress) + providerData = po.updateProbeEntryAvailability(providerData, success, RELAY_UPDATE_WEIGHT, halfTime) + if success { + if latency > 0 { + baseLatency := po.baseWorldLatency + common.BaseTimePerCU(cu) + if isHangingApi { + baseLatency += po.averageBlockTime // hanging apis take longer + } + providerData = po.updateProbeEntryLatency(providerData, latency, baseLatency, RELAY_UPDATE_WEIGHT, halfTime) + } + if syncBlock > providerData.SyncBlock { + // do not allow providers to go back + providerData.SyncBlock = syncBlock + } + syncLag := po.calculateSyncLag(latestSync, timeSync, providerData.SyncBlock) + providerData = po.updateProbeEntrySync(providerData, syncLag, po.averageBlockTime, halfTime) + } + po.providersStorage.Set(providerAddress, providerData, 1) + po.updateRelayTime(providerAddress) +} + +func (po *ProviderOptimizer) AppendProbeRelayData(providerAddress string, latency time.Duration, success bool) { + providerData, _ := po.getProviderData(providerAddress) + halfTime := po.calculateHalfTime(providerAddress) + providerData = po.updateProbeEntryAvailability(providerData, success, PROBE_UPDATE_WEIGHT, halfTime) + if success && latency > 0 { + // base latency for a probe is the world latency + providerData = po.updateProbeEntryLatency(providerData, latency, po.baseWorldLatency, PROBE_UPDATE_WEIGHT, halfTime) + } + po.providersStorage.Set(providerAddress, providerData, 1) +} + +// returns a sub set of selected providers according to their scores, perturbation factor will be added to each score in order to randomly select providers that are not always on top +func (po *ProviderOptimizer) ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64, perturbationPercentage float64) (addresses []string) { + returnedProviders := make([]string, 1) // location 0 is always the best score + latencyScore := math.MaxFloat64 // smaller = better i.e less latency + syncScore := math.MaxFloat64 // smaller = better i.e less sync lag + for _, providerAddress := range allAddresses { + if _, ok := ignoredProviders[providerAddress]; ok { + // ignored provider, skip it + continue + } + providerData, _ := po.getProviderData(providerAddress) + // latency score + latencyScoreCurrent := po.calculateLatencyScore(providerData, cu, requestedBlock) // smaller == better i.e less latency + // latency perturbation + latencyScoreCurrent = pertrubWithNormalGaussian(latencyScoreCurrent, perturbationPercentage) + + // sync score + syncScoreCurrent := float64(0) + if requestedBlock < 0 { + // means user didn't ask for a specific block and we want to give him the best + syncScoreCurrent = po.calculateSyncScore(providerData.Sync) // smaller == better i.e less sync lag + // sync perturbation + syncScoreCurrent = pertrubWithNormalGaussian(syncScoreCurrent, perturbationPercentage) + } + + // we want the minimum latency and sync diff + if po.isBetterProviderScore(latencyScore, latencyScoreCurrent, syncScore, syncScoreCurrent) || len(returnedProviders) == 0 { + if len(returnedProviders) > 0 && po.shouldExplore(len(returnedProviders)) { + // we are about to overwrite position 0, and this provider needs a chance to be in exploration + returnedProviders = append(returnedProviders, returnedProviders[0]) + } + returnedProviders[0] = providerAddress // best provider is always on position 0 + latencyScore = latencyScoreCurrent + syncScore = syncScoreCurrent + continue + } + if po.shouldExplore(len(returnedProviders)) { + returnedProviders = append(returnedProviders, providerAddress) + } + } + + return returnedProviders +} + +// calculate the expected average time until this provider catches up with the given latestSync block +func (po *ProviderOptimizer) calculateSyncLag(latestSync uint64, timeSync time.Time, providerBlock uint64) time.Duration { + if latestSync < providerBlock { + return 0 + } + timeLag := time.Since(timeSync) // received the latest block at time X, this provider provided the entry at time Y, which is X-Y time after + blocksGap := time.Duration(latestSync-providerBlock) * po.averageBlockTime // the provider is behind by X blocks, so is expected to catch up in averageBlockTime * X + timeLag += blocksGap + return timeLag +} + +func (po *ProviderOptimizer) updateLatestSyncData(providerLatestBlock uint64) (uint64, time.Time) { + po.latestSyncData.Lock.Lock() + defer po.latestSyncData.Lock.Unlock() + latestBlock := po.latestSyncData.Block + if latestBlock < providerLatestBlock { + // saved latest block is older, so update + po.latestSyncData.Block = providerLatestBlock + po.latestSyncData.Time = time.Now() + } + return po.latestSyncData.Block, po.latestSyncData.Time +} + +func (po *ProviderOptimizer) shouldExplore(currentNumProvders int) bool { + if currentNumProvders >= po.wantedNumProvidersInConcurrency { + return false + } + explorationChance := DEFAULT_EXPLORATION_CHANCE + switch po.strategy { + case STRATEGY_LATENCY: + return true // we want a lot of parallel tries on latency + case STRATEGY_ACCURACY: + return true + case STRATEGY_COST: + explorationChance = COST_EXPLORATION_CHANCE + case STRATEGY_PRIVACY: + return false // only one at a time + } + return rand.Float64() < explorationChance +} + +func (po *ProviderOptimizer) isBetterProviderScore(latencyScore float64, latencyScoreCurrent float64, syncScore float64, syncScoreCurrent float64) bool { + var latencyWeight float64 + switch po.strategy { + case STRATEGY_LATENCY: + latencyWeight = 0.9 + case STRATEGY_SYNC_FRESHNESS: + latencyWeight = 0.2 + case STRATEGY_PRIVACY: + // pick at random regardless of score + if rand.Intn(2) == 0 { + return true + } + default: + latencyWeight = 0.8 + } + if debug { + utils.LavaFormatDebug("total scores", utils.Attribute{Key: "latencyScoreCurrent", Value: latencyScoreCurrent}, utils.Attribute{Key: "syncScoreCurrent", Value: syncScoreCurrent}, utils.Attribute{Key: "total", Value: latencyScoreCurrent*latencyWeight + syncScoreCurrent*(1-latencyWeight)}) + } + if syncScoreCurrent == 0 { + return latencyScore > latencyScoreCurrent + } + return latencyScore*latencyWeight+syncScore*(1-latencyWeight) > latencyScoreCurrent*latencyWeight+syncScoreCurrent*(1-latencyWeight) +} + +func (po *ProviderOptimizer) calculateSyncScore(syncScore score.ScoreStore) float64 { + var historicalSyncLatency time.Duration + if syncScore.Denom == 0 { + historicalSyncLatency = 0 + } else { + historicalSyncLatency = time.Duration(syncScore.Num / syncScore.Denom * float64(po.averageBlockTime)) // give it units of block time + } + return historicalSyncLatency.Seconds() +} + +func (po *ProviderOptimizer) calculateLatencyScore(providerData ProviderData, cu uint64, requestedBlock int64) float64 { + baseLatency := po.baseWorldLatency + common.BaseTimePerCU(cu)/2 // divide by two because the returned time is for timeout not for average + timeoutDuration := common.GetTimePerCu(cu) + var historicalLatency time.Duration + if providerData.Latency.Denom == 0 { + historicalLatency = baseLatency + } else { + historicalLatency = time.Duration(float64(baseLatency) * providerData.Latency.Num / providerData.Latency.Denom) + } + if historicalLatency > timeoutDuration { + // can't have a bigger latency than timeout + historicalLatency = timeoutDuration + } + probabilityBlockError := po.CalculateProbabilityOfBlockError(requestedBlock, providerData) + probabilityOfTimeout := po.CalculateProbabilityOfTimeout(providerData.Availability) + probabilityOfSuccess := (1 - probabilityBlockError) * (1 - probabilityOfTimeout) + + // base latency is how much time it would cost to an average performing provider + // timeoutDuration is the extra time we pay for a non responsive provider + // historicalLatency is how much we are paying for the processing of this provider + + // in case of block error we are paying the time cost of this provider and the time cost of the next provider on retry + costBlockError := historicalLatency.Seconds() + baseLatency.Seconds() + // in case of a time out we are paying the time cost of a timeout and the time cost of the next provider on retry + costTimeout := timeoutDuration.Seconds() + baseLatency.Seconds() + // on success we are paying the time cost of this provider + costSuccess := historicalLatency.Seconds() + if debug { + utils.LavaFormatDebug("latency calculation breakdown", + utils.Attribute{Key: "probabilityBlockError", Value: probabilityBlockError}, + utils.Attribute{Key: "costBlockError", Value: costBlockError}, + utils.Attribute{Key: "probabilityOfTimeout", Value: probabilityOfTimeout}, + utils.Attribute{Key: "costTimeout", Value: costTimeout}, + utils.Attribute{Key: "probabilityOfSuccess", Value: probabilityOfSuccess}, + utils.Attribute{Key: "costSuccess", Value: costSuccess}, + ) + } + return probabilityBlockError*costBlockError + probabilityOfTimeout*costTimeout + probabilityOfSuccess*costSuccess +} + +func (po *ProviderOptimizer) CalculateProbabilityOfTimeout(availabilityScore score.ScoreStore) float64 { + probabilityTimeout := float64(0) + if availabilityScore.Denom > 0 { // shouldn't happen since we have default values but protect just in case + mean := availabilityScore.Num / availabilityScore.Denom + // bernoulli distribution assumption means probability of '1' is the mean, success is 1 + return 1 - mean + } + return probabilityTimeout +} + +func (po *ProviderOptimizer) CalculateProbabilityOfBlockError(requestedBlock int64, providerData ProviderData) float64 { + probabilityBlockError := float64(0) + if requestedBlock > 0 && providerData.SyncBlock < uint64(requestedBlock) { + // requested a specific block, so calculate a probability of provider having that block + averageBlockTime := po.averageBlockTime.Seconds() + blockDistanceRequired := uint64(requestedBlock) - providerData.SyncBlock + if blockDistanceRequired > 0 { + timeSinceSyncReceived := time.Since(providerData.Sync.Time).Seconds() + eventRate := timeSinceSyncReceived / averageBlockTime // a new block every average block time, numerator is time passed, gamma=rt + // probValueAfterRepetitions(k,lambda) calculates the probability for k events or less meaning p(x<=k), + // an error occurs if we didn't have enough blocks, so the chance of error is p(x halfTime { + halfTime = relaysHalfTime + } + if halfTime > MAX_HALF_TIME { + halfTime = MAX_HALF_TIME + } + return halfTime +} + +func (po *ProviderOptimizer) getRelayStatsTimeDiff(providerAddress string) time.Duration { + times := po.getRelayStatsTimes(providerAddress) + if len(times) == 0 { + return 0 + } + return time.Since(times[(len(times)-1)/2]) +} + +func (po *ProviderOptimizer) getRelayStatsTimes(providerAddress string) []time.Time { + storedVal, found := po.providerRelayStats.Get(providerAddress) + if found { + times, ok := storedVal.([]time.Time) + if !ok { + utils.LavaFormatFatal("invalid usage of optimizer relay stats cache", nil, utils.Attribute{Key: "storedVal", Value: storedVal}) + } + return times + } + return nil +} + +func NewProviderOptimizer(strategy Strategy, averageBlockTIme time.Duration, baseWorldLatency time.Duration, wantedNumProvidersInConcurrency int) *ProviderOptimizer { + cache, err := ristretto.NewCache(&ristretto.Config{NumCounters: CacheNumCounters, MaxCost: CacheMaxCost, BufferItems: 64, IgnoreInternalCost: true}) + if err != nil { + utils.LavaFormatFatal("failed setting up cache for queries", err) + } + relayCache, err := ristretto.NewCache(&ristretto.Config{NumCounters: CacheNumCounters, MaxCost: CacheMaxCost, BufferItems: 64, IgnoreInternalCost: true}) + if err != nil { + utils.LavaFormatFatal("failed setting up cache for queries", err) + } + if strategy == STRATEGY_PRIVACY { + // overwrite + wantedNumProvidersInConcurrency = 1 + } + return &ProviderOptimizer{strategy: strategy, providersStorage: cache, averageBlockTime: averageBlockTIme, baseWorldLatency: baseWorldLatency, providerRelayStats: relayCache, wantedNumProvidersInConcurrency: wantedNumProvidersInConcurrency} +} + +// calculate the probability a random variable with a poisson distribution +// poisson distribution calculates the probability of K events, in this case the probability enough blocks pass and the request will be accessible in the block + +func cumulativeProbabilityFunctionForPoissonDist(k_events uint64, lambda float64) float64 { + // calculate cumulative probability of observing k events (having k or more events): + // GammaIncReg is the lower incomplete gamma function GammaIncReg(a,x) = (1/ Γ(a)) \int_0^x e^{-t} t^{a-1} dt + // the CPF for k events (less than equal k) is the regularized upper incomplete gamma function + // so to get the CPF we need to return 1 - prob + prob := mathext.GammaIncReg(float64(k_events+1), lambda) + return 1 - prob } -func NewProviderOptimizer(strategy Strategy) *ProviderOptimizer { - return &ProviderOptimizer{strategy: strategy} +func pertrubWithNormalGaussian(orig float64, percentage float64) float64 { + perturb := rand.NormFloat64() * percentage * orig + return orig + perturb } diff --git a/protocol/provideroptimizer/provider_optimizer_test.go b/protocol/provideroptimizer/provider_optimizer_test.go new file mode 100644 index 0000000000..73ca9e2100 --- /dev/null +++ b/protocol/provideroptimizer/provider_optimizer_test.go @@ -0,0 +1,397 @@ +package provideroptimizer + +import ( + "math/rand" + "strconv" + "testing" + "time" + + "github.com/lavanet/lava/utils" + spectypes "github.com/lavanet/lava/x/spec/types" + "github.com/stretchr/testify/require" +) + +const ( + TEST_AVERAGE_BLOCK_TIME = 10 * time.Second + TEST_BASE_WORLD_LATENCY = 150 * time.Millisecond +) + +func setupProviderOptimizer(maxProvidersCount int) *ProviderOptimizer { + averageBlockTIme := TEST_AVERAGE_BLOCK_TIME + baseWorldLatency := TEST_BASE_WORLD_LATENCY + return NewProviderOptimizer(STRATEGY_BALANCED, averageBlockTIme, baseWorldLatency, 1) +} + +type providersGenerator struct { + providersAddresses []string +} + +func (pg *providersGenerator) setupProvidersForTest(count int) *providersGenerator { + pg.providersAddresses = make([]string, count) + for i := range pg.providersAddresses { + pg.providersAddresses[i] = "lava@test_" + strconv.Itoa(i) + } + return pg +} + +func TestProbabilitiesCalculations(t *testing.T) { + value := cumulativeProbabilityFunctionForPoissonDist(1, 10) + value2 := cumulativeProbabilityFunctionForPoissonDist(10, 10) + require.Greater(t, value2, value) +} + +func TestProviderOptimizerSetGet(t *testing.T) { + providerOptimizer := setupProviderOptimizer(1) + providersGen := (&providersGenerator{}).setupProvidersForTest(1) + providerAddress := providersGen.providersAddresses[0] + for i := 0; i < 100; i++ { + providerData := ProviderData{SyncBlock: uint64(i)} + address := providerAddress + strconv.Itoa(i) + set := providerOptimizer.providersStorage.Set(address, providerData, 1) + if set == false { + utils.LavaFormatWarning("set in cache dropped", nil) + } else { + utils.LavaFormatDebug("successfully set", utils.Attribute{Key: "entry", Value: address}) + } + } + time.Sleep(2 * time.Millisecond) + for i := 0; i < 100; i++ { + address := providerAddress + strconv.Itoa(i) + providerData, found := providerOptimizer.getProviderData(address) + require.Equal(t, uint64(i), providerData.SyncBlock, "failed getting entry %s", address) + require.True(t, found) + } +} + +func TestProviderOptimizerBasic(t *testing.T) { + providerOptimizer := setupProviderOptimizer(1) + providersGen := (&providersGenerator{}).setupProvidersForTest(10) + + requestCU := uint64(10) + requestBlock := int64(1000) + pertrubationPercentage := 0.0 + + returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + require.Equal(t, 1, len(returnedProviders)) + providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[1], TEST_BASE_WORLD_LATENCY*3, true) + time.Sleep(2 * time.Millisecond) + returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + require.Equal(t, 1, len(returnedProviders)) + require.NotEqual(t, returnedProviders[0], providersGen.providersAddresses[1]) // we shouldn't pick the wrong provider + providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[0], TEST_BASE_WORLD_LATENCY/2, true) + time.Sleep(2 * time.Millisecond) + returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + require.Equal(t, 1, len(returnedProviders)) + require.Equal(t, providersGen.providersAddresses[0], returnedProviders[0]) // we should pick the best provider +} + +func TestProviderOptimizerBasicRelayData(t *testing.T) { + providerOptimizer := setupProviderOptimizer(1) + providersGen := (&providersGenerator{}).setupProvidersForTest(10) + + requestCU := uint64(1) + requestBlock := int64(1000) + pertrubationPercentage := 0.0 + syncBlock := uint64(requestBlock) + + providerOptimizer.AppendRelayData(providersGen.providersAddresses[1], TEST_BASE_WORLD_LATENCY*4, false, true, requestCU, syncBlock) + returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + require.Equal(t, 1, len(returnedProviders)) + require.NotEqual(t, returnedProviders[0], providersGen.providersAddresses[1]) // we shouldn't pick the wrong provider + providerOptimizer.AppendRelayData(providersGen.providersAddresses[0], TEST_BASE_WORLD_LATENCY/4, false, true, requestCU, syncBlock) + returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + require.Equal(t, 1, len(returnedProviders)) + require.Equal(t, providersGen.providersAddresses[0], returnedProviders[0]) // we should pick the best provider +} + +func TestProviderOptimizerAvailability(t *testing.T) { + providerOptimizer := setupProviderOptimizer(1) + providersCount := 100 + providersGen := (&providersGenerator{}).setupProvidersForTest(providersCount) + + requestCU := uint64(10) + requestBlock := int64(1000) + pertrubationPercentage := 0.0 + skipIndex := rand.Intn(providersCount) + for i := range providersGen.providersAddresses { + // give all providers a worse availability score + if i == skipIndex { + // skip 0 + continue + } + providerOptimizer.AppendProbeRelayData(providersGen.providersAddresses[i], TEST_BASE_WORLD_LATENCY, false) + } + time.Sleep(2 * time.Millisecond) + returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + require.Equal(t, 1, len(returnedProviders)) + require.Equal(t, providersGen.providersAddresses[skipIndex], returnedProviders[0]) + returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[skipIndex]: {}}, requestCU, requestBlock, pertrubationPercentage) + require.Equal(t, 1, len(returnedProviders)) + require.NotEqual(t, providersGen.providersAddresses[skipIndex], returnedProviders[0]) +} + +func TestProviderOptimizerAvailabilityRelayData(t *testing.T) { + providerOptimizer := setupProviderOptimizer(1) + providersCount := 100 + providersGen := (&providersGenerator{}).setupProvidersForTest(providersCount) + requestCU := uint64(10) + requestBlock := int64(1000) + pertrubationPercentage := 0.0 + syncBlock := uint64(requestBlock) + skipIndex := rand.Intn(providersCount) + for i := range providersGen.providersAddresses { + // give all providers a worse availability score + if i == skipIndex { + // skip one provider + continue + } + providerOptimizer.AppendRelayData(providersGen.providersAddresses[i], TEST_BASE_WORLD_LATENCY, false, false, requestCU, syncBlock) + } + time.Sleep(2 * time.Millisecond) + returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + require.Equal(t, 1, len(returnedProviders)) + require.Equal(t, providersGen.providersAddresses[skipIndex], returnedProviders[0]) + returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[skipIndex]: {}}, requestCU, requestBlock, pertrubationPercentage) + require.Equal(t, 1, len(returnedProviders)) + require.NotEqual(t, providersGen.providersAddresses[skipIndex], returnedProviders[0]) +} + +func TestProviderOptimizerAvailabilityBlockError(t *testing.T) { + providerOptimizer := setupProviderOptimizer(1) + providersCount := 10 + providersGen := (&providersGenerator{}).setupProvidersForTest(providersCount) + + requestCU := uint64(10) + requestBlock := int64(1000) + pertrubationPercentage := 0.0 + syncBlock := uint64(requestBlock) + chosenIndex := rand.Intn(providersCount) + for i := range providersGen.providersAddresses { + time.Sleep(2 * time.Millisecond) + // give all providers a worse availability score + if i == chosenIndex { + // give better syncBlock, worse latency by a little + providerOptimizer.AppendRelayData(providersGen.providersAddresses[i], TEST_BASE_WORLD_LATENCY+10*time.Millisecond, false, true, requestCU, syncBlock) + continue + } + providerOptimizer.AppendRelayData(providersGen.providersAddresses[i], TEST_BASE_WORLD_LATENCY, false, true, requestCU, syncBlock-1) // update that he doesn't have the latest requested block + } + time.Sleep(2 * time.Millisecond) + returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + require.Equal(t, 1, len(returnedProviders)) + require.Equal(t, providersGen.providersAddresses[chosenIndex], returnedProviders[0]) + // now try to get a previous block, our chosenIndex should be inferior in latency and blockError chance should be the same + returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock-1, pertrubationPercentage) + require.Equal(t, 1, len(returnedProviders)) + require.NotEqual(t, providersGen.providersAddresses[chosenIndex], returnedProviders[0]) +} + +func TestProviderOptimizerUpdatingLatency(t *testing.T) { + providerOptimizer := setupProviderOptimizer(1) + providersCount := 2 + providersGen := (&providersGenerator{}).setupProvidersForTest(providersCount) + providerAddress := providersGen.providersAddresses[0] + requestCU := uint64(10) + requestBlock := int64(1000) + syncBlock := uint64(requestBlock) + // in this test we are repeatedly adding better results, and latency score should improve + for i := 0; i < 10; i++ { + providerData, _ := providerOptimizer.getProviderData(providerAddress) + currentLatencyScore := providerOptimizer.calculateLatencyScore(providerData, requestCU, requestBlock) + providerOptimizer.AppendProbeRelayData(providerAddress, TEST_BASE_WORLD_LATENCY, true) + time.Sleep(2 * time.Millisecond) + providerData, found := providerOptimizer.getProviderData(providerAddress) + require.True(t, found) + newLatencyScore := providerOptimizer.calculateLatencyScore(providerData, requestCU, requestBlock) + require.Greater(t, currentLatencyScore, newLatencyScore, i) + } + providerAddress = providersGen.providersAddresses[1] + for i := 0; i < 10; i++ { + providerData, _ := providerOptimizer.getProviderData(providerAddress) + currentLatencyScore := providerOptimizer.calculateLatencyScore(providerData, requestCU, requestBlock) + providerOptimizer.AppendRelayData(providerAddress, TEST_BASE_WORLD_LATENCY, false, true, requestCU, syncBlock) + time.Sleep(2 * time.Millisecond) + providerData, found := providerOptimizer.getProviderData(providerAddress) + require.True(t, found) + newLatencyScore := providerOptimizer.calculateLatencyScore(providerData, requestCU, requestBlock) + require.Greater(t, currentLatencyScore, newLatencyScore, i) + } +} + +func TestProviderOptimizerStrategiesProviderCount(t *testing.T) { + providerOptimizer := setupProviderOptimizer(3) + providersCount := 5 + providersGen := (&providersGenerator{}).setupProvidersForTest(providersCount) + requestCU := uint64(10) + requestBlock := int64(1000) + syncBlock := uint64(requestBlock) + pertrubationPercentage := 0.0 + // set a basic state for all of them + for i := 0; i < 10; i++ { + for _, address := range providersGen.providersAddresses { + providerOptimizer.AppendRelayData(address, TEST_BASE_WORLD_LATENCY*2, false, true, requestCU, syncBlock) + } + time.Sleep(2 * time.Millisecond) + } + testProvidersCount := func(iterations int) float64 { + exploration := 0.0 + for i := 0; i < iterations; i++ { + returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + if len(returnedProviders) > 1 { + exploration++ + } + } + return exploration + } + + // with a cost strategy we expect only one provider, two with a chance of 1/100 + providerOptimizer.strategy = STRATEGY_COST + providerOptimizer.wantedNumProvidersInConcurrency = 2 + iterations := 10000 + exploration := testProvidersCount(iterations) + require.Less(t, exploration, float64(1.3)*float64(iterations*providersCount)*COST_EXPLORATION_CHANCE) // allow mistake buffer of 30% because of randomness + + // with a cost strategy we expect only one provider, two with a chance of 10/100 + providerOptimizer.strategy = STRATEGY_BALANCED + exploration = testProvidersCount(iterations) + require.Greater(t, exploration, float64(1.3)*float64(iterations*providersCount)/100.0) + require.Less(t, exploration, float64(1.3)*float64(iterations*providersCount)*DEFAULT_EXPLORATION_CHANCE) // allow mistake buffer of 30% because of randomness + + providerOptimizer.strategy = STRATEGY_PRIVACY + exploration = testProvidersCount(iterations) + require.Equal(t, exploration, float64(0)) +} + +func TestProviderOptimizerSyncScore(t *testing.T) { + providerOptimizer := setupProviderOptimizer(1) + providersGen := (&providersGenerator{}).setupProvidersForTest(10) + + requestCU := uint64(10) + requestBlock := spectypes.LATEST_BLOCK + pertrubationPercentage := 0.0 + syncBlock := uint64(1000) + + chosenIndex := rand.Intn(len(providersGen.providersAddresses)) + for i := range providersGen.providersAddresses { + if i == chosenIndex { + // give better syncBlock, latency is a tiny bit worse for the second check + providerOptimizer.AppendRelayData(providersGen.providersAddresses[i], TEST_BASE_WORLD_LATENCY*2+1*time.Microsecond, false, true, requestCU, syncBlock+5) + continue + } + providerOptimizer.AppendRelayData(providersGen.providersAddresses[i], TEST_BASE_WORLD_LATENCY*2, false, true, requestCU, syncBlock) // update that he doesn't have the latest requested block + } + time.Sleep(2 * time.Millisecond) + returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + require.Equal(t, 1, len(returnedProviders)) + require.Equal(t, providersGen.providersAddresses[chosenIndex], returnedProviders[0]) // we should pick the best sync score + + returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, int64(syncBlock), pertrubationPercentage) + require.Equal(t, 1, len(returnedProviders)) + require.NotEqual(t, providersGen.providersAddresses[chosenIndex], returnedProviders[0]) // sync score doesn't matter now +} + +func TestProviderOptimizerStrategiesScoring(t *testing.T) { + providerOptimizer := setupProviderOptimizer(1) + providersCount := 5 + providersGen := (&providersGenerator{}).setupProvidersForTest(providersCount) + requestCU := uint64(10) + requestBlock := spectypes.LATEST_BLOCK + syncBlock := uint64(1000) + pertrubationPercentage := 0.0 + // set a basic state for all of them + for i := 0; i < 10; i++ { + for _, address := range providersGen.providersAddresses { + providerOptimizer.AppendRelayData(address, TEST_BASE_WORLD_LATENCY*2, false, true, requestCU, syncBlock) + } + time.Sleep(2 * time.Millisecond) + } + time.Sleep(2 * time.Millisecond) + // provider 2 doesn't get a probe availability hit, this is the most meaningful factor + for idx, address := range providersGen.providersAddresses { + if idx != 2 { + providerOptimizer.AppendProbeRelayData(address, TEST_BASE_WORLD_LATENCY*2, false) + time.Sleep(2 * time.Millisecond) + } + providerOptimizer.AppendProbeRelayData(address, TEST_BASE_WORLD_LATENCY*2, true) + time.Sleep(2 * time.Millisecond) + providerOptimizer.AppendProbeRelayData(address, TEST_BASE_WORLD_LATENCY*2, false) + time.Sleep(2 * time.Millisecond) + providerOptimizer.AppendProbeRelayData(address, TEST_BASE_WORLD_LATENCY*2, true) + time.Sleep(2 * time.Millisecond) + providerOptimizer.AppendProbeRelayData(address, TEST_BASE_WORLD_LATENCY*2, true) + time.Sleep(2 * time.Millisecond) + } + + // provider 0 gets a good latency + providerOptimizer.AppendRelayData(providersGen.providersAddresses[0], TEST_BASE_WORLD_LATENCY/1000, false, true, requestCU, syncBlock) + // providers 3,4 get a regular entry + providerOptimizer.AppendRelayData(providersGen.providersAddresses[3], TEST_BASE_WORLD_LATENCY*2, false, true, requestCU, syncBlock) + providerOptimizer.AppendRelayData(providersGen.providersAddresses[4], TEST_BASE_WORLD_LATENCY*2, false, true, requestCU, syncBlock) + // provider 1 gets a good sync + providerOptimizer.AppendRelayData(providersGen.providersAddresses[1], TEST_BASE_WORLD_LATENCY*2, false, true, requestCU, syncBlock+100) + + time.Sleep(2 * time.Millisecond) + providerOptimizer.strategy = STRATEGY_BALANCED + // a balanced strategy should pick provider 0 + returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + require.Equal(t, 1, len(returnedProviders)) + require.Equal(t, providersGen.providersAddresses[2], returnedProviders[0]) + + providerOptimizer.strategy = STRATEGY_COST + // with a cost strategy we expect the same as balanced + returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + require.Equal(t, 1, len(returnedProviders)) + require.Equal(t, providersGen.providersAddresses[2], returnedProviders[0]) + + providerOptimizer.strategy = STRATEGY_LATENCY + // latency strategy should pick the best latency + returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[2]: {}}, requestCU, requestBlock, pertrubationPercentage) + require.Equal(t, 1, len(returnedProviders)) + require.Equal(t, providersGen.providersAddresses[0], returnedProviders[0]) + + providerOptimizer.strategy = STRATEGY_SYNC_FRESHNESS + returnedProviders = providerOptimizer.ChooseProvider(providersGen.providersAddresses, map[string]struct{}{providersGen.providersAddresses[2]: {}}, requestCU, requestBlock, pertrubationPercentage) + require.Equal(t, 1, len(returnedProviders)) + require.Equal(t, providersGen.providersAddresses[1], returnedProviders[0]) +} + +func TestProviderOptimizerPerturbation(t *testing.T) { + providerOptimizer := setupProviderOptimizer(1) + providersCount := 10 + providersGen := (&providersGenerator{}).setupProvidersForTest(providersCount) + requestCU := uint64(10) + requestBlock := spectypes.LATEST_BLOCK + syncBlock := uint64(1000) + pertrubationPercentage := 0.05 + // set a basic state for all of them + for i := 0; i < 10; i++ { + for idx, address := range providersGen.providersAddresses { + if idx < len(providersGen.providersAddresses)/2 { + // first half are good + providerOptimizer.AppendRelayData(address, TEST_BASE_WORLD_LATENCY, false, true, requestCU, syncBlock) + } else { + // second half are bad + providerOptimizer.AppendRelayData(address, TEST_BASE_WORLD_LATENCY*10, false, true, requestCU, syncBlock) + } + } + } + seed := time.Now().UnixNano() + rand.Seed(seed) + utils.LavaFormatDebug("rand seed", utils.Attribute{Key: "seed", Value: seed}) + same := 0 + chosenProvider := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, 0)[0] + runs := 1000 + for i := 0; i < runs; i++ { + returnedProviders := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock, pertrubationPercentage) + require.Equal(t, 1, len(returnedProviders)) + if chosenProvider == returnedProviders[0] { + same++ + } + for idx, address := range providersGen.providersAddresses { + if address == returnedProviders[0] { + require.Less(t, idx, len(providersGen.providersAddresses)/2, returnedProviders[0]) + } + } + } + require.Less(t, same, runs/2) +} diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index cae4003f24..25107809db 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -76,7 +76,12 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, txFactory tx.Factory, client if err != nil { utils.LavaFormatFatal("failed unmarshaling public address", err, utils.Attribute{Key: "keyName", Value: keyName}, utils.Attribute{Key: "pubkey", Value: clientKey.GetPubKey().Address()}) } - + // we want one provider optimizer per chain so we will store them for reuse across rpcEndpoints + chainMutexes := map[string]*sync.Mutex{} + for _, endpoint := range rpcEndpoints { + chainMutexes[endpoint.ChainID] = &sync.Mutex{} // create a mutex per chain for shared resources + } + var optimizers sync.Map var wg sync.WaitGroup parallelJobs := len(rpcEndpoints) wg.Add(parallelJobs) @@ -87,22 +92,51 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, txFactory tx.Factory, client for _, rpcEndpoint := range rpcEndpoints { go func(rpcEndpoint *lavasession.RPCEndpoint) error { defer wg.Done() - strategy := provideroptimizer.STRATEGY_QOS - optimizer := provideroptimizer.NewProviderOptimizer(strategy) - consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer) - rpcc.consumerStateTracker.RegisterConsumerSessionManagerForPairingUpdates(ctx, consumerSessionManager) chainParser, err := chainlib.NewChainParser(rpcEndpoint.ApiInterface) if err != nil { err = utils.LavaFormatError("failed creating chain parser", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint}) errCh <- err return err } - err = consumerStateTracker.RegisterChainParserForSpecUpdates(ctx, chainParser, rpcEndpoint.ChainID) + chainID := rpcEndpoint.ChainID + err = consumerStateTracker.RegisterChainParserForSpecUpdates(ctx, chainParser, chainID) if err != nil { err = utils.LavaFormatError("failed registering for spec updates", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint}) errCh <- err return err } + _, averageBlockTime, _, _ := chainParser.ChainBlockStats() + var optimizer *provideroptimizer.ProviderOptimizer + + getOrCreateOptimizer := func() error { + // this is locked so we don't race optimizers creation + chainMutexes[chainID].Lock() + defer chainMutexes[chainID].Unlock() + value, exists := optimizers.Load(chainID) + if !exists { + // doesn't exist for this chain create a new one + strategy := provideroptimizer.STRATEGY_BALANCED + baseLatency := commonlib.AverageWorldLatency / 2 // we want performance to be half our timeout or better + optimizer = provideroptimizer.NewProviderOptimizer(strategy, averageBlockTime, baseLatency, 3) + optimizers.Store(chainID, optimizer) + } else { + var ok bool + optimizer, ok = value.(*provideroptimizer.ProviderOptimizer) + if !ok { + err = utils.LavaFormatError("failed loading optimizer, value is of the wrong type", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint.Key()}) + return err + } + } + return nil + } + err = getOrCreateOptimizer() + if err != nil { + errCh <- err + return err + } + consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer) + rpcc.consumerStateTracker.RegisterConsumerSessionManagerForPairingUpdates(ctx, consumerSessionManager) + finalizationConsensus := &lavaprotocol.FinalizationConsensus{} consumerStateTracker.RegisterFinalizationConsensusForUpdates(ctx, finalizationConsensus) rpcConsumerServer := &RPCConsumerServer{} diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 031835d024..7bb591f658 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -232,7 +232,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( if chainMessage.GetInterface().Category.HangingApi { _, extraRelayTimeout, _, _ = rpccs.chainParser.ChainBlockStats() } - relayTimeout := extraRelayTimeout + lavaprotocol.GetTimePerCu(singleConsumerSession.LatestRelayCu) + lavasession.AverageWorldLatency + relayTimeout := extraRelayTimeout + common.GetTimePerCu(singleConsumerSession.LatestRelayCu) + common.AverageWorldLatency relayResult, relayLatency, err, backoff := rpccs.relayInner(ctx, singleConsumerSession, relayResult, relayTimeout) if err != nil { failRelaySession := func(origErr error, backoff_ bool) { @@ -259,7 +259,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( // set cache in a non blocking call go func() { new_ctx := context.Background() - new_ctx, cancel := context.WithTimeout(new_ctx, chainlib.DataReliabilityTimeoutIncrease) + new_ctx, cancel := context.WithTimeout(new_ctx, common.DataReliabilityTimeoutIncrease) defer cancel() err2 := rpccs.cache.SetEntry(new_ctx, relayRequest, chainMessage.GetInterface().Interface, nil, chainID, dappID, relayResult.Reply, relayResult.Finalized) // caching in the portal doesn't care about hashes if err2 != nil && !performance.NotInitialisedError.Is(err2) { @@ -412,7 +412,7 @@ func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context return nil, utils.LavaFormatError("failed creating data reliability relay", err, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "relayRequestData", Value: relayResult.Request.RelayData}) } relayResult = &lavaprotocol.RelayResult{Request: reliabilityRequest, ProviderAddress: providerAddress, Finalized: false} - relayTimeout := lavaprotocol.GetTimePerCu(singleConsumerSession.LatestRelayCu) + lavasession.AverageWorldLatency + chainlib.DataReliabilityTimeoutIncrease + relayTimeout := common.GetTimePerCu(singleConsumerSession.LatestRelayCu) + common.DataReliabilityTimeoutIncrease relayResult, dataReliabilityLatency, err, backoff := rpccs.relayInner(ctx, singleConsumerSession, relayResult, relayTimeout) if err != nil { failRelaySession := func(origErr error, backoff_ bool) { @@ -428,7 +428,7 @@ func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context } } go failRelaySession(err, backoff) - return nil, utils.LavaFormatError("sendReliabilityRelay Could not get reply to reliability relay from provider", err, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "Address", Value: providerAddress}) + return nil, utils.LavaFormatError("send Reliability Relay Could not get reply to reliability relay from provider", err, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "Address", Value: providerAddress}) } expectedBH, numOfProviders := rpccs.finalizationConsensus.ExpectedBlockHeight(rpccs.chainParser) diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index 44a7423a56..5d47a2c65c 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -139,7 +139,6 @@ func (rpcp *RPCProvider) Start(ctx context.Context, txFactory tx.Factory, client return utils.LavaFormatError("panic severity critical error, aborting support for chain api due to invalid chain parser, continuing with others", err, utils.Attribute{Key: "endpoint", Value: rpcProviderEndpoint.String()}) } providerStateTracker.RegisterChainParserForSpecUpdates(ctx, chainParser, chainID) - chainProxy, err := chainlib.GetChainProxy(ctx, parallelConnections, rpcProviderEndpoint, chainParser) if err != nil { disabledEndpoints <- rpcProviderEndpoint @@ -183,6 +182,7 @@ func (rpcp *RPCProvider) Start(ctx context.Context, txFactory tx.Factory, client disabledEndpoints <- rpcProviderEndpoint return err } + reliabilityManager := reliabilitymanager.NewReliabilityManager(chainTracker, providerStateTracker, addr.String(), chainProxy, chainParser) providerStateTracker.RegisterReliabilityManagerForVoteUpdates(ctx, reliabilityManager, rpcProviderEndpoint) diff --git a/utils/lavalog.go b/utils/lavalog.go index b33d799ce8..74e8b85e70 100644 --- a/utils/lavalog.go +++ b/utils/lavalog.go @@ -140,7 +140,6 @@ func LavaFormatLog(description string, err error, attributes []Attribute, severi st_val = strconv.FormatUint(value, 10) case error: st_val = value.Error() - case fmt.Stringer: st_val = value.String() // needs to come after stringer so byte inheriting objects will use their string method if implemented (like AccAddress) diff --git a/utils/score/decay_score.go b/utils/score/decay_score.go new file mode 100644 index 0000000000..cdf183b241 --- /dev/null +++ b/utils/score/decay_score.go @@ -0,0 +1,40 @@ +package score + +import ( + "math" + "time" +) + +type ScoreStore struct { + Num float64 // for performance i didn't use math/big rationale arithmetic + Denom float64 + Time time.Time +} + +func NewScoreStore(num float64, denom float64, inpTime time.Time) ScoreStore { + return ScoreStore{Num: num, Denom: denom, Time: inpTime} +} + +// CalculateTimeDecayFunctionUpdate calculates the time decayed score update between two ScoreStore entries. +// It uses a decay function with a half life of halfLife to factor in the time elapsed since the oldScore was recorded. +// Both the numerator and the denominator of the newScore are decayed by this function. +// Additionally, the newScore is factored by a weight of updateWeight. +// The function returns a new ScoreStore entry with the updated numerator, denominator, and current time. +// +// The mathematical equation used to calculate the update is: +// +// updatedNum = oldScore.Num*exp(-(now-oldScore.Time)/halfLife) + newScore.Num*exp(-(now-newScore.Time)/halfLife)*updateWeight +// updatedDenom = oldScore.Denom*exp(-(now-oldScore.Time)/halfLife) + newScore.Denom*exp(-(now-newScore.Time)/halfLife)*updateWeight +// +// where now is the current time. +// +// Note that the returned ScoreStore has a new Time field set to the current time. +func CalculateTimeDecayFunctionUpdate(oldScore ScoreStore, newScore ScoreStore, halfLife time.Duration, updateWeight float64) ScoreStore { + oldDecayExponent := math.Ln2 * time.Since(oldScore.Time).Seconds() / halfLife.Seconds() + oldDecayFactor := math.Exp(-oldDecayExponent) + newDecayExponent := math.Ln2 * time.Since(newScore.Time).Seconds() / halfLife.Seconds() + newDecayFactor := math.Exp(-newDecayExponent) + updatedNum := oldScore.Num*oldDecayFactor + newScore.Num*newDecayFactor*updateWeight + updatedDenom := oldScore.Denom*oldDecayFactor + newScore.Denom*newDecayFactor*updateWeight + return NewScoreStore(updatedNum, updatedDenom, time.Now()) +}