Skip to content

Commit

Permalink
PRT-686 returning headers back from the node (lavanet#520)
Browse files Browse the repository at this point in the history
* returning headers

* adding md to grpc tests

* adding seed
  • Loading branch information
ranlavanet authored May 29, 2023
1 parent ee274cb commit 0d4075a
Show file tree
Hide file tree
Showing 13 changed files with 373 additions and 149 deletions.
113 changes: 83 additions & 30 deletions docs/static/openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33125,6 +33125,45 @@ paths:
type: string
tags:
- Query
'/lavanet/lava/subscription/list_projects/{subscription}':
get:
summary: Queries a list of ListProjects items.
operationId: LavanetLavaSubscriptionListProjects
responses:
'200':
description: A successful response.
schema:
type: object
properties:
projects:
type: array
items:
type: string
default:
description: An unexpected error response.
schema:
type: object
properties:
code:
type: integer
format: int32
message:
type: string
details:
type: array
items:
type: object
properties:
'@type':
type: string
additionalProperties: {}
parameters:
- name: subscription
in: path
required: true
type: string
tags:
- Query
/lavanet/lava/subscription/params:
get:
summary: Parameters queries the parameters of the module.
Expand Down Expand Up @@ -54409,6 +54448,15 @@ definitions:
title: >-
sign
latest_block+finalized_blocks_hashes+session_id+block_height+relay_num
metadata:
type: array
items:
type: object
properties:
name:
type: string
value:
type: string
lavanet.lava.pairing.RelayRequest:
type: object
properties:
Expand Down Expand Up @@ -54558,6 +54606,15 @@ definitions:
salt:
type: string
format: byte
metadata:
type: array
items:
type: object
properties:
name:
type: string
value:
type: string
lavanet.lava.pairing.VRFData:
type: object
properties:
Expand Down Expand Up @@ -54993,6 +55050,13 @@ definitions:
type: array
items:
type: string
lavanet.lava.pairing.Metadata:
type: object
properties:
name:
type: string
value:
type: string
lavanet.lava.pairing.MsgFreezeProviderResponse:
type: object
lavanet.lava.pairing.MsgRelayPaymentResponse:
Expand Down Expand Up @@ -55554,6 +55618,15 @@ definitions:
salt:
type: string
format: byte
metadata:
type: array
items:
type: object
properties:
name:
type: string
value:
type: string
lavanet.lava.pairing.RelaySession:
type: object
properties:
Expand Down Expand Up @@ -55961,17 +56034,6 @@ definitions:
kinds:
type: integer
format: int64
types:
type: array
items:
type: string
enum:
- NONE
- ADMIN
- DEVELOPER
default: NONE
title: 'bitmap, must only be power of 2'
title: 'the key type, determines the privilages of the key'
lavanet.lava.projects.QueryDeveloperResponse:
type: object
properties:
Expand Down Expand Up @@ -57358,17 +57420,9 @@ definitions:
key:
type: string
title: the address of the project key
types:
type: array
items:
type: string
enum:
- NONE
- ADMIN
- DEVELOPER
default: NONE
title: 'bitmap, must only be power of 2'
title: 'the key type, determines the privilages of the key'
kinds:
type: integer
format: int64
policy:
type: object
properties:
Expand Down Expand Up @@ -57397,14 +57451,6 @@ definitions:
format: uint64
title: 'protobuf expected in YAML format: used "moretags" to simplify parsing'
title: used as a container struct for the subscription module
lavanet.lava.projects.ProjectKey.KEY_TYPE:
type: string
enum:
- NONE
- ADMIN
- DEVELOPER
default: NONE
title: 'bitmap, must only be power of 2'
lavanet.lava.subscription.MsgAddProjectResponse:
type: object
lavanet.lava.subscription.MsgBuyResponse:
Expand Down Expand Up @@ -57463,6 +57509,13 @@ definitions:
type: string
format: uint64
title: CU remaining for previous month
lavanet.lava.subscription.QueryListProjectsResponse:
type: object
properties:
projects:
type: array
items:
type: string
lavanet.lava.subscription.QueryParamsResponse:
type: object
properties:
Expand Down
1 change: 1 addition & 0 deletions proto/pairing/relay.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ message RelayReply {
int64 latest_block = 4;
bytes finalized_blocks_hashes = 5;
bytes sig_blocks = 6; //sign latest_block+finalized_blocks_hashes+session_id+block_height+relay_num
repeated Metadata metadata = 7 [(gogoproto.nullable) = false];
}

message QualityOfServiceReport{
Expand Down
32 changes: 32 additions & 0 deletions protocol/chainlib/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (
"github.com/lavanet/lava/protocol/metrics"
"github.com/lavanet/lava/protocol/parser"
"github.com/lavanet/lava/utils"
pairingtypes "github.com/lavanet/lava/x/pairing/types"
spectypes "github.com/lavanet/lava/x/spec/types"
"google.golang.org/grpc/metadata"
)

const (
Expand Down Expand Up @@ -246,3 +248,33 @@ type CraftData struct {
func CraftChainMessage(serviceApi spectypes.ServiceApi, chainParser ChainParser, craftData *CraftData) (ChainMessageForSend, error) {
return chainParser.CraftMessage(serviceApi, craftData)
}

// rest request headers are formatted like map[string]string
func convertToMetadataMap(md map[string]string) []pairingtypes.Metadata {
metadata := make([]pairingtypes.Metadata, len(md))
indexer := 0
for k, v := range md {
metadata[indexer] = pairingtypes.Metadata{Name: k, Value: v}
indexer += 1
}
return metadata
}

// rest response headers / grpc headers are formatted like map[string][]string
func convertToMetadataMapOfSlices(md map[string][]string) []pairingtypes.Metadata {
metadata := make([]pairingtypes.Metadata, len(md))
indexer := 0
for k, v := range md {
metadata[indexer] = pairingtypes.Metadata{Name: k, Value: v[0]}
indexer += 1
}
return metadata
}

func convertRelayMetaDataToMDMetaData(md []pairingtypes.Metadata) metadata.MD {
responseMetaData := make(metadata.MD)
for _, v := range md {
responseMetaData[v.Name] = append(responseMetaData[v.Name], v.Value)
}
return responseMetaData
}
27 changes: 10 additions & 17 deletions protocol/chainlib/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/gogo/protobuf/jsonpb"
"github.com/lavanet/lava/protocol/chainlib/grpcproxy"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/fullstorydev/grpcurl"
Expand Down Expand Up @@ -242,11 +243,11 @@ func (apil *GrpcChainListener) Serve(ctx context.Context) {

lis := GetListenerWithRetryGrpc("tcp", apil.endpoint.NetworkAddress)
apiInterface := apil.endpoint.ApiInterface
sendRelayCallback := func(ctx context.Context, method string, reqBody []byte) ([]byte, error) {
sendRelayCallback := func(ctx context.Context, method string, reqBody []byte) ([]byte, metadata.MD, error) {
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
msgSeed := apil.logger.GetMessageSeed()
metadataValues, _ := metadata.FromIncomingContext(ctx)
grpcHeaders := convertToMetadataGrpc(metadataValues)
grpcHeaders := convertToMetadataMapOfSlices(metadataValues)
utils.LavaFormatInfo("GRPC Got Relay ", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "method", Value: method})
var relayReply *pairingtypes.RelayReply
metricsData := metrics.NewRelayAnalytics("NoDappID", apil.endpoint.ChainID, apiInterface)
Expand All @@ -256,10 +257,10 @@ func (apil *GrpcChainListener) Serve(ctx context.Context) {
if err != nil {
errMasking := apil.logger.GetUniqueGuidResponseForError(err, msgSeed)
apil.logger.LogRequestAndResponse("http in/out", true, method, string(reqBody), "", errMasking, msgSeed, err)
return nil, utils.LavaFormatError("Failed to SendRelay", fmt.Errorf(errMasking))
return nil, nil, utils.LavaFormatError("Failed to SendRelay", fmt.Errorf(errMasking))
}
apil.logger.LogRequestAndResponse("http in/out", false, method, string(reqBody), "", "", msgSeed, nil)
return relayReply.Data, nil
return relayReply.Data, convertRelayMetaDataToMDMetaData(relayReply.Metadata), nil
}

_, httpServer, err := grpcproxy.NewGRPCProxy(sendRelayCallback)
Expand Down Expand Up @@ -400,8 +401,10 @@ func (cp *GrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{},
}
}

var respHeaders metadata.MD
response := msgFactory.NewMessage(methodDescriptor.GetOutputType())
err = conn.Invoke(connectCtx, "/"+nodeMessage.Path, msg, response)
err = conn.Invoke(connectCtx, "/"+nodeMessage.Path, msg, response, grpc.Header(&respHeaders))

if err != nil {
if connectCtx.Err() == context.DeadlineExceeded {
// Not an rpc error, return provider error without disclosing the endpoint address
Expand All @@ -417,22 +420,12 @@ func (cp *GrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{},
}

reply := &pairingtypes.RelayReply{
Data: respBytes,
Data: respBytes,
Metadata: convertToMetadataMapOfSlices(respHeaders),
}
return reply, "", nil, nil
}

func convertToMetadataGrpc(md map[string][]string) []pairingtypes.Metadata {
metadata := make([]pairingtypes.Metadata, len(md))
indexer := 0
for k, v := range md {
metadata[indexer] = pairingtypes.Metadata{Name: k, Value: v[0]}
indexer += 1
}
fmt.Println("metadata: ", metadata)
return metadata
}

func marshalJSON(msg proto.Message) ([]byte, error) {
if dyn, ok := msg.(*dynamic.Message); ok {
return dyn.MarshalJSON()
Expand Down
3 changes: 2 additions & 1 deletion protocol/chainlib/grpcproxy/dyncodec/remote_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func (r RelayerRemote) sendReq(req *grpc_reflection_v1alpha.ServerReflectionRequ
if err != nil {
return nil, err
}
respBytes, err := r.relay(context.Background(), "grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo", reqBytes)

respBytes, _, err := r.relay(context.Background(), "grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo", reqBytes)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions protocol/chainlib/grpcproxy/dyncodec/remotes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/lavanet/lava/protocol/chainlib/grpcproxy/testproto"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
"google.golang.org/protobuf/encoding/protojson"
Expand Down Expand Up @@ -74,10 +75,10 @@ func TestRemotes(t *testing.T) {
})

t.Run("test relayer remote", func(t *testing.T) {
remote := NewRelayerRemote(func(ctx context.Context, method string, req []byte) ([]byte, error) {
remote := NewRelayerRemote(func(ctx context.Context, method string, req []byte) ([]byte, metadata.MD, error) {
var resp []byte
err := conn.Invoke(ctx, method, req, &resp, grpc.CustomCodecCallOption{Codec: grpcproxy.RawBytesCodec{}})
return resp, err
return resp, make(metadata.MD), err
})
testRemote(remote)
})
Expand Down
6 changes: 4 additions & 2 deletions protocol/chainlib/grpcproxy/grpcproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
"golang.org/x/net/http2/h2c"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

type ProxyCallBack = func(ctx context.Context, method string, reqBody []byte) ([]byte, error)
type ProxyCallBack = func(ctx context.Context, method string, reqBody []byte) ([]byte, metadata.MD, error)

func NewGRPCProxy(cb ProxyCallBack) (*grpc.Server, *http.Server, error) {
s := grpc.NewServer(grpc.UnknownServiceHandler(makeProxyFunc(cb)), grpc.ForceServerCodec(RawBytesCodec{}))
Expand Down Expand Up @@ -44,10 +45,11 @@ func makeProxyFunc(callBack ProxyCallBack) grpc.StreamHandler {
if err != nil {
return err
}
respBytes, err := callBack(stream.Context(), methodName[1:], reqBytes) // strip first '/' of the method name
respBytes, md, err := callBack(stream.Context(), methodName[1:], reqBytes) // strip first '/' of the method name
if err != nil {
return err
}
stream.SetHeader(md)
return stream.SendMsg(respBytes)
}
}
Expand Down
7 changes: 5 additions & 2 deletions protocol/chainlib/grpcproxy/grpcproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@ import (

"github.com/lavanet/lava/protocol/chainlib/grpcproxy/testproto"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/metadata"
)

func TestGRPCProxy(t *testing.T) {
proxyGRPCSrv, _, err := NewGRPCProxy(func(ctx context.Context, method string, reqBody []byte) ([]byte, error) {
proxyGRPCSrv, _, err := NewGRPCProxy(func(ctx context.Context, method string, reqBody []byte) ([]byte, metadata.MD, error) {
// the callback function just does echo proxying
req := new(testproto.TestRequest)
err := req.Unmarshal(reqBody)
require.NoError(t, err)
respBytes, err := (&testproto.TestResponse{Response: req.Request + "-callback"}).Marshal()
require.NoError(t, err)
return respBytes, nil
responseHeaders := make(metadata.MD)
responseHeaders["test-headers"] = append(responseHeaders["test-headers"], "55")
return respBytes, responseHeaders, nil
})
require.NoError(t, err)

Expand Down
Loading

0 comments on commit 0d4075a

Please sign in to comment.