Skip to content

Commit

Permalink
implement WSClient
Browse files Browse the repository at this point in the history
  • Loading branch information
name5566 committed Oct 20, 2015
1 parent f3fb664 commit 5e8ec9c
Showing 1 changed file with 124 additions and 0 deletions.
124 changes: 124 additions & 0 deletions network/ws_client.go
Original file line number Diff line number Diff line change
@@ -1 +1,125 @@
package network

import (
"github.com/gorilla/websocket"
"github.com/name5566/leaf/log"
"sync"
"time"
)

type WSClient struct {
sync.Mutex
Addr string
ConnNum int
ConnectInterval time.Duration
PendingWriteNum int
MaxMsgLen int
HandshakeTimeout time.Duration
NewAgent func(*WSConn) Agent
dialer websocket.Dialer
conns WebsocketConnSet
wg sync.WaitGroup
closeFlag bool
}

func (client *WSClient) Start() {
client.init()

for i := 0; i < client.ConnNum; i++ {
client.wg.Add(1)
go client.connect()
}
}

func (client *WSClient) init() {
client.Lock()
defer client.Unlock()

if client.ConnNum <= 0 {
client.ConnNum = 1
log.Release("invalid ConnNum, reset to %v", client.ConnNum)
}
if client.ConnectInterval <= 0 {
client.ConnectInterval = 3 * time.Second
log.Release("invalid ConnectInterval, reset to %v", client.ConnectInterval)
}
if client.PendingWriteNum <= 0 {
client.PendingWriteNum = 100
log.Release("invalid PendingWriteNum, reset to %v", client.PendingWriteNum)
}
if client.MaxMsgLen <= 0 {
client.MaxMsgLen = 4096
log.Release("invalid MaxMsgLen, reset to %v", client.MaxMsgLen)
}
if client.HandshakeTimeout <= 0 {
client.HandshakeTimeout = 10 * time.Second
log.Release("invalid HandshakeTimeout, reset to %v", client.HandshakeTimeout)
}
if client.NewAgent == nil {
log.Fatal("NewAgent must not be nil")
}
if client.conns != nil {
log.Fatal("client is running")
}

client.conns = make(WebsocketConnSet)
client.closeFlag = false
client.dialer = websocket.Dialer{
HandshakeTimeout: client.HandshakeTimeout,
}
}

func (client *WSClient) dial() *websocket.Conn {
for {
conn, _, err := client.dialer.Dial(client.Addr, nil)
if err == nil || client.closeFlag {
return conn
}

log.Release("connect to %v error: %v", client.Addr, err)
time.Sleep(client.ConnectInterval)
continue
}
}

func (client *WSClient) connect() {
defer client.wg.Done()

conn := client.dial()
if conn == nil {
return
}
conn.SetReadLimit(int64(client.MaxMsgLen))

client.Lock()
if client.closeFlag {
client.Unlock()
conn.Close()
return
}
client.conns[conn] = struct{}{}
client.Unlock()

wsConn := newWSConn(conn, client.PendingWriteNum)
agent := client.NewAgent(wsConn)
agent.Run()

// cleanup
wsConn.Close()
client.Lock()
delete(client.conns, conn)
client.Unlock()
agent.OnClose()
}

func (client *WSClient) Close() {
client.Lock()
client.closeFlag = true
for conn := range client.conns {
conn.Close()
}
client.conns = nil
client.Unlock()

client.wg.Wait()
}

0 comments on commit 5e8ec9c

Please sign in to comment.