Skip to content

Commit

Permalink
feat: PRT-adding relay debug headers (lavanet#1487)
Browse files Browse the repository at this point in the history
* adding relay debug headers

* adding reported providers and errored providers to debug relays header

* ai fix

* lint

* fixing buffer race issue for emergency mode test.

* for the foundation
  • Loading branch information
ranlavanet authored Jun 10, 2024
1 parent 278244e commit 39bcdb5
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 24 deletions.
4 changes: 3 additions & 1 deletion protocol/common/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ const (
RETRY_COUNT_HEADER_NAME = "Lava-Retries"
PROVIDER_LATEST_BLOCK_HEADER_NAME = "Provider-Latest-Block"
GUID_HEADER_NAME = "Lava-Guid"
ERRORED_PROVIDERS_HEADER_NAME = "Lava-Errored-Providers"
REPORTED_PROVIDERS_HEADER_NAME = "Lava-Reported-Providers"
// these headers need to be lowercase
BLOCK_PROVIDERS_ADDRESSES_HEADER_NAME = "lava-providers-block"
RELAY_TIMEOUT_HEADER_NAME = "lava-relay-timeout"
EXTENSION_OVERRIDE_HEADER_NAME = "lava-extension"
FORCE_CACHE_REFRESH_HEADER_NAME = "lava-force-cache-refresh"
LAVA_DEBUG = "lava-debug"
LAVA_DEBUG_RELAY = "lava-debug-relay"
// send http request to /lava/health to see if the process is up - (ret code 200)
DEFAULT_HEALTH_PATH = "/lava/health"
MAXIMUM_ALLOWED_TIMEOUT_EXTEND_MULTIPLIER_BY_THE_CONSUMER = 4
Expand Down
13 changes: 12 additions & 1 deletion protocol/lavasession/used_providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ func NewUsedProviders(directiveHeaders map[string]string) *UsedProviders {
}
}
}
return &UsedProviders{providers: map[string]struct{}{}, unwantedProviders: unwantedProviders, blockOnSyncLoss: map[string]struct{}{}}
return &UsedProviders{providers: map[string]struct{}{}, unwantedProviders: unwantedProviders, blockOnSyncLoss: map[string]struct{}{}, erroredProviders: map[string]struct{}{}}
}

type UsedProviders struct {
lock sync.RWMutex
providers map[string]struct{}
selecting bool
unwantedProviders map[string]struct{}
erroredProviders map[string]struct{} // providers who returned protocol errors (used to debug relays for now)
blockOnSyncLoss map[string]struct{}
sessionsLatestBatch int
}
Expand Down Expand Up @@ -100,6 +101,7 @@ func (up *UsedProviders) RemoveUsed(provider string, err error) {
up.lock.Lock()
defer up.lock.Unlock()
if err != nil {
up.erroredProviders[provider] = struct{}{}
if shouldRetryWithThisError(err) {
_, ok := up.blockOnSyncLoss[provider]
if !ok && IsSessionSyncLoss(err) {
Expand Down Expand Up @@ -185,6 +187,15 @@ func (up *UsedProviders) tryLockSelection() bool {
return false
}

func (up *UsedProviders) GetErroredProviders() map[string]struct{} {
if up == nil {
return map[string]struct{}{}
}
up.lock.RLock()
defer up.lock.RUnlock()
return up.erroredProviders
}

func (up *UsedProviders) GetUnwantedProvidersToSend() map[string]struct{} {
if up == nil {
return map[string]struct{}{}
Expand Down
41 changes: 38 additions & 3 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rpcconsumer
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -316,7 +317,7 @@ func (rpccs *RPCConsumerServer) SendRelay(
}

returnedResult, err := relayProcessor.ProcessingResult()
rpccs.appendHeadersToRelayResult(ctx, returnedResult, relayProcessor.ProtocolErrors())
rpccs.appendHeadersToRelayResult(ctx, returnedResult, relayProcessor.ProtocolErrors(), relayProcessor, directiveHeaders)
if err != nil {
return returnedResult, utils.LavaFormatError("failed processing responses from providers", err, utils.Attribute{Key: "GUID", Value: ctx}, utils.LogAttr("endpoint", rpccs.listenEndpoint.Key()))
}
Expand Down Expand Up @@ -975,7 +976,7 @@ func (rpccs *RPCConsumerServer) LavaDirectiveHeaders(metadata []pairingtypes.Met
case common.RELAY_TIMEOUT_HEADER_NAME:
case common.EXTENSION_OVERRIDE_HEADER_NAME:
case common.FORCE_CACHE_REFRESH_HEADER_NAME:
case common.LAVA_DEBUG:
case common.LAVA_DEBUG_RELAY:
headerDirectives[name] = metaElement.Value
default:
metadataRet = append(metadataRet, metaElement)
Expand Down Expand Up @@ -1014,7 +1015,7 @@ func (rpccs *RPCConsumerServer) HandleDirectiveHeadersForMessage(chainMessage ch
chainMessage.SetForceCacheRefresh(ok)
}

func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context, relayResult *common.RelayResult, protocolErrors uint64) {
func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context, relayResult *common.RelayResult, protocolErrors uint64, relayProcessor *RelayProcessor, directiveHeaders map[string]string) {
if relayResult == nil {
return
}
Expand Down Expand Up @@ -1068,6 +1069,40 @@ func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context,
}
relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, extensionMD)
}

_, debugRelays := directiveHeaders[common.LAVA_DEBUG_RELAY]
if debugRelays {
erroredProviders := relayProcessor.GetUsedProviders().GetErroredProviders()
if len(erroredProviders) > 0 {
erroredProvidersArray := make([]string, len(erroredProviders))
idx := 0
for providerAddress := range erroredProviders {
erroredProvidersArray[idx] = providerAddress
idx++
}
erroredProvidersString := fmt.Sprintf("%v", erroredProvidersArray)
erroredProvidersMD := pairingtypes.Metadata{
Name: common.ERRORED_PROVIDERS_HEADER_NAME,
Value: erroredProvidersString,
}
relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, erroredProvidersMD)
}

currentReportedProviders := rpccs.consumerSessionManager.GetReportedProviders(uint64(relayResult.Request.RelaySession.Epoch))
if len(currentReportedProviders) > 0 {
reportedProvidersArray := make([]string, len(currentReportedProviders))
for idx, providerAddress := range currentReportedProviders {
reportedProvidersArray[idx] = providerAddress.Address
}
reportedProvidersString := fmt.Sprintf("%v", reportedProvidersArray)
reportedProvidersMD := pairingtypes.Metadata{
Name: common.REPORTED_PROVIDERS_HEADER_NAME,
Value: reportedProvidersString,
}
relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, reportedProvidersMD)
}
}

relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, metadataReply...)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ $EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer3 --chain

wait_next_block

# screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer \
# 127.0.0.1:3360 LAV1 rest 127.0.0.1:3361 LAV1 tendermintrpc 127.0.0.1:3362 LAV1 grpc \
# $EXTRA_PORTAL_FLAGS --geolocation 1 --log_level debug --from user1 --chain-id lava --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25
screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer \
127.0.0.1:3360 LAV1 rest 127.0.0.1:3361 LAV1 tendermintrpc 127.0.0.1:3362 LAV1 grpc \
$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level debug --from user1 --chain-id lava --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25

echo "--- setting up screens done ---"
screen -ls
Expand Down
5 changes: 3 additions & 2 deletions testutil/e2e/allowedErrorList.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ var allowedErrors = map[string]string{
}

var allowedErrorsDuringEmergencyMode = map[string]string{
"connection refused": "Connection to tendermint port sometimes can happen as we shut down the node and we try to fetch info during emergency mode",
"connection reset by peer": "Connection to tendermint port sometimes can happen as we shut down the node and we try to fetch info during emergency mode",
"connection refused": "Connection to tendermint port sometimes can happen as we shut down the node and we try to fetch info during emergency mode",
"connection reset by peer": "Connection to tendermint port sometimes can happen as we shut down the node and we try to fetch info during emergency mode",
"Failed Querying EpochDetails": "Connection to tendermint port sometimes can happen as we shut down the node and we try to fetch info during emergency mode",
}

var allowedErrorsPaymentE2E = map[string]string{
Expand Down
4 changes: 2 additions & 2 deletions testutil/e2e/paymentE2E.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package e2e

import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -16,6 +15,7 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/lavanet/lava/cmd/lavad/cmd"
commonconsts "github.com/lavanet/lava/testutil/common/consts"
e2esdk "github.com/lavanet/lava/testutil/e2e/sdk"
"github.com/lavanet/lava/utils"
dualstakingTypes "github.com/lavanet/lava/x/dualstaking/types"
epochStorageTypes "github.com/lavanet/lava/x/epochstorage/types"
Expand Down Expand Up @@ -283,7 +283,7 @@ func runPaymentE2E(timeout time.Duration) {
protocolPath: gopath + lavapPath,
lavadArgs: "--geolocation 1 --log_level debug",
consumerArgs: " --allow-insecure-provider-dialing",
logs: make(map[string]*bytes.Buffer),
logs: make(map[string]*e2esdk.SafeBuffer),
commands: make(map[string]*exec.Cmd),
providerType: make(map[string][]epochStorageTypes.Endpoint),
logPath: protocolLogsFolder,
Expand Down
41 changes: 34 additions & 7 deletions testutil/e2e/protocolE2E.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
commonconsts "github.com/lavanet/lava/testutil/common/consts"
"github.com/lavanet/lava/testutil/e2e/sdk"
"github.com/lavanet/lava/utils"
epochStorageTypes "github.com/lavanet/lava/x/epochstorage/types"
pairingTypes "github.com/lavanet/lava/x/pairing/types"
Expand Down Expand Up @@ -64,7 +65,7 @@ type lavaTest struct {
protocolPath string
lavadArgs string
consumerArgs string
logs map[string]*bytes.Buffer
logs map[string]*sdk.SafeBuffer
commands map[string]*exec.Cmd
providerType map[string][]epochStorageTypes.Endpoint
wg sync.WaitGroup
Expand All @@ -86,7 +87,7 @@ func init() {

func (lt *lavaTest) execCommandWithRetry(ctx context.Context, funcName string, logName string, command string) {
utils.LavaFormatDebug("Executing command " + command)
lt.logs[logName] = new(bytes.Buffer)
lt.logs[logName] = &sdk.SafeBuffer{}

cmd := exec.CommandContext(ctx, "", "")
cmd.Args = strings.Fields(command)
Expand Down Expand Up @@ -129,7 +130,7 @@ func (lt *lavaTest) execCommand(ctx context.Context, funcName string, logName st
}
}()

lt.logs[logName] = new(bytes.Buffer)
lt.logs[logName] = &sdk.SafeBuffer{}

cmd := exec.CommandContext(ctx, "", "")
utils.LavaFormatInfo("Executing Command: " + command)
Expand Down Expand Up @@ -806,14 +807,16 @@ func (lt *lavaTest) saveLogs() {

lines := strings.Split(logBuffer.String(), "\n")
errorLines := []string{}
for _, line := range lines {
for idx, line := range lines {
if fileName == "00_StartLava" { // TODO remove this and solve the errors
break
}
if strings.Contains(line, EmergencyModeStartLine) {
utils.LavaFormatInfo("Found Emergency start line", utils.LogAttr("file", fileName), utils.LogAttr("Line index", idx))
reachedEmergencyModeLine = true
}
if strings.Contains(line, EmergencyModeEndLine) {
utils.LavaFormatInfo("Found Emergency end line", utils.LogAttr("file", fileName), utils.LogAttr("Line index", idx))
reachedEmergencyModeLine = false
}
if strings.Contains(line, " ERR ") || strings.Contains(line, "[Error]" /* sdk errors*/) {
Expand Down Expand Up @@ -885,7 +888,7 @@ func (lt *lavaTest) checkQoS() error {
for provider := range providerCU {
// Get sequence number of provider
logNameAcc := "8_authAccount" + fmt.Sprintf("%02d", providerIdx)
lt.logs[logNameAcc] = new(bytes.Buffer)
lt.logs[logNameAcc] = &sdk.SafeBuffer{}

fetchAccCommand := lt.lavadPath + " query account " + provider + " --output=json"
cmdAcc := exec.CommandContext(context.Background(), "", "")
Expand Down Expand Up @@ -921,7 +924,7 @@ func (lt *lavaTest) checkQoS() error {
sequence = strconv.Itoa(int(sequenceInt))
//
logName := "9_QoS_" + fmt.Sprintf("%02d", providerIdx)
lt.logs[logName] = new(bytes.Buffer)
lt.logs[logName] = &sdk.SafeBuffer{}

txQueryCommand := lt.lavadPath + " query tx --type=acc_seq " + provider + "/" + sequence

Expand Down Expand Up @@ -993,6 +996,30 @@ func (lt *lavaTest) markEmergencyModeLogsStart() {
if err != nil {
utils.LavaFormatError("Failed Writing to buffer", err, utils.LogAttr("key", log))
}

// Verify that the EmergencyModeStartLine is in the last 20 lines
contents := buffer.String()
lines := strings.Split(contents, "\n")
start := len(lines) - 21 // -21 because we want to check the last 20 lines, and -1 for 0-indexing
if start < 0 {
start = 0
}
last20Lines := lines[start : len(lines)-1] // Exclude the last empty string after the final split
// Check if EmergencyModeStartLine is present in the last 20 lines
found := false
indexFound := 0
for idx, line := range last20Lines {
if strings.Contains(line, EmergencyModeStartLine) {
found = true
indexFound = idx
break
}
}
if found {
utils.LavaFormatInfo("Successfully verified EmergencyMode Start Line in the last 20 lines", utils.LogAttr("log_name", log), utils.LogAttr("line", indexFound))
} else {
utils.LavaFormatError("Verification failed for EmergencyMode Start Line in the last 20 lines", nil, utils.LogAttr("log_name", log))
}
}
}

Expand Down Expand Up @@ -1191,7 +1218,7 @@ func runProtocolE2E(timeout time.Duration) {
protocolPath: gopath + "/bin/lavap",
lavadArgs: "--geolocation 1 --log_level debug",
consumerArgs: " --allow-insecure-provider-dialing",
logs: make(map[string]*bytes.Buffer),
logs: make(map[string]*sdk.SafeBuffer),
commands: make(map[string]*exec.Cmd),
providerType: make(map[string][]epochStorageTypes.Endpoint),
logPath: protocolLogsFolder,
Expand Down
34 changes: 32 additions & 2 deletions testutil/e2e/sdk/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,43 @@ import (
"os/exec"
"path/filepath"
"strings"
"sync"

"github.com/lavanet/lava/utils"
pairingTypes "github.com/lavanet/lava/x/pairing/types"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
)

type SafeBuffer struct {
buffer bytes.Buffer
mu sync.Mutex
}

func (sb *SafeBuffer) WriteString(s string) (int, error) {
sb.mu.Lock()
defer sb.mu.Unlock()
return sb.buffer.WriteString(s)
}

func (sb *SafeBuffer) String() string {
sb.mu.Lock()
defer sb.mu.Unlock()
return sb.buffer.String()
}

func (sb *SafeBuffer) Write(p []byte) (int, error) {
sb.mu.Lock()
defer sb.mu.Unlock()
return sb.buffer.Write(p)
}

func (sb *SafeBuffer) Bytes() []byte {
sb.mu.Lock()
defer sb.mu.Unlock()
return sb.buffer.Bytes()
}

// PairingList struct is used to store seed provider information for lavaOverLava
type PairingList struct {
TestNet Geolocations `json:"testnet"`
Expand All @@ -33,7 +63,7 @@ type Pair struct {
PublicAddress string `json:"publicAddress"`
}

func RunSDKTest(testFile string, privateKey string, publicKey string, logs *bytes.Buffer, badgePort string) error {
func RunSDKTest(testFile string, privateKey string, publicKey string, logs *SafeBuffer, badgePort string) error {
utils.LavaFormatInfo("[+] Starting SDK test", utils.LogAttr("test_file", testFile))
// Prepare command for running test
cmd := exec.Command("ts-node", testFile)
Expand Down Expand Up @@ -69,7 +99,7 @@ func RunSDKTest(testFile string, privateKey string, publicKey string, logs *byte
return nil
}

func RunSDKTests(ctx context.Context, grpcConn *grpc.ClientConn, privateKey string, publicKey string, logs *bytes.Buffer, badgePort string) {
func RunSDKTests(ctx context.Context, grpcConn *grpc.ClientConn, privateKey string, publicKey string, logs *SafeBuffer, badgePort string) {
defer func() {
// Delete the file directly without checking if it exists
os.Remove("testutil/e2e/sdk/pairingList.json")
Expand Down
5 changes: 2 additions & 3 deletions testutil/e2e/sdkE2E.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package e2e

import (
"bytes"
"context"
"fmt"
"go/build"
Expand Down Expand Up @@ -96,7 +95,7 @@ func runSDKE2E(timeout time.Duration) {
protocolPath: gopath + "/bin/lavap",
lavadArgs: "--geolocation 1 --log_level debug",
consumerArgs: " --allow-insecure-provider-dialing",
logs: make(map[string]*bytes.Buffer),
logs: make(map[string]*sdk.SafeBuffer),
commands: make(map[string]*exec.Cmd),
providerType: make(map[string][]epochStorageTypes.Endpoint),
logPath: sdkLogsFolder,
Expand Down Expand Up @@ -148,7 +147,7 @@ func runSDKE2E(timeout time.Duration) {
lt.startLavaProviders(ctx)

// Test SDK
lt.logs["01_sdkTest"] = new(bytes.Buffer)
lt.logs["01_sdkTest"] = &sdk.SafeBuffer{}
sdk.RunSDKTests(ctx, grpcConn, privateKey, publicKey, lt.logs["01_sdkTest"], "7070")

// Emergency mode tests
Expand Down

0 comments on commit 39bcdb5

Please sign in to comment.