forked from tidwall/evio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevio.go
174 lines (162 loc) · 5.27 KB
/
evio.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
// Copyright 2017 Joshua J Baker. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package evio
import (
"io"
"net"
"os"
"strings"
"time"
)
// Action is an action that occurs after the completion of an event.
type Action int
const (
// None indicates that no action should occur following an event.
None Action = iota
// Detach detaches the client.
Detach
// Close closes the client.
Close
// Shutdown shutdowns the server.
Shutdown
)
// Options are set when the client opens.
type Options struct {
// TCPKeepAlive (SO_KEEPALIVE) socket option.
TCPKeepAlive time.Duration
}
// Addr represents the connection's remote and local addresses.
type Addr struct {
// Index is the index of server address that was passed to the Serve call.
Index int
// Local is the connection's local socket address.
Local net.Addr
// Local is the connection's remote peer address.
Remote net.Addr
}
// Events represents the server events for the Serve call.
// Each event has an Action return value that is used manage the state
// of the connection and server.
type Events struct {
// Serving fires when the server can accept connections.
// The wake parameter is a goroutine-safe function that triggers
// a Data event (with a nil `in` parameter) for the specified id.
// The addrs parameter is an array of listening addresses that align
// with the addr strings passed to the Serve function.
Serving func(wake func(id int) bool, addrs []net.Addr) (action Action)
// Opened fires when a new connection has opened.
// The addr parameter is the connection's local and remote addresses.
// Use the out return value to write data to the connection.
// The opts return value is used to set connection options.
Opened func(id int, addr Addr) (out []byte, opts Options, action Action)
// Opened fires when a connection has closed.
// The err parameter is the last known connection error, usually nil.
Closed func(id int, err error) (action Action)
// Detached fires when a connection has been previously detached.
// Once detached it's up to the receiver of this event to manage the
// state of the connection. The Closed event will not be called for
// this connection.
// The conn parameter is a ReadWriteCloser that represents the
// underlying socket connection. It can be freely used in goroutines
// and should be closed when it's no longer needed.
Detached func(id int, rwc io.ReadWriteCloser) (action Action)
// Data fires when a connection sends the server data.
// The in parameter is the incoming data.
// Use the out return value to write data to the connection.
Data func(id int, in []byte) (out []byte, action Action)
// Prewrite fires prior to every write attempt.
// The amount parameter is the number of bytes that will be attempted
// to be written to the connection.
Prewrite func(id int, amount int) (action Action)
// Postwrite fires immediately after every write attempt.
// The amount parameter is the number of bytes that was written to the
// connection.
// The remaining parameter is the number of bytes that still remain in
// the buffer scheduled to be written.
Postwrite func(id int, amount, remaining int) (action Action)
// Tick fires immediately after the server starts and will fire again
// following the duration specified by the delay return value.
Tick func() (delay time.Duration, action Action)
}
// Serve starts handling events for the specified addresses.
//
// Addresses should use a scheme prefix and be formatted
// like `tcp://192.168.0.10:9851` or `unix://socket`.
// Valid network schemes:
// tcp - bind to both IPv4 and IPv6
// tcp4 - IPv4
// tcp6 - IPv6
// unix - Unix Domain Socket
//
// The "tcp" network scheme is assumed when one is not specified.
func Serve(events Events, addr ...string) error {
var lns []*listener
defer func() {
for _, ln := range lns {
ln.close()
}
}()
var stdlib bool
for _, addr := range addr {
ln := listener{network: "tcp", addr: addr}
if strings.Contains(addr, "://") {
ln.network = strings.Split(addr, "://")[0]
ln.addr = strings.Split(addr, "://")[1]
}
if strings.HasSuffix(ln.network, "-net") {
stdlib = true
ln.network = ln.network[:len(ln.network)-4]
}
if ln.network == "unix" {
os.RemoveAll(ln.addr)
}
var err error
ln.ln, err = net.Listen(ln.network, ln.addr)
if err != nil {
return err
}
ln.naddr = ln.ln.Addr()
if !stdlib {
if err := ln.system(); err != nil {
return err
}
}
lns = append(lns, &ln)
}
if stdlib {
return servenet(events, lns)
}
return serve(events, lns)
}
// InputStream is a helper type for managing input streams inside the
// Data event.
type InputStream struct{ b []byte }
// Begin accepts a new packet and returns a working sequence of
// unprocessed bytes.
func (is *InputStream) Begin(packet []byte) (data []byte) {
data = packet
if len(is.b) > 0 {
is.b = append(is.b, data...)
data = is.b
}
return data
}
// End shift the stream to match the unprocessed data.
func (is *InputStream) End(data []byte) {
if len(data) > 0 {
if len(data) != len(is.b) {
is.b = append(is.b[:0], data...)
}
} else if len(is.b) > 0 {
is.b = is.b[:0]
}
}
type listener struct {
ln net.Listener
f *os.File
fd int
network string
addr string
naddr net.Addr
}