forked from name5566/leaf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathws_conn.go
138 lines (112 loc) · 2.36 KB
/
ws_conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package network
import (
"errors"
"github.com/gorilla/websocket"
"github.com/name5566/leaf/log"
"net"
"sync"
)
type WebsocketConnSet map[*websocket.Conn]struct{}
type WSConn struct {
sync.Mutex
conn *websocket.Conn
writeChan chan []byte
maxMsgLen uint32
closeFlag bool
}
func newWSConn(conn *websocket.Conn, pendingWriteNum int, maxMsgLen uint32) *WSConn {
wsConn := new(WSConn)
wsConn.conn = conn
wsConn.writeChan = make(chan []byte, pendingWriteNum)
wsConn.maxMsgLen = maxMsgLen
go func() {
for b := range wsConn.writeChan {
if b == nil {
break
}
err := conn.WriteMessage(websocket.BinaryMessage, b)
if err != nil {
break
}
}
conn.Close()
wsConn.Lock()
wsConn.closeFlag = true
wsConn.Unlock()
}()
return wsConn
}
func (wsConn *WSConn) doDestroy() {
wsConn.conn.UnderlyingConn().(*net.TCPConn).SetLinger(0)
wsConn.conn.Close()
if !wsConn.closeFlag {
close(wsConn.writeChan)
wsConn.closeFlag = true
}
}
func (wsConn *WSConn) Destroy() {
wsConn.Lock()
defer wsConn.Unlock()
wsConn.doDestroy()
}
func (wsConn *WSConn) Close() {
wsConn.Lock()
defer wsConn.Unlock()
if wsConn.closeFlag {
return
}
wsConn.doWrite(nil)
wsConn.closeFlag = true
}
func (wsConn *WSConn) doWrite(b []byte) {
if len(wsConn.writeChan) == cap(wsConn.writeChan) {
log.Debug("close conn: channel full")
wsConn.doDestroy()
return
}
wsConn.writeChan <- b
}
func (wsConn *WSConn) LocalAddr() net.Addr {
return wsConn.conn.LocalAddr()
}
func (wsConn *WSConn) RemoteAddr() net.Addr {
return wsConn.conn.RemoteAddr()
}
// goroutine not safe
func (wsConn *WSConn) ReadMsg() ([]byte, error) {
_, b, err := wsConn.conn.ReadMessage()
return b, err
}
// args must not be modified by the others goroutines
func (wsConn *WSConn) WriteMsg(args ...[]byte) error {
wsConn.Lock()
defer wsConn.Unlock()
if wsConn.closeFlag {
return nil
}
// get len
var msgLen uint32
for i := 0; i < len(args); i++ {
msgLen += uint32(len(args[i]))
}
// check len
if msgLen > wsConn.maxMsgLen {
return errors.New("message too long")
} else if msgLen < 1 {
return errors.New("message too short")
}
// don't copy
if len(args) == 1 {
wsConn.doWrite(args[0])
return nil
}
// merge the args
msg := make([]byte, msgLen)
l := 0
for i := 0; i < len(args); i++ {
copy(msg[l:], args[i])
l += len(args[i])
}
wsConn.doWrite(msg)
return nil
}