From 6b00e6faa60d5dca6917ecf167d8d1fdcb346f7a Mon Sep 17 00:00:00 2001 From: name5566 Date: Wed, 21 Oct 2015 11:13:33 +0800 Subject: [PATCH] Gate support Websocket --- gate/gate.go | 135 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 95 insertions(+), 40 deletions(-) diff --git a/gate/gate.go b/gate/gate.go index f33c30c9..d260a167 100644 --- a/gate/gate.go +++ b/gate/gate.go @@ -8,58 +8,98 @@ import ( "github.com/name5566/leaf/network/json" "github.com/name5566/leaf/network/protobuf" "reflect" + "time" ) -type TCPGate struct { - Addr string +type Gate struct { MaxConnNum int PendingWriteNum int - LenMsgLen int - MinMsgLen uint32 MaxMsgLen uint32 - LittleEndian bool JSONProcessor *json.Processor ProtobufProcessor *protobuf.Processor AgentChanRPC *chanrpc.Server + + // websocket + WSAddr string + HTTPTimeout time.Duration + + // tcp + TCPAddr string + LenMsgLen int + LittleEndian bool } -func (gate *TCPGate) Run(closeSig chan bool) { - server := new(network.TCPServer) - server.Addr = gate.Addr - server.MaxConnNum = gate.MaxConnNum - server.PendingWriteNum = gate.PendingWriteNum - server.LenMsgLen = gate.LenMsgLen - server.MinMsgLen = gate.MinMsgLen - server.MaxMsgLen = gate.MaxMsgLen - server.LittleEndian = gate.LittleEndian - server.NewAgent = func(conn *network.TCPConn) network.Agent { - a := new(TCPAgent) - a.conn = conn - a.gate = gate - - if gate.AgentChanRPC != nil { - gate.AgentChanRPC.Go("NewAgent", a) +func (gate *Gate) Run(closeSig chan bool) { + var wsServer *network.WSServer + if gate.WSAddr != "" { + wsServer = new(network.WSServer) + wsServer.Addr = gate.WSAddr + wsServer.MaxConnNum = gate.MaxConnNum + wsServer.PendingWriteNum = gate.PendingWriteNum + wsServer.MaxMsgLen = gate.MaxMsgLen + wsServer.HTTPTimeout = gate.HTTPTimeout + wsServer.NewAgent = func(conn *network.WSConn) network.Agent { + a := &agent{wsConn: conn, gate: gate} + if gate.AgentChanRPC != nil { + gate.AgentChanRPC.Go("NewAgent", a) + } + return a } + } - return a + var tcpServer *network.TCPServer + if gate.TCPAddr != "" { + tcpServer = new(network.TCPServer) + tcpServer.Addr = gate.TCPAddr + tcpServer.MaxConnNum = gate.MaxConnNum + tcpServer.PendingWriteNum = gate.PendingWriteNum + tcpServer.LenMsgLen = gate.LenMsgLen + tcpServer.MaxMsgLen = gate.MaxMsgLen + tcpServer.LittleEndian = gate.LittleEndian + tcpServer.NewAgent = func(conn *network.TCPConn) network.Agent { + a := &agent{tcpConn: conn, gate: gate} + if gate.AgentChanRPC != nil { + gate.AgentChanRPC.Go("NewAgent", a) + } + return a + } } - server.Start() + if wsServer != nil { + wsServer.Start() + } + if tcpServer != nil { + tcpServer.Start() + } <-closeSig - server.Close() + if wsServer != nil { + wsServer.Close() + } + if tcpServer != nil { + tcpServer.Close() + } } -func (gate *TCPGate) OnDestroy() {} +func (gate *Gate) OnDestroy() {} -type TCPAgent struct { - conn *network.TCPConn - gate *TCPGate +type agent struct { + wsConn *network.WSConn + tcpConn *network.TCPConn + gate *Gate userData interface{} } -func (a *TCPAgent) Run() { +func (a *agent) Run() { for { - data, err := a.conn.ReadMsg() + var ( + data []byte + err error + ) + if a.wsConn != nil { + data, err = a.wsConn.ReadMsg() + } else if a.tcpConn != nil { + data, err = a.tcpConn.ReadMsg() + } if err != nil { log.Debug("read message: %v", err) break @@ -72,7 +112,7 @@ func (a *TCPAgent) Run() { log.Debug("unmarshal json error: %v", err) break } - err = a.gate.JSONProcessor.Route(msg, Agent(a)) + err = a.gate.JSONProcessor.Route(msg, a) if err != nil { log.Debug("route message error: %v", err) break @@ -84,7 +124,7 @@ func (a *TCPAgent) Run() { log.Debug("unmarshal protobuf error: %v", err) break } - err = a.gate.ProtobufProcessor.Route(msg, Agent(a)) + err = a.gate.ProtobufProcessor.Route(msg, a) if err != nil { log.Debug("route message error: %v", err) break @@ -93,7 +133,7 @@ func (a *TCPAgent) Run() { } } -func (a *TCPAgent) OnClose() { +func (a *agent) OnClose() { if a.gate.AgentChanRPC != nil { err := a.gate.AgentChanRPC.Open(0).Call0("CloseAgent", a) if err != nil { @@ -102,7 +142,7 @@ func (a *TCPAgent) OnClose() { } } -func (a *TCPAgent) WriteMsg(msg interface{}) { +func (a *agent) WriteMsg(msg interface{}) { if a.gate.JSONProcessor != nil { // json data, err := a.gate.JSONProcessor.Marshal(msg) @@ -110,7 +150,11 @@ func (a *TCPAgent) WriteMsg(msg interface{}) { log.Error("marshal json %v error: %v", reflect.TypeOf(msg), err) return } - a.conn.WriteMsg(data) + if a.wsConn != nil { + a.wsConn.WriteMsg(data) + } else if a.tcpConn != nil { + a.tcpConn.WriteMsg(data) + } } else if a.gate.ProtobufProcessor != nil { // protobuf id, data, err := a.gate.ProtobufProcessor.Marshal(msg.(proto.Message)) @@ -118,18 +162,29 @@ func (a *TCPAgent) WriteMsg(msg interface{}) { log.Error("marshal protobuf %v error: %v", reflect.TypeOf(msg), err) return } - a.conn.WriteMsg(id, data) + if a.wsConn != nil { + b := make([]byte, len(id)+len(data)) + copy(b, id) + copy(b[len(id):], data) + a.wsConn.WriteMsg(b) + } else if a.tcpConn != nil { + a.tcpConn.WriteMsg(id, data) + } } } -func (a *TCPAgent) Close() { - a.conn.Close() +func (a *agent) Close() { + if a.wsConn != nil { + a.wsConn.Close() + } else if a.tcpConn != nil { + a.tcpConn.Close() + } } -func (a *TCPAgent) UserData() interface{} { +func (a *agent) UserData() interface{} { return a.userData } -func (a *TCPAgent) SetUserData(data interface{}) { +func (a *agent) SetUserData(data interface{}) { a.userData = data }