Skip to content

Commit

Permalink
fix benchmark and heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
Tony committed Dec 30, 2018
1 parent cc2d775 commit 3109085
Show file tree
Hide file tree
Showing 17 changed files with 203 additions and 177 deletions.
214 changes: 125 additions & 89 deletions api/logic/grpc/api.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions api/logic/grpc/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ message ConnectReply {
string key = 2;
string roomID = 3;
repeated int32 accepts = 4;
int64 heartbeat = 5;
}

message DisconnectReq {
Expand Down
75 changes: 38 additions & 37 deletions benchmarks/client/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package main

// Start Commond eg: ./client 1 5000 localhost:8080
// Start Commond eg: ./client 1 1000 localhost:3101
// first parameter:beginning userId
// second parameter: amount of clients
// third parameter: comet server ip
Expand All @@ -11,6 +11,7 @@ import (
"encoding/json"
"flag"
"fmt"
"math/rand"
"net"
"os"
"runtime"
Expand All @@ -30,7 +31,7 @@ const (

const (
rawHeaderLen = uint16(16)
heart = 30 * time.Second
heart = 240 * time.Second
)

// Proto proto.
Expand All @@ -45,15 +46,16 @@ type Proto struct {

// AuthToken auth token.
type AuthToken struct {
Mid int64
Key string
RoomID string
Platform string
Accepts []int32
Mid int64 `json:"mid"`
Key string `json:"key"`
RoomID string `json:"room_id"`
Platform string `json:"platform"`
Accepts []int32 `json:"accepts"`
}

var (
countDown int64
countDown int64
aliveCount int64
)

func main() {
Expand All @@ -63,51 +65,50 @@ func main() {
if err != nil {
panic(err)
}

num, err := strconv.Atoi(os.Args[2])
if err != nil {
panic(err)
}

go result()

for i := begin; i < begin+num; i++ {
go client(fmt.Sprintf("%d", i))
go client(int64(i))
}

// signal
var exit chan bool
<-exit
}

func result() {
var (
lastTimes int64
diff int64
nowCount int64
timer = int64(30)
interval = int64(5)
)

for {
nowCount = atomic.LoadInt64(&countDown)
diff = nowCount - lastTimes
nowCount := atomic.LoadInt64(&countDown)
nowAlive := atomic.LoadInt64(&aliveCount)
diff := nowCount - lastTimes
lastTimes = nowCount
fmt.Println(fmt.Sprintf("%s down:%d down/s:%d", time.Now().Format("2006-01-02 15:04:05"), nowCount, diff/timer))
time.Sleep(time.Duration(timer) * time.Second)
fmt.Println(fmt.Sprintf("%s alive:%d down:%d down/s:%d", time.Now().Format("2006-01-02 15:04:05"), nowAlive, nowCount, diff/interval))
time.Sleep(time.Second * time.Duration(interval))
}
}

func client(key string) {
func client(mid int64) {
for {
startClient(key)
time.Sleep(3 * time.Second)
startClient(mid)
time.Sleep(time.Duration(rand.Intn(10)) * time.Second)
}
}

func startClient(key string) {
//time.Sleep(time.Duration(mrand.Intn(30)) * time.Second)
func startClient(key int64) {
time.Sleep(time.Duration(rand.Intn(120)) * time.Second)
atomic.AddInt64(&aliveCount, 1)
quit := make(chan bool, 1)
defer close(quit)

defer func() {
close(quit)
atomic.AddInt64(&aliveCount, -1)
}()
// connnect to server
conn, err := net.Dial("tcp", os.Args[3])
if err != nil {
log.Errorf("net.Dial(%s) error(%v)", os.Args[3], err)
Expand All @@ -117,9 +118,9 @@ func startClient(key string) {
wr := bufio.NewWriter(conn)
rd := bufio.NewReader(conn)
authToken := &AuthToken{
123,
key,
"",
"test://1000",
"test://1",
"ios",
[]int32{1000, 1001, 1002},
}
Expand All @@ -136,7 +137,7 @@ func startClient(key string) {
log.Errorf("tcpReadProto() error(%v)", err)
return
}
log.Infof("key:%s auth ok, proto: %v", key, proto)
log.Infof("key:%d auth ok, proto: %v", key, proto)
seq++
// writer
go func() {
Expand All @@ -147,10 +148,10 @@ func startClient(key string) {
hbProto.Seq = seq
hbProto.Body = nil
if err = tcpWriteProto(wr, hbProto); err != nil {
log.Errorf("key:%s tcpWriteProto() error(%v)", key, err)
log.Errorf("key:%d tcpWriteProto() error(%v)", key, err)
return
}
log.Infof("key:%s Write heartbeat", key)
log.Infof("key:%d Write heartbeat", key)
time.Sleep(heart)
seq++
select {
Expand All @@ -163,21 +164,21 @@ func startClient(key string) {
// reader
for {
if err = tcpReadProto(rd, proto); err != nil {
log.Errorf("key:%s tcpReadProto() error(%v)", key, err)
log.Errorf("key:%d tcpReadProto() error(%v)", key, err)
quit <- true
return
}
if proto.Operation == opAuthReply {
log.Infof("key:%s auth success", key)
log.Infof("key:%d auth success", key)
} else if proto.Operation == opHeartbeatReply {
log.Infof("key:%s receive heartbeat", key)
log.Infof("key:%d receive heartbeat", key)
if err = conn.SetReadDeadline(time.Now().Add(heart + 60*time.Second)); err != nil {
log.Errorf("conn.SetReadDeadline() error(%v)", err)
quit <- true
return
}
} else {
log.Infof("key:%s op:%d msg: %s", key, proto.Operation, string(proto.Body))
log.Infof("key:%d op:%d msg: %s", key, proto.Operation, string(proto.Body))
atomic.AddInt64(&countDown, 1)
}
}
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/multi_push/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func startPush(b, e int) {
panic(err)
}
for {
resp, err := httpPost(fmt.Sprintf("http://%s/1/pushs", os.Args[3]), "application/x-www-form-urlencoded", bytes.NewBuffer(body))
resp, err := httpPost(fmt.Sprintf("http://%s/goim/push/mids=%d", os.Args[3], b), "application/x-www-form-urlencoded", bytes.NewBuffer(body))
if err != nil {
lg.Printf("post error (%v)", err)
continue
Expand Down
19 changes: 6 additions & 13 deletions benchmarks/push/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
)

var (
lg *log.Logger
httpClient *http.Client
t int
)
Expand Down Expand Up @@ -51,12 +50,6 @@ func init() {

func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
infoLogfi, err := os.OpenFile("./pushs.log", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
panic(err)
}
lg = log.New(infoLogfi, "", log.LstdFlags|log.Lshortfile)

begin, err := strconv.Atoi(os.Args[1])
if err != nil {
panic(err)
Expand All @@ -72,7 +65,7 @@ func main() {
}

num := runtime.NumCPU() * 2
lg.Printf("start routine num:%d", num)
log.Printf("start routine num:%d", num)

l := length / num
b, e := begin, begin+l
Expand All @@ -94,7 +87,7 @@ func stop() {
}

func startPush(b, e int) {
lg.Printf("start Push from %d to %d", b, e)
log.Printf("start Push from %d to %d", b, e)
bodys := make([][]byte, e-b)
for i := 0; i < e-b; i++ {
msg := &pushBodyMsg{Msg: json.RawMessage(testContent), UserID: int64(b)}
Expand All @@ -107,20 +100,20 @@ func startPush(b, e int) {

for {
for i := 0; i < len(bodys); i++ {
resp, err := httpPost(fmt.Sprintf("http://%s/1/pushs", os.Args[3]), "application/x-www-form-urlencoded", bytes.NewBuffer(bodys[i]))
resp, err := httpPost(fmt.Sprintf("http://%s/goim/push/mids?operation=1000&mids=%d", os.Args[3], b), "application/x-www-form-urlencoded", bytes.NewBuffer(bodys[i]))
if err != nil {
lg.Printf("post error (%v)", err)
log.Printf("post error (%v)", err)
continue
}

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
lg.Printf("post error (%v)", err)
log.Printf("post error (%v)", err)
return
}
resp.Body.Close()

lg.Printf("response %s", string(body))
log.Printf("response %s", string(body))
//time.Sleep(50 * time.Millisecond)
}
}
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/push_room/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package main

// Start Commond eg: ./push_room 1 20 localhost:7172
// Start Commond eg: ./push_room 1 20 localhost:3111
// first parameter: room id
// second parameter: num per seconds
// third parameter: logic server ip
Expand Down
18 changes: 5 additions & 13 deletions benchmarks/push_rooms/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
)

var (
lg *log.Logger
httpClient *http.Client
)

Expand All @@ -34,7 +33,6 @@ func init() {
if err != nil {
return nil, err
}

_ = c.SetDeadline(deadline)
return c, nil
},
Expand All @@ -47,12 +45,6 @@ func init() {

func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
infoLogfi, err := os.OpenFile("./push_rooms.log", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
panic(err)
}
lg = log.New(infoLogfi, "", log.LstdFlags|log.Lshortfile)

begin, err := strconv.Atoi(os.Args[1])
if err != nil {
panic(err)
Expand All @@ -69,7 +61,7 @@ func main() {
delay := (1000 * time.Millisecond) / time.Duration(num)

routines := runtime.NumCPU() * 2
lg.Printf("start routine num:%d", routines)
log.Printf("start routine num:%d", routines)

l := length / routines
b, e := begin, begin+l
Expand All @@ -86,24 +78,24 @@ func main() {
}

func startPush(b, e int, delay time.Duration) {
lg.Printf("start Push from %d to %d", b, e)
log.Printf("start Push from %d to %d", b, e)

for {
for i := b; i < e; i++ {
resp, err := http.Post(fmt.Sprintf("http://%s/goim/push/room?operation=1000&type=test&room=%d", os.Args[3], i), "application/json", bytes.NewBufferString(testContent))
if err != nil {
lg.Printf("post error (%v)", err)
log.Printf("post error (%v)", err)
continue
}

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
lg.Printf("post error (%v)", err)
log.Printf("post error (%v)", err)
return
}
resp.Body.Close()

lg.Printf("push room:%d response %s", i, string(body))
log.Printf("push room:%d response %s", i, string(body))
time.Sleep(delay)
}
}
Expand Down
1 change: 0 additions & 1 deletion cmd/comet/comet-example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
svrProto = 10
cliProto = 5
handshakeTimeout = "8s"
heartbeatTimeout = "120s"

[whitelist]
Whitelist = [123]
Expand Down
1 change: 1 addition & 0 deletions cmd/comet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func main() {
}
rand.Seed(time.Now().UTC().UnixNano())
runtime.GOMAXPROCS(runtime.NumCPU())
println(conf.Conf.Debug)
log.Infof("goim-comet [version: %s env: %+v] start", ver, conf.Conf.Env)
// register discovery
dis := naming.New(conf.Conf.Discovery)
Expand Down
6 changes: 3 additions & 3 deletions cmd/logic/logic-example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@
network = "tcp"
addr = "127.0.0.1:6379"
active = 60000
idle = 32
dialTimeout = "50ms"
idle = 1024
dialTimeout = "200ms"
readTimeout = "500ms"
writeTimeout = "500ms"
idleTimeout = "30s"
idleTimeout = "120s"
expire = "30m"
2 changes: 0 additions & 2 deletions internal/comet/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ func Default() *Config {
CliProto: 5,
SvrProto: 10,
HandshakeTimeout: xtime.Duration(time.Second * 5),
HeartbeatTimeout: xtime.Duration(time.Second * 120),
},
Bucket: &Bucket{
Size: 32,
Expand Down Expand Up @@ -179,7 +178,6 @@ type Protocol struct {
SvrProto int
CliProto int
HandshakeTimeout xtime.Duration
HeartbeatTimeout xtime.Duration
}

// Bucket is bucket config.
Expand Down
Loading

0 comments on commit 3109085

Please sign in to comment.