Skip to content

Commit

Permalink
remove deadline, let conn break by user :)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ehco1996 committed Jul 10, 2020
1 parent 56e9842 commit 49ba510
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 121 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,5 @@ iperf3 -c 0.0.0.0 -p 1234 -u -b 1G --length 1024

| iperf | raw | relay(raw) | relay(ws) |relay(wss) | relay(mwss)|
| ---- | ---- | ---- | ---- | ---- | ---- |
| tcp | 62.6 Gbits/sec | 23.9 Gbits/sec | 4.65 Gbits/sec | 4.22 Gbits/sec | 1.77 Gbits/sec |
| tcp | 62.6 Gbits/sec | 23.9 Gbits/sec | 14.65 Gbits/sec | 4.22 Gbits/sec | 2.43 Gbits/sec |
| udp | 2.2 Gbits/sec | 2.2 Gbits/sec | 暂不支持 | 暂不支持 | 暂不支持 |
51 changes: 7 additions & 44 deletions internal/relay/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,80 +2,43 @@ package relay

import (
"net"
"time"

"github.com/xtaci/smux"
)

// Deadliner is a wrapper around net.Conn that sets read/write deadlines before
// every Read() or Write() call.
// TODO check deadline not work
type Deadliner struct {
net.Conn
t time.Duration
}

func (d Deadliner) Write(p []byte) (int, error) {
if err := d.Conn.SetWriteDeadline(time.Now().Add(d.t)); err != nil {
return 0, err
}
return d.Conn.Write(p)
}

func (d Deadliner) Read(p []byte) (int, error) {
if err := d.Conn.SetReadDeadline(time.Now().Add(d.t)); err != nil {
return 0, err
}
return d.Conn.Read(p)
}

func NewDeadLinerConn(c net.Conn, t time.Duration) *Deadliner {
return &Deadliner{c, t}
}

// mux steam whit deadline
type muxDeadlineStreamConn struct {
type muxConn struct {
net.Conn
stream *smux.Stream
t time.Duration
}

func newMuxDeadlineStreamConn(
conn net.Conn, stream *smux.Stream, t time.Duration) *muxDeadlineStreamConn {
return &muxDeadlineStreamConn{Conn: conn, stream: stream, t: t}
func newMuxConn(conn net.Conn, stream *smux.Stream) *muxConn {
return &muxConn{Conn: conn, stream: stream}
}

func (c *muxDeadlineStreamConn) Read(b []byte) (n int, err error) {
if err := c.stream.SetReadDeadline(time.Now().Add(c.t)); err != nil {
return 0, err
}
func (c *muxConn) Read(b []byte) (n int, err error) {
return c.stream.Read(b)
}

func (c *muxDeadlineStreamConn) Write(b []byte) (n int, err error) {
if err := c.stream.SetWriteDeadline(time.Now().Add(c.t)); err != nil {
return 0, err
}
func (c *muxConn) Write(b []byte) (n int, err error) {
return c.stream.Write(b)
}

func (c *muxDeadlineStreamConn) Close() error {
func (c *muxConn) Close() error {
return c.stream.Close()
}

type muxSession struct {
conn net.Conn
session *smux.Session
maxStreamCnt int
t time.Duration
}

func (session *muxSession) GetConn() (net.Conn, error) {
stream, err := session.session.OpenStream()
if err != nil {
return nil, err
}
return newMuxDeadlineStreamConn(session.conn, stream, session.t), nil
return newMuxConn(session.conn, stream), nil
}

func (session *muxSession) Close() error {
Expand Down
26 changes: 15 additions & 11 deletions internal/relay/mwss.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ func (tr *mwssTransporter) initSession(addr string) (*muxSession, error) {
return nil, err
}
Logger.Infof("[mwss] Init new session %s", session.RemoteAddr())
return &muxSession{
conn: rc, session: session, maxStreamCnt: MaxMWSSStreamCnt, t: WsDeadline}, nil
return &muxSession{conn: rc, session: session, maxStreamCnt: MaxMWSSStreamCnt}, nil
}

func (r *Relay) RunLocalMWSSServer() error {
Expand Down Expand Up @@ -152,7 +151,7 @@ func (s *MWSSServer) upgrade(w http.ResponseWriter, r *http.Request) {
Logger.Info(err)
return
}
s.mux(NewDeadLinerConn(conn, WsDeadline))
s.mux(conn)
}

func (s *MWSSServer) mux(conn net.Conn) {
Expand All @@ -173,7 +172,7 @@ func (s *MWSSServer) mux(conn net.Conn) {
Logger.Infof("[mwss] accept stream err: %s", err)
break
}
cc := newMuxDeadlineStreamConn(conn, stream, WsDeadline)
cc := newMuxConn(conn, stream)
select {
case s.connChan <- cc:
default:
Expand All @@ -200,8 +199,7 @@ func (s *MWSSServer) Addr() string {
}

func (r *Relay) handleTcpOverMWSS(c *net.TCPConn) error {
dc := NewDeadLinerConn(c, TcpDeadline)
defer dc.Close()
defer c.Close()

addr := r.RemoteTCPAddr + "/tcp/"
wsc, err := r.mwssTSP.Dial(addr)
Expand All @@ -210,7 +208,11 @@ func (r *Relay) handleTcpOverMWSS(c *net.TCPConn) error {
}
defer wsc.Close()
Logger.Infof("handleTcpOverMWSS from:%s to:%s", c.RemoteAddr(), wsc.RemoteAddr())
Logger.Info(transport(wsc, dc))

err = transport(wsc, c)
if err != nil {
Logger.Infof("handleTcpOverMWSS transport err: %s", err)
}
return nil
}

Expand All @@ -221,9 +223,11 @@ func (r *Relay) handleMWSSConnToTcp(c net.Conn) {
Logger.Infof("dial error: %s", err)
return
}
drc := NewDeadLinerConn(rc, TcpDeadline)
defer drc.Close()

defer rc.Close()
Logger.Infof("handleMWSSConnToTcp from:%s to:%s", c.RemoteAddr(), rc.RemoteAddr())
Logger.Info((transport(drc, c)))

err = transport(rc, c)
if err != nil {
Logger.Infof("handleMWSSConnToTcp transport err: %s", err)
}
}
16 changes: 2 additions & 14 deletions internal/relay/raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,12 @@ import (
)

func (r *Relay) handleTCPConn(c *net.TCPConn) error {
dc := NewDeadLinerConn(c, TcpDeadline)

rc, err := net.Dial("tcp", r.RemoteTCPAddr)
if err != nil {
return err
}
drc := NewDeadLinerConn(rc, TcpDeadline)
defer drc.Close()

transport(dc, drc)
defer rc.Close()
transport(c, rc)
return nil
}

Expand Down Expand Up @@ -43,10 +39,6 @@ func (r *Relay) handleOneUDPConn(addr string, ubc *udpBufferCh) {
Logger.Info(err, 1)
break
}
if err := r.keepAliveAndSetNextTimeout(rc); err != nil {
Logger.Info(err)
break
}
if _, err := r.UDPConn.WriteToUDP(buf[0:i], uaddr); err != nil {
Logger.Info(err)
break
Expand All @@ -61,10 +53,6 @@ func (r *Relay) handleOneUDPConn(addr string, ubc *udpBufferCh) {
Logger.Info(err)
break
}
if err := r.keepAliveAndSetNextTimeout(rc); err != nil {
Logger.Info(err)
break
}
}
wg.Wait()
}
33 changes: 4 additions & 29 deletions internal/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,10 @@ import (
"time"
)

var (
TcpDeadline = 60 * time.Second
UdpDeadline = 6 * time.Second
WsDeadline = 60 * time.Second
const (
MaxMWSSStreamCnt = 10
DialTimeOut = 10 * time.Second

DialTimeOut = 10 * time.Second
)

const (
Listen_RAW = "raw"
Listen_WS = "ws"
Listen_WSS = "wss"
Expand Down Expand Up @@ -153,9 +147,9 @@ func (r *Relay) RunLocalUDPServer() error {
return err
}
defer r.UDPConn.Close()

buf := inboundBufferPool.Get().([]byte)
defer inboundBufferPool.Put(buf)
for {
buf := inboundBufferPool.Get().([]byte)
n, addr, err := r.UDPConn.ReadFromUDP(buf)
if err != nil {
return err
Expand All @@ -175,26 +169,7 @@ func (r *Relay) RunLocalUDPServer() error {
go r.handleOneUDPConn(addr.String(), ubc)
}
}
inboundBufferPool.Put(buf)
}
}

func (r *Relay) keepAliveAndSetNextTimeout(conn interface{}) error {
switch c := conn.(type) {
case *net.TCPConn:
if err := c.SetDeadline(time.Now().Add(TcpDeadline)); err != nil {
Logger.Info("keep alive error", err.Error())
return err
}
case *net.UDPConn:
if err := c.SetDeadline(time.Now().Add(UdpDeadline)); err != nil {
Logger.Info("keep alive error", err.Error())
return err
}
default:
return nil
}
return nil
}

// NOTE not thread safe
Expand Down
17 changes: 6 additions & 11 deletions internal/relay/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,34 +34,29 @@ func (relay *Relay) RunLocalWSServer() error {
}

func (relay *Relay) handleWsToTcp(w http.ResponseWriter, r *http.Request) {
c, _, _, err := ws.UpgradeHTTP(r, w)
wsc, _, _, err := ws.UpgradeHTTP(r, w)
if err != nil {
return
}
wsc := NewDeadLinerConn(c, WsDeadline)
defer wsc.Close()

rc, err := net.Dial("tcp", relay.RemoteTCPAddr)
if err != nil {
Logger.Infof("dial error: %s", err)
return
}
drc := NewDeadLinerConn(rc, TcpDeadline)
defer drc.Close()
defer rc.Close()
Logger.Infof("handleWsToTcp from:%s to:%s", wsc.RemoteAddr(), rc.RemoteAddr())
transport(drc, wsc)
transport(rc, wsc)
}

func (relay *Relay) handleTcpOverWs(c *net.TCPConn) error {
dc := NewDeadLinerConn(c, TcpDeadline)
defer dc.Close()

rc, _, _, err := ws.Dial(context.TODO(), relay.RemoteTCPAddr+"/ws/tcp/")
defer c.Close()
wsc, _, _, err := ws.Dial(context.TODO(), relay.RemoteTCPAddr+"/ws/tcp/")
if err != nil {
return err
}
wsc := NewDeadLinerConn(rc, WsDeadline)
defer wsc.Close()
transport(dc, wsc)
transport(c, wsc)
return nil
}
17 changes: 6 additions & 11 deletions internal/relay/wss.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,37 +31,32 @@ func (relay *Relay) RunLocalWSSServer() error {
}

func (relay *Relay) handleWssToTcp(w http.ResponseWriter, r *http.Request) {
c, _, _, err := ws.UpgradeHTTP(r, w)
wsc, _, _, err := ws.UpgradeHTTP(r, w)
if err != nil {
return
}
wsc := NewDeadLinerConn(c, WsDeadline)
defer wsc.Close()

rc, err := net.Dial("tcp", relay.RemoteTCPAddr)
if err != nil {
Logger.Infof("dial error: %s", err)
return
}
drc := NewDeadLinerConn(rc, TcpDeadline)
defer drc.Close()
defer rc.Close()
Logger.Infof("handleWssToTcp from:%s to:%s", wsc.RemoteAddr(), rc.RemoteAddr())
transport(drc, wsc)
transport(rc, wsc)
}

func (relay *Relay) handleTcpOverWss(c *net.TCPConn) error {
dc := NewDeadLinerConn(c, TcpDeadline)
defer dc.Close()
defer c.Close()

d := ws.Dialer{TLSConfig: DefaultTLSConfig}
rc, _, _, err := d.Dial(context.TODO(), relay.RemoteTCPAddr+"/tcp/")
wsc, _, _, err := d.Dial(context.TODO(), relay.RemoteTCPAddr+"/tcp/")
if err != nil {
return err
}

wsc := NewDeadLinerConn(rc, WsDeadline)
defer wsc.Close()
transport(dc, wsc)
transport(c, wsc)
return nil
}

Expand Down

0 comments on commit 49ba510

Please sign in to comment.