Skip to content

Commit

Permalink
Gate support Websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
name5566 committed Oct 21, 2015
1 parent 3222761 commit 6b00e6f
Showing 1 changed file with 95 additions and 40 deletions.
135 changes: 95 additions & 40 deletions gate/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,58 +8,98 @@ import (
"github.com/name5566/leaf/network/json"
"github.com/name5566/leaf/network/protobuf"
"reflect"
"time"
)

type TCPGate struct {
Addr string
type Gate struct {
MaxConnNum int
PendingWriteNum int
LenMsgLen int
MinMsgLen uint32
MaxMsgLen uint32
LittleEndian bool
JSONProcessor *json.Processor
ProtobufProcessor *protobuf.Processor
AgentChanRPC *chanrpc.Server

// websocket
WSAddr string
HTTPTimeout time.Duration

// tcp
TCPAddr string
LenMsgLen int
LittleEndian bool
}

func (gate *TCPGate) Run(closeSig chan bool) {
server := new(network.TCPServer)
server.Addr = gate.Addr
server.MaxConnNum = gate.MaxConnNum
server.PendingWriteNum = gate.PendingWriteNum
server.LenMsgLen = gate.LenMsgLen
server.MinMsgLen = gate.MinMsgLen
server.MaxMsgLen = gate.MaxMsgLen
server.LittleEndian = gate.LittleEndian
server.NewAgent = func(conn *network.TCPConn) network.Agent {
a := new(TCPAgent)
a.conn = conn
a.gate = gate

if gate.AgentChanRPC != nil {
gate.AgentChanRPC.Go("NewAgent", a)
func (gate *Gate) Run(closeSig chan bool) {
var wsServer *network.WSServer
if gate.WSAddr != "" {
wsServer = new(network.WSServer)
wsServer.Addr = gate.WSAddr
wsServer.MaxConnNum = gate.MaxConnNum
wsServer.PendingWriteNum = gate.PendingWriteNum
wsServer.MaxMsgLen = gate.MaxMsgLen
wsServer.HTTPTimeout = gate.HTTPTimeout
wsServer.NewAgent = func(conn *network.WSConn) network.Agent {
a := &agent{wsConn: conn, gate: gate}
if gate.AgentChanRPC != nil {
gate.AgentChanRPC.Go("NewAgent", a)
}
return a
}
}

return a
var tcpServer *network.TCPServer
if gate.TCPAddr != "" {
tcpServer = new(network.TCPServer)
tcpServer.Addr = gate.TCPAddr
tcpServer.MaxConnNum = gate.MaxConnNum
tcpServer.PendingWriteNum = gate.PendingWriteNum
tcpServer.LenMsgLen = gate.LenMsgLen
tcpServer.MaxMsgLen = gate.MaxMsgLen
tcpServer.LittleEndian = gate.LittleEndian
tcpServer.NewAgent = func(conn *network.TCPConn) network.Agent {
a := &agent{tcpConn: conn, gate: gate}
if gate.AgentChanRPC != nil {
gate.AgentChanRPC.Go("NewAgent", a)
}
return a
}
}

server.Start()
if wsServer != nil {
wsServer.Start()
}
if tcpServer != nil {
tcpServer.Start()
}
<-closeSig
server.Close()
if wsServer != nil {
wsServer.Close()
}
if tcpServer != nil {
tcpServer.Close()
}
}

func (gate *TCPGate) OnDestroy() {}
func (gate *Gate) OnDestroy() {}

type TCPAgent struct {
conn *network.TCPConn
gate *TCPGate
type agent struct {
wsConn *network.WSConn
tcpConn *network.TCPConn
gate *Gate
userData interface{}
}

func (a *TCPAgent) Run() {
func (a *agent) Run() {
for {
data, err := a.conn.ReadMsg()
var (
data []byte
err error
)
if a.wsConn != nil {
data, err = a.wsConn.ReadMsg()
} else if a.tcpConn != nil {
data, err = a.tcpConn.ReadMsg()
}
if err != nil {
log.Debug("read message: %v", err)
break
Expand All @@ -72,7 +112,7 @@ func (a *TCPAgent) Run() {
log.Debug("unmarshal json error: %v", err)
break
}
err = a.gate.JSONProcessor.Route(msg, Agent(a))
err = a.gate.JSONProcessor.Route(msg, a)
if err != nil {
log.Debug("route message error: %v", err)
break
Expand All @@ -84,7 +124,7 @@ func (a *TCPAgent) Run() {
log.Debug("unmarshal protobuf error: %v", err)
break
}
err = a.gate.ProtobufProcessor.Route(msg, Agent(a))
err = a.gate.ProtobufProcessor.Route(msg, a)
if err != nil {
log.Debug("route message error: %v", err)
break
Expand All @@ -93,7 +133,7 @@ func (a *TCPAgent) Run() {
}
}

func (a *TCPAgent) OnClose() {
func (a *agent) OnClose() {
if a.gate.AgentChanRPC != nil {
err := a.gate.AgentChanRPC.Open(0).Call0("CloseAgent", a)
if err != nil {
Expand All @@ -102,34 +142,49 @@ func (a *TCPAgent) OnClose() {
}
}

func (a *TCPAgent) WriteMsg(msg interface{}) {
func (a *agent) WriteMsg(msg interface{}) {
if a.gate.JSONProcessor != nil {
// json
data, err := a.gate.JSONProcessor.Marshal(msg)
if err != nil {
log.Error("marshal json %v error: %v", reflect.TypeOf(msg), err)
return
}
a.conn.WriteMsg(data)
if a.wsConn != nil {
a.wsConn.WriteMsg(data)
} else if a.tcpConn != nil {
a.tcpConn.WriteMsg(data)
}
} else if a.gate.ProtobufProcessor != nil {
// protobuf
id, data, err := a.gate.ProtobufProcessor.Marshal(msg.(proto.Message))
if err != nil {
log.Error("marshal protobuf %v error: %v", reflect.TypeOf(msg), err)
return
}
a.conn.WriteMsg(id, data)
if a.wsConn != nil {
b := make([]byte, len(id)+len(data))
copy(b, id)
copy(b[len(id):], data)
a.wsConn.WriteMsg(b)
} else if a.tcpConn != nil {
a.tcpConn.WriteMsg(id, data)
}
}
}

func (a *TCPAgent) Close() {
a.conn.Close()
func (a *agent) Close() {
if a.wsConn != nil {
a.wsConn.Close()
} else if a.tcpConn != nil {
a.tcpConn.Close()
}
}

func (a *TCPAgent) UserData() interface{} {
func (a *agent) UserData() interface{} {
return a.userData
}

func (a *TCPAgent) SetUserData(data interface{}) {
func (a *agent) SetUserData(data interface{}) {
a.userData = data
}

0 comments on commit 6b00e6f

Please sign in to comment.