Skip to content

Commit

Permalink
Remove Data Reliability check on Subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
kajeagentspi committed Oct 26, 2022
1 parent 10b4d1d commit 7a683af
Showing 1 changed file with 91 additions and 150 deletions.
241 changes: 91 additions & 150 deletions relayer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,184 +823,125 @@ func (s *relayServer) RelaySubscribe(request *pairingtypes.RelayRequest, srv pai
}
return authorisedUserResponse, nodeMsg, nil
}
var authorisedUserResponse *pairingtypes.QueryVerifyPairingResponse
authorisedUserResponse, nodeMsg, err = authorizeAndParseMessage(context.Background(), userAddr, request, uint64(request.BlockHeight))
_, nodeMsg, err = authorizeAndParseMessage(context.Background(), userAddr, request, uint64(request.BlockHeight))
if err != nil {
utils.LavaFormatError("failed authorizing user request", nil, nil)
return err
}

if request.DataReliability != nil {
userSessions := getOrCreateUserSessions(userAddr.String())
vrf_pk, maxcuRes, err := g_sentry.GetVrfPkAndMaxCuForUser(context.Background(), userAddr.String(), request.ChainID, request.BlockHeight)
if err != nil {
return utils.LavaFormatError("failed to get vrfpk and maxCURes for data reliability!", err, &map[string]string{
"userAddr": userAddr.String(),
})
}

userSessions.Lock.Lock()
if epochData, ok := userSessions.dataByEpoch[uint64(request.BlockHeight)]; ok {
//data reliability message
if epochData.DataReliability != nil {
userSessions.Lock.Unlock()
return utils.LavaFormatError("dataReliability can only be used once per client per epoch", nil,
&map[string]string{"requested epoch": strconv.FormatInt(request.BlockHeight, 10), "userAddr": userAddr.String(), "dataReliability": fmt.Sprintf("%v", epochData.DataReliability)})
}
}
userSessions.Lock.Unlock()
// data reliability is not session dependant, its always sent with sessionID 0 and if not we don't care
if vrf_pk == nil {
return utils.LavaFormatError("dataReliability Triggered with vrf_pk == nil", nil,
&map[string]string{"requested epoch": strconv.FormatInt(request.BlockHeight, 10), "userAddr": userAddr.String()})
}
// verify the providerSig is ineed a signature by a valid provider on this query
valid, err := s.VerifyReliabilityAddressSigning(context.Background(), userAddr, request)
if err != nil {
return utils.LavaFormatError("VerifyReliabilityAddressSigning invalid", err,
&map[string]string{"requested epoch": strconv.FormatInt(request.BlockHeight, 10), "userAddr": userAddr.String(), "dataReliability": fmt.Sprintf("%v", request.DataReliability)})
}
if !valid {
return utils.LavaFormatError("invalid DataReliability Provider signing", nil,
&map[string]string{"requested epoch": strconv.FormatInt(request.BlockHeight, 10), "userAddr": userAddr.String(), "dataReliability": fmt.Sprintf("%v", request.DataReliability)})
}
//verify data reliability fields correspond to the right vrf
valid = utils.VerifyVrfProof(request, *vrf_pk, uint64(request.BlockHeight))
if !valid {
return utils.LavaFormatError("invalid DataReliability fields, VRF wasn't verified with provided proof", nil,
&map[string]string{"requested epoch": strconv.FormatInt(request.BlockHeight, 10), "userAddr": userAddr.String(), "dataReliability": fmt.Sprintf("%v", request.DataReliability)})
}

vrfIndex := utils.GetIndexForVrf(request.DataReliability.VrfValue, uint32(g_sentry.GetProvidersCount()), g_sentry.GetReliabilityThreshold())
if authorisedUserResponse.Index != vrfIndex {
return utils.LavaFormatError("Provider identified invalid vrfIndex in data reliability request, the given index and self index are different", nil,
&map[string]string{"requested epoch": strconv.FormatInt(request.BlockHeight, 10), "userAddr": userAddr.String(),
"dataReliability": fmt.Sprintf("%+v", request.DataReliability), "relayEpochStart": strconv.FormatInt(request.BlockHeight, 10),
"vrfIndex": strconv.FormatInt(vrfIndex, 10),
"self Index": strconv.FormatInt(authorisedUserResponse.Index, 10)})
}
utils.LavaFormatInfo("server got valid DataReliability request", nil)

userSessions.Lock.Lock()
getOrCreateDataByEpoch(userSessions, uint64(request.BlockHeight), maxcuRes, vrf_pk, userAddr.String())
userSessions.dataByEpoch[uint64(request.BlockHeight)].DataReliability = request.DataReliability
userSessions.Lock.Unlock()
} else {
relaySession, err := getOrCreateSession(context.Background(), userAddr.String(), request)
if err != nil {
return err
}
relaySession, err := getOrCreateSession(context.Background(), userAddr.String(), request)
if err != nil {
return err
}

relaySession.Lock.Lock()
pairingEpoch := relaySession.GetPairingEpoch()
relaySession.Lock.Lock()
pairingEpoch := relaySession.GetPairingEpoch()

if request.BlockHeight != int64(pairingEpoch) {
relaySession.Lock.Unlock()
return utils.LavaFormatError("request blockheight mismatch to session epoch", nil,
&map[string]string{"pairingEpoch": strconv.FormatUint(pairingEpoch, 10), "userAddr": userAddr.String(),
"relay blockheight": strconv.FormatInt(request.BlockHeight, 10)})
}

userSessions := relaySession.userSessionsParent
if request.BlockHeight != int64(pairingEpoch) {
relaySession.Lock.Unlock()
return utils.LavaFormatError("request blockheight mismatch to session epoch", nil,
&map[string]string{"pairingEpoch": strconv.FormatUint(pairingEpoch, 10), "userAddr": userAddr.String(),
"relay blockheight": strconv.FormatInt(request.BlockHeight, 10)})
}

// Validate
if request.SessionId == 0 {
return utils.LavaFormatError("SessionID cannot be 0 for non-data reliability requests", nil,
&map[string]string{"pairingEpoch": strconv.FormatUint(pairingEpoch, 10), "userAddr": userAddr.String(),
"relay request": fmt.Sprintf("%v", request)})
}
userSessions := relaySession.userSessionsParent
relaySession.Lock.Unlock()

// Update session
err = updateSessionCu(relaySession, userSessions, nodeMsg.GetServiceApi(), request, pairingEpoch)
if err != nil {
return err
}
// Validate
if request.SessionId == 0 {
return utils.LavaFormatError("SessionID cannot be 0 for non-data reliability requests", nil,
&map[string]string{"pairingEpoch": strconv.FormatUint(pairingEpoch, 10), "userAddr": userAddr.String(),
"relay request": fmt.Sprintf("%v", request)})
}

relaySession.Lock.Lock()
relaySession.Proof = request
relaySession.Lock.Unlock()
// Update session
err = updateSessionCu(relaySession, userSessions, nodeMsg.GetServiceApi(), request, pairingEpoch)
if err != nil {
return err
}

var reply *pairingtypes.RelayReply
var clientSub *rpcclient.ClientSubscription
var subscriptionID string
repliesChan := make(chan interface{})
reply, subscriptionID, clientSub, err = nodeMsg.Send(context.Background(), repliesChan)
if err != nil {
return utils.LavaFormatError("Subscription failed", err, nil)
}
relaySession.Lock.Lock()
relaySession.Proof = request
relaySession.Lock.Unlock()

relaySession.Lock.Lock()
relaySession.Subs[subscriptionID] = &subscription{
id: subscriptionID,
sub: clientSub,
repliesChan: repliesChan,
}
relaySession.Lock.Unlock()
var reply *pairingtypes.RelayReply
var clientSub *rpcclient.ClientSubscription
var subscriptionID string
repliesChan := make(chan interface{})
reply, subscriptionID, clientSub, err = nodeMsg.Send(context.Background(), repliesChan)
if err != nil {
return utils.LavaFormatError("Subscription failed", err, nil)
}

err = srv.Send(reply) //this reply contains the RPC ID
if err != nil {
utils.LavaFormatError("Error getting RPC ID", err, nil)
}
relaySession.Lock.Lock()
relaySession.Subs[subscriptionID] = &subscription{
id: subscriptionID,
sub: clientSub,
repliesChan: repliesChan,
}
relaySession.Lock.Unlock()

for {
select {
case <-clientSub.Err():
utils.LavaFormatError("client sub", err, nil)
// delete this connection from the subs map
err = srv.Send(reply) //this reply contains the RPC ID
if err != nil {
utils.LavaFormatError("Error getting RPC ID", err, nil)
}

for {
select {
case <-clientSub.Err():
utils.LavaFormatError("client sub", err, nil)
// delete this connection from the subs map
relaySession.Lock.Lock()
if sub, ok := relaySession.Subs[subscriptionID]; ok {
sub.disconnect()
delete(relaySession.Subs, subscriptionID)
}
relaySession.Lock.Unlock()
return err
case reply := <-repliesChan:
data, err := json.Marshal(reply)
if err != nil {
utils.LavaFormatError("client sub unmarshal", err, nil)
relaySession.Lock.Lock()
if sub, ok := relaySession.Subs[subscriptionID]; ok {
sub.disconnect()
delete(relaySession.Subs, subscriptionID)
}
relaySession.Lock.Unlock()
return err
case reply := <-repliesChan:
data, err := json.Marshal(reply)
if err != nil {
utils.LavaFormatError("client sub unmarshal", err, nil)
relaySession.Lock.Lock()
if sub, ok := relaySession.Subs[subscriptionID]; ok {
sub.disconnect()
delete(relaySession.Subs, subscriptionID)
}
relaySession.Lock.Unlock()
return err
}
}

err = srv.Send(
&pairingtypes.RelayReply{
Data: data,
},
)
if err != nil {
// usually triggered when client closes connection
utils.LavaFormatError("client sub send", err, nil)
relaySession.Lock.Lock()
if sub, ok := relaySession.Subs[subscriptionID]; ok {
sub.disconnect()
delete(relaySession.Subs, subscriptionID)
}
relaySession.Lock.Unlock()
return err
err = srv.Send(
&pairingtypes.RelayReply{
Data: data,
},
)
if err != nil {
// usually triggered when client closes connection
utils.LavaFormatError("client sub send", err, nil)
relaySession.Lock.Lock()
if sub, ok := relaySession.Subs[subscriptionID]; ok {
sub.disconnect()
delete(relaySession.Subs, subscriptionID)
}
// performCUUpdate
// TODO Update payments
err = updateSessionCuSubscribe(relaySession, userSessions, nodeMsg.GetServiceApi(), request, pairingEpoch)
if err != nil {
relaySession.Lock.Lock()
if sub, ok := relaySession.Subs[subscriptionID]; ok {
sub.disconnect()
delete(relaySession.Subs, subscriptionID)
}
relaySession.Lock.Unlock()
return err
relaySession.Lock.Unlock()
return err
}
// performCUUpdate
// TODO Update payments
err = updateSessionCuSubscribe(relaySession, userSessions, nodeMsg.GetServiceApi(), request, pairingEpoch)
if err != nil {
relaySession.Lock.Lock()
if sub, ok := relaySession.Subs[subscriptionID]; ok {
sub.disconnect()
delete(relaySession.Subs, subscriptionID)
}
utils.LavaFormatInfo("Sending data", &map[string]string{"data": string(data)})
relaySession.Lock.Unlock()
return err
}
utils.LavaFormatInfo("Sending data", &map[string]string{"data": string(data)})
}
}
return nil
}

func (relayServ *relayServer) VerifyReliabilityAddressSigning(ctx context.Context, consumer sdk.AccAddress, request *pairingtypes.RelayRequest) (valid bool, err error) {
Expand Down

0 comments on commit 7a683af

Please sign in to comment.