Skip to content

Commit

Permalink
Merge pull request topfreegames#218 from topfreegames/feature/port-pr…
Browse files Browse the repository at this point in the history
…-200-tov2

Port PR topfreegames#200 to V2
  • Loading branch information
gabrielcorado authored Jul 2, 2021
2 parents 4d35ad8 + 0f53a67 commit 9dd5e98
Showing 1 changed file with 11 additions and 9 deletions.
20 changes: 11 additions & 9 deletions cluster/nats_rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type NatsRPCServer struct {
subChan chan *nats.Msg // subChan is the channel used by the server to receive network messages addressed to itself
bindingsChan chan *nats.Msg // bindingsChan receives notify from other servers on every user bind to session
unhandledReqCh chan *protos.Request
responses []*protos.Response
requests []*protos.Request
userPushCh chan *protos.Push
userKickCh chan *protos.KickMsg
sub *nats.Subscription
Expand Down Expand Up @@ -110,6 +112,8 @@ func (ns *NatsRPCServer) configure(config config.NatsRPCServerConfig) error {
// blocking producers on a massive push
ns.userPushCh = make(chan *protos.Push, ns.pushBufferSize)
ns.userKickCh = make(chan *protos.KickMsg, ns.messagesBufferSize)
ns.responses = make([]*protos.Response, ns.service)
ns.requests = make([]*protos.Request, ns.service)
return nil
}

Expand Down Expand Up @@ -257,23 +261,21 @@ func (ns *NatsRPCServer) marshalResponse(res *protos.Response) ([]byte, error) {
}

func (ns *NatsRPCServer) processMessages(threadID int) {
for req := range ns.GetUnhandledRequestsChannel() {
logger.Log.Debugf("(%d) processing message %v", threadID, req.GetMsg().GetId())
reply := req.GetMsg().GetReply()
var response *protos.Response
ctx, err := util.GetContextFromRequest(req, ns.server.ID)
for ns.requests[threadID] = range ns.GetUnhandledRequestsChannel() {
logger.Log.Debugf("(%d) processing message %v", threadID, ns.requests[threadID].GetMsg().GetId())
ctx, err := util.GetContextFromRequest(ns.requests[threadID], ns.server.ID)
if err != nil {
response = &protos.Response{
ns.responses[threadID] = &protos.Response{
Error: &protos.Error{
Code: e.ErrInternalCode,
Msg: err.Error(),
},
}
} else {
response, _ = ns.pitayaServer.Call(ctx, req)
ns.responses[threadID], _ = ns.pitayaServer.Call(ctx, ns.requests[threadID])
}
p, err := ns.marshalResponse(response)
err = ns.conn.Publish(reply, p)
p, err := ns.marshalResponse(ns.responses[threadID])
err = ns.conn.Publish(ns.requests[threadID].GetMsg().GetReply(), p)
if err != nil {
logger.Log.Error("error sending message response")
}
Expand Down

0 comments on commit 9dd5e98

Please sign in to comment.