Skip to content

Commit

Permalink
implement websocket server
Browse files Browse the repository at this point in the history
  • Loading branch information
name5566 committed Oct 19, 2015
1 parent 122d177 commit 859af0b
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 2 deletions.
98 changes: 96 additions & 2 deletions network/ws_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,105 @@ package network

import (
"github.com/gorilla/websocket"
_ "github.com/name5566/leaf/log"
_ "sync"
"github.com/name5566/leaf/log"
"net"
"sync"
)

type WebsocketConnSet map[*websocket.Conn]struct{}

type WSConn struct {
sync.Mutex
conn *websocket.Conn
writeChan chan []byte
closeFlag bool
}

func newWSConn(conn *websocket.Conn, pendingWriteNum int) *WSConn {
wsConn := new(WSConn)
wsConn.conn = conn
wsConn.writeChan = make(chan []byte, pendingWriteNum)

go func() {
for b := range wsConn.writeChan {
if b == nil {
break
}

err := conn.WriteMessage(websocket.BinaryMessage, b)
if err != nil {
break
}
}

conn.Close()
wsConn.Lock()
wsConn.closeFlag = true
wsConn.Unlock()
}()

return wsConn
}

func (wsConn *WSConn) doDestroy() {
wsConn.conn.UnderlyingConn().(*net.TCPConn).SetLinger(0)
wsConn.conn.Close()
close(wsConn.writeChan)
wsConn.closeFlag = true
}

func (wsConn *WSConn) Destroy() {
wsConn.Lock()
defer wsConn.Unlock()
if wsConn.closeFlag {
return
}

wsConn.doDestroy()
}

func (wsConn *WSConn) Close() {
wsConn.Lock()
defer wsConn.Unlock()
if wsConn.closeFlag {
return
}

wsConn.doWrite(nil)
wsConn.closeFlag = true
}

func (wsConn *WSConn) doWrite(b []byte) {
if len(wsConn.writeChan) == cap(wsConn.writeChan) {
log.Debug("close conn: channel full")
wsConn.doDestroy()
return
}

wsConn.writeChan <- b
}

func (wsConn *WSConn) LocalAddr() net.Addr {
return wsConn.conn.LocalAddr()
}

func (wsConn *WSConn) RemoteAddr() net.Addr {
return wsConn.conn.RemoteAddr()
}

// goroutine not safe
func (wsConn *WSConn) ReadMsg() ([]byte, error) {
_, b, err := wsConn.conn.ReadMessage()
return b, err
}

// b must not be modified by other goroutines
func (wsConn *WSConn) WriteMsg(b []byte) {
wsConn.Lock()
defer wsConn.Unlock()
if wsConn.closeFlag || b == nil {
return
}

wsConn.doWrite(b)
}
13 changes: 13 additions & 0 deletions network/ws_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type WSServer struct {
type WSHandler struct {
maxConnNum int
pendingWriteNum int
newAgent func(*WSConn) Agent
upgrader websocket.Upgrader
conns WebsocketConnSet
mutexConns sync.Mutex
Expand Down Expand Up @@ -56,6 +57,17 @@ func (handler *WSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
handler.conns[conn] = struct{}{}
handler.mutexConns.Unlock()

wsConn := newWSConn(conn, handler.pendingWriteNum)
agent := handler.newAgent(wsConn)
agent.Run()

// cleanup
wsConn.Close()
handler.mutexConns.Lock()
delete(handler.conns, conn)
handler.mutexConns.Unlock()
agent.OnClose()
}

func (server *WSServer) Start() {
Expand Down Expand Up @@ -84,6 +96,7 @@ func (server *WSServer) Start() {
server.handler = &WSHandler{
maxConnNum: server.MaxConnNum,
pendingWriteNum: server.PendingWriteNum,
newAgent: server.NewAgent,
conns: make(WebsocketConnSet),
upgrader: websocket.Upgrader{
HandshakeTimeout: server.HTTPTimeout,
Expand Down

0 comments on commit 859af0b

Please sign in to comment.