Skip to content

Commit

Permalink
兼容macos
Browse files Browse the repository at this point in the history
  • Loading branch information
ikilobyte committed Jan 24, 2022
1 parent 3e32f8b commit 756bb0f
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 34 deletions.
8 changes: 8 additions & 0 deletions iface/iacceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package iface

type IAcceptor interface {
Run(fd int, loop IEventLoop) error
Exit()
IncrementID() int
Close()
}
141 changes: 141 additions & 0 deletions server/acceptor_darwin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// +build darwin freebsd dragonfly

package server

import (
"log"
"syscall"

"github.com/ikilobyte/netman/util"

"golang.org/x/sys/unix"

"github.com/ikilobyte/netman/eventloop"
"github.com/ikilobyte/netman/iface"
)

//acceptor 统一处理用来处理新连接
type acceptor struct {
packer iface.IPacker
connectMgr iface.IConnectManager
poller *eventloop.Poller
eventfd int
eventbuff []byte
connID int
}

//func (a *acceptor) Run(fd int, loop iface.IEventLoop) error {
// panic("implement me")
//}

func newAcceptor(packer iface.IPacker, connectMgr iface.IConnectManager) iface.IAcceptor {

poller, err := eventloop.NewPoller(connectMgr)
if err != nil {
log.Panicln(err)
}

return &acceptor{
packer: packer,
poller: poller,
connectMgr: connectMgr,
eventfd: 0,
eventbuff: []byte{},
connID: -1,
}
}

//Run 启动
func (a *acceptor) Run(listenerFd int, loop iface.IEventLoop) error {

// 添加event
if _, err := unix.Kevent(a.poller.Epfd, []unix.Kevent_t{
{Ident: 0, Filter: unix.EVFILT_USER, Flags: unix.EV_ADD | unix.EV_CLEAR},
}, nil, nil); err != nil {
return err
}

// 添加listener fd
if err := a.poller.AddRead(listenerFd, 0); err != nil {
return err
}

for {

n, err := unix.Kevent(a.poller.Epfd, nil, a.poller.Events, nil)
if err != nil {
if err == unix.EAGAIN || err == unix.EINTR {
continue
}
return err
}

for i := 0; i < n; i++ {
event := a.poller.Events[i]
eventFd := int(event.Ident)

if eventFd == a.eventfd {
_, _ = unix.Read(eventFd, a.eventbuff)
a.Close()
return nil
}

connFd, sa, err := unix.Accept(eventFd)
if err != nil {
if err == syscall.Errno(9) {
a.Close()
return nil
}
util.Logger.Errorf("acceptor error: %v", err)
continue
}

// 设置非阻塞
if err := unix.SetNonblock(connFd, true); err != nil {
_ = unix.Close(connFd)
continue
}

// 设置不延迟
if err := unix.SetsockoptInt(connFd, syscall.IPPROTO_TCP, syscall.TCP_NODELAY, 1); err != nil {
_ = unix.Close(connFd)
continue
}

connect := newConnect(
a.IncrementID(),
connFd,
util.SockaddrToTCPOrUnixAddr(sa),
a.packer,
)

// 添加事件循环
if err := loop.AddRead(connect); err != nil {
_ = connect.Close()
continue
}

// 添加到这里
a.connectMgr.Add(connect)
}
}
}

func (a *acceptor) IncrementID() int {
a.connID += 1
return a.connID
}

func (a *acceptor) Close() {
_ = a.poller.Remove(a.eventfd)
_ = unix.Close(a.eventfd)
_ = a.poller.Close()
}

func (a *acceptor) Exit() {
_, _ = unix.Kevent(a.poller.Epfd, []unix.Kevent_t{{
Ident: 0,
Filter: unix.EVFILT_USER,
Fflags: unix.NOTE_TRIGGER,
}}, nil, nil)
}
6 changes: 3 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Server struct {
status serverStatus // 状态
options *Options // serve启动可选项参数
socket *socket // 直接系统调用的方式监听TCP端口,不使用官方的net包
acceptor *acceptor // 处理新连接
acceptor iface.IAcceptor // 处理新连接
eventloop iface.IEventLoop // 事件循环管理
connectMgr iface.IConnectManager // 所有的连接管理
packer iface.IPacker // 负责封包解包
Expand Down Expand Up @@ -113,7 +113,7 @@ func (s *Server) Start() {
}
s.status = started

if err := s.acceptor.Start(s.socket.fd, s.eventloop); err != nil {
if err := s.acceptor.Run(s.socket.fd, s.eventloop); err != nil {
util.Logger.Errorf("server start error:%v", err)
}
}
Expand All @@ -125,5 +125,5 @@ func (s *Server) Stop() {
s.eventloop.Stop()
close(s.emitCh)
_ = unix.Close(s.socket.fd)
_, _ = unix.Write(s.acceptor.eventfd, s.acceptor.eventbuff)
s.acceptor.Exit()
}
75 changes: 75 additions & 0 deletions server/socket_darwin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
//+build darwin

package server

import (
"log"
"net"
"time"

"github.com/ikilobyte/netman/util"

"golang.org/x/sys/unix"
)

type socket struct {
fd int
socketId int
}

//newSocket 使用系统调用创建socket,不使用net包,net包未暴露fd的相关接口,只能通过反射获取,效率不高
func createSocket(address string, duration time.Duration) *socket {

// 创建
fd, err := unix.Socket(unix.AF_INET, unix.SOCK_STREAM, unix.IPPROTO_TCP)
if err != nil {
log.Panicln(err)
}

// 设置属性
if secs := int(duration / time.Second); secs >= 1 {
if err := setKeepAlive(fd, secs); err != nil {
log.Panicln(err)
}
}

// 绑定
tcpAddr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
log.Panicln(err)
}

// 绑定端口
if err := unix.Bind(fd, &unix.SockaddrInet4{Port: tcpAddr.Port}); err != nil {
log.Panicln(err)
}

// 监听端口
if err := unix.Listen(fd, util.MaxListenerBacklog()); err != nil {
log.Panicln(err)
}

return &socket{
fd: fd,
socketId: -1,
}
}

//setKeepAlive 设置tcp属性
func setKeepAlive(fd, secs int) error {
if secs <= 0 {
return nil
}

if err := unix.SetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1); err != nil {
return err
}

switch err := unix.SetsockoptInt(fd, unix.IPPROTO_TCP, unix.TCP_KEEPINTVL, secs); err {
case nil, unix.ENOPROTOOPT: // OS X 10.7 and earlier don't support this option
default:
return err
}

return unix.SetsockoptInt(fd, unix.IPPROTO_TCP, unix.TCP_KEEPALIVE, secs)
}
33 changes: 2 additions & 31 deletions server/socket.go → server/socket_linux.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
//+build linux

package server

import (
"log"
"net"
"syscall"
"time"

"github.com/ikilobyte/netman/util"

"github.com/ikilobyte/netman/iface"

"golang.org/x/sys/unix"
)

Expand Down Expand Up @@ -72,31 +71,3 @@ func setKeepAlive(fd, secs int) error {

return unix.SetsockoptInt(fd, unix.IPPROTO_TCP, unix.TCP_KEEPIDLE, secs)
}

//Accept 处理新连接
func (s *socket) Accept(packer iface.IPacker) (iface.IConnect, error) {

connFd, sa, err := unix.Accept(s.fd)
if err != nil {
return nil, err
}

// 设置非阻塞
if err := unix.SetNonblock(connFd, true); err != nil {
return nil, err
}

// 设置不延迟
if err := unix.SetsockoptInt(connFd, syscall.IPPROTO_TCP, syscall.TCP_NODELAY, 1); err != nil {
return nil, err
}

// 创建一个连接实例
conn := newConnect(s.incrementID(), connFd, util.SockaddrToTCPOrUnixAddr(sa), packer)
return conn, nil
}

func (s *socket) incrementID() int {
s.socketId += 1
return s.socketId
}

0 comments on commit 756bb0f

Please sign in to comment.