diff --git a/README.md b/README.md index 32229b4..ff48586 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ The reason I wrote this framework is so I can build certain network services tha - Fallback for non-epoll/kqueue operating systems by simulating events with the [net](https://golang.org/pkg/net/) package - Ability to [wake up](#wake-up) connections from long running background operations - [Dial](#dial-out) an outbound connection and process/proxy on the event loop +- [SO_REUSEPORT](#so_reuseport) socket option ## Getting Started @@ -205,7 +206,7 @@ events = evio.Translate(events, nil, }, ) -log.Fatal(Serve(events, "tcp://0.0.0.0:443")) +log.Fatal(evio.Serve(events, "tcp://0.0.0.0:443")) ``` Here we wrapped the event with a TLS translator. The `evio.NopConn` function is used to converts the `ReadWriter` a `net.Conn` so the `tls.Server()` call will work. @@ -235,6 +236,16 @@ The `Serve` function can bind to UDP addresses. - The `Wake` and `Dial` operations are not available to UDP connections. - All incoming and outgoing packets are not buffered and sent individually. +## SO_REUSEPORT + +Servers can utilize the [SO_REUSEPORT](https://lwn.net/Articles/542629/) option which allows multiple sockets on the same host to bind to the same port. + +Just provide `reuseport=true` to an address: + +```go +evio.Serve(events, "tcp://0.0.0.0:1234?reuseport=true")) +``` + ## More examples Please check out the [examples](examples) subdirectory for a simplified [redis](examples/redis-server/main.go) clone, an [echo](examples/echo-server/main.go) server, and a very basic [http](examples/http-server/main.go) server with TLS support. diff --git a/evio.go b/evio.go index 3400683..78843ab 100644 --- a/evio.go +++ b/evio.go @@ -10,6 +10,8 @@ import ( "os" "strings" "time" + + "github.com/kavu/go_reuseport" ) // Action is an action that occurs after the completion of an event. @@ -140,7 +142,7 @@ func Serve(events Events, addr ...string) error { for _, addr := range addr { var ln listener var stdlibt bool - ln.network, ln.addr, stdlibt = parseAddr(addr) + ln.network, ln.addr, ln.opts, stdlibt = parseAddr(addr) if stdlibt { stdlib = true } @@ -149,9 +151,17 @@ func Serve(events Events, addr ...string) error { } var err error if ln.network == "udp" { - ln.pconn, err = net.ListenPacket(ln.network, ln.addr) + if ln.opts.reusePort() { + ln.pconn, err = reuseport.ListenPacket(ln.network, ln.addr) + } else { + ln.pconn, err = net.ListenPacket(ln.network, ln.addr) + } } else { - ln.ln, err = net.Listen(ln.network, ln.addr) + if ln.opts.reusePort() { + ln.ln, err = reuseport.Listen(ln.network, ln.addr) + } else { + ln.ln, err = net.Listen(ln.network, ln.addr) + } } if err != nil { return err @@ -204,15 +214,27 @@ type listener struct { ln net.Listener lnaddr net.Addr pconn net.PacketConn + opts addrOpts f *os.File fd int network string addr string } -func parseAddr(addr string) (network, address string, stdlib bool) { +type addrOpts map[string]string + +func (opts addrOpts) reusePort() bool { + switch opts["reuseport"] { + case "yes", "true", "1": + return true + } + return false +} + +func parseAddr(addr string) (network, address string, opts addrOpts, stdlib bool) { network = "tcp" address = addr + opts = make(map[string]string) if strings.Contains(address, "://") { network = strings.Split(address, "://")[0] address = strings.Split(address, "://")[1] @@ -221,5 +243,15 @@ func parseAddr(addr string) (network, address string, stdlib bool) { stdlib = true network = network[:len(network)-4] } + q := strings.Index(address, "?") + if q != -1 { + for _, part := range strings.Split(address[q+1:], "&") { + kv := strings.Split(part, "=") + if len(kv) == 2 { + opts[kv[0]] = kv[1] + } + } + address = address[:q] + } return } diff --git a/evio_loop.go b/evio_loop.go index 283b178..adc17c9 100644 --- a/evio_loop.go +++ b/evio_loop.go @@ -322,7 +322,7 @@ func serve(events Events, lns []*listener) error { }() var rsa syscall.Sockaddr var sa6 syscall.SockaddrInet6 - + var detached []int var packet [0xFFFF]byte var evs = internal.MakeEvents(64) nextTicker := time.Now() @@ -394,6 +394,7 @@ func serve(events Events, lns []*listener) error { continue } } + detached = detached[:0] lock() for i := 0; i < pn; i++ { var in []byte @@ -416,7 +417,16 @@ func serve(events Events, lns []*listener) error { ln = nil c = fdconn[fd] if c == nil { - syscall.Close(fd) + var found bool + for _, dfd := range detached { + if dfd == fd { + found = true + break + } + } + if !found { + syscall.Close(fd) + } goto next } if c.opening { @@ -665,6 +675,13 @@ func serve(events Events, lns []*listener) error { delete(idconn, c.id) if c.action == Detach { if events.Detached != nil { + if err = internal.DelRead(p, c.fd, &c.readon, &c.writeon); err != nil { + goto fail + } + if err = internal.DelWrite(p, c.fd, &c.readon, &c.writeon); err != nil { + goto fail + } + detached = append(detached, c.fd) c.detached = true if len(c.outbuf)-c.outpos > 0 { c.outbuf = append(c.outbuf[:0], c.outbuf[c.outpos:]...) @@ -708,7 +725,7 @@ func serve(events Events, lns []*listener) error { // resolve resolves an evio address and retuns a sockaddr for socket // connection to external servers. func resolve(addr string) (sa syscall.Sockaddr, err error) { - network, address, _ := parseAddr(addr) + network, address, _, _ := parseAddr(addr) var taddr net.Addr switch network { default: diff --git a/evio_net.go b/evio_net.go index bba3f4e..b776bcd 100644 --- a/evio_net.go +++ b/evio_net.go @@ -275,7 +275,7 @@ func servenet(events Events, lns []*listener) error { } id := int(atomic.AddInt64(&idc, 1)) go func() { - network, address, _ := parseAddr(addr) + network, address, _, _ := parseAddr(addr) var conn net.Conn var err error if timeout > 0 { diff --git a/evio_other.go b/evio_other.go index 2706d2b..58674cb 100644 --- a/evio_other.go +++ b/evio_other.go @@ -20,7 +20,7 @@ func (ln *listener) close() { } } -func (ln *listener) system() error { +func (ln *listener) system(opts map[string]string) error { return nil } diff --git a/examples/echo-server/main.go b/examples/echo-server/main.go index 82d30fb..7d24a2f 100644 --- a/examples/echo-server/main.go +++ b/examples/echo-server/main.go @@ -8,6 +8,7 @@ import ( "flag" "fmt" "log" + "strings" "github.com/tidwall/evio" ) @@ -15,13 +16,20 @@ import ( func main() { var port int var udp bool + var trace bool + var reuseport bool flag.IntVar(&port, "port", 5000, "server port") flag.BoolVar(&udp, "udp", false, "listen on udp") + flag.BoolVar(&reuseport, "reuseport", false, "reuseport (SO_REUSEPORT)") + flag.BoolVar(&trace, "trace", false, "print packets to console") flag.Parse() var events evio.Events events.Serving = func(srv evio.Server) (action evio.Action) { log.Printf("echo server started on port %d", port) + if reuseport { + log.Printf("reuseport") + } return } events.Opened = func(id int, info evio.Info) (out []byte, opts evio.Options, action evio.Action) { @@ -33,6 +41,9 @@ func main() { return } events.Data = func(id int, in []byte) (out []byte, action evio.Action) { + if trace { + log.Printf("%s", strings.TrimSpace(string(in))) + } out = in return } @@ -40,5 +51,5 @@ func main() { if udp { scheme = "udp" } - log.Fatal(evio.Serve(events, fmt.Sprintf("%s://:%d", scheme, port))) + log.Fatal(evio.Serve(events, fmt.Sprintf("%s://:%d?reuseport=%t", scheme, port, reuseport))) } diff --git a/vendor/.stub b/vendor/.stub new file mode 100644 index 0000000..39802f6 --- /dev/null +++ b/vendor/.stub @@ -0,0 +1 @@ +stub diff --git a/vendor/github.com/kavu/go_reuseport/LICENSE b/vendor/github.com/kavu/go_reuseport/LICENSE new file mode 100644 index 0000000..5f25159 --- /dev/null +++ b/vendor/github.com/kavu/go_reuseport/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2014 Max Riveiro + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/vendor/github.com/kavu/go_reuseport/Makefile b/vendor/github.com/kavu/go_reuseport/Makefile new file mode 100644 index 0000000..4aa3d2b --- /dev/null +++ b/vendor/github.com/kavu/go_reuseport/Makefile @@ -0,0 +1,9 @@ +lint: + @gometalinter \ + --disable=errcheck \ + --disable=dupl \ + --min-const-length=5 \ + --min-confidence=0.25 \ + --cyclo-over=20 \ + --enable=unused \ + --deadline=100s diff --git a/vendor/github.com/kavu/go_reuseport/README.md b/vendor/github.com/kavu/go_reuseport/README.md new file mode 100644 index 0000000..9e9726b --- /dev/null +++ b/vendor/github.com/kavu/go_reuseport/README.md @@ -0,0 +1,48 @@ +# GO_REUSEPORT + +[![Build Status](https://travis-ci.org/kavu/go_reuseport.png?branch=master)](https://travis-ci.org/kavu/go_reuseport) +[![codecov](https://codecov.io/gh/kavu/go_reuseport/branch/master/graph/badge.svg)](https://codecov.io/gh/kavu/go_reuseport) +[![GoDoc](https://godoc.org/github.com/kavu/go_reuseport?status.png)](https://godoc.org/github.com/kavu/go_reuseport) + +**GO_REUSEPORT** is a little expirement to create a `net.Listener` that supports [SO_REUSEPORT](http://lwn.net/Articles/542629/) socket option. + +For now, Darwin and Linux (from 3.9) systems are supported. I'll be pleased if you'll test other systems and tell me the results. + documentation on [godoc.org](http://godoc.org/github.com/kavu/go_reuseport "go_reuseport documentation"). + +## Example ## + +```go +package main + +import ( + "fmt" + "html" + "net/http" + "os" + "runtime" + "github.com/kavu/go_reuseport" +) + +func main() { + listener, err := reuseport.Listen("tcp", "localhost:8881") + if err != nil { + panic(err) + } + defer listener.Close() + + server := &http.Server{} + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + fmt.Println(os.Getgid()) + fmt.Fprintf(w, "Hello, %q\n", html.EscapeString(r.URL.Path)) + }) + + panic(server.Serve(listener)) +} +``` + +Now you can run several instances of this tiny server without `Address already in use` errors. + +## Thanks + +Inspired by [Artur Siekielski](https://github.com/aartur) [post](http://freeprogrammersblog.vhex.net/post/linux-39-introdued-new-way-of-writing-socket-servers/2) about `SO_REUSEPORT`. + diff --git a/vendor/github.com/kavu/go_reuseport/reuseport.go b/vendor/github.com/kavu/go_reuseport/reuseport.go new file mode 100644 index 0000000..ea4c7c4 --- /dev/null +++ b/vendor/github.com/kavu/go_reuseport/reuseport.go @@ -0,0 +1,50 @@ +// +build linux darwin dragonfly freebsd netbsd openbsd + +// Copyright (C) 2017 Max Riveiro +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +// Package reuseport provides a function that returns a net.Listener powered +// by a net.FileListener with a SO_REUSEPORT option set to the socket. +package reuseport + +import ( + "errors" + "fmt" + "net" + "os" + "syscall" +) + +const fileNameTemplate = "reuseport.%d.%s.%s" + +var errUnsupportedProtocol = errors.New("only tcp, tcp4, tcp6, udp, udp4, udp6 are supported") + +// getSockaddr parses protocol and address and returns implementor +// of syscall.Sockaddr: syscall.SockaddrInet4 or syscall.SockaddrInet6. +func getSockaddr(proto, addr string) (sa syscall.Sockaddr, soType int, err error) { + switch proto { + case "tcp", "tcp4", "tcp6": + return getTCPSockaddr(proto, addr) + case "udp", "udp4", "udp6": + return getUDPSockaddr(proto, addr) + default: + return nil, -1, errUnsupportedProtocol + } +} + +func getSocketFileName(proto, addr string) string { + return fmt.Sprintf(fileNameTemplate, os.Getpid(), proto, addr) +} + +// Listen function is an alias for NewReusablePortListener. +func Listen(proto, addr string) (l net.Listener, err error) { + return NewReusablePortListener(proto, addr) +} + +// ListenPacket is an alias for NewReusablePortPacketConn. +func ListenPacket(proto, addr string) (l net.PacketConn, err error) { + return NewReusablePortPacketConn(proto, addr) +} diff --git a/vendor/github.com/kavu/go_reuseport/reuseport_bsd.go b/vendor/github.com/kavu/go_reuseport/reuseport_bsd.go new file mode 100644 index 0000000..19000e8 --- /dev/null +++ b/vendor/github.com/kavu/go_reuseport/reuseport_bsd.go @@ -0,0 +1,44 @@ +// +build darwin dragonfly freebsd netbsd openbsd + +// Copyright (C) 2017 Ma Weiwei, Max Riveiro +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +package reuseport + +import ( + "runtime" + "syscall" +) + +var reusePort = syscall.SO_REUSEPORT + +func maxListenerBacklog() int { + var ( + n uint32 + err error + ) + + switch runtime.GOOS { + case "darwin", "freebsd": + n, err = syscall.SysctlUint32("kern.ipc.somaxconn") + case "netbsd": + // NOTE: NetBSD has no somaxconn-like kernel state so far + case "openbsd": + n, err = syscall.SysctlUint32("kern.somaxconn") + } + + if n == 0 || err != nil { + return syscall.SOMAXCONN + } + + // FreeBSD stores the backlog in a uint16, as does Linux. + // Assume the other BSDs do too. Truncate number to avoid wrapping. + // See issue 5030. + if n > 1<<16-1 { + n = 1<<16 - 1 + } + return int(n) +} diff --git a/vendor/github.com/kavu/go_reuseport/reuseport_linux.go b/vendor/github.com/kavu/go_reuseport/reuseport_linux.go new file mode 100644 index 0000000..f6f85a4 --- /dev/null +++ b/vendor/github.com/kavu/go_reuseport/reuseport_linux.go @@ -0,0 +1,52 @@ +// +build linux + +// Copyright (C) 2017 Ma Weiwei, Max Riveiro +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +package reuseport + +import ( + "bufio" + "os" + "strconv" + "strings" + "syscall" +) + +var reusePort = 0x0F + +func maxListenerBacklog() int { + fd, err := os.Open("/proc/sys/net/core/somaxconn") + if err != nil { + return syscall.SOMAXCONN + } + defer fd.Close() + + rd := bufio.NewReader(fd) + line, err := rd.ReadString('\n') + if err != nil { + return syscall.SOMAXCONN + } + + f := strings.Fields(line) + if len(f) < 1 { + return syscall.SOMAXCONN + } + + n, err := strconv.Atoi(f[0]) + if err != nil || n == 0 { + return syscall.SOMAXCONN + } + + // Linux stores the backlog in a uint16. + // Truncate number to avoid wrapping. + // See issue 5030. + if n > 1<<16-1 { + n = 1<<16 - 1 + } + + return n +} diff --git a/vendor/github.com/kavu/go_reuseport/reuseport_windows.go b/vendor/github.com/kavu/go_reuseport/reuseport_windows.go new file mode 100644 index 0000000..e1e90df --- /dev/null +++ b/vendor/github.com/kavu/go_reuseport/reuseport_windows.go @@ -0,0 +1,19 @@ +// +build windows + +// Copyright (C) 2017 Ma Weiwei, Max Riveiro +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +package reuseport + +import "net" + +func NewReusablePortListener(proto, addr string) (net.Listener, error) { + return net.Listen(proto, addr) +} + +func NewReusablePortPacketConn(proto, addr string) (net.PacketConn, error) { + return net.ListenPacket(proto, addr) +} diff --git a/vendor/github.com/kavu/go_reuseport/tcp.go b/vendor/github.com/kavu/go_reuseport/tcp.go new file mode 100644 index 0000000..76540a1 --- /dev/null +++ b/vendor/github.com/kavu/go_reuseport/tcp.go @@ -0,0 +1,143 @@ +// +build linux darwin dragonfly freebsd netbsd openbsd + +// Copyright (C) 2017 Max Riveiro +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +package reuseport + +import ( + "errors" + "net" + "os" + "syscall" +) + +var ( + listenerBacklogMaxSize = maxListenerBacklog() + errUnsupportedTCPProtocol = errors.New("only tcp, tcp4, tcp6 are supported") +) + +func getTCPSockaddr(proto, addr string) (sa syscall.Sockaddr, soType int, err error) { + var tcp *net.TCPAddr + + tcp, err = net.ResolveTCPAddr(proto, addr) + if err != nil && tcp.IP != nil { + return nil, -1, err + } + + tcpVersion, err := determineTCPProto(proto, tcp) + if err != nil { + return nil, -1, err + } + + switch tcpVersion { + case "tcp": + return &syscall.SockaddrInet4{Port: tcp.Port}, syscall.AF_INET, nil + case "tcp4": + sa := &syscall.SockaddrInet4{Port: tcp.Port} + + if tcp.IP != nil { + copy(sa.Addr[:], tcp.IP[12:16]) // copy last 4 bytes of slice to array + } + + return sa, syscall.AF_INET, nil + case "tcp6": + sa := &syscall.SockaddrInet6{Port: tcp.Port} + + if tcp.IP != nil { + copy(sa.Addr[:], tcp.IP) // copy all bytes of slice to array + } + + if tcp.Zone != "" { + iface, err := net.InterfaceByName(tcp.Zone) + if err != nil { + return nil, -1, err + } + + sa.ZoneId = uint32(iface.Index) + } + + return sa, syscall.AF_INET6, nil + } + + return nil, -1, errUnsupportedProtocol +} + +func determineTCPProto(proto string, ip *net.TCPAddr) (string, error) { + // If the protocol is set to "tcp", we try to determine the actual protocol + // version from the size of the resolved IP address. Otherwise, we simple use + // the protcol given to us by the caller. + + if ip.IP.To4() != nil { + return "tcp4", nil + } + + if ip.IP.To16() != nil { + return "tcp6", nil + } + + switch proto { + case "tcp", "tcp4", "tcp6": + return proto, nil + } + + return "", errUnsupportedTCPProtocol +} + +// NewReusablePortListener returns net.FileListener that created from +// a file discriptor for a socket with SO_REUSEPORT option. +func NewReusablePortListener(proto, addr string) (l net.Listener, err error) { + var ( + soType, fd int + file *os.File + sockaddr syscall.Sockaddr + ) + + if sockaddr, soType, err = getSockaddr(proto, addr); err != nil { + return nil, err + } + + syscall.ForkLock.RLock() + if fd, err = syscall.Socket(soType, syscall.SOCK_STREAM, syscall.IPPROTO_TCP); err != nil { + syscall.ForkLock.RUnlock() + + return nil, err + } + syscall.ForkLock.RUnlock() + + if err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1); err != nil { + syscall.Close(fd) + return nil, err + } + + if err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, reusePort, 1); err != nil { + syscall.Close(fd) + return nil, err + } + + if err = syscall.Bind(fd, sockaddr); err != nil { + syscall.Close(fd) + return nil, err + } + + // Set backlog size to the maximum + if err = syscall.Listen(fd, listenerBacklogMaxSize); err != nil { + syscall.Close(fd) + return nil, err + } + + file = os.NewFile(uintptr(fd), getSocketFileName(proto, addr)) + if l, err = net.FileListener(file); err != nil { + file.Close() + return nil, err + } + + if err = file.Close(); err != nil { + return nil, err + } + + return l, err +} diff --git a/vendor/github.com/kavu/go_reuseport/tcp_test.go b/vendor/github.com/kavu/go_reuseport/tcp_test.go new file mode 100644 index 0000000..1620f9d --- /dev/null +++ b/vendor/github.com/kavu/go_reuseport/tcp_test.go @@ -0,0 +1,218 @@ +// +build linux darwin dragonfly freebsd netbsd openbsd + +// Copyright (C) 2017 Max Riveiro +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +package reuseport + +import ( + "fmt" + "html" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "testing" +) + +const ( + httpServerOneResponse = "1" + httpServerTwoResponse = "2" +) + +var ( + httpServerOne = NewHTTPServer(httpServerOneResponse) + httpServerTwo = NewHTTPServer(httpServerTwoResponse) +) + +func NewHTTPServer(resp string) *httptest.Server { + return httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, resp) + })) +} +func TestNewReusablePortListener(t *testing.T) { + listenerOne, err := NewReusablePortListener("tcp4", "localhost:10081") + if err != nil { + t.Error(err) + } + defer listenerOne.Close() + + listenerTwo, err := NewReusablePortListener("tcp", "127.0.0.1:10081") + if err != nil { + t.Error(err) + } + defer listenerTwo.Close() + + listenerThree, err := NewReusablePortListener("tcp6", "[::1]:10081") + if err != nil { + t.Error(err) + } + defer listenerThree.Close() + + listenerFour, err := NewReusablePortListener("tcp6", ":10081") + if err != nil { + t.Error(err) + } + defer listenerFour.Close() + + listenerFive, err := NewReusablePortListener("tcp4", ":10081") + if err != nil { + t.Error(err) + } + defer listenerFive.Close() + + listenerSix, err := NewReusablePortListener("tcp", ":10081") + if err != nil { + t.Error(err) + } + defer listenerSix.Close() +} + +func TestListen(t *testing.T) { + listenerOne, err := Listen("tcp4", "localhost:10081") + if err != nil { + t.Error(err) + } + defer listenerOne.Close() + + listenerTwo, err := Listen("tcp", "127.0.0.1:10081") + if err != nil { + t.Error(err) + } + defer listenerTwo.Close() + + listenerThree, err := Listen("tcp6", "[::1]:10081") + if err != nil { + t.Error(err) + } + defer listenerThree.Close() + + listenerFour, err := Listen("tcp6", ":10081") + if err != nil { + t.Error(err) + } + defer listenerFour.Close() + + listenerFive, err := Listen("tcp4", ":10081") + if err != nil { + t.Error(err) + } + defer listenerFive.Close() + + listenerSix, err := Listen("tcp", ":10081") + if err != nil { + t.Error(err) + } + defer listenerSix.Close() +} + +func TestNewReusablePortServers(t *testing.T) { + listenerOne, err := NewReusablePortListener("tcp4", "localhost:10081") + if err != nil { + t.Error(err) + } + defer listenerOne.Close() + + listenerTwo, err := NewReusablePortListener("tcp6", ":10081") + if err != nil { + t.Error(err) + } + defer listenerTwo.Close() + + httpServerOne.Listener = listenerOne + httpServerTwo.Listener = listenerTwo + + httpServerOne.Start() + httpServerTwo.Start() + + // Server One — First Response + resp1, err := http.Get(httpServerOne.URL) + if err != nil { + t.Error(err) + } + body1, err := ioutil.ReadAll(resp1.Body) + resp1.Body.Close() + if err != nil { + t.Error(err) + } + if string(body1) != httpServerOneResponse && string(body1) != httpServerTwoResponse { + t.Errorf("Expected %#v or %#v, got %#v.", httpServerOneResponse, httpServerTwoResponse, string(body1)) + } + + // Server Two — First Response + resp2, err := http.Get(httpServerTwo.URL) + if err != nil { + t.Error(err) + } + body2, err := ioutil.ReadAll(resp2.Body) + resp1.Body.Close() + if err != nil { + t.Error(err) + } + if string(body2) != httpServerOneResponse && string(body2) != httpServerTwoResponse { + t.Errorf("Expected %#v or %#v, got %#v.", httpServerOneResponse, httpServerTwoResponse, string(body2)) + } + + httpServerTwo.Close() + + // Server One — Second Response + resp3, err := http.Get(httpServerOne.URL) + if err != nil { + t.Error(err) + } + body3, err := ioutil.ReadAll(resp3.Body) + resp1.Body.Close() + if err != nil { + t.Error(err) + } + if string(body3) != httpServerOneResponse { + t.Errorf("Expected %#v, got %#v.", httpServerOneResponse, string(body3)) + } + + // Server One — Third Response + resp5, err := http.Get(httpServerOne.URL) + if err != nil { + t.Error(err) + } + body5, err := ioutil.ReadAll(resp5.Body) + resp1.Body.Close() + if err != nil { + t.Error(err) + } + if string(body5) != httpServerOneResponse { + t.Errorf("Expected %#v, got %#v.", httpServerOneResponse, string(body5)) + } + + httpServerOne.Close() +} + +func BenchmarkNewReusablePortListener(b *testing.B) { + for i := 0; i < b.N; i++ { + listener, err := NewReusablePortListener("tcp", ":10081") + + if err != nil { + b.Error(err) + } else { + listener.Close() + } + } +} + +func ExampleNewReusablePortListener() { + listener, err := NewReusablePortListener("tcp", ":8881") + if err != nil { + panic(err) + } + defer listener.Close() + + server := &http.Server{} + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + fmt.Println(os.Getgid()) + fmt.Fprintf(w, "Hello, %q\n", html.EscapeString(r.URL.Path)) + }) + + panic(server.Serve(listener)) +} diff --git a/vendor/github.com/kavu/go_reuseport/test.bash b/vendor/github.com/kavu/go_reuseport/test.bash new file mode 100755 index 0000000..a57c012 --- /dev/null +++ b/vendor/github.com/kavu/go_reuseport/test.bash @@ -0,0 +1,22 @@ +#!/bin/bash + +set -e + +# Thanks to IPFS team +if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then + if [[ "$TRAVIS_SUDO" == true ]]; then + # Ensure that IPv6 is enabled. + # While this is unsupported by TravisCI, it still works for localhost. + sudo sysctl -w net.ipv6.conf.lo.disable_ipv6=0 + sudo sysctl -w net.ipv6.conf.default.disable_ipv6=0 + sudo sysctl -w net.ipv6.conf.all.disable_ipv6=0 + fi +else + # OSX has a default file limit of 256, for some tests we need a + # maximum of 8192. + ulimit -Sn 8192 +fi + +go test -v -cover ./... +go test -v -cover -race ./... -coverprofile=coverage.txt -covermode=atomic +go test -v -cover -race -benchmem -benchtime=5s -bench=. \ No newline at end of file diff --git a/vendor/github.com/kavu/go_reuseport/udp.go b/vendor/github.com/kavu/go_reuseport/udp.go new file mode 100644 index 0000000..9b30c1b --- /dev/null +++ b/vendor/github.com/kavu/go_reuseport/udp.go @@ -0,0 +1,139 @@ +// +build linux darwin dragonfly freebsd netbsd openbsd + +// Copyright (C) 2017 Max Riveiro +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +package reuseport + +import ( + "errors" + "net" + "os" + "syscall" +) + +var errUnsupportedUDPProtocol = errors.New("only udp, udp4, udp6 are supported") + +func getUDPSockaddr(proto, addr string) (sa syscall.Sockaddr, soType int, err error) { + var udp *net.UDPAddr + + udp, err = net.ResolveUDPAddr(proto, addr) + if err != nil && udp.IP != nil { + return nil, -1, err + } + + udpVersion, err := determineUDPProto(proto, udp) + if err != nil { + return nil, -1, err + } + + switch udpVersion { + case "udp": + return &syscall.SockaddrInet4{Port: udp.Port}, syscall.AF_INET, nil + case "udp4": + sa := &syscall.SockaddrInet4{Port: udp.Port} + + if udp.IP != nil { + copy(sa.Addr[:], udp.IP[12:16]) // copy last 4 bytes of slice to array + } + + return sa, syscall.AF_INET, nil + case "udp6": + sa := &syscall.SockaddrInet6{Port: udp.Port} + + if udp.IP != nil { + copy(sa.Addr[:], udp.IP) // copy all bytes of slice to array + } + + if udp.Zone != "" { + iface, err := net.InterfaceByName(udp.Zone) + if err != nil { + return nil, -1, err + } + + sa.ZoneId = uint32(iface.Index) + } + + return sa, syscall.AF_INET6, nil + } + + return nil, -1, errUnsupportedProtocol +} + +func determineUDPProto(proto string, ip *net.UDPAddr) (string, error) { + // If the protocol is set to "udp", we try to determine the actual protocol + // version from the size of the resolved IP address. Otherwise, we simple use + // the protcol given to us by the caller. + + if ip.IP.To4() != nil { + return "udp4", nil + } + + if ip.IP.To16() != nil { + return "udp6", nil + } + + switch proto { + case "udp", "udp4", "udp6": + return proto, nil + } + + return "", errUnsupportedUDPProtocol +} + +// NewReusablePortPacketConn returns net.FilePacketConn that created from +// a file discriptor for a socket with SO_REUSEPORT option. +func NewReusablePortPacketConn(proto, addr string) (l net.PacketConn, err error) { + var ( + soType, fd int + file *os.File + sockaddr syscall.Sockaddr + ) + + if sockaddr, soType, err = getSockaddr(proto, addr); err != nil { + return nil, err + } + + syscall.ForkLock.RLock() + fd, err = syscall.Socket(soType, syscall.SOCK_DGRAM, syscall.IPPROTO_UDP) + if err == nil { + syscall.CloseOnExec(fd) + } + syscall.ForkLock.RUnlock() + if err != nil { + syscall.Close(fd) + return nil, err + } + + defer func() { + if err != nil { + syscall.Close(fd) + } + }() + + if err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1); err != nil { + return nil, err + } + + if err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, reusePort, 1); err != nil { + return nil, err + } + + if err = syscall.Bind(fd, sockaddr); err != nil { + return nil, err + } + + file = os.NewFile(uintptr(fd), getSocketFileName(proto, addr)) + if l, err = net.FilePacketConn(file); err != nil { + return nil, err + } + + if err = file.Close(); err != nil { + return nil, err + } + + return l, err +} diff --git a/vendor/github.com/kavu/go_reuseport/udp_test.go b/vendor/github.com/kavu/go_reuseport/udp_test.go new file mode 100644 index 0000000..d6550e3 --- /dev/null +++ b/vendor/github.com/kavu/go_reuseport/udp_test.go @@ -0,0 +1,99 @@ +// +build linux darwin dragonfly freebsd netbsd openbsd + +// Copyright (C) 2017 Max Riveiro +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +package reuseport + +import "testing" + +func TestNewReusablePortPacketConn(t *testing.T) { + listenerOne, err := NewReusablePortPacketConn("udp4", "localhost:10082") + if err != nil { + t.Error(err) + } + defer listenerOne.Close() + + listenerTwo, err := NewReusablePortPacketConn("udp", "127.0.0.1:10082") + if err != nil { + t.Error(err) + } + defer listenerTwo.Close() + + listenerThree, err := NewReusablePortPacketConn("udp6", "[::1]:10082") + if err != nil { + t.Error(err) + } + defer listenerThree.Close() + + listenerFour, err := NewReusablePortListener("udp6", ":10081") + if err != nil { + t.Error(err) + } + defer listenerFour.Close() + + listenerFive, err := NewReusablePortListener("udp4", ":10081") + if err != nil { + t.Error(err) + } + defer listenerFive.Close() + + listenerSix, err := NewReusablePortListener("udp", ":10081") + if err != nil { + t.Error(err) + } + defer listenerSix.Close() +} + +func TestListenPacket(t *testing.T) { + listenerOne, err := ListenPacket("udp4", "localhost:10082") + if err != nil { + t.Error(err) + } + defer listenerOne.Close() + + listenerTwo, err := ListenPacket("udp", "127.0.0.1:10082") + if err != nil { + t.Error(err) + } + defer listenerTwo.Close() + + listenerThree, err := ListenPacket("udp6", "[::1]:10082") + if err != nil { + t.Error(err) + } + defer listenerThree.Close() + + listenerFour, err := ListenPacket("udp6", ":10081") + if err != nil { + t.Error(err) + } + defer listenerFour.Close() + + listenerFive, err := ListenPacket("udp4", ":10081") + if err != nil { + t.Error(err) + } + defer listenerFive.Close() + + listenerSix, err := ListenPacket("udp", ":10081") + if err != nil { + t.Error(err) + } + defer listenerSix.Close() +} + +func BenchmarkNewReusableUDPPortListener(b *testing.B) { + for i := 0; i < b.N; i++ { + listener, err := NewReusablePortPacketConn("udp4", "localhost:10082") + + if err != nil { + b.Error(err) + } else { + listener.Close() + } + } +}