Skip to content

Commit

Permalink
adding header prints (lavanet#1195)
Browse files Browse the repository at this point in the history
  • Loading branch information
ranlavanet authored Feb 5, 2024
1 parent 5de850d commit dd6eb64
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 11 deletions.
12 changes: 12 additions & 0 deletions protocol/chainlib/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,3 +327,15 @@ func createAndSetupBaseAppListener(cmdFlags common.ConsumerCmdFlags, healthCheck

return app
}

func truncateAndPadString(s string, maxLength int) string {
// Truncate to a maximum length
if len(s) > maxLength {
s = s[:maxLength]
}

// Pad with empty strings if the length is less than the specified maximum length
s = fmt.Sprintf("%-*s", maxLength, s)

return s
}
22 changes: 16 additions & 6 deletions protocol/chainlib/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import (
"sync"
"time"

"github.com/gogo/protobuf/jsonpb"
"github.com/lavanet/lava/protocol/chainlib/extensionslib"
"github.com/lavanet/lava/protocol/chainlib/grpcproxy"
dyncodec "github.com/lavanet/lava/protocol/chainlib/grpcproxy/dyncodec"
"github.com/lavanet/lava/protocol/parser"

"github.com/gogo/protobuf/jsonpb"
"github.com/lavanet/lava/protocol/chainlib/grpcproxy"
protocoltypes "github.com/lavanet/lava/x/protocol/types"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -283,8 +283,6 @@ func (apil *GrpcChainListener) Serve(ctx context.Context, cmdFlags common.Consum
return
}

utils.LavaFormatInfo("gRPC PortalStart")

lis := GetListenerWithRetryGrpc("tcp", apil.endpoint.NetworkAddress)
apiInterface := apil.endpoint.ApiInterface
sendRelayCallback := func(ctx context.Context, method string, reqBody []byte) ([]byte, metadata.MD, error) {
Expand All @@ -297,7 +295,11 @@ func (apil *GrpcChainListener) Serve(ctx context.Context, cmdFlags common.Consum
dappID := extractDappIDFromGrpcHeader(metadataValues)

grpcHeaders := convertToMetadataMapOfSlices(metadataValues)
utils.LavaFormatInfo("GRPC Got Relay ", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "method", Value: method})
utils.LavaFormatInfo("in <<< GRPC Relay ",
utils.LogAttr("GUID", ctx),
utils.LogAttr("method", method),
utils.LogAttr("headers", grpcHeaders),
)
metricsData := metrics.NewRelayAnalytics(dappID, apil.endpoint.ChainID, apiInterface)
consumerIp := common.GetIpFromGrpcContext(ctx)
relayResult, err := apil.relaySender.SendRelay(ctx, method, string(reqBody), "", dappID, consumerIp, metricsData, grpcHeaders)
Expand Down Expand Up @@ -345,6 +347,14 @@ func (apil *GrpcChainListener) Serve(ctx context.Context, cmdFlags common.Consum
serveExecutor = func() error { return httpServer.Serve(lis) }
}

fmt.Printf(fmt.Sprintf(`
┌───────────────────────────────────────────────────┐
│ Lava's Grpc Server │
│ %s│
│ Lavap Version: %s│
└───────────────────────────────────────────────────┘
`, truncateAndPadString(apil.endpoint.NetworkAddress, 36), truncateAndPadString(protocoltypes.DefaultVersion.ConsumerTarget, 21)))
if err := serveExecutor(); !errors.Is(err, http.ErrServerClosed) {
utils.LavaFormatFatal("Portal failed to serve", err, utils.Attribute{Key: "Address", Value: lis.Addr()}, utils.Attribute{Key: "ChainID", Value: apil.endpoint.ChainID})
}
Expand Down
8 changes: 7 additions & 1 deletion protocol/chainlib/jsonRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,14 +405,20 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context, cmdFlags common.Con
guid := utils.GenerateUniqueIdentifier()
ctx = utils.WithUniqueIdentifier(ctx, guid)
msgSeed := strconv.FormatUint(guid, 10)
utils.LavaFormatInfo("in <<<", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "seed", Value: msgSeed}, utils.Attribute{Key: "msg", Value: fiberCtx.Body()}, utils.Attribute{Key: "dappID", Value: dappID})
if test_mode {
apil.logger.LogTestMode(fiberCtx)
}

consumerIp := fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP())
metadataValues := fiberCtx.GetReqHeaders()
headers := convertToMetadataMap(metadataValues)
utils.LavaFormatInfo("in <<<",
utils.LogAttr("GUID", ctx),
utils.LogAttr("seed", msgSeed),
utils.LogAttr("msg", fiberCtx.Body()),
utils.LogAttr("dappID", dappID),
utils.LogAttr("headers", headers),
)
relayResult, err := apil.relaySender.SendRelay(ctx, "", string(fiberCtx.Body()), http.MethodPost, dappID, consumerIp, metricsData, headers)
reply := relayResult.GetReply()
go apil.logger.AddMetricForHttp(metricsData, err, fiberCtx.GetReqHeaders())
Expand Down
16 changes: 14 additions & 2 deletions protocol/chainlib/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,13 @@ func (apil *RestChainListener) Serve(ctx context.Context, cmdFlags common.Consum
// contentType := string(c.Context().Request.Header.ContentType())
dappID := extractDappIDFromFiberContext(fiberCtx)
analytics := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
utils.LavaFormatInfo("in <<<", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "path", Value: path}, utils.Attribute{Key: "dappID", Value: dappID}, utils.Attribute{Key: "msgSeed", Value: msgSeed})
utils.LavaFormatInfo("in <<<",
utils.LogAttr("GUID", ctx),
utils.LogAttr("path", path),
utils.LogAttr("dappID", dappID),
utils.LogAttr("msgSeed", msgSeed),
utils.LogAttr("headers", restHeaders),
)
requestBody := string(fiberCtx.Body())
relayResult, err := apil.relaySender.SendRelay(ctx, path+query, requestBody, http.MethodPost, dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), analytics, restHeaders)
reply := relayResult.GetReply()
Expand Down Expand Up @@ -346,7 +352,13 @@ func (apil *RestChainListener) Serve(ctx context.Context, cmdFlags common.Consum
msgSeed = strconv.FormatUint(guid, 10)
}
defer cancel() // incase there's a problem make sure to cancel the connection
utils.LavaFormatInfo("in <<<", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "path", Value: path}, utils.Attribute{Key: "dappID", Value: dappID}, utils.Attribute{Key: "msgSeed", Value: msgSeed})
utils.LavaFormatInfo("in <<<",
utils.LogAttr("GUID", ctx),
utils.LogAttr("path", path),
utils.LogAttr("dappID", dappID),
utils.LogAttr("msgSeed", msgSeed),
utils.LogAttr("headers", restHeaders),
)
relayResult, err := apil.relaySender.SendRelay(ctx, path+query, "", fiberCtx.Method(), dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), analytics, restHeaders)
reply := relayResult.GetReply()
go apil.logger.AddMetricForHttp(analytics, err, fiberCtx.GetReqHeaders())
Expand Down
15 changes: 13 additions & 2 deletions protocol/chainlib/tendermintRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,9 +432,15 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context, cmdFlags comm
ctx = utils.WithUniqueIdentifier(ctx, guid)
defer cancel() // incase there's a problem make sure to cancel the connection
msgSeed := strconv.FormatUint(guid, 10)
utils.LavaFormatInfo("in <<<", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "seed", Value: msgSeed}, utils.Attribute{Key: "msg", Value: fiberCtx.Body()}, utils.Attribute{Key: "dappID", Value: dappID})
metadataValues := fiberCtx.GetReqHeaders()
headers := convertToMetadataMap(metadataValues)
utils.LavaFormatInfo("in <<<",
utils.LogAttr("GUID", ctx),
utils.LogAttr("seed", msgSeed),
utils.LogAttr("msg", fiberCtx.Body()),
utils.LogAttr("dappID", dappID),
utils.LogAttr("headers", headers),
)
relayResult, err := apil.relaySender.SendRelay(ctx, "", string(fiberCtx.Body()), "", dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), metricsData, headers)
reply := relayResult.GetReply()
go apil.logger.AddMetricForHttp(metricsData, err, fiberCtx.GetReqHeaders())
Expand Down Expand Up @@ -482,10 +488,15 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context, cmdFlags comm
guid := utils.GenerateUniqueIdentifier()
ctx = utils.WithUniqueIdentifier(ctx, guid)
defer cancel() // incase there's a problem make sure to cancel the connection
utils.LavaFormatInfo("urirpc in <<<", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "msg", Value: path}, utils.Attribute{Key: "dappID", Value: dappID})
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
metadataValues := fiberCtx.GetReqHeaders()
headers := convertToMetadataMap(metadataValues)
utils.LavaFormatInfo("urirpc in <<<",
utils.LogAttr("GUID", ctx),
utils.LogAttr("msg", path),
utils.LogAttr("dappID", dappID),
utils.LogAttr("headers", headers),
)
relayResult, err := apil.relaySender.SendRelay(ctx, path+query, "", "", dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), metricsData, headers)
msgSeed := strconv.FormatUint(guid, 10)
reply := relayResult.GetReply()
Expand Down
1 change: 1 addition & 0 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,7 @@ func (rpccs *RPCConsumerServer) HandleDirectiveHeadersForMessage(chainMessage ch
timeout, err := time.ParseDuration(timeoutStr)
if err == nil {
// set an override timeout
utils.LavaFormatDebug("User indicated to set the timeout using flag", utils.LogAttr("timeout", timeoutStr))
chainMessage.TimeoutOverride(timeout)
}
}
Expand Down

0 comments on commit dd6eb64

Please sign in to comment.