Skip to content

Commit

Permalink
Pipeline now receives the decoded arguments insted of raw data (topfr…
Browse files Browse the repository at this point in the history
  • Loading branch information
rcmgleite authored and cscatolini committed Aug 16, 2018
1 parent 4114def commit 7465ba3
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 54 deletions.
28 changes: 28 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 0 additions & 23 deletions examples/demo/chat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type (
component.Base
group *pitaya.Group
timer *timer.Timer
stats *stats
}

// UserMessage represents a message that user sent
Expand All @@ -49,37 +48,19 @@ type (
Code int `json:"code"`
Result string `json:"result"`
}

stats struct {
outboundBytes int
inboundBytes int
}
)

func (stats *stats) outbound(ctx context.Context, in []byte) ([]byte, error) {
stats.outboundBytes += len(in)
return in, nil
}

func (stats *stats) inbound(ctx context.Context, in []byte) ([]byte, error) {
stats.inboundBytes += len(in)
return in, nil
}

// NewRoom returns a new room
func NewRoom() *Room {
return &Room{
group: pitaya.NewGroup("room"),
stats: &stats{},
}
}

// AfterInit component lifetime callback
func (r *Room) AfterInit() {
r.timer = pitaya.NewTimer(time.Minute, func() {
println("UserCount: Time=>", time.Now().String(), "Count=>", r.group.Count())
println("OutboundBytes", r.stats.outboundBytes)
println("InboundBytes", r.stats.outboundBytes)
})
}

Expand Down Expand Up @@ -129,10 +110,6 @@ func main() {
component.WithNameFunc(strings.ToLower),
)

// traffic stats
pitaya.AfterHandler(room.stats.outbound)
pitaya.BeforeHandler(room.stats.inbound)

log.SetFlags(log.LstdFlags | log.Llongfile)

http.Handle("/web/", http.StripPrefix("/web/", http.FileServer(http.Dir("web"))))
Expand Down
4 changes: 0 additions & 4 deletions examples/demo/cluster/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ func configureBackend() {
component.WithName("room"),
component.WithNameFunc(strings.ToLower),
)

// traffic stats
pitaya.AfterHandler(room.Stats.Outbound)
pitaya.BeforeHandler(room.Stats.Inbound)
}

func configureFrontend(port int) {
Expand Down
4 changes: 0 additions & 4 deletions examples/demo/cluster_grpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ func configureBackend() {
component.WithName("room"),
component.WithNameFunc(strings.ToLower),
)

// traffic stats
pitaya.AfterHandler(room.Stats.Outbound)
pitaya.BeforeHandler(room.Stats.Inbound)
}

func configureFrontend(port int) {
Expand Down
4 changes: 0 additions & 4 deletions examples/demo/cluster_protobuf/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ func configureBackend() {
component.WithName("room"),
component.WithNameFunc(strings.ToLower),
)

// traffic stats
pitaya.AfterHandler(room.Stats.Outbound)
pitaya.BeforeHandler(room.Stats.Inbound)
}

func configureFrontend(port int) {
Expand Down
111 changes: 111 additions & 0 deletions examples/demo/pipeline/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package main

import (
"context"
"flag"
"fmt"

"github.com/sirupsen/logrus"
"github.com/topfreegames/pitaya"
"github.com/topfreegames/pitaya/acceptor"
"github.com/topfreegames/pitaya/component"
"github.com/topfreegames/pitaya/serialize/json"
validator "gopkg.in/go-playground/validator.v9"
)

// MetagameServer ...
type MetagameServer struct {
component.Base
Logger logrus.FieldLogger
}

// NewMetagameMock ...
func NewMetagameMock() *MetagameServer {
return &MetagameServer{
Logger: logrus.New(),
}
}

// CreatePlayerCheatArgs ...
type CreatePlayerCheatArgs struct {
Name string `json:"name"`
Email string `json:"email" validate:"email"`
SoftCurrency int `json:"softCurrency" validate:"gte=0,lte=1000"`
HardCurrency int `json:"hardCurrency" validate:"gte=0,lte=200"`
}

// CreatePlayerCheatResponse ...
type CreatePlayerCheatResponse struct {
Msg string `json:"msg"`
}

// CreatePlayerCheat ...
func (g *MetagameServer) CreatePlayerCheat(ctx context.Context, args *CreatePlayerCheatArgs) (*CreatePlayerCheatResponse, error) {
// Do nothing. This is just an example of how pipelines can be helpful
return &CreatePlayerCheatResponse{
Msg: "ok",
}, nil
}

// This is a beforeHandler that validates the handler argument based on the struct tags.
// As for this example, the CreatePlayerCheatArgs has the 'validate' tags for email,
// softCurrency and hardCurrency. If any of the validations fail an error will be returned
func handlerParamsValidator(ctx context.Context, in interface{}) (interface{}, error) {
var validate *validator.Validate
validate = validator.New()

if err := validate.Struct(in); err != nil {
return nil, err
}

return in, nil
}

// Simple example of a before pipeline that actually asserts the type of the
// in parameter.
// IMPORTANT: that this kind of pipeline will be hard to exist in real code
// as a pipeline function executes for every handler and each of them
// most probably have different parameter types.
func (g *MetagameServer) simpleBefore(ctx context.Context, in interface{}) (interface{}, error) {
g.Logger.Info("Simple Before exec")
createPlayerArgs := in.(*CreatePlayerCheatArgs)

g.Logger.Infof("Name: %s", createPlayerArgs.Name)
g.Logger.Infof("Email: %s", createPlayerArgs.Email)
g.Logger.Infof("SoftCurrency: %d", createPlayerArgs.SoftCurrency)
g.Logger.Infof("HardCurrency: %d", createPlayerArgs.HardCurrency)

return in, nil
}

// Simple example of an after pipeline. The 2nd argument is the handler response.
func (g *MetagameServer) simpleAfter(ctx context.Context, resp interface{}) (interface{}, error) {
g.Logger.Info("Simple After exec - response:", resp)

return resp, nil
}

func main() {
svType := flag.String("type", "metagameDemo", "the server type")
isFrontend := flag.Bool("frontend", true, "if server is frontend")
flag.Parse()

defer pitaya.Shutdown()

metagameServer := NewMetagameMock()
pitaya.SetSerializer(json.NewSerializer())
pitaya.Register(metagameServer,
component.WithName("metagameHandler"),
)

// Pipelines registration
pitaya.BeforeHandler(handlerParamsValidator)
pitaya.BeforeHandler(metagameServer.simpleBefore)
pitaya.AfterHandler(metagameServer.simpleAfter)

port := 3251
tcp := acceptor.NewTCPAcceptor(fmt.Sprintf(":%d", port))
pitaya.AddAcceptor(tcp)
pitaya.Configure(*isFrontend, *svType, pitaya.Cluster, map[string]string{})
pitaya.Start()
}
2 changes: 1 addition & 1 deletion pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
type (
// Handler is a function that has the same signature as a handler and will
// be called before or after handler methods
Handler func(ctx context.Context, in []byte) (out []byte, err error)
Handler func(ctx context.Context, in interface{}) (out interface{}, err error)

pipelineChannel struct {
Handlers []Handler
Expand Down
4 changes: 2 additions & 2 deletions pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
)

var (
handler1 = func(ctx context.Context, in []byte) ([]byte, error) {
handler1 = func(ctx context.Context, in interface{}) (interface{}, error) {
return in, errors.New("ohno")
}
handler2 = func(ctx context.Context, in []byte) ([]byte, error) {
handler2 = func(ctx context.Context, in interface{}) (interface{}, error) {
return nil, nil
}
p = &pipelineChannel{}
Expand Down
2 changes: 1 addition & 1 deletion pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func resetPipelines() {
pipeline.AfterHandler.Handlers = make([]pipeline.Handler, 0)
}

var myHandler = func(ctx context.Context, in []byte) ([]byte, error) {
var myHandler = func(ctx context.Context, in interface{}) (interface{}, error) {
return []byte("test"), nil
}

Expand Down
16 changes: 9 additions & 7 deletions service/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func getMsgType(msgTypeIface interface{}) (message.Type, error) {
return msgType, nil
}

func executeBeforePipeline(ctx context.Context, data []byte) ([]byte, error) {
func executeBeforePipeline(ctx context.Context, data interface{}) (interface{}, error) {
var err error
res := data
if len(pipeline.BeforeHandler.Handlers) > 0 {
Expand All @@ -114,7 +114,7 @@ func executeBeforePipeline(ctx context.Context, data []byte) ([]byte, error) {
return res, nil
}

func executeAfterPipeline(ctx context.Context, ser serialize.Serializer, res []byte) []byte {
func executeAfterPipeline(ctx context.Context, ser serialize.Serializer, res interface{}) interface{} {
var err error
ret := res
if len(pipeline.AfterHandler.Handlers) > 0 {
Expand Down Expand Up @@ -170,15 +170,17 @@ func processHandlerMessage(
logger.Log.Warn(err.Error())
}

if data, err = executeBeforePipeline(ctx, data); err != nil {
return nil, err
}

// First unmarshal the handler arg that will be passed to
// both handler and pipeline functions
arg, err := unmarshalHandlerArg(h, serializer, data)
if err != nil {
return nil, e.NewError(err, e.ErrBadRequestCode)
}

if arg, err = executeBeforePipeline(ctx, arg); err != nil {
return nil, err
}

logger.Log.Debugf("SID=%d, Data=%s", session.ID(), data)
args := []reflect.Value{h.Receiver, reflect.ValueOf(ctx)}
if arg != nil {
Expand All @@ -199,11 +201,11 @@ func processHandlerMessage(
resp = []byte("ack")
}

resp = executeAfterPipeline(ctx, serializer, resp)
ret, err := serializeReturn(serializer, resp)
if err != nil {
return nil, err
}
ret = executeAfterPipeline(ctx, serializer, ret)

return ret, nil
}
Loading

0 comments on commit 7465ba3

Please sign in to comment.