diff --git a/proxy/dokodemo/dokodemo.go b/proxy/dokodemo/dokodemo.go index 3b3ea0db82..9cb75f3b01 100644 --- a/proxy/dokodemo/dokodemo.go +++ b/proxy/dokodemo/dokodemo.go @@ -118,7 +118,7 @@ func (this *DokodemoDoor) ListenTCP(port v2net.Port) error { return nil } -func (this *DokodemoDoor) HandleTCPConnection(conn *hub.TCPConn) { +func (this *DokodemoDoor) HandleTCPConnection(conn hub.Connection) { defer conn.Close() ray := this.packetDispatcher.DispatchToOutbound(v2net.TCPDestination(this.address, this.port)) diff --git a/proxy/http/http.go b/proxy/http/http.go index 48e6ee2984..31bf787f7a 100644 --- a/proxy/http/http.go +++ b/proxy/http/http.go @@ -95,7 +95,7 @@ func parseHost(rawHost string, defaultPort v2net.Port) (v2net.Destination, error return v2net.TCPDestination(v2net.DomainAddress(host), port), nil } -func (this *HttpProxyServer) handleConnection(conn *hub.TCPConn) { +func (this *HttpProxyServer) handleConnection(conn hub.Connection) { defer conn.Close() reader := bufio.NewReader(conn) diff --git a/proxy/shadowsocks/shadowsocks.go b/proxy/shadowsocks/shadowsocks.go index 441d713a54..9fc48f036e 100644 --- a/proxy/shadowsocks/shadowsocks.go +++ b/proxy/shadowsocks/shadowsocks.go @@ -156,7 +156,7 @@ func (this *Shadowsocks) handlerUDPPayload(payload *alloc.Buffer, source v2net.D }) } -func (this *Shadowsocks) handleConnection(conn *hub.TCPConn) { +func (this *Shadowsocks) handleConnection(conn hub.Connection) { defer conn.Close() buffer := alloc.NewSmallBuffer() diff --git a/proxy/socks/server.go b/proxy/socks/server.go index 26657d4d97..86fc41a98c 100644 --- a/proxy/socks/server.go +++ b/proxy/socks/server.go @@ -92,7 +92,7 @@ func (this *Server) Listen(port v2net.Port) error { return nil } -func (this *Server) handleConnection(connection *hub.TCPConn) { +func (this *Server) handleConnection(connection hub.Connection) { defer connection.Close() timedReader := v2net.NewTimeOutReader(120, connection) diff --git a/proxy/vmess/inbound/inbound.go b/proxy/vmess/inbound/inbound.go index 9ea6942c9a..6ac3c8d601 100644 --- a/proxy/vmess/inbound/inbound.go +++ b/proxy/vmess/inbound/inbound.go @@ -114,7 +114,7 @@ func (this *VMessInboundHandler) Listen(port v2net.Port) error { return nil } -func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) { +func (this *VMessInboundHandler) HandleConnection(connection hub.Connection) { defer connection.Close() connReader := v2net.NewTimeOutReader(16, connection) diff --git a/transport/hub/connection.go b/transport/hub/connection.go new file mode 100644 index 0000000000..5842424925 --- /dev/null +++ b/transport/hub/connection.go @@ -0,0 +1,97 @@ +package hub + +import ( + "net" + "time" + + "github.com/v2ray/v2ray-core/common" +) + +type ConnectionHandler func(Connection) + +type Connection interface { + common.Releasable + + Read([]byte) (int, error) + Write([]byte) (int, error) + Close() error + LocalAddr() net.Addr + RemoteAddr() net.Addr + SetDeadline(t time.Time) error + SetReadDeadline(t time.Time) error + SetWriteDeadline(t time.Time) error + CloseRead() error + CloseWrite() error +} + +type TCPConnection struct { + conn *net.TCPConn + listener *TCPHub +} + +func (this *TCPConnection) Read(b []byte) (int, error) { + if this == nil || this.conn == nil { + return 0, ErrorClosedConnection + } + + return this.conn.Read(b) +} + +func (this *TCPConnection) Write(b []byte) (int, error) { + if this == nil || this.conn == nil { + return 0, ErrorClosedConnection + } + return this.conn.Write(b) +} + +func (this *TCPConnection) Close() error { + if this == nil || this.conn == nil { + return ErrorClosedConnection + } + err := this.conn.Close() + return err +} + +func (this *TCPConnection) Release() { + if this == nil || this.listener == nil { + return + } + + this.Close() + this.conn = nil + this.listener = nil +} + +func (this *TCPConnection) LocalAddr() net.Addr { + return this.conn.LocalAddr() +} + +func (this *TCPConnection) RemoteAddr() net.Addr { + return this.conn.RemoteAddr() +} + +func (this *TCPConnection) SetDeadline(t time.Time) error { + return this.conn.SetDeadline(t) +} + +func (this *TCPConnection) SetReadDeadline(t time.Time) error { + return this.conn.SetReadDeadline(t) +} + +func (this *TCPConnection) SetWriteDeadline(t time.Time) error { + return this.conn.SetWriteDeadline(t) +} + +func (this *TCPConnection) CloseRead() error { + if this == nil || this.conn == nil { + return nil + } + return this.conn.CloseRead() +} + +func (this *TCPConnection) CloseWrite() error { + if this == nil || this.conn == nil { + return nil + } + return this.conn.CloseWrite() +} diff --git a/transport/hub/tcp.go b/transport/hub/tcp.go index c69805ecb8..54cda0e9e7 100644 --- a/transport/hub/tcp.go +++ b/transport/hub/tcp.go @@ -3,7 +3,6 @@ package hub import ( "errors" "net" - "time" "github.com/v2ray/v2ray-core/common/log" v2net "github.com/v2ray/v2ray-core/common/net" @@ -13,84 +12,13 @@ var ( ErrorClosedConnection = errors.New("Connection already closed.") ) -type TCPConn struct { - conn *net.TCPConn - listener *TCPHub -} - -func (this *TCPConn) Read(b []byte) (int, error) { - if this == nil || this.conn == nil { - return 0, ErrorClosedConnection - } - return this.conn.Read(b) -} - -func (this *TCPConn) Write(b []byte) (int, error) { - if this == nil || this.conn == nil { - return 0, ErrorClosedConnection - } - return this.conn.Write(b) -} - -func (this *TCPConn) Close() error { - if this == nil || this.conn == nil { - return ErrorClosedConnection - } - err := this.conn.Close() - return err -} - -func (this *TCPConn) Release() { - if this == nil || this.listener == nil { - return - } - - this.Close() - this.conn = nil - this.listener = nil -} - -func (this *TCPConn) LocalAddr() net.Addr { - return this.conn.LocalAddr() -} - -func (this *TCPConn) RemoteAddr() net.Addr { - return this.conn.RemoteAddr() -} - -func (this *TCPConn) SetDeadline(t time.Time) error { - return this.conn.SetDeadline(t) -} - -func (this *TCPConn) SetReadDeadline(t time.Time) error { - return this.conn.SetReadDeadline(t) -} - -func (this *TCPConn) SetWriteDeadline(t time.Time) error { - return this.conn.SetWriteDeadline(t) -} - -func (this *TCPConn) CloseRead() error { - if this == nil || this.conn == nil { - return nil - } - return this.conn.CloseRead() -} - -func (this *TCPConn) CloseWrite() error { - if this == nil || this.conn == nil { - return nil - } - return this.conn.CloseWrite() -} - type TCPHub struct { listener *net.TCPListener - connCallback func(*TCPConn) + connCallback ConnectionHandler accepting bool } -func ListenTCP(port v2net.Port, callback func(*TCPConn)) (*TCPHub, error) { +func ListenTCP(port v2net.Port, callback ConnectionHandler) (*TCPHub, error) { listener, err := net.ListenTCP("tcp", &net.TCPAddr{ IP: []byte{0, 0, 0, 0}, Port: int(port), @@ -123,7 +51,7 @@ func (this *TCPHub) start() { } continue } - go this.connCallback(&TCPConn{ + go this.connCallback(&TCPConnection{ conn: conn, listener: this, })