Skip to content

Commit

Permalink
Major refactor for using protobuf only in RPC calls
Browse files Browse the repository at this point in the history
This will make it possible to create cluster and RPC libs for other
programming languages
  • Loading branch information
felipejfc committed Jun 4, 2018
1 parent 77a9c4c commit c79d9fa
Show file tree
Hide file tree
Showing 49 changed files with 2,383 additions and 348 deletions.
4 changes: 3 additions & 1 deletion agent/agent_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ func (a *Remote) Kick(ctx context.Context) error {
if a.Session.UID() == "" {
return constants.ErrNoUIDBind
}
b, err := util.GobEncode([]byte(a.Session.UID()))
b, err := proto.Marshal(&protos.KickMsg{
UserID: a.Session.UID(),
})
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion agent/agent_remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestNewRemoteFailsIfFailedToSetEncodedData(t *testing.T) {
ss := &protos.Session{Data: []byte("invalid")}

remote, err := NewRemote(ss, "", nil, nil, nil, nil, "", nil)
assert.Equal(t, errors.New("unexpected EOF"), err)
assert.Equal(t, errors.New("invalid character 'i' looking for beginning of value").Error(), err.Error())
assert.Nil(t, remote)
}

Expand Down
1 change: 0 additions & 1 deletion app.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ func startDefaultRPCClient() {
}

func initSysRemotes() {
gob.Register(&session.Data{})
gob.Register(map[string]interface{}{})
sys := &remote.Sys{}
RegisterRemote(sys,
Expand Down
2 changes: 1 addition & 1 deletion cluster/nats_rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (ns *NatsRPCClient) buildRequest(
if err != nil {
logger.Log.Errorf("failed to inject span: %s", err)
}
ctx = pcontext.AddToPropagateCtx(ctx, constants.PeerIdKey, ns.server.ID)
ctx = pcontext.AddToPropagateCtx(ctx, constants.PeerIDKey, ns.server.ID)
ctx = pcontext.AddToPropagateCtx(ctx, constants.PeerServiceKey, ns.server.Type)
req.Metadata, err = pcontext.Encode(ctx)
if err != nil {
Expand Down
30 changes: 23 additions & 7 deletions component/method.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ import (
"unicode"
"unicode/utf8"

"github.com/gogo/protobuf/proto"
"github.com/topfreegames/pitaya/internal/message"
)

var (
typeOfError = reflect.TypeOf((*error)(nil)).Elem()
typeOfBytes = reflect.TypeOf(([]byte)(nil))
typeOfContext = reflect.TypeOf(new(context.Context)).Elem()
typeOfError = reflect.TypeOf((*error)(nil)).Elem()
typeOfBytes = reflect.TypeOf(([]byte)(nil))
typeOfContext = reflect.TypeOf(new(context.Context)).Elem()
typeOfProtoMsg = reflect.TypeOf(new(proto.Message)).Elem()
)

func isExported(name string) bool {
Expand All @@ -50,20 +52,30 @@ func isRemoteMethod(method reflect.Method) bool {
}

// Method needs at least two ins: receiver and context.Context
if mt.NumIn() < 2 {
if mt.NumIn() != 2 && mt.NumIn() != 3 {
return false
}

if t1 := mt.In(1); !t1.Implements(typeOfContext) {
return false
}

// Method needs two outs: interface{}(or []byte), error
if mt.NumIn() == 3 {
if t2 := mt.In(2); !t2.Implements(typeOfProtoMsg) {
return false
}
}

// Method needs two outs: interface{}(that implements proto.Message), error
if mt.NumOut() != 2 {
return false
}

if (mt.Out(0).Kind() != reflect.Ptr && mt.Out(0) != typeOfBytes) || mt.Out(1) != typeOfError {
if (mt.Out(0).Kind() != reflect.Ptr) || mt.Out(1) != typeOfError {
return false
}

if o0 := mt.Out(0); !o0.Implements(typeOfProtoMsg) {
return false
}

Expand Down Expand Up @@ -107,6 +119,7 @@ func suitableRemoteMethods(typ reflect.Type, nameFunc func(string) string) map[s
methods := make(map[string]*Remote)
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mt := method.Type
mn := method.Name
if isRemoteMethod(method) {
// rewrite remote name
Expand All @@ -115,7 +128,10 @@ func suitableRemoteMethods(typ reflect.Type, nameFunc func(string) string) map[s
}
methods[mn] = &Remote{
Method: method,
HasArgs: method.Type.NumIn() > 2,
HasArgs: method.Type.NumIn() == 3,
}
if mt.NumIn() == 3 {
methods[mn].Type = mt.In(2)
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions component/method_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/topfreegames/pitaya/protos/test"
)

type TestType struct {
Expand All @@ -42,10 +43,10 @@ func (t *TestType) ExportedHandlerWithSessionAndPointerWithRawOut(ctx context.Co
func (t *TestType) ExportedHandlerWithSessionAndPointerWithPointerOut(ctx context.Context, tt *TestType) (*TestType, error) {
return nil, nil
}
func (t *TestType) ExportedRemoteRawOut(ctx context.Context) ([]byte, error) {
func (t *TestType) ExportedRemoteRawOut(ctx context.Context) (*test.SomeStruct, error) {
return nil, nil
}
func (t *TestType) ExportedRemotePointerOut(ctx context.Context) (*TestType, error) {
func (t *TestType) ExportedRemotePointerOut(ctx context.Context) (*test.SomeStruct, error) {
return nil, nil
}

Expand Down
1 change: 1 addition & 0 deletions component/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type (
Receiver reflect.Value // receiver of method
Method reflect.Method // method stub
HasArgs bool // if remote has no args we won't try to serialize received data into arguments
Type reflect.Type // low-level type of method
}

// Service implements a specific service, some of it's methods will be
Expand Down
4 changes: 2 additions & 2 deletions constants/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ var PropagateCtxKey = propagateKey{}
// the propagate key
var SpanPropagateCtxKey = "opentracing-span"

// PeerIdKey is the key holding the peer id to be sent over the context
var PeerIdKey = "peer.id"
// PeerIDKey is the key holding the peer id to be sent over the context
var PeerIDKey = "peer.id"

// PeerServiceKey is the key holding the peer service to be sent over the context
var PeerServiceKey = "peer.service"
Expand Down
12 changes: 5 additions & 7 deletions context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
package context

import (
"bytes"
"context"
"encoding/gob"
"encoding/json"

"github.com/topfreegames/pitaya/constants"
"github.com/topfreegames/pitaya/util"
)

// AddToPropagateCtx adds a key and value that will be propagated through RPC calls
Expand Down Expand Up @@ -63,7 +61,7 @@ func FromMap(val map[string]interface{}) context.Context {
func Encode(ctx context.Context) ([]byte, error) {
m := ToMap(ctx)
if len(m) > 0 {
return util.GobEncode(m)
return json.Marshal(m)
}
return nil, nil
}
Expand All @@ -74,10 +72,10 @@ func Decode(m []byte) (context.Context, error) {
// TODO maybe return an error
return nil, nil
}
args := make([]interface{}, 0)
err := gob.NewDecoder(bytes.NewReader(m)).Decode(&args)
mp := make(map[string]interface{}, 0)
err := json.Unmarshal(m, &mp)
if err != nil {
return nil, err
}
return FromMap(args[0].(map[string]interface{})), nil
return FromMap(mp), nil
}
9 changes: 3 additions & 6 deletions context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package context
import (
"context"
"encoding/gob"
"encoding/json"
"errors"
"flag"
"os"
Expand All @@ -33,7 +34,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/topfreegames/pitaya/constants"
"github.com/topfreegames/pitaya/helpers"
"github.com/topfreegames/pitaya/util"
)

var update = flag.Bool("update", false, "update .golden files")
Expand Down Expand Up @@ -166,8 +166,6 @@ func TestEncode(t *testing.T) {
}{
{"no_elements", map[string]interface{}{}, nil},
{"one_element", map[string]interface{}{"key1": "val1"}, nil},
{"unregistered_struct", map[string]interface{}{"key1": &unregisteredStruct{}}, errors.New("gob: type not registered for interface: context.unregisteredStruct")},
{"registered_struct", map[string]interface{}{"key1": &registeredStruct{}}, nil},
}

for _, table := range tables {
Expand All @@ -177,7 +175,7 @@ func TestEncode(t *testing.T) {
if len(table.items) > 0 && table.err == nil {
gp := filepath.Join("fixtures", table.name+".golden")
if *update {
b, err := util.GobEncode(table.items)
b, err := json.Marshal(table.items)
require.NoError(t, err)
t.Log("updating golden file")
helpers.WriteFile(t, gp, b)
Expand All @@ -202,7 +200,6 @@ func TestDecode(t *testing.T) {
items map[string]interface{}
}{
{"one_element", map[string]interface{}{"key1": "val1"}},
{"registered_struct", map[string]interface{}{"key1": &registeredStruct{}}},
}

for _, table := range tables {
Expand All @@ -221,7 +218,7 @@ func TestDecode(t *testing.T) {

func TestDecodeFailsIfBadEncodedData(t *testing.T) {
decoded, err := Decode([]byte("oh noes"))
assert.Equal(t, errors.New("unexpected EOF"), err)
assert.Equal(t, errors.New("invalid character 'o' looking for beginning of value").Error(), err.Error())
assert.Nil(t, decoded)
}

Expand Down
Binary file modified context/fixtures/one_element.golden
Binary file not shown.
Binary file modified context/fixtures/registered_struct.golden
Binary file not shown.
22 changes: 11 additions & 11 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ func TestHandlerCallToFront(t *testing.T) {
data []byte
resp []byte
}{
{"connector.testsvc.testrequestonlysessionreturnsptr", []byte(``), []byte(`{"code":200,"msg":"hello"}`)},
{"connector.testsvc.testrequestonlysessionreturnsptr", []byte(``), []byte(`{"Code":200,"Msg":"hello"}`)},
{"connector.testsvc.testrequestonlysessionreturnsptrnil", []byte(``), []byte(`{"code":"PIT-000","msg":"reply must not be null"}`)},
{"connector.testsvc.testrequestonlysessionreturnsrawnil", []byte(``), []byte(`{"code":"PIT-000","msg":"reply must not be null"}`)},
{"connector.testsvc.testrequestreturnsptr", []byte(`{"msg":"good"}`), []byte(`{"code":200,"msg":"good"}`)},
{"connector.testsvc.testrequestreturnsptr", []byte(`{"msg":"good"}`), []byte(`{"Code":200,"Msg":"good"}`)},
{"connector.testsvc.testrequestreturnsraw", []byte(`{"msg":"good"}`), []byte(`good`)},
{"connector.testsvc.testrequestreceivereturnsraw", []byte(`woow`), []byte(`woow`)},
{"connector.testsvc.nonexistenthandler", []byte(`woow`), []byte(`{"code":"PIT-404","msg":"pitaya/handler: connector.testsvc.nonexistenthandler not found"}`)},
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestGroupFront(t *testing.T) {
data []byte
}{
{"connector.testsvc.testsendgroupmsg", []byte("testing group")},
{"connector.testsvc.testsendgroupmsgptr", []byte(`{"msg":"hellow"}`)},
{"connector.testsvc.testsendgroupmsgptr", []byte(`{"Msg":"hellow"}`)},
}

for _, table := range tables {
Expand Down Expand Up @@ -357,8 +357,8 @@ func TestForwardToBackend(t *testing.T) {
data []byte
resp []byte
}{
{"game.testsvc.testrequestonlysessionreturnsptr", []byte(``), []byte(`{"code":200,"msg":"hello"}`)},
{"game.testsvc.testrequestreturnsptr", []byte(`{"msg":"good"}`), []byte(`{"code":200,"msg":"good"}`)},
{"game.testsvc.testrequestonlysessionreturnsptr", []byte(``), []byte(`{"Code":200,"Msg":"hello"}`)},
{"game.testsvc.testrequestreturnsptr", []byte(`{"msg":"good"}`), []byte(`{"Code":200,"Msg":"good"}`)},
{"game.testsvc.testrequestreturnsraw", []byte(`{"msg":"good"}`), []byte(`good`)},
{"game.testsvc.testrequestreceivereturnsraw", []byte(`woow`), []byte(`woow`)},
{"game.testsvc.nonexistenthandler", []byte(`woow`), []byte(`{"code":"PIT-404","msg":"pitaya/handler: game.testsvc.nonexistenthandler not found"}`)},
Expand Down Expand Up @@ -418,7 +418,7 @@ func TestGroupBack(t *testing.T) {
data []byte
}{
{"game.testsvc.testsendgroupmsg", []byte("testing group")},
{"game.testsvc.testsendgroupmsgptr", []byte(`{"msg":"hellow"}`)},
{"game.testsvc.testsendgroupmsgptr", []byte(`{"Msg":"hellow"}`)},
}

for _, table := range tables {
Expand Down Expand Up @@ -452,14 +452,14 @@ func TestUserRPC(t *testing.T) {
data []byte
res []byte
}{
{"front_to_back", "connector.testsvc.testsendrpc", []byte(`{"route":"game.testremotesvc.rpctestrawptrreturnsptr","data":"thisthis"}`), []byte(`{"code":200,"msg":"got thisthis"}`)},
{"back_to_front", "game.testsvc.testsendrpc", []byte(`{"route":"connector.testremotesvc.rpctestrawptrreturnsptr","data":"thisthis"}`), []byte(`{"code":200,"msg":"got thisthis"}`)},
{"front_to_back", "connector.testsvc.testsendrpc", []byte(`{"route":"game.testremotesvc.rpctestrawptrreturnsptr","data":"thisthis"}`), []byte(`{"Code":200,"Msg":"got thisthis"}`)},
{"back_to_front", "game.testsvc.testsendrpc", []byte(`{"route":"connector.testremotesvc.rpctestrawptrreturnsptr","data":"thisthis"}`), []byte(`{"Code":200,"Msg":"got thisthis"}`)},
{"front_to_back_error", "connector.testsvc.testsendrpc", []byte(`{"route":"game.testremotesvc.rpctestreturnserror","data":"thisthis"}`), []byte(`{"code":"PIT-433","msg":"test error","metadata":{"some":"meta"}}`)},
{"back_to_front_error", "game.testsvc.testsendrpc", []byte(`{"route":"connector.testremotesvc.rpctestreturnserror","data":"thisthis"}`), []byte(`{"code":"PIT-433","msg":"test error","metadata":{"some":"meta"}}`)},
{"same_server", "connector.testsvc.testsendrpc", []byte(`{"route":"connector.testremotesvc.rpctestrawptrreturnsptr","data":"thisthis"}`), []byte(`{"code":"PIT-000","msg":"you are making a rpc that may be processed locally, either specify a different server type or specify a server id"}`)},
{"front_to_back_ptr", "connector.testsvc.testsendrpcpointer", []byte(`{"route":"game.testremotesvc.rpctestptrreturnsptr","data":"thisthis"}`), []byte(`{"code":200,"msg":"got thisthis"}`)},
{"no_args", "connector.testsvc.testsendrpcnoargs", []byte(`{"route":"game.testremotesvc.rpctestnoargs"}`), []byte(`{"code":200,"msg":"got nothing"}`)},
{"not_found", "connector.testsvc.testsendrpcpointer", []byte(`{"route":"game.testremotesvc.rpctestnotfound","data":"thisthis"}`), []byte(`{"code":"PIT-404","msg":"route not found","metadata":{"route":"testremotesvc.rpctestnotfound"}}`)},
{"front_to_back_ptr", "connector.testsvc.testsendrpc", []byte(`{"route":"game.testremotesvc.rpctestptrreturnsptr","data":"thisthis"}`), []byte(`{"Code":200,"Msg":"got thisthis"}`)},
{"no_args", "connector.testsvc.testsendrpcnoargs", []byte(`{"route":"game.testremotesvc.rpctestnoargs"}`), []byte(`{"Code":200,"Msg":"got nothing"}`)},
{"not_found", "connector.testsvc.testsendrpc", []byte(`{"route":"game.testremotesvc.rpctestnotfound","data":"thisthis"}`), []byte(`{"code":"PIT-404","msg":"route not found","metadata":{"route":"testremotesvc.rpctestnotfound"}}`)},
}

for _, table := range tables {
Expand Down
4 changes: 1 addition & 3 deletions examples/demo/cluster/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ func configureBackend() {

func configureFrontend(port int) {
configureJaeger("connector")
ws := acceptor.NewWSAcceptor(fmt.Sprintf(":%d", port))
tcp := acceptor.NewTCPAcceptor(fmt.Sprintf(":%d", port+1))
tcp := acceptor.NewTCPAcceptor(fmt.Sprintf(":%d", port))

pitaya.Register(&services.Connector{},
component.WithName("connector"),
Expand Down Expand Up @@ -90,7 +89,6 @@ func configureFrontend(port int) {
fmt.Printf("error setting route dictionary %s\n", err.Error())
}

pitaya.AddAcceptor(ws)
pitaya.AddAcceptor(tcp)
}

Expand Down
Loading

0 comments on commit c79d9fa

Please sign in to comment.