Skip to content

Commit

Permalink
Migrating pr 225 (topfreegames#232)
Browse files Browse the repository at this point in the history
* fix(client): fix when routes have no return (output)

We're also improving some errors to be more descritive on the client.

* chore(makefile): add a target to compile examples protos

* chore(examples): update cluster example to use only protobuf

We're also adding the missing descriptor handler which is used by the
pitaya-cli.

Co-authored-by: gabrielcorado <[email protected]>
  • Loading branch information
henriqueoelze and gabrielcorado authored Aug 10, 2021
1 parent 067f6e6 commit 56d2c1f
Show file tree
Hide file tree
Showing 7 changed files with 754 additions and 131 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ run-custom-metrics-example:
run-rate-limiting-example:
@go run examples/demo/rate_limiting/main.go

protos-compile-demo:
@protoc -I examples/demo/protos examples/demo/protos/*.proto --go_out=.

protos-compile:
@cd benchmark/testdata && ./gen_proto.sh
@protoc -I pitaya-protos/ pitaya-protos/*.proto --go_out=plugins=grpc:protos
Expand Down
31 changes: 18 additions & 13 deletions client/protoclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"strings"
"time"
Expand Down Expand Up @@ -144,6 +145,11 @@ func getOutputInputNames(command map[string]interface{}) (string, string, error)

out := command["output"]
outputDocsArr := out.([]interface{})
// we can have handlers that have no return specified.
if len(outputDocsArr) == 0 {
return inputName, "", nil
}

outputDocs, ok := outputDocsArr[0].(map[string]interface{})
if ok {
for k := range outputDocs {
Expand Down Expand Up @@ -211,7 +217,7 @@ func (pc *ProtoClient) getDescriptors(data string) error {
cmdInfo := v.(map[string]interface{})
in, out, err := getOutputInputNames(cmdInfo)
if err != nil {
return err
return fmt.Errorf("failed to get output and input names for '%s' handler: %w", k, err)
}

var command Command
Expand Down Expand Up @@ -250,33 +256,33 @@ func (pc *ProtoClient) getDescriptors(data string) error {

encodedNames, err := proto.Marshal(protname)
if err != nil {
return err
return fmt.Errorf("failed to encode proto names: %w", err)
}
_, err = pc.SendRequest(pc.descriptorsRoute, encodedNames)
if err != nil {
return err
return fmt.Errorf("failed to send proto descriptors request: %w", err)
}

response := <-pc.Client.IncomingMsgChan
descriptors := &protos.ProtoDescriptors{}
if err := proto.Unmarshal(response.Data, descriptors); err != nil {
return err
return fmt.Errorf("failed to unmarshal proto descriptors response: %w", err)
}

// get all proto types
descriptorArray := make([]*protobuf.FileDescriptorProto, 0)
for i := range descriptors.Desc {
fileDescriptorProto, err := unpackDescriptor(descriptors.Desc[i])
if err != nil {
return err
return fmt.Errorf("failed to unpack descriptor: %w", err)
}

descriptorArray = append(descriptorArray, fileDescriptorProto)
pc.descriptorsNames[names[i]] = true
}

if err = pc.buildProtosFromDescriptor(descriptorArray); err != nil {
return err
return fmt.Errorf("failed to build proto from descriptor: %w", err)
}

return nil
Expand Down Expand Up @@ -341,11 +347,11 @@ func (pc *ProtoClient) LoadServerInfo(addr string) error {

docs := &protos.Doc{}
if err := proto.Unmarshal(response.Data, docs); err != nil {
return err
return fmt.Errorf("failed to unmarshal docs route response: %w", err)
}

if err := pc.getDescriptors(docs.Doc); err != nil {
return err
return fmt.Errorf("failed to read proto descriptors: %w", err)
}

pc.Disconnect()
Expand All @@ -369,7 +375,6 @@ func (pc *ProtoClient) waitForData() {
for {
select {
case response := <-pc.Client.IncomingMsgChan:

inputMsg := dynamic.NewMessage(pc.expectedInputDescriptor)

msg, ok := pc.info.Commands[response.Route]
Expand All @@ -388,27 +393,27 @@ func (pc *ProtoClient) waitForData() {
}
response.Data, err = json.Marshal(errMsg)
if err != nil {
logger.Log.Errorf("Erro encode error to json: %s", string(response.Data))
logger.Log.Errorf("error encode error to json: %s", string(response.Data))
continue
}
pc.IncomingMsgChan <- response
continue
}

if inputMsg == nil {
logger.Log.Errorf("Not expected data: %s", string(response.Data))
logger.Log.Errorf("not expected data: %s", string(response.Data))
continue
}

err := inputMsg.Unmarshal(response.Data)
if err != nil {
logger.Log.Errorf("Erro decode data: %s", string(response.Data))
logger.Log.Errorf("error decode data: %s", string(response.Data))
continue
}

data, err2 := inputMsg.MarshalJSON()
if err2 != nil {
logger.Log.Errorf("Erro encode data to json: %s", string(response.Data))
logger.Log.Errorf("error encode data to json: %s", string(response.Data))
continue
}

Expand Down
3 changes: 3 additions & 0 deletions examples/demo/cluster/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func configureFrontend(port int) {
component.WithName("connector"),
component.WithNameFunc(strings.ToLower),
)

app.RegisterRemote(services.NewConnectorRemote(app),
component.WithName("connectorremote"),
component.WithNameFunc(strings.ToLower),
Expand Down Expand Up @@ -87,6 +88,8 @@ func main() {
builder.Groups = groups.NewMemoryGroupService(*config.NewDefaultMemoryGroupConfig())
app = builder.Build()

//TODO: Oelze pitaya.SetSerializer(protobuf.NewSerializer())

defer app.Shutdown()

if !*isFrontend {
Expand Down
24 changes: 20 additions & 4 deletions examples/demo/cluster/services/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/topfreegames/pitaya/v2"
"github.com/topfreegames/pitaya/v2/component"
"github.com/topfreegames/pitaya/v2/examples/demo/protos"
pitayaprotos "github.com/topfreegames/pitaya/v2/protos"
)

// ConnectorRemote is a remote that will receive rpc's
Expand Down Expand Up @@ -43,8 +44,8 @@ func NewConnectorRemote(app pitaya.Pitaya) *ConnectorRemote {
return &ConnectorRemote{app: app}
}

func reply(code int32, msg string) (*Response, error) {
res := &Response{
func reply(code int32, msg string) (*protos.Response, error) {
res := &protos.Response{
Code: code,
Msg: msg,
}
Expand Down Expand Up @@ -88,7 +89,7 @@ func (c *ConnectorRemote) RemoteFunc(ctx context.Context, msg *protos.RPCMsg) (*
}

// Docs returns documentation
func (c *ConnectorRemote) Docs(ctx context.Context, ddd *protos.Doc) (*protos.Doc, error) {
func (c *ConnectorRemote) Docs(ctx context.Context, ddd *pitayaprotos.Doc) (*protos.Doc, error) {
d, err := c.app.Documentation(true)
if err != nil {
return nil, err
Expand All @@ -99,5 +100,20 @@ func (c *ConnectorRemote) Docs(ctx context.Context, ddd *protos.Doc) (*protos.Do
return nil, err
}

return &protos.Doc{Doc: string(doc)}, nil
return &pitayaprotos.Doc{Doc: string(doc)}, nil
}

func (c *ConnectorRemote) Descriptor(ctx context.Context, names *pitayaprotos.ProtoNames) (*pitayaprotos.ProtoDescriptors, error) {
descriptors := make([][]byte, len(names.Name))

for i, protoName := range names.Name {
desc, err := pitaya.Descriptor(protoName)
if err != nil {
return nil, fmt.Errorf("failed to get descriptor for '%s': %w", protoName, err)
}

descriptors[i] = desc
}

return &pitayaprotos.ProtoDescriptors{Desc: descriptors}, nil
}
32 changes: 16 additions & 16 deletions examples/demo/cluster/services/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type (
component.Base
timer *timer.Timer
app pitaya.Pitaya
Stats *Stats
Stats *protos.Stats
}

// UserMessage represents a message that user sent
Expand Down Expand Up @@ -66,7 +66,7 @@ type (
func NewRoom(app pitaya.Pitaya) *Room {
return &Room{
app: app,
Stats: &Stats{},
Stats: &protos.Stats{},
}
}

Expand All @@ -80,23 +80,23 @@ func (r *Room) AfterInit() {
r.timer = pitaya.NewTimer(time.Minute, func() {
count, err := r.app.GroupCountMembers(context.Background(), "room")
println("UserCount: Time=>", time.Now().String(), "Count=>", count, "Error=>", err)
println("OutboundBytes", r.Stats.outboundBytes)
println("InboundBytes", r.Stats.outboundBytes)
println("OutboundBytes", r.Stats.OutboundBytes)
println("InboundBytes", r.Stats.OutboundBytes)
})
}

// Entry is the entrypoint
func (r *Room) Entry(ctx context.Context, msg []byte) (*JoinResponse, error) {
func (r *Room) Entry(ctx context.Context, msg []byte) (*protos.JoinResponse, error) {
logger := pitaya.GetDefaultLoggerFromCtx(ctx) // The default logger contains a requestId, the route being executed and the sessionId
s := r.app.GetSessionFromCtx(ctx)

err := s.Bind(ctx, "helroow")
err := s.Bind(ctx, "banana")
if err != nil {
logger.Error("Failed to bind session")
logger.Error(err)
return nil, pitaya.Error(err, "RH-000", map[string]string{"failed": "bind"})
}
return &JoinResponse{Result: "ok"}, nil
return &protos.JoinResponse{Result: "ok"}, nil
}

// GetSessionData gets the session data
Expand Down Expand Up @@ -125,7 +125,7 @@ func (r *Room) SetSessionData(ctx context.Context, data *SessionData) ([]byte, e
}

// Join room
func (r *Room) Join(ctx context.Context) (*JoinResponse, error) {
func (r *Room) Join(ctx context.Context) (*protos.JoinResponse, error) {
logger := pitaya.GetDefaultLoggerFromCtx(ctx)
s := r.app.GetSessionFromCtx(ctx)
err := r.app.GroupAddMember(ctx, "room", s.UID())
Expand All @@ -140,18 +140,18 @@ func (r *Room) Join(ctx context.Context) (*JoinResponse, error) {
logger.Error(err)
return nil, err
}
s.Push("onMembers", &AllMembers{Members: members})
err = r.app.GroupBroadcast(ctx, "connector", "room", "onNewUser", &NewUser{Content: fmt.Sprintf("New user: %d", s.ID())})
s.Push("onMembers", &protos.AllMembers{Members: members})
err = r.app.GroupBroadcast(ctx, "connector", "room", "onNewUser", &protos.NewUser{Content: fmt.Sprintf("New user: %d", s.ID())})
if err != nil {
logger.Error("Failed to broadcast onNewUser")
logger.Error(err)
return nil, err
}
return &JoinResponse{Result: "success"}, nil
return &protos.JoinResponse{Result: "success"}, nil
}

// Message sync last message to all members
func (r *Room) Message(ctx context.Context, msg *UserMessage) {
func (r *Room) Message(ctx context.Context, msg *protos.UserMessage) {
logger := pitaya.GetDefaultLoggerFromCtx(ctx)
err := r.app.GroupBroadcast(ctx, "connector", "room", "onMessage", msg)
if err != nil {
Expand All @@ -161,19 +161,19 @@ func (r *Room) Message(ctx context.Context, msg *UserMessage) {
}

// SendRPC sends rpc
func (r *Room) SendRPC(ctx context.Context, msg *SendRPCMsg) (*protos.RPCRes, error) {
func (r *Room) SendRPC(ctx context.Context, msg *protos.SendRPCMsg) (*protos.RPCRes, error) {
logger := pitaya.GetDefaultLoggerFromCtx(ctx)
ret := &protos.RPCRes{}
err := r.app.RPCTo(ctx, msg.ServerID, msg.Route, ret, &protos.RPCMsg{Msg: msg.Msg})
err := r.app.RPCTo(ctx, msg.ServerId, msg.Route, ret, &protos.RPCMsg{Msg: msg.Msg})
if err != nil {
logger.Errorf("Failed to execute RPCTo %s - %s", msg.ServerID, msg.Route)
logger.Errorf("Failed to execute RPCTo %s - %s", msg.ServerId, msg.Route)
logger.Error(err)
return nil, pitaya.Error(err, "RPC-000")
}
return ret, nil
}

// MessageRemote just echoes the given message
func (r *Room) MessageRemote(ctx context.Context, msg *UserMessage, b bool, s string) (*UserMessage, error) {
func (r *Room) MessageRemote(ctx context.Context, msg *protos.UserMessage, b bool, s string) (*protos.UserMessage, error) {
return msg, nil
}
Loading

0 comments on commit 56d2c1f

Please sign in to comment.