Skip to content

Commit

Permalink
Added Wake to connection
Browse files Browse the repository at this point in the history
  • Loading branch information
tidwall committed Oct 17, 2018
1 parent 3db0020 commit 3a190d6
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 6 deletions.
2 changes: 2 additions & 0 deletions evio.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type Conn interface {
LocalAddr() net.Addr
// RemoteAddr is the connection's remote peer address.
RemoteAddr() net.Addr
// Wake triggers a Data event for this connection.
Wake()
}

// LoadBalance sets the load balancing method.
Expand Down
11 changes: 11 additions & 0 deletions evio_std.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (c *stdudpconn) SetContext(ctx interface{}) {}
func (c *stdudpconn) AddrIndex() int { return c.addrIndex }
func (c *stdudpconn) LocalAddr() net.Addr { return c.localAddr }
func (c *stdudpconn) RemoteAddr() net.Addr { return c.remoteAddr }
func (c *stdudpconn) Wake() {}

type stdloop struct {
idx int // loop index
Expand All @@ -59,16 +60,22 @@ type stdconn struct {
done int32 // 0: attached, 1: closed, 2: detached
}

type wakeReq struct {
c *stdconn
}

func (c *stdconn) Context() interface{} { return c.ctx }
func (c *stdconn) SetContext(ctx interface{}) { c.ctx = ctx }
func (c *stdconn) AddrIndex() int { return c.addrIndex }
func (c *stdconn) LocalAddr() net.Addr { return c.localAddr }
func (c *stdconn) RemoteAddr() net.Addr { return c.remoteAddr }
func (c *stdconn) Wake() { c.loop.ch <- wakeReq{c} }

type stdin struct {
c *stdconn
in []byte
}

type stderr struct {
c *stdconn
err error
Expand Down Expand Up @@ -267,6 +274,8 @@ func stdloopRun(s *stdserver, l *stdloop) {
err = stdloopReadUDP(s, l, v)
case *stderr:
err = stdloopError(s, l, v.c, v.err)
case wakeReq:
err = stdloopRead(s, l, v.c, nil)
}
}
if err != nil {
Expand Down Expand Up @@ -360,6 +369,8 @@ func (c *stddetachedConn) Close() error {
return c.conn.Close()
}

func (c *stddetachedConn) Wake() {}

func stdloopRead(s *stdserver, l *stdloop, c *stdconn, in []byte) error {
if atomic.LoadInt32(&c.done) == 2 {
// should not ignore reads for detached connections
Expand Down
39 changes: 33 additions & 6 deletions evio_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,28 @@ import (
type conn struct {
fd int // file descriptor
lnidx int // listener index in the server lns list
loopidx int // owner loop
out []byte // write buffer
sa syscall.Sockaddr // remote socket address
reuse bool // should reuse input buffer
opened bool // connection opened event fired
action Action // next user action
ctx interface{} // user-defined context
addrIndex int
localAddr net.Addr
remoteAddr net.Addr
addrIndex int // index of listening address
localAddr net.Addr // local addre
remoteAddr net.Addr // remote addr
loop *loop // connected loop
}

func (c *conn) Context() interface{} { return c.ctx }
func (c *conn) SetContext(ctx interface{}) { c.ctx = ctx }
func (c *conn) AddrIndex() int { return c.addrIndex }
func (c *conn) LocalAddr() net.Addr { return c.localAddr }
func (c *conn) RemoteAddr() net.Addr { return c.remoteAddr }
func (c *conn) Wake() {
if c.loop != nil {
c.loop.poll.Trigger(c)
}
}

type server struct {
events Events // user events
Expand Down Expand Up @@ -198,6 +203,12 @@ func loopNote(s *server, l *loop, note interface{}) error {
s.tch <- delay
case error: // shutdown
err = v
case *conn:
// Wake called for connection
if l.fdconns[v.fd] != v {
return nil // ignore stale wakes
}
return loopWake(s, l, v)
}
return err
}
Expand Down Expand Up @@ -258,7 +269,8 @@ func loopAccept(s *server, l *loop, fd int) error {
}
}
case RoundRobin:
if int(atomic.LoadUintptr(&s.accepted))%len(s.loops) != l.idx {
idx := int(atomic.LoadUintptr(&s.accepted)) % len(s.loops)
if idx != l.idx {
return nil // do not accept
}
atomic.AddUintptr(&s.accepted, 1)
Expand All @@ -277,7 +289,7 @@ func loopAccept(s *server, l *loop, fd int) error {
if err := syscall.SetNonblock(nfd, true); err != nil {
return err
}
c := &conn{fd: nfd, sa: sa, lnidx: i}
c := &conn{fd: nfd, sa: sa, lnidx: i, loop: l}
l.fdconns[c.fd] = c
l.poll.AddReadWrite(c.fd)
atomic.AddInt32(&l.count, 1)
Expand Down Expand Up @@ -391,6 +403,21 @@ func loopAction(s *server, l *loop, c *conn) error {
return nil
}

func loopWake(s *server, l *loop, c *conn) error {
if s.events.Data == nil {
return nil
}
out, action := s.events.Data(c, nil)
c.action = action
if len(out) > 0 {
c.out = append([]byte{}, out...)
}
if len(c.out) != 0 || c.action != None {
l.poll.ModReadWrite(c.fd)
}
return nil
}

func loopRead(s *server, l *loop, c *conn) error {
var in []byte
n, err := syscall.Read(c.fd, l.packet)
Expand Down
10 changes: 10 additions & 0 deletions examples/redis-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,20 @@ func main() {
return
}
events.Opened = func(ec evio.Conn) (out []byte, opts evio.Options, action evio.Action) {
//fmt.Printf("opened: %v\n", ec.RemoteAddr())
ec.SetContext(&conn{})
return
}
events.Closed = func(ec evio.Conn, err error) (action evio.Action) {
// fmt.Printf("closed: %v\n", ec.RemoteAddr())
return
}

events.Data = func(ec evio.Conn, in []byte) (out []byte, action evio.Action) {
if in == nil {
log.Printf("wake from %s\n", ec.RemoteAddr())
return nil, evio.Close
}
c := ec.Context().(*conn)
data := c.is.Begin(in)
var n int
Expand Down Expand Up @@ -94,6 +101,9 @@ func main() {
} else {
out = redcon.AppendString(out, "PONG")
}
case "WAKE":
go ec.Wake()
out = redcon.AppendString(out, "OK")
case "ECHO":
if len(args) != 2 {
out = redcon.AppendError(out, "ERR wrong number of arguments for '"+string(args[0])+"' command")
Expand Down

0 comments on commit 3a190d6

Please sign in to comment.