Skip to content

Commit

Permalink
PRT-add-flag-to-export-jail-reports (lavanet#1226)
Browse files Browse the repository at this point in the history
* wip

* add reports for conflicts

* added tests and nil safety

* added aggregation, and added referrer client aggregation

* lint

* send referer only when a relay succeeds

* fix dup ports in full_consumer_example

* add nil reference support

* check for nil just in case

* added debug prints

* added logging for health check
  • Loading branch information
omerlavanet authored Feb 18, 2024
1 parent 95633aa commit 9faee81
Show file tree
Hide file tree
Showing 24 changed files with 691 additions and 220 deletions.
8 changes: 5 additions & 3 deletions config/consumer_examples/full_consumer_example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,6 @@ endpoints:
- chain-id: AGR
api-interface: grpc
network-address: 127.0.0.1:3387
- chain-id: KOIIT
api-interface: jsonrpc
network-address: 127.0.0.1:3388
- chain-id: AGR
api-interface: tendermintrpc
network-address: 127.0.0.1:3388
Expand All @@ -149,4 +146,9 @@ endpoints:
- chain-id: AGRT
api-interface: tendermintrpc
network-address: 127.0.0.1:3391
- chain-id: KOIIT
api-interface: jsonrpc
network-address: 127.0.0.1:3392
metrics-listen-address: ":7779"
# referer-be-address: "http://127.0.0.1:6500"
# reports-be-address: "http://127.0.0.1:6501"
2 changes: 0 additions & 2 deletions config/health_examples/health_template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ consumer_endpoints:
- chain-id: ETH1
api-interface: jsonrpc
network-address: http://127.0.0.1:3333
addons:
- debug
- chain-id: LAV1
api-interface: rest
network-address: http://127.0.0.1:3360
Expand Down
28 changes: 7 additions & 21 deletions protocol/chainlib/common.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package chainlib

import (
"bytes"
"encoding/json"
"fmt"
"net"
Expand Down Expand Up @@ -374,33 +373,20 @@ func ValidateNilResponse(responseString string) error {
}

type RefererData struct {
Address string
Marker string
Address string
Marker string
ReferrerClient *metrics.ConsumerReferrerClient
}

func (rd *RefererData) SendReferer(refererMatchString string) error {
if rd == nil || rd.Address == "" {
return nil
}
utils.LavaFormatDebug("referer detected", utils.LogAttr("referer", refererMatchString))
payload := map[string]interface{}{}
payload["referer-id"] = refererMatchString
payloadBytes, err := json.Marshal(payload)
if err != nil {
return utils.LavaFormatError("failed marshaling payload for Referer", err)
}
req, err := http.NewRequest("POST", rd.Address, bytes.NewBuffer(payloadBytes))
if err != nil {
return utils.LavaFormatError("failed building a request for referer", err)
if rd.ReferrerClient == nil {
return nil
}
req.Header.Set("Content-Type", "application/json")

// Make the HTTP request
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return utils.LavaFormatDebug("failed sending http request", utils.LogAttr("error", err))
}
defer resp.Body.Close()
utils.LavaFormatDebug("referer detected", utils.LogAttr("referer", refererMatchString))
rd.ReferrerClient.AppendReferrer(metrics.NewReferrerRequest(refererMatchString))
return nil
}
10 changes: 5 additions & 5 deletions protocol/chainlib/jsonRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,6 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context, cmdFlags common.Con
apil.logger.AnalyzeWebSocketErrorAndWriteMessage(websockConn, messageType, nil, msgSeed, []byte("Unable to extract dappID"), spectypes.APIInterfaceJsonRPC, time.Since(startTime))
}
refererMatch, ok := websockConn.Locals(refererMatchString).(string)
if ok && refererMatch != "" && apil.refererData != nil {
go apil.refererData.SendReferer(refererMatch)
}
ctx, cancel := context.WithCancel(context.Background())
guid := utils.GenerateUniqueIdentifier()
ctx = utils.WithUniqueIdentifier(ctx, guid)
Expand All @@ -350,6 +347,9 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context, cmdFlags common.Con
utils.LavaFormatDebug("ws in <<<", utils.Attribute{Key: "seed", Value: msgSeed}, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "msg", Value: msg}, utils.Attribute{Key: "dappID", Value: dappID})
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
relayResult, err := apil.relaySender.SendRelay(ctx, "", string(msg), http.MethodPost, dappID, websockConn.RemoteAddr().String(), metricsData, nil)
if ok && refererMatch != "" && apil.refererData != nil && err == nil {
go apil.refererData.SendReferer(refererMatch)
}
reply := relayResult.GetReply()
replyServer := relayResult.GetReplyServer()
go apil.logger.AddMetricForWebSocket(metricsData, err, websockConn)
Expand Down Expand Up @@ -428,10 +428,10 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context, cmdFlags common.Con
utils.LogAttr("headers", headers),
)
refererMatch := fiberCtx.Params(refererMatchString, "")
if refererMatch != "" && apil.refererData != nil {
relayResult, err := apil.relaySender.SendRelay(ctx, "", string(fiberCtx.Body()), http.MethodPost, dappID, consumerIp, metricsData, headers)
if refererMatch != "" && apil.refererData != nil && err == nil {
go apil.refererData.SendReferer(refererMatch)
}
relayResult, err := apil.relaySender.SendRelay(ctx, "", string(fiberCtx.Body()), http.MethodPost, dappID, consumerIp, metricsData, headers)
reply := relayResult.GetReply()
go apil.logger.AddMetricForHttp(metricsData, err, fiberCtx.GetReqHeaders())
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions protocol/chainlib/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,11 +301,11 @@ func (apil *RestChainListener) Serve(ctx context.Context, cmdFlags common.Consum
utils.LogAttr("headers", restHeaders),
)
refererMatch := fiberCtx.Params(refererMatchString, "")
if refererMatch != "" && apil.refererData != nil {
go apil.refererData.SendReferer(refererMatch)
}
requestBody := string(fiberCtx.Body())
relayResult, err := apil.relaySender.SendRelay(ctx, path+query, requestBody, http.MethodPost, dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), analytics, restHeaders)
if refererMatch != "" && apil.refererData != nil && err == nil {
go apil.refererData.SendReferer(refererMatch)
}
reply := relayResult.GetReply()
go apil.logger.AddMetricForHttp(analytics, err, fiberCtx.GetReqHeaders())
if err != nil {
Expand Down Expand Up @@ -367,10 +367,10 @@ func (apil *RestChainListener) Serve(ctx context.Context, cmdFlags common.Consum
utils.LogAttr("headers", restHeaders),
)
refererMatch := fiberCtx.Params(refererMatchString, "")
if refererMatch != "" && apil.refererData != nil {
relayResult, err := apil.relaySender.SendRelay(ctx, path+query, "", fiberCtx.Method(), dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), analytics, restHeaders)
if refererMatch != "" && apil.refererData != nil && err == nil {
go apil.refererData.SendReferer(refererMatch)
}
relayResult, err := apil.relaySender.SendRelay(ctx, path+query, "", fiberCtx.Method(), dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), analytics, restHeaders)
reply := relayResult.GetReply()
go apil.logger.AddMetricForHttp(analytics, err, fiberCtx.GetReqHeaders())
if err != nil {
Expand Down
13 changes: 8 additions & 5 deletions protocol/chainlib/tendermintRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,11 +374,11 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context, cmdFlags comm
utils.LavaFormatInfo("ws in <<<", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "seed", Value: msgSeed}, utils.Attribute{Key: "msg", Value: msg}, utils.Attribute{Key: "dappID", Value: dappID})
msgSeed = strconv.FormatUint(guid, 10)
refererMatch, ok := websocketConn.Locals(refererMatchString).(string)
if ok && refererMatch != "" && apil.refererData != nil {
go apil.refererData.SendReferer(refererMatch)
}
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
relayResult, err := apil.relaySender.SendRelay(ctx, "", string(msg), "", dappID, websocketConn.RemoteAddr().String(), metricsData, nil)
if ok && refererMatch != "" && apil.refererData != nil && err == nil {
go apil.refererData.SendReferer(refererMatch)
}
reply := relayResult.GetReply()
replyServer := relayResult.GetReplyServer()
go apil.logger.AddMetricForWebSocket(metricsData, err, websocketConn)
Expand Down Expand Up @@ -451,10 +451,10 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context, cmdFlags comm
utils.LogAttr("headers", headers),
)
refererMatch := fiberCtx.Params(refererMatchString, "")
if refererMatch != "" && apil.refererData != nil {
relayResult, err := apil.relaySender.SendRelay(ctx, "", string(fiberCtx.Body()), "", dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), metricsData, headers)
if refererMatch != "" && apil.refererData != nil && err == nil {
go apil.refererData.SendReferer(refererMatch)
}
relayResult, err := apil.relaySender.SendRelay(ctx, "", string(fiberCtx.Body()), "", dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), metricsData, headers)
reply := relayResult.GetReply()
go apil.logger.AddMetricForHttp(metricsData, err, fiberCtx.GetReqHeaders())

Expand Down Expand Up @@ -515,6 +515,9 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context, cmdFlags comm
go apil.refererData.SendReferer(refererMatch)
}
relayResult, err := apil.relaySender.SendRelay(ctx, path+query, "", "", dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), metricsData, headers)
if refererMatch != "" && apil.refererData != nil && err == nil {
go apil.refererData.SendReferer(refererMatch)
}
msgSeed := strconv.FormatUint(guid, 10)
reply := relayResult.GetReply()
go apil.logger.AddMetricForHttp(metricsData, err, fiberCtx.GetReqHeaders())
Expand Down
38 changes: 18 additions & 20 deletions protocol/lavaprotocol/response_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,15 @@ func TestSignAndExtractResponse(t *testing.T) {
specId := "LAV1"
epoch := int64(100)
singleConsumerSession := &lavasession.SingleConsumerSession{
CuSum: 20,
LatestRelayCu: 10, // set by GetSessions cuNeededForSession
QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}},
SessionId: 123,
Parent: nil,
RelayNum: 1,
LatestBlock: epoch,
Endpoint: nil,
BlockListed: false, // if session lost sync we blacklist it.
ConsecutiveNumberOfFailures: 0, // number of times this session has failed
CuSum: 20,
LatestRelayCu: 10, // set by GetSessions cuNeededForSession
QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}},
SessionId: 123,
Parent: nil,
RelayNum: 1,
LatestBlock: epoch,
Endpoint: nil,
BlockListed: false, // if session lost sync we blacklist it.
}
metadataValue := make([]pairingtypes.Metadata, 1)
metadataValue[0] = pairingtypes.Metadata{
Expand Down Expand Up @@ -75,16 +74,15 @@ func TestSignAndExtractResponseLatest(t *testing.T) {
testSpecId := "BLAV1"
epoch := int64(100)
singleConsumerSession := &lavasession.SingleConsumerSession{
CuSum: 20,
LatestRelayCu: 10, // set by GetSessions cuNeededForSession
QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}},
SessionId: 123,
Parent: nil,
RelayNum: 1,
LatestBlock: epoch,
Endpoint: nil,
BlockListed: false, // if session lost sync we blacklist it.
ConsecutiveNumberOfFailures: 0, // number of times this session has failed
CuSum: 20,
LatestRelayCu: 10, // set by GetSessions cuNeededForSession
QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}},
SessionId: 123,
Parent: nil,
RelayNum: 1,
LatestBlock: epoch,
Endpoint: nil,
BlockListed: false, // if session lost sync we blacklist it.
}
metadataValue := make([]pairingtypes.Metadata, 1)
metadataValue[0] = pairingtypes.Metadata{
Expand Down
19 changes: 9 additions & 10 deletions protocol/lavaprotocol/reuqest_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ func TestSignAndExtract(t *testing.T) {
specId := "LAV1"
epoch := int64(100)
singleConsumerSession := &lavasession.SingleConsumerSession{
CuSum: 20,
LatestRelayCu: 10, // set by GetSessions cuNeededForSession
QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}},
SessionId: 123,
Parent: nil,
RelayNum: 1,
LatestBlock: epoch,
Endpoint: nil,
BlockListed: false, // if session lost sync we blacklist it.
ConsecutiveNumberOfFailures: 0, // number of times this session has failed
CuSum: 20,
LatestRelayCu: 10, // set by GetSessions cuNeededForSession
QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}},
SessionId: 123,
Parent: nil,
RelayNum: 1,
LatestBlock: epoch,
Endpoint: nil,
BlockListed: false, // if session lost sync we blacklist it.
}
metadataValue := make([]pairingtypes.Metadata, 1)
metadataValue[0] = pairingtypes.Metadata{
Expand Down
Loading

0 comments on commit 9faee81

Please sign in to comment.