Skip to content

Commit

Permalink
when processing RPC requests, do not create new requests and response…
Browse files Browse the repository at this point in the history
… objects
  • Loading branch information
hakulala authored and gabrielcorado committed Jul 2, 2021
1 parent 4d35ad8 commit 98f25e2
Showing 1 changed file with 14 additions and 13 deletions.
27 changes: 14 additions & 13 deletions cluster/nats_rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type NatsRPCServer struct {
subChan chan *nats.Msg // subChan is the channel used by the server to receive network messages addressed to itself
bindingsChan chan *nats.Msg // bindingsChan receives notify from other servers on every user bind to session
unhandledReqCh chan *protos.Request
responses []*protos.Response
requests []*protos.Request
userPushCh chan *protos.Push
userKickCh chan *protos.KickMsg
sub *nats.Subscription
Expand Down Expand Up @@ -257,23 +259,20 @@ func (ns *NatsRPCServer) marshalResponse(res *protos.Response) ([]byte, error) {
}

func (ns *NatsRPCServer) processMessages(threadID int) {
for req := range ns.GetUnhandledRequestsChannel() {
logger.Log.Debugf("(%d) processing message %v", threadID, req.GetMsg().GetId())
reply := req.GetMsg().GetReply()
var response *protos.Response
ctx, err := util.GetContextFromRequest(req, ns.server.ID)
for ns.requests[threadID] = range ns.GetUnhandledRequestsChannel() {
logger.Log.Debugf("(%d) processing message %v", threadID, ns.requests[threadID].GetMsg().GetId())
ctx, err := util.GetContextFromRequest(ns.requests[threadID], ns.server.ID)
if err != nil {
response = &protos.Response{
Error: &protos.Error{
Code: e.ErrInternalCode,
Msg: err.Error(),
},
ns.responses[threadID].Error = &protos.Error{
Code: e.ErrInternalCode,
Msg: err.Error(),
}
ns.responses[threadID].Data = nil
} else {
response, _ = ns.pitayaServer.Call(ctx, req)
ns.responses[threadID], _ = ns.pitayaServer.Call(ctx, ns.requests[threadID])
}
p, err := ns.marshalResponse(response)
err = ns.conn.Publish(reply, p)
p, err := ns.marshalResponse(ns.responses[threadID])
err = ns.conn.Publish(ns.requests[threadID].GetMsg().GetReply(), p)
if err != nil {
logger.Log.Error("error sending message response")
}
Expand Down Expand Up @@ -314,6 +313,8 @@ func (ns *NatsRPCServer) processKick() {

// Init inits nats rpc server
func (ns *NatsRPCServer) Init() error {
ns.responses = make([]*protos.Response, ns.config.GetInt("pitaya.concurrency.remote.service"))
ns.requests = make([]*protos.Request, ns.config.GetInt("pitaya.concurrency.remote.service"))
// TODO should we have concurrency here? it feels like we should
go ns.handleMessages()

Expand Down

0 comments on commit 98f25e2

Please sign in to comment.