Skip to content

Commit

Permalink
修复Websocket服务器重启后重连问题
Browse files Browse the repository at this point in the history
  • Loading branch information
davyxu committed Mar 30, 2019
1 parent 472fc9f commit 49c004e
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 102 deletions.
49 changes: 0 additions & 49 deletions codec/wsjson/json.go

This file was deleted.

36 changes: 18 additions & 18 deletions examples/websocket/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,37 @@

console.log("hello")

var ws = new WebSocket("ws://127.0.0.1:18802/echo");
let ws = new WebSocket("ws://127.0.0.1:18802/echo");
ws.binaryType = "arraybuffer";

ws.onopen = function(evt) {
console.log("Connection open ...");

var msgBody = {
let msgBody = {
Msg: "鲍勃",
Value: 331,
}

// 消息体编码
// 注意:需要对字符串做url编码, 否则中文乱码。该问题仅限于json传输模式
// cellnet接收时,必须使用wsjson编码处理
var jsonBody = encodeURI(JSON.stringify(msgBody),"utf-8")
let msgData = JSON.stringify(msgBody)

let encoder = new TextEncoder('utf8')
let jsonBody= encoder.encode( msgData)

// TODO 实现消息ID与消息体绑定
var msgid = 1234
let msgid = 1234

var pkt = new ArrayBuffer( 2+ jsonBody.length)
var dv = new DataView(pkt)
let pkt = new ArrayBuffer( 2+ jsonBody.length)
let dv = new DataView(pkt)

// 写入消息号
dv.setUint16(0, msgid, true)

// 这里body使用的是Json编码
for(var i = 0;i <jsonBody.length;i++){
dv.setUint8(2+i, jsonBody.charCodeAt(i))
for(let i = 0;i <jsonBody.length;i++){
dv.setUint8(2+i, jsonBody[i])
}

// 发送
Expand All @@ -50,24 +53,21 @@

if (evt.data instanceof ArrayBuffer ){

var dv = new DataView(evt.data);
let dv = new DataView(evt.data);

// TODO 消息号验证
var msgid = dv.getUint16(0, true)
let msgid = dv.getUint16(0, true)

// 包体
var msgBody = evt.data.slice(2)
let msgBody = evt.data.slice(2)

// 这里使用Json的包体
var jsonBody = ""
for(var i = 0;i <msgBody.byteLength;i++){
jsonBody += String.fromCharCode(dv.getUint8(i+2))
}
let decoder = new TextDecoder('utf8')
let jsonBody = decoder.decode(msgBody)

// 解码包体
var msgBody = JSON.parse(jsonBody)
let msg = JSON.parse(jsonBody)

console.log( "Received Message: " , msgBody);
console.log( "Received Message: " , msg);


}else{
Expand Down
69 changes: 65 additions & 4 deletions examples/websocket/main.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package main

import (
"flag"
"github.com/davyxu/cellnet"
"github.com/davyxu/cellnet/peer"
"github.com/davyxu/cellnet/proc"
"github.com/davyxu/golog"
"time"

"fmt"
"github.com/davyxu/cellnet/codec"
_ "github.com/davyxu/cellnet/codec/wsjson"
_ "github.com/davyxu/cellnet/codec/json"
_ "github.com/davyxu/cellnet/peer/gorillaws"
_ "github.com/davyxu/cellnet/proc/gorillaws"
"reflect"
Expand All @@ -26,7 +28,7 @@ func (self *TestEchoACK) String() string { return fmt.Sprintf("%+v", *self) }
// 将消息注册到系统
func init() {
cellnet.RegisterMessageMeta(&cellnet.MessageMeta{
Codec: codec.MustGetCodec("wsjson"),
Codec: codec.MustGetCodec("json"),
Type: reflect.TypeOf((*TestEchoACK)(nil)).Elem(),
ID: 1234,
})
Expand All @@ -35,12 +37,59 @@ func init() {
// 运行服务器, 在浏览器(Chrome)中打开index.html, F12打开调试窗口->Console标签 查看命令行输出
// 注意:日志中的http://127.0.0.1:18802/echo链接是api地址,不是网页地址,直接打开无法正常工作
// 注意:如果http代理/VPN在运行时可能会导致无法连接, 请关闭
func main() {

var (
flagClient = flag.Bool("client", false, "client mode")
)

const (
TestAddress = "http://127.0.0.1:18802/echo"
)

func client() {
// 创建一个事件处理队列,整个服务器只有这一个队列处理事件
queue := cellnet.NewEventQueue()

p := peer.NewGenericPeer("gorillaws.Connector", "client", TestAddress, queue)
p.(cellnet.WSConnector).SetReconnectDuration(time.Second)

proc.BindProcessorHandler(p, "gorillaws.ltv", func(ev cellnet.Event) {

switch msg := ev.Message().(type) {

case *cellnet.SessionConnected:
log.Debugln("server connected")

ev.Session().Send(&TestEchoACK{
Msg: "鲍勃",
Value: 331,
})
// 有连接断开
case *cellnet.SessionClosed:
log.Debugln("session closed: ", ev.Session().ID())
case *TestEchoACK:

log.Debugf("recv: %+v %v", msg, []byte("鲍勃"))

}
})

// 开始侦听
p.Start()

// 事件队列开始循环
queue.StartLoop()

// 阻塞等待事件队列结束退出( 在另外的goroutine调用queue.StopLoop() )
queue.Wait()
}

func server() {
// 创建一个事件处理队列,整个服务器只有这一个队列处理事件,服务器属于单线程服务器
queue := cellnet.NewEventQueue()

p := peer.NewGenericPeer("gorillaws.Acceptor", "server", "http://127.0.0.1:18802~18803/echo", queue)
// 侦听在18802端口
p := peer.NewGenericPeer("gorillaws.Acceptor", "server", TestAddress, queue)

proc.BindProcessorHandler(p, "gorillaws.ltv", func(ev cellnet.Event) {

Expand Down Expand Up @@ -72,3 +121,15 @@ func main() {
queue.Wait()

}

func main() {

flag.Parse()

if *flagClient {
client()
} else {
server()
}

}
25 changes: 22 additions & 3 deletions peer/gorillaws/connector.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package gorillaws

import (
"fmt"
"github.com/davyxu/cellnet"
"github.com/davyxu/cellnet/peer"
"github.com/davyxu/cellnet/util"
"github.com/gorilla/websocket"
"net"
"net/http"
"strings"
"sync"
"time"
)
Expand Down Expand Up @@ -44,6 +46,14 @@ func (self *wsConnector) Session() cellnet.Session {
return self.defaultSes
}

func (self *wsConnector) Port() int {
if self.defaultSes.conn == nil {
return 0
}

return self.defaultSes.conn.LocalAddr().(*net.TCPAddr).Port
}

func (self *wsConnector) SetSessionManager(raw interface{}) {
self.CoreSessionManager = raw.(peer.CoreSessionManager)
}
Expand Down Expand Up @@ -79,19 +89,28 @@ func (self *wsConnector) SetReconnectDuration(v time.Duration) {
const reportConnectFailedLimitTimes = 3

func (self *wsConnector) connect(address string) {

self.SetRunning(true)

for {
self.tryConnTimes++

dialer := websocket.Dialer{}
dialer.Proxy = http.ProxyFromEnvironment
dialer.HandshakeTimeout = 45 * time.Second

addrObj, err := util.ParseAddress(address)
if err != nil {
log.Errorf("invalid address: %s", address)
break
}

// 处理非法路径问题
var finalAddress string
if strings.HasPrefix(address, "ws://") {
if addrObj.Scheme == "ws" || addrObj.Scheme == "wss" {
finalAddress = address
} else {
finalAddress = "ws://" + address
finalAddress = "ws://" + fmt.Sprintf("%s:%d%s", addrObj.Host, addrObj.MinPort, addrObj.Path)
}

conn, _, err := dialer.Dial(finalAddress, nil)
Expand Down
22 changes: 7 additions & 15 deletions peer/gorillaws/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func (self *wsSession) recvLoop() {
msg, err := self.ReadMessage(self)

if err != nil {

log.Debugln(err)

if !util.IsEOFOrNetReadError(err) {
log.Errorln("session closed:", err)
}
Expand All @@ -70,7 +73,10 @@ func (self *wsSession) recvLoop() {
self.ProcEvent(&cellnet.RecvMsgEvent{Ses: self, Msg: msg})
}

self.cleanup()
self.Close()

// 通知完成
self.exitSync.Done()
}

// 发送循环
Expand All @@ -94,25 +100,12 @@ func (self *wsSession) sendLoop() {
}
}

self.cleanup()
}

// 清理资源
func (self *wsSession) cleanup() {

self.cleanupGuard.Lock()

defer self.cleanupGuard.Unlock()

// 关闭连接
if self.conn != nil {
self.conn.Close()
self.conn = nil
}

// pal301x: websocket 客服端关闭服务端无法释放 issue 60
self.Close()

// 通知完成
self.exitSync.Done()
}
Expand All @@ -127,7 +120,6 @@ func (self *wsSession) Start() {
self.exitSync.Add(2)

go func() {

// 等待2个任务结束
self.exitSync.Wait()

Expand Down
13 changes: 0 additions & 13 deletions peer_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,6 @@ import (
"net/http"
)

// Websocket接受器,具备会话访问
type WSAcceptor interface {
GenericPeer

SetHttps(certfile, keyfile string)

// 设置升级器
SetUpgrader(upgrader interface{})

// 访问会话
SessionAccessor
}

type HTTPAcceptor interface {
GenericPeer

Expand Down
Loading

0 comments on commit 49c004e

Please sign in to comment.