Skip to content

Commit

Permalink
Major bug fixes and refactor 💥
Browse files Browse the repository at this point in the history
- benchmark tests
- fix bugs that could cause pitaya to be slow and now scallable
  • Loading branch information
felipejfc committed Apr 19, 2018
1 parent c78380f commit 6f62be2
Show file tree
Hide file tree
Showing 30 changed files with 428 additions and 320 deletions.
8 changes: 3 additions & 5 deletions acceptor/tcp_acceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
"github.com/topfreegames/pitaya/logger"
)

var log = logger.Log

// TCPAcceptor struct
type TCPAcceptor struct {
addr string
Expand Down Expand Up @@ -69,7 +67,7 @@ func (a *TCPAcceptor) Stop() {
func (a *TCPAcceptor) ListenAndServe() {
listener, err := net.Listen("tcp", a.addr)
if err != nil {
log.Fatal(err)
logger.Log.Fatal(err)
}
a.listener = listener
a.running = true
Expand All @@ -80,7 +78,7 @@ func (a *TCPAcceptor) ListenAndServe() {
func (a *TCPAcceptor) ListenAndServeTLS(cert, key string) {
crt, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
log.Fatal(err)
logger.Log.Fatal(err)
}

tlsCfg := &tls.Config{Certificates: []tls.Certificate{crt}}
Expand All @@ -96,7 +94,7 @@ func (a *TCPAcceptor) serve() {
for a.running {
conn, err := a.listener.Accept()
if err != nil {
log.Error(err.Error())
logger.Log.Error(err.Error())
continue
}
a.connChan <- conn
Expand Down
10 changes: 5 additions & 5 deletions acceptor/ws_acceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (h *connHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {

c, err := newWSConn(conn)
if err != nil {
log.Error(err)
logger.Log.Error(err)
return
}
h.connChan <- c
Expand All @@ -89,7 +89,7 @@ func (w *WSAcceptor) ListenAndServe() {

listener, err := net.Listen("tcp", w.addr)
if err != nil {
log.Fatal(err)
logger.Log.Fatal(err)
}
w.listener = listener

Expand All @@ -105,13 +105,13 @@ func (w *WSAcceptor) ListenAndServeTLS(cert, key string) {

crt, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
log.Fatal(err)
logger.Log.Fatal(err)
}

tlsCfg := &tls.Config{Certificates: []tls.Certificate{crt}}
listener, err := tls.Listen("tcp", w.addr, tlsCfg)
if err != nil {
log.Fatal(err)
logger.Log.Fatal(err)
}
w.listener = listener
w.serve(&upgrader)
Expand All @@ -130,7 +130,7 @@ func (w *WSAcceptor) serve(upgrader *websocket.Upgrader) {
func (w *WSAcceptor) Stop() {
err := w.listener.Close()
if err != nil {
log.Error(err)
logger.Log.Error(err)
}
}

Expand Down
36 changes: 15 additions & 21 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
)

var (
log = logger.Log
// hbd contains the heartbeat packet data
hbd []byte
// hrd contains the handshake response data
Expand All @@ -60,7 +59,6 @@ type (
chSend chan pendingMessage // push message queue
chStopHeartbeat chan struct{} // stop heartbeats
chStopWrite chan struct{} // stop writing messages
chWrite chan []byte // write message to the clients
conn net.Conn // low-level conn fd
decoder codec.PacketDecoder // binary decoder
encoder codec.PacketEncoder // binary encoder
Expand Down Expand Up @@ -102,7 +100,6 @@ func NewAgent(
messagesBufferSize: messagesBufferSize,
chStopHeartbeat: make(chan struct{}),
chStopWrite: make(chan struct{}),
chWrite: make(chan []byte, messagesBufferSize),
conn: conn,
decoder: packetDecoder,
encoder: packetEncoder,
Expand Down Expand Up @@ -137,7 +134,8 @@ func (a *Agent) Push(route string, v interface{}) error {
}

if len(a.chSend) >= a.messagesBufferSize {
return constants.ErrBufferExceed
// TODO monitorar
logger.Log.Warnf("chSend is at maximum capacity, channel len: %d", len(a.chSend))
}

switch d := v.(type) {
Expand Down Expand Up @@ -169,7 +167,7 @@ func (a *Agent) ResponseMID(mid uint, v interface{}, isError ...bool) error {

if len(a.chSend) >= a.messagesBufferSize {
// TODO monitorar
return constants.ErrBufferExceed
logger.Log.Warnf("chSend is at maximum capacity, channel len: %d", len(a.chSend))
}

switch d := v.(type) {
Expand All @@ -192,7 +190,7 @@ func (a *Agent) Close() error {
}
a.SetStatus(constants.StatusClosed)

log.Debugf("Session closed, ID=%d, UID=%d, IP=%s",
logger.Log.Debugf("Session closed, ID=%d, UID=%d, IP=%s",
a.Session.ID(), a.Session.UID(), a.conn.RemoteAddr())

// prevent closing closed channel
Expand Down Expand Up @@ -239,7 +237,7 @@ func (a *Agent) SetStatus(state int32) {
func (a *Agent) Handle() {
defer func() {
a.Close()
log.Debugf("Session handle goroutine exit, SessionID=%d, UID=%d", a.Session.ID(), a.Session.UID())
logger.Log.Debugf("Session handle goroutine exit, SessionID=%d, UID=%d", a.Session.ID(), a.Session.UID())
}()

go a.write()
Expand All @@ -261,7 +259,7 @@ func (a *Agent) heartbeat() {
case <-ticker.C:
deadline := time.Now().Add(-2 * a.heartbeatTimeout).Unix()
if a.lastAt < deadline {
log.Debugf("Session heartbeat timeout, LastTime=%d, Deadline=%d", a.lastAt, deadline)
logger.Log.Debugf("Session heartbeat timeout, LastTime=%d, Deadline=%d", a.lastAt, deadline)
close(a.chDie)
return
}
Expand Down Expand Up @@ -301,25 +299,17 @@ func (a *Agent) write() {
// clean func
defer func() {
close(a.chSend)
close(a.chWrite)
}()

for {
select {
case data := <-a.chWrite:
// close agent if low-level Conn broken
if _, err := a.conn.Write(data); err != nil {
logger.Log.Error(err.Error())
return
}

case data := <-a.chSend:
payload, err := util.SerializeOrRaw(a.serializer, data.payload)
if err != nil {
log.Error(err.Error())
logger.Log.Error(err.Error())
payload, err = util.GetErrorPayload(a.serializer, err)
if err != nil {
log.Error("cannot serialize message and respond to the client ", err.Error())
logger.Log.Error("cannot serialize message and respond to the client ", err.Error())
break
}
}
Expand All @@ -344,7 +334,11 @@ func (a *Agent) write() {
logger.Log.Error(err)
break
}
a.chWrite <- p
// close agent if low-level Conn broken
if _, err := a.conn.Write(p); err != nil {
logger.Log.Error(err.Error())
return
}

case <-a.chStopWrite:
return
Expand All @@ -361,12 +355,12 @@ func (a *Agent) SendRequest(serverID, route string, v interface{}) (*protos.Resp
func (a *Agent) AnswerWithError(mid uint, err error) {
p, e := util.GetErrorPayload(a.serializer, err)
if e != nil {
log.Error("error answering the player with an error: ", e.Error())
logger.Log.Error("error answering the player with an error: ", e.Error())
return
}
e = a.Session.ResponseMID(mid, p, true)
if e != nil {
log.Error("error answering the player with an error: ", e.Error())
logger.Log.Error("error answering the player with an error: ", e.Error())
}
}

Expand Down
36 changes: 20 additions & 16 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ var (
running: false,
router: router.New(),
}
log = logger.Log

remoteService *service.RemoteService
handlerService *service.HandlerService
Expand All @@ -119,7 +118,7 @@ func Configure(
cfgs ...*viper.Viper,
) {
if app.configured {
log.Warn("pitaya configured twice!")
logger.Log.Warn("pitaya configured twice!")
}
app.config = config.NewConfig(cfgs...)
if app.heartbeat == time.Duration(0) {
Expand All @@ -135,7 +134,7 @@ func Configure(
// AddAcceptor adds a new acceptor to app
func AddAcceptor(ac acceptor.Acceptor) {
if !app.server.Frontend {
log.Error("tried to add an acceptor to a backend server, skipping")
logger.Log.Error("tried to add an acceptor to a backend server, skipping")
return
}
app.acceptors = append(app.acceptors, ac)
Expand All @@ -161,6 +160,11 @@ func SetHeartbeatTime(interval time.Duration) {
app.heartbeat = interval
}

// SetLogger logger setter
func SetLogger(l logger.Logger) {
logger.Log = l
}

// SetRPCServer to be used
func SetRPCServer(s cluster.RPCServer) {
app.rpcServer = s
Expand Down Expand Up @@ -204,7 +208,7 @@ func startDefaultSD() {
app.server,
)
if err != nil {
log.Fatalf("error starting cluster service discovery component: %s", err.Error())
logger.Log.Fatalf("error starting cluster service discovery component: %s", err.Error())
}
}

Expand All @@ -215,7 +219,7 @@ func startDefaultRPCServer() {
app.server,
)
if err != nil {
log.Fatalf("error starting cluster rpc server component: %s", err.Error())
logger.Log.Fatalf("error starting cluster rpc server component: %s", err.Error())
}
SetRPCServer(rpcServer)
}
Expand All @@ -224,7 +228,7 @@ func startDefaultRPCClient() {
// initialize default rpc client
rpcClient, err := cluster.NewNatsRPCClient(app.config, app.server)
if err != nil {
log.Fatalf("error starting cluster rpc client component: %s", err.Error())
logger.Log.Fatalf("error starting cluster rpc client component: %s", err.Error())
}
SetRPCClient(rpcClient)
}
Expand All @@ -241,26 +245,26 @@ func initSysRemotes() {
// Start starts the app
func Start() {
if !app.configured {
log.Fatal("starting app without configuring it first! call pitaya.Configure()")
logger.Log.Fatal("starting app without configuring it first! call pitaya.Configure()")
}

if !app.server.Frontend && len(app.acceptors) > 0 {
log.Fatal("acceptors are not allowed on backend servers")
logger.Log.Fatal("acceptors are not allowed on backend servers")
}

if app.serverMode == Cluster {
if app.serviceDiscovery == nil {
log.Warn("creating default service discovery because cluster mode is enabled, " +
logger.Log.Warn("creating default service discovery because cluster mode is enabled, " +
"if you want to specify yours, use pitaya.SetServiceDiscoveryClient")
startDefaultSD()
}
if app.rpcServer == nil {
log.Warn("creating default rpc server because cluster mode is enabled, " +
logger.Log.Warn("creating default rpc server because cluster mode is enabled, " +
"if you want to specify yours, use pitaya.SetRPCServer")
startDefaultRPCServer()
}
if app.rpcClient == nil {
log.Warn("creating default rpc client because cluster mode is enabled, " +
logger.Log.Warn("creating default rpc client because cluster mode is enabled, " +
"if you want to specify yours, use pitaya.SetRPCClient")
startDefaultRPCClient()
RegisterModule(app.serviceDiscovery, "serviceDiscovery")
Expand Down Expand Up @@ -307,12 +311,12 @@ func Start() {
// stop server
select {
case <-app.dieChan:
log.Warn("The app will shutdown in a few seconds")
logger.Log.Warn("The app will shutdown in a few seconds")
case s := <-sg:
log.Warn("got signal", s)
logger.Log.Warn("got signal", s)
}

log.Warn("server is stopping...")
logger.Log.Warn("server is stopping...")

shutdownModules()
shutdownComponents()
Expand All @@ -324,7 +328,7 @@ func listen() {
// by SetTimerPrecision
timer.GlobalTicker = time.NewTicker(timer.Precision)

log.Infof("starting server %s:%s", app.server.Type, app.server.ID)
logger.Log.Infof("starting server %s:%s", app.server.Type, app.server.ID)
for i := 0; i < app.config.GetInt("pitaya.concurrency.handler.dispatch"); i++ {
go handlerService.Dispatch(i)
}
Expand All @@ -340,7 +344,7 @@ func listen() {
a.ListenAndServe()
}()

log.Infof("listening with acceptor %s on addr %s", reflect.TypeOf(a), a.GetAddr())
logger.Log.Infof("listening with acceptor %s on addr %s", reflect.TypeOf(a), a.GetAddr())
}

startModules()
Expand Down
8 changes: 8 additions & 0 deletions app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/topfreegames/pitaya/acceptor"
Expand All @@ -39,6 +40,7 @@ import (
"github.com/topfreegames/pitaya/helpers"
"github.com/topfreegames/pitaya/internal/codec"
"github.com/topfreegames/pitaya/internal/message"
"github.com/topfreegames/pitaya/logger"
"github.com/topfreegames/pitaya/route"
"github.com/topfreegames/pitaya/router"
"github.com/topfreegames/pitaya/serialize/json"
Expand Down Expand Up @@ -141,6 +143,12 @@ func TestSetDebug(t *testing.T) {
assert.Equal(t, false, app.debug)
}

func TestSetLogger(t *testing.T) {
l := logrus.New()
SetLogger(l)
assert.Equal(t, l, logger.Log)
}

func TestSetPacketDecoder(t *testing.T) {
d := codec.NewPomeloPacketDecoder()
SetPacketDecoder(d)
Expand Down
Loading

0 comments on commit 6f62be2

Please sign in to comment.