-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathepoll_unix.go
120 lines (109 loc) · 2.19 KB
/
epoll_unix.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
/**
* @Author: llh
* @Date: 2019-06-01 15:08:12
* @Last Modified by: llh
*/
// +build darwin netbsd freebsd openbsd dragonfly linux
package tfg
import (
"golang.org/x/sys/unix"
"sync/atomic"
)
type pollEvent struct {
id int
connCount int64
poll *poll
s *server
}
func (e *pollEvent) incConnCount() {
atomic.AddInt64(&e.connCount, 1)
}
func (e *pollEvent) decConnCount() {
atomic.AddInt64(&e.connCount, -1)
}
func (e *pollEvent) run() {
defer func() {
e.s.signalShutdown()
e.s.wg.Done()
}()
e.poll.wait(func(fd int, mode int32) error {
c, _ := e.s.connManager.get(fd)
if c == nil {
return e.accept(fd)
}
if mode == 'r' || mode == 'r'+'w' {
e.read(c)
}
if mode == 'w' || mode == 'r'+'w' {
e.write()
}
return nil
})
}
func (e *pollEvent) opened(c *conn) {
c.setConnOpened()
if e.s.handleConn.PreOpen != nil {
e.s.handleConn.PreOpen(c)
}
}
func (e *pollEvent) write() {
e.s.setAvailableWrite()
}
func (e *pollEvent) accept(fd int) error {
for {
if len(e.s.pollEvents) > 1 {
switch e.s.acceptBalance {
case RoundRobin:
if e.id != int(atomic.LoadInt64(&e.s.connManager.connCount))%e.s.numPollEvent {
return nil
}
case LeastConn:
count := atomic.LoadInt64(&e.connCount)
for _, event := range e.s.pollEvents {
if count > atomic.LoadInt64(&event.connCount) {
return nil
}
}
}
}
nfd, sa, err := unix.Accept(fd)
if err != nil {
if err == unix.EAGAIN {
return nil
}
return err
}
if err := unix.SetNonblock(nfd, true); err != nil {
return err
}
conn := e.s.connManager.connCache.Get().(*conn)
conn.fd = nfd
conn.sa = sa
conn.laddr = e.s.ln.lnaddr
conn.s = e.s
conn.indexPollEvent = e.id
conn.raddr = conn.saToAddr(sa)
e.s.connManager.add(conn.fd, conn)
e.poll.addFd(conn.fd)
e.incConnCount()
e.s.connManager.incConnCount()
e.opened(conn)
}
return nil
}
func (e *pollEvent) read(c *conn) {
for {
cw := e.s.connManager.inCache.Get().(*connWorker)
n, err := unix.Read(c.fd, cw.in)
if n == 0 || err != nil {
if err == unix.EAGAIN {
return
}
c.setConnNeedClosed()
return
}
cw.conn = c
cw.n = n
c.s.poolHandle.handleConn(cw)
}
}