Skip to content

Commit

Permalink
Move handlers and remotes static variables to new HandlerPool and Rem…
Browse files Browse the repository at this point in the history
…oteService
  • Loading branch information
Felippe Durán committed Jul 24, 2020
1 parent a0cb576 commit 001aeee
Show file tree
Hide file tree
Showing 10 changed files with 416 additions and 227 deletions.
3 changes: 3 additions & 0 deletions builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (builder *Builder) AddAcceptor(ac acceptor.Acceptor) {

// Build returns a valid App instance
func (builder *Builder) Build() Pitaya {
handlerPool := service.NewHandlerPool()
var remoteService *service.RemoteService
if builder.ServerMode == Standalone {
if builder.ServiceDiscovery != nil || builder.RPCClient != nil || builder.RPCServer != nil {
Expand All @@ -149,6 +150,7 @@ func (builder *Builder) Build() Pitaya {
builder.Server,
builder.SessionPool,
builder.HandlerHooks,
handlerPool,
)

builder.RPCServer.SetPitayaServer(remoteService)
Expand Down Expand Up @@ -177,6 +179,7 @@ func (builder *Builder) Build() Pitaya {
agentFactory,
builder.MetricsReporters,
builder.HandlerHooks,
handlerPool,
)

return NewApp(
Expand Down
3 changes: 2 additions & 1 deletion rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ func TestDoSendRPC(t *testing.T) {
messageEncoder := message.NewMessagesEncoder(false)
sessionPool := sessionmocks.NewMockSessionPool(ctrl)
router := router.New()
svc := service.NewRemoteService(mockRPCClient, mockRPCServer, mockSD, packetEncoder, mockSerializer, router, messageEncoder, &cluster.Server{}, sessionPool, pipeline.NewHandlerHooks())
handlerPool := service.NewHandlerPool()
svc := service.NewRemoteService(mockRPCClient, mockRPCServer, mockSD, packetEncoder, mockSerializer, router, messageEncoder, &cluster.Server{}, sessionPool, pipeline.NewHandlerHooks(), handlerPool)
assert.NotNil(t, svc)
app.remoteService = svc
app.server.ID = "notmyserver"
Expand Down
11 changes: 8 additions & 3 deletions service/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import (
)

var (
handlers = make(map[string]*component.Handler) // all handler method
handlerType = "handler"
)

Expand All @@ -69,6 +68,8 @@ type (
services map[string]*component.Service // all registered service
metricsReporters []metrics.Reporter
agentFactory agent.AgentFactory
handlerPool *HandlerPool
handlers map[string]*component.Handler // all handler method
}

unhandledMessage struct {
Expand All @@ -90,6 +91,7 @@ func NewHandlerService(
agentFactory agent.AgentFactory,
metricsReporters []metrics.Reporter,
handlerHooks *pipeline.HandlerHooks,
handlerPool *HandlerPool,
) *HandlerService {
h := &HandlerService{
services: make(map[string]*component.Service),
Expand All @@ -101,6 +103,8 @@ func NewHandlerService(
remoteService: remoteService,
agentFactory: agentFactory,
metricsReporters: metricsReporters,
handlerPool: handlerPool,
handlers: make(map[string]*component.Handler),
}

h.handlerHooks = handlerHooks
Expand Down Expand Up @@ -151,7 +155,7 @@ func (h *HandlerService) Register(comp component.Component, opts []component.Opt
// register all handlers
h.services[s.Name] = s
for name, handler := range s.Handlers {
handlers[fmt.Sprintf("%s.%s", s.Name, name)] = handler
h.handlerPool.Register(s.Name, name, handler)
}
return nil
}
Expand Down Expand Up @@ -304,7 +308,7 @@ func (h *HandlerService) localProcess(ctx context.Context, a agent.Agent, route
mid = 0
}

ret, err := processHandlerMessage(ctx, route, h.serializer, h.handlerHooks, a.GetSession(), msg.Data, msg.Type, false)
ret, err := h.handlerPool.ProcessHandlerMessage(ctx, route, h.serializer, h.handlerHooks, a.GetSession(), msg.Data, msg.Type, false)
if msg.Type != message.Notify {
if err != nil {
logger.Log.Errorf("Failed to process handler message: %s", err.Error())
Expand All @@ -324,6 +328,7 @@ func (h *HandlerService) localProcess(ctx context.Context, a agent.Agent, route

// DumpServices outputs all registered services
func (h *HandlerService) DumpServices() {
handlers := h.handlerPool.GetHandlers()
for name := range handlers {
logger.Log.Infof("registered handler %s, isRawArg: %s", name, handlers[name].IsRawArg)
}
Expand Down
125 changes: 125 additions & 0 deletions service/handler_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package service

import (
"context"
"fmt"
"reflect"

"github.com/topfreegames/pitaya/component"
"github.com/topfreegames/pitaya/conn/message"
"github.com/topfreegames/pitaya/constants"
e "github.com/topfreegames/pitaya/errors"
"github.com/topfreegames/pitaya/logger"
"github.com/topfreegames/pitaya/pipeline"
"github.com/topfreegames/pitaya/route"
"github.com/topfreegames/pitaya/serialize"
"github.com/topfreegames/pitaya/session"
"github.com/topfreegames/pitaya/util"
)

// HandlerPool ...
type HandlerPool struct {
handlers map[string]*component.Handler // all handler method
}

// NewHandlerPool ...
func NewHandlerPool() *HandlerPool {
return &HandlerPool{
handlers: make(map[string]*component.Handler),
}
}

// Register ...
func (h *HandlerPool) Register(serviceName string, name string, handler *component.Handler) {
h.handlers[fmt.Sprintf("%s.%s", serviceName, name)] = handler
}

// GetHandlers ...
func (h *HandlerPool) GetHandlers() map[string]*component.Handler {
return h.handlers
}

// ProcessHandlerMessage ...
func (h *HandlerPool) ProcessHandlerMessage(
ctx context.Context,
rt *route.Route,
serializer serialize.Serializer,
handlerHooks *pipeline.HandlerHooks,
session session.Session,
data []byte,
msgTypeIface interface{},
remote bool,
) ([]byte, error) {
if ctx == nil {
ctx = context.Background()
}
ctx = context.WithValue(ctx, constants.SessionCtxKey, session)
ctx = util.CtxWithDefaultLogger(ctx, rt.String(), session.UID())

handler, err := h.getHandler(rt)
if err != nil {
return nil, e.NewError(err, e.ErrNotFoundCode)
}

msgType, err := getMsgType(msgTypeIface)
if err != nil {
return nil, e.NewError(err, e.ErrInternalCode)
}

logger := ctx.Value(constants.LoggerCtxKey).(logger.Logger)
exit, err := handler.ValidateMessageType(msgType)
if err != nil && exit {
return nil, e.NewError(err, e.ErrBadRequestCode)
} else if err != nil {
logger.Warnf("invalid message type, error: %s", err.Error())
}

// First unmarshal the handler arg that will be passed to
// both handler and pipeline functions
arg, err := unmarshalHandlerArg(handler, serializer, data)
if err != nil {
return nil, e.NewError(err, e.ErrBadRequestCode)
}

if arg, err = handlerHooks.BeforeHandler.ExecuteBeforePipeline(ctx, arg); err != nil {
return nil, err
}

logger.Debugf("SID=%d, Data=%s", session.ID(), data)
args := []reflect.Value{handler.Receiver, reflect.ValueOf(ctx)}
if arg != nil {
args = append(args, reflect.ValueOf(arg))
}

resp, err := util.Pcall(handler.Method, args)
if remote && msgType == message.Notify {
// This is a special case and should only happen with nats rpc client
// because we used nats request we have to answer to it or else a timeout
// will happen in the caller server and will be returned to the client
// the reason why we don't just Publish is to keep track of failed rpc requests
// with timeouts, maybe we can improve this flow
resp = []byte("ack")
}

resp, err = handlerHooks.AfterHandler.ExecuteAfterPipeline(ctx, resp, err)
if err != nil {
return nil, err
}

ret, err := serializeReturn(serializer, resp)
if err != nil {
return nil, err
}

return ret, nil
}

func (h *HandlerPool) getHandler(rt *route.Route) (*component.Handler, error) {
handler, ok := h.handlers[rt.Short()]
if !ok {
e := fmt.Errorf("pitaya/handler: %s not found", rt.String())
return nil, e
}
return handler, nil

}
Loading

0 comments on commit 001aeee

Please sign in to comment.