Skip to content

Commit

Permalink
Merging v2 and resolving conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
Henrique Oelze committed Jul 28, 2021
2 parents 9621045 + 67305a9 commit 5e03566
Show file tree
Hide file tree
Showing 57 changed files with 1,242 additions and 380 deletions.
98 changes: 98 additions & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
name: Tests

on:
push:
branches:
- v2
pull_request:
branches:
- v2

jobs:
deps:
name: Dependencies
runs-on: ubuntu-latest
steps:
- name: Set up Go
uses: actions/setup-go@v1
with:
go-version: '1.13'
- name: Checkout
uses: actions/checkout@v2
- name: Restore cache
uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: Download dependencies
run: go mod download
unit-test:
name: Unit Test
runs-on: ubuntu-latest
needs: deps
steps:
- name: Set up Go
uses: actions/setup-go@v1
with:
go-version: '1.13'
- name: Checkout
uses: actions/checkout@v2
- name: Restore cache
uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: Setup dependencies
env:
GO111MODULE: off
run: make setup-ci
- name: Run tests
run: make test-coverage
- name: Send coverage
env:
COVERALLS_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: ~/go/bin/goveralls -coverprofile=coverprofile.out -service=github
e2e-test-nats:
name: Nats Test End to End
runs-on: ubuntu-latest
needs: deps
steps:
- name: Set up Go
uses: actions/setup-go@v1
with:
go-version: '1.13'
- name: Checkout
uses: actions/checkout@v2
- name: Restore cache
uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: Run tests
run: make e2e-test-nats
e2e-test-grpc:
name: GRPC Test End to End
runs-on: ubuntu-latest
needs: deps
steps:
- name: Set up Go
uses: actions/setup-go@v1
with:
go-version: '1.13'
- name: Checkout
uses: actions/checkout@v2
- name: Restore cache
uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: Run tests
run: make e2e-test-grpc
9 changes: 0 additions & 9 deletions .travis.yml

This file was deleted.

4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
[4]: https://goreportcard.com/report/github.com/topfreegames/pitaya
[5]: https://img.shields.io/badge/license-MIT-blue.svg
[6]: LICENSE
[7]: https://travis-ci.org/topfreegames/pitaya.svg?branch=master
[8]: https://travis-ci.org/topfreegames/pitaya
[7]: https://github.com/topfreegames/pitaya/actions/workflows/tests.yaml/badge.svg
[8]: https://github.com/topfreegames/pitaya/actions/workflows/tests.yaml
[9]: https://coveralls.io/repos/github/topfreegames/pitaya/badge.svg?branch=master
[10]: https://coveralls.io/github/topfreegames/pitaya?branch=master
[11]: https://readthedocs.org/projects/pitaya/badge/?version=latest
Expand Down
7 changes: 7 additions & 0 deletions acceptor/tcp_acceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (t *tcpPlayerConn) GetNextMessage() (b []byte, err error) {
if err != nil {
return nil, err
}
// if the header has no data, we can consider it as a closed connection
if len(header) == 0 {
return nil, constants.ErrConnectionClosed
}
msgSize, _, err := codec.ParseHeader(header)
if err != nil {
return nil, err
Expand Down Expand Up @@ -134,6 +138,9 @@ func (a *TCPAcceptor) ListenAndServeTLS(cert, key string) {
tlsCfg := &tls.Config{Certificates: []tls.Certificate{crt}}

listener, err := tls.Listen("tcp", a.addr, tlsCfg)
if err != nil {
logger.Log.Fatalf("Failed to listen: %s", err.Error())
}
a.listener = listener
a.running = true
a.serve()
Expand Down
23 changes: 23 additions & 0 deletions acceptor/tcp_acceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,30 @@ func TestGetNextMessageEOF(t *testing.T) {

_, err = playerConn.GetNextMessage()
assert.EqualError(t, err, constants.ErrReceivedMsgSmallerThanExpected.Error())
}

func TestGetNextMessageEmptyEOF(t *testing.T) {
a := NewTCPAcceptor("0.0.0.0:0")
go a.ListenAndServe()
defer a.Stop()
c := a.GetConnChan()
// should be able to connect within 100 milliseconds
var conn net.Conn
var err error
helpers.ShouldEventuallyReturn(t, func() error {
conn, err = net.Dial("tcp", a.GetAddr())
return err
}, nil, 10*time.Millisecond, 100*time.Millisecond)

playerConn := helpers.ShouldEventuallyReceive(t, c, 100*time.Millisecond).(PlayerConn)

go func() {
time.Sleep(100 * time.Millisecond)
conn.Close()
}()

_, err = playerConn.GetNextMessage()
assert.EqualError(t, err, constants.ErrConnectionClosed.Error())
}

func TestGetNextMessageInParts(t *testing.T) {
Expand Down
9 changes: 4 additions & 5 deletions acceptorwrapper/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"container/list"
"time"

"github.com/topfreegames/pitaya/v2"
"github.com/topfreegames/pitaya/v2/acceptor"
"github.com/topfreegames/pitaya/v2/constants"
"github.com/topfreegames/pitaya/v2/logger"
Expand All @@ -42,7 +41,7 @@ import (
// be prepared to handle it.
type RateLimiter struct {
acceptor.PlayerConn
app pitaya.Pitaya
reporters []metrics.Reporter
limit int
interval time.Duration
times list.List
Expand All @@ -51,15 +50,15 @@ type RateLimiter struct {

// NewRateLimiter returns an initialized *RateLimiting
func NewRateLimiter(
app pitaya.Pitaya,
reporters []metrics.Reporter,
conn acceptor.PlayerConn,
limit int,
interval time.Duration,
forceDisable bool,
) *RateLimiter {
r := &RateLimiter{
PlayerConn: conn,
app: app,
reporters: reporters,
limit: limit,
interval: interval,
forceDisable: forceDisable,
Expand All @@ -85,7 +84,7 @@ func (r *RateLimiter) GetNextMessage() (msg []byte, err error) {
now := time.Now()
if r.shouldRateLimit(now) {
logger.Log.Errorf("Data=%s, Error=%s", msg, constants.ErrRateLimitExceeded)
metrics.ReportExceededRateLimiting(r.app.GetMetricsReporters())
metrics.ReportExceededRateLimiting(r.reporters)
continue
}

Expand Down
9 changes: 2 additions & 7 deletions acceptorwrapper/rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,8 @@ func TestRateLimiterGetNextMessage(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockConn = mocks.NewMockPlayerConn(ctrl)
mockedApp := mocks.NewMockPitaya(ctrl)
mockedApp.EXPECT().GetMetricsReporters().Return([]metrics.Reporter{}).AnyTimes()

r = NewRateLimiter(mockedApp, mockConn, limit, interval, table.forceDisable)
r = NewRateLimiter([]metrics.Reporter{}, mockConn, limit, interval, table.forceDisable)

table.mock()
buf, err := r.GetNextMessage()
Expand Down Expand Up @@ -165,10 +163,7 @@ func TestRateLimiterShouldRateLimit(t *testing.T) {

for name, table := range tables {
t.Run(name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockedApp := mocks.NewMockPitaya(ctrl)
mockedApp.EXPECT().GetMetricsReporters().Return([]metrics.Reporter{}).AnyTimes()
r = NewRateLimiter(mockedApp, nil, limit, interval, false)
r = NewRateLimiter([]metrics.Reporter{}, nil, limit, interval, false)

table.before()
should := r.shouldRateLimit(now)
Expand Down
6 changes: 3 additions & 3 deletions acceptorwrapper/rate_limiting_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
package acceptorwrapper

import (
"github.com/topfreegames/pitaya/v2"
"github.com/topfreegames/pitaya/v2/acceptor"
"github.com/topfreegames/pitaya/v2/config"
"github.com/topfreegames/pitaya/v2/metrics"
)

// RateLimitingWrapper rate limits for each connection
Expand All @@ -33,11 +33,11 @@ type RateLimitingWrapper struct {
}

// NewRateLimitingWrapper returns an instance of *RateLimitingWrapper
func NewRateLimitingWrapper(app pitaya.Pitaya, c config.RateLimitingConfig) *RateLimitingWrapper {
func NewRateLimitingWrapper(reporters []metrics.Reporter, c config.RateLimitingConfig) *RateLimitingWrapper {
r := &RateLimitingWrapper{}

r.BaseWrapper = NewBaseWrapper(func(conn acceptor.PlayerConn) acceptor.PlayerConn {
return NewRateLimiter(app, conn, c.Limit, c.Interval, c.ForceDisable)
return NewRateLimiter(reporters, conn, c.Limit, c.Interval, c.ForceDisable)
})

return r
Expand Down
10 changes: 3 additions & 7 deletions acceptorwrapper/rate_limiting_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,17 @@ import (
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/topfreegames/pitaya/v2/config"
"github.com/topfreegames/pitaya/v2/metrics"
"github.com/topfreegames/pitaya/v2/mocks"
)

func TestNewRateLimitingWrapper(t *testing.T) {
t.Parallel()

ctrl := gomock.NewController(t)
mockedApp := mocks.NewMockPitaya(ctrl)
mockedApp.EXPECT().GetMetricsReporters().Return([]metrics.Reporter{}).AnyTimes()
reporters := []metrics.Reporter{}

rateLimitingWrapper := NewRateLimitingWrapper(mockedApp, *config.NewDefaultRateLimitingConfig())
expected := NewRateLimiter(mockedApp, nil, 20, time.Second, false)
rateLimitingWrapper := NewRateLimitingWrapper(reporters, *config.NewDefaultRateLimitingConfig())
expected := NewRateLimiter(reporters, nil, 20, time.Second, false)
assert.Equal(t, expected, rateLimitingWrapper.wrapConn(nil))
}
32 changes: 20 additions & 12 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,11 @@ func (a *agentImpl) send(pendingMsg pendingMessage) (err error) {
pWrite.err = util.GetErrorFromPayload(a.serializer, m.Data)
}

a.chSend <- pWrite
// chSend is never closed so we need this to don't block if agent is already closed
select {
case a.chSend <- pWrite:
case <-a.chDie:
}
return
}

Expand All @@ -288,10 +292,10 @@ func (a *agentImpl) Push(route string, v interface{}) error {

switch d := v.(type) {
case []byte:
logger.Log.Debugf("Type=Push, ID=%d, UID=%d, Route=%s, Data=%dbytes",
logger.Log.Debugf("Type=Push, ID=%d, UID=%s, Route=%s, Data=%dbytes",
a.Session.ID(), a.Session.UID(), route, len(d))
default:
logger.Log.Debugf("Type=Push, ID=%d, UID=%d, Route=%s, Data=%+v",
logger.Log.Debugf("Type=Push, ID=%d, UID=%s, Route=%s, Data=%+v",
a.Session.ID(), a.Session.UID(), route, v)
}
return a.send(pendingMessage{typ: message.Push, route: route, payload: v})
Expand All @@ -314,10 +318,10 @@ func (a *agentImpl) ResponseMID(ctx context.Context, mid uint, v interface{}, is

switch d := v.(type) {
case []byte:
logger.Log.Debugf("Type=Response, ID=%d, UID=%d, MID=%d, Data=%dbytes",
logger.Log.Debugf("Type=Response, ID=%d, UID=%s, MID=%d, Data=%dbytes",
a.Session.ID(), a.Session.UID(), mid, len(d))
default:
logger.Log.Infof("Type=Response, ID=%d, UID=%d, MID=%d, Data=%+v",
logger.Log.Infof("Type=Response, ID=%d, UID=%s, MID=%d, Data=%+v",
a.Session.ID(), a.Session.UID(), mid, v)
}

Expand Down Expand Up @@ -394,15 +398,12 @@ func (a *agentImpl) SetStatus(state int32) {
func (a *agentImpl) Handle() {
defer func() {
a.Close()
logger.Log.Debugf("Session handle goroutine exit, SessionID=%d, UID=%d", a.Session.ID(), a.Session.UID())
logger.Log.Debugf("Session handle goroutine exit, SessionID=%d, UID=%s", a.Session.ID(), a.Session.UID())
}()

go a.write()
go a.heartbeat()
select {
case <-a.chDie: // agent closed signal
return
}
<-a.chDie // agent closed signal
}

// IPVersion returns the remote address ip version.
Expand Down Expand Up @@ -438,7 +439,15 @@ func (a *agentImpl) heartbeat() {
logger.Log.Debugf("Session heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&a.lastAt), deadline)
return
}
a.chSend <- pendingWrite{data: hbd}

// chSend is never closed so we need this to don't block if agent is already closed
select {
case a.chSend <- pendingWrite{data: hbd}:
case <-a.chDie:
return
case <-a.chStopHeartbeat:
return
}
case <-a.chDie:
return
case <-a.chStopHeartbeat:
Expand Down Expand Up @@ -472,7 +481,6 @@ func (a *agentImpl) SendHandshakeResponse() error {
func (a *agentImpl) write() {
// clean func
defer func() {
close(a.chSend)
a.Close()
}()

Expand Down
4 changes: 2 additions & 2 deletions agent/agent_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ func (a *Remote) Push(route string, v interface{}) error {
}
switch d := v.(type) {
case []byte:
logger.Log.Debugf("Type=Push, ID=%d, UID=%d, Route=%s, Data=%dbytes",
logger.Log.Debugf("Type=Push, ID=%d, UID=%s, Route=%s, Data=%dbytes",
a.Session.ID(), a.Session.UID(), route, len(d))
default:
logger.Log.Debugf("Type=Push, ID=%d, UID=%d, Route=%s, Data=%+v",
logger.Log.Debugf("Type=Push, ID=%d, UID=%s, Route=%s, Data=%+v",
a.Session.ID(), a.Session.UID(), route, v)
}

Expand Down
Loading

0 comments on commit 5e03566

Please sign in to comment.