From c01577d055128f92cb235299a7528e03ea245923 Mon Sep 17 00:00:00 2001 From: Googol Lee Date: Sat, 13 Jul 2013 21:02:27 +0800 Subject: [PATCH] add call sending event to client --- events.go | 2 +- namespace.go | 88 +++++++++++++++++++++++++++++++++++++++++++--------- server.go | 2 +- 3 files changed, 76 insertions(+), 16 deletions(-) diff --git a/events.go b/events.go index f2207260..23da11e8 100644 --- a/events.go +++ b/events.go @@ -149,7 +149,7 @@ func safeCall(fn reflect.Value, args []reflect.Value, callback func([]interface{ } }() ret := fn.Call(args) - if ret != nil { + if len(ret) > 0 { retArgs := make([]interface{}, len(ret)) for i, arg := range ret { retArgs[i] = arg.Interface() diff --git a/namespace.go b/namespace.go index 14e07803..97590ea4 100644 --- a/namespace.go +++ b/namespace.go @@ -2,20 +2,30 @@ package socketio import ( "encoding/json" + "errors" "reflect" + "sync" + "time" ) type NameSpace struct { *EventEmitter - Name string - session *Session - onMessage func([]byte) interface{} - ackPackets int - acks map[int]func([]byte) + Name string + session *Session + id int + waitingLock sync.Mutex + waiting map[int]chan []byte + onMessage func([]byte) interface{} } func NewNameSpace(session *Session, name string, ee *EventEmitter) *NameSpace { - ret := &NameSpace{session: session, Name: name, EventEmitter: ee} + ret := &NameSpace{ + EventEmitter: ee, + Name: name, + session: session, + id: 1, + waiting: make(map[int]chan []byte), + } if ret.EventEmitter == nil { ret.EventEmitter = NewEventEmitter() } @@ -26,6 +36,64 @@ func (ns *NameSpace) Of(name string) *NameSpace { return ns.session.Of(name) } +func (ns *NameSpace) Call(name string, reply []interface{}, args ...interface{}) error { + var c chan []byte + + pack := new(eventPacket) + pack.endPoint = ns.Name + pack.name = name + if len(reply) > 0 { + c = make(chan []byte) + ns.waitingLock.Lock() + id := ns.id + ns.id++ + ns.waiting[id] = c + ns.waitingLock.Unlock() + + pack.id = id + pack.ack = true + } + var err error + pack.args, err = json.Marshal(args) + if err != nil { + return err + } + + err = ns.sendPacket(pack) + if err != nil { + return err + } + + if len(reply) > 0 { + select { + case replyRaw := <-c: + err := json.Unmarshal(replyRaw, reply) + if err != nil { + return err + } + case <-time.After(time.Duration(ns.session.server.heartbeatTimeout) * time.Second): + return errors.New("time out") + } + } + + return nil +} + +func (ns *NameSpace) onAckPacket(packet *ackPacket) { + c := func() chan []byte { + ns.waitingLock.Lock() + defer ns.waitingLock.Unlock() + if c, ok := ns.waiting[packet.Id()]; ok { + delete(ns.waiting, packet.Id()) + return c + } + return nil + }() + if c != nil { + c <- []byte(packet.args) + } +} + func (ns *NameSpace) onMessagePacket(packet messageMix) { message, ok := packet.(messageMix) if !ok { @@ -75,14 +143,6 @@ func (ns *NameSpace) onEventPacket(packet *eventPacket) { if packet.Id() == 0 { callback = nil } - if !packet.Ack() { - callback = nil - ack := new(ackPacket) - ack.ackId = packet.Id() - ack.args = nil - ack.endPoint = ns.Name - ns.sendPacket(ack) - } ns.emitRaw(packet.name, ns, callback, packet.args) } diff --git a/server.go b/server.go index 684900b8..e92a2d75 100644 --- a/server.go +++ b/server.go @@ -84,6 +84,7 @@ func (srv *SocketIOServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "invalid session id", 400) return } + srv.emit("connect", session.Of(""), nil) session.serve(transportId, w, r) } @@ -120,7 +121,6 @@ func (srv *SocketIOServer) handShake(w http.ResponseWriter, r *http.Request) { strings.Join(transportNames, ",")) session := NewSession(srv, sessionId) srv.addSession(session) - srv.emit("connect", session.Of(""), nil) } func (srv *SocketIOServer) addSession(ss *Session) {