Skip to content

Commit

Permalink
local and remote addresses
Browse files Browse the repository at this point in the history
  • Loading branch information
tidwall committed Oct 29, 2017
1 parent 3c262df commit 912b6ea
Show file tree
Hide file tree
Showing 5 changed files with 336 additions and 326 deletions.
320 changes: 10 additions & 310 deletions evio.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (
"io"
"net"
"os"
"sort"
"strings"
"sync"
"time"
)

Expand All @@ -31,6 +29,12 @@ type Options struct {
TCPKeepAlive time.Duration
}

type Addr struct {
Index int
Local net.Addr
Remote net.Addr
}

// Events represents server events
type Events struct {
// Serving fires when the server can accept connections.
Expand All @@ -39,7 +43,7 @@ type Events struct {
Serving func(wake func(id int) bool) (action Action)
// Opened fires when a new connection has opened.
// Use the out return value to write data to the connection.
Opened func(id int, addr net.Addr) (out []byte, opts Options, action Action)
Opened func(id int, addr Addr) (out []byte, opts Options, action Action)
// Opened fires when a connection is closed
Closed func(id int) (action Action)
// Detached fires when a connection has been previously detached.
Expand Down Expand Up @@ -83,9 +87,9 @@ func Serve(events Events, addr ...string) error {
} else if !strings.Contains(addr, ":") {
ln.network = "unix"
}
if strings.HasSuffix(ln.network, "-stdlib") {
if strings.HasSuffix(ln.network, "-net") {
stdlib = true
ln.network = ln.network[:len(ln.network)-7]
ln.network = ln.network[:len(ln.network)-4]
}
if ln.network == "unix" {
os.RemoveAll(ln.addr)
Expand All @@ -103,7 +107,7 @@ func Serve(events Events, addr ...string) error {
lns = append(lns, &ln)
}
if stdlib {
return servestdlib(events, lns)
return servenet(events, lns)
}
return serve(events, lns)
}
Expand Down Expand Up @@ -144,307 +148,3 @@ type listener struct {
network string
addr string
}

// servestdlib uses the stdlib net package instead of syscalls.
func servestdlib(events Events, lns []*listener) error {
type cconn struct {
id int
conn net.Conn
outbuf []byte
outpos int
action Action
wake bool
detached bool
}
type fail struct{ err error }
type accept struct {
lnidx int
conn net.Conn
}
type read struct {
conn net.Conn
packet []byte
}
type tick struct{}
type write struct{ conn net.Conn }
type close struct{ conn net.Conn }
var cond = sync.NewCond(&sync.Mutex{})
lock := func() { cond.L.Lock() }
unlock := func() { cond.L.Unlock() }
var evs []interface{}
send := func(ev interface{}, broadcast bool) {
if broadcast {
lock()
}
if _, ok := ev.(fail); ok {
evs = evs[:0]
}
evs = append(evs, ev)
if broadcast {
cond.Broadcast()
unlock()
}
}
for i, ln := range lns {
go func(lnidx int, ln net.Listener) {
for {
conn, err := ln.Accept()
if err != nil {
send(fail{err}, true)
return
}
send(accept{lnidx, conn}, true)
go func(conn net.Conn) {
defer send(close{conn}, true)
var packet [0xFFFF]byte
for {
n, err := conn.Read(packet[:])
if err != nil && !istimeout(err) {
return
} else if n > 0 {
send(read{conn, append([]byte{}, packet[:n]...)}, true)
}
}
}(conn)
}
}(i, ln.ln)
}
var id int
var connconn = make(map[net.Conn]*cconn)
var idconn = make(map[int]*cconn)
wake := func(id int) bool {
var ok = true
var err error
lock()
c := idconn[id]
if c == nil {
ok = false
} else if !c.wake {
c.wake = true
send(read{c.conn, nil}, false)
cond.Broadcast()
}
unlock()
if err != nil {
panic(err)
}
return ok

}
if events.Serving != nil {
if events.Serving(wake) == Shutdown {
return nil
}
}
defer func() {
lock()
type connid struct {
conn net.Conn
id int
}
var connids []connid
for id, conn := range idconn {
connids = append(connids, connid{conn.conn, id})
}
sort.Slice(connids, func(i, j int) bool {
return connids[j].id < connids[i].id
})
for _, connid := range connids {
connid.conn.Close()
if events.Closed != nil {
unlock()
events.Closed(connid.id)
lock()
}
}
unlock()
}()
var tickerDelay time.Duration
if events.Tick != nil {
go func() {
for {
send(tick{}, true)
lock()
d := tickerDelay
unlock()
time.Sleep(d)
}
}()
}
lock()
again:
for {
for i := 0; i < len(evs); i++ {
ev := evs[i]
switch ev := ev.(type) {
case nil:
case tick:
if events.Tick != nil {
unlock()
delay, action := events.Tick()
lock()
tickerDelay = delay
if action == Shutdown {
send(fail{nil}, false)
continue again
}

}
case accept:
id++
c := &cconn{id: id, conn: ev.conn}
connconn[ev.conn] = c
idconn[id] = c
if events.Opened != nil {
unlock()
out, opts, action := events.Opened(id, ev.lnidx, ev.conn.LocalAddr(), ev.conn.RemoteAddr())
lock()
if opts.TCPKeepAlive > 0 {
if conn, ok := ev.conn.(*net.TCPConn); ok {
if err := conn.SetKeepAlive(true); err != nil {
send(fail{err}, false)
continue again
}
if err := conn.SetKeepAlivePeriod(opts.TCPKeepAlive); err != nil {
send(fail{err}, false)
continue again
}
}
}
if len(out) > 0 {
c.outbuf = append(c.outbuf, out...)
}
c.action = action
if c.action != None || len(c.outbuf) > 0 {
send(write{c.conn}, false)
}
}
case read:
var out []byte
var in []byte
c := connconn[ev.conn]
if c == nil {
continue
}
if c.action != None {
send(write{c.conn}, false)
continue
}
if c.wake {
c.wake = false
} else {
in = ev.packet
}
if events.Data != nil {
unlock()
out, c.action = events.Data(c.id, in)
lock()
}
if len(out) > 0 {
c.outbuf = append(c.outbuf, out...)
}
if c.action != None || len(c.outbuf) > 0 {
send(write{c.conn}, false)
}
case write:
c := connconn[ev.conn]
if c == nil {
continue
}
if len(c.outbuf)-c.outpos > 0 {
if events.Prewrite != nil {
unlock()
action := events.Prewrite(c.id, len(c.outbuf[c.outpos:]))
lock()
if action > c.action {
c.action = action
}
}
c.conn.SetWriteDeadline(time.Now().Add(time.Millisecond))
n, err := c.conn.Write(c.outbuf[c.outpos:])
if events.Postwrite != nil {
amount := n
if amount < 0 {
amount = 0
}
unlock()
action := events.Postwrite(c.id, amount, len(c.outbuf)-c.outpos-amount)
lock()
if action > c.action {
c.action = action
}
}
if err != nil {
if c.action == Shutdown {
send(close{c.conn}, false)
} else if istimeout(err) {
send(write{c.conn}, false)
} else {
send(close{c.conn}, false)
}
continue
}
c.outpos += n
if len(c.outbuf)-c.outpos == 0 {
c.outbuf = c.outbuf[:0]
c.outpos = 0
}
}
if c.action == Shutdown {
send(close{c.conn}, false)
} else if len(c.outbuf)-c.outpos == 0 {
if c.action != None {
send(close{c.conn}, false)
}
} else {
send(write{c.conn}, false)
}
case close:
c := connconn[ev.conn]
if c == nil {
continue
}
delete(connconn, c.conn)
delete(idconn, c.id)
if c.action == Detach {
c.detached = true
if events.Detached != nil {
unlock()
c.action = events.Detached(c.id, c.conn)
lock()
if c.action == Shutdown {
send(fail{nil}, false)
continue again
}
continue
}
}
c.conn.Close()
if events.Closed != nil {
unlock()
action := events.Closed(c.id)
lock()
if action == Shutdown {
c.action = Shutdown
}
}
if c.action == Shutdown {
send(fail{nil}, false)
continue again
}
case fail:
unlock()
return ev.err
}
}
evs = evs[:0]
cond.Wait()
}
}

func istimeout(err error) bool {
if err, ok := err.(net.Error); ok && err.Timeout() {
return true
}
return false
}
Loading

0 comments on commit 912b6ea

Please sign in to comment.