Skip to content

Commit

Permalink
add call sending event to client
Browse files Browse the repository at this point in the history
  • Loading branch information
googollee committed Jul 13, 2013
1 parent a176364 commit c01577d
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 16 deletions.
2 changes: 1 addition & 1 deletion events.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func safeCall(fn reflect.Value, args []reflect.Value, callback func([]interface{
}
}()
ret := fn.Call(args)
if ret != nil {
if len(ret) > 0 {
retArgs := make([]interface{}, len(ret))
for i, arg := range ret {
retArgs[i] = arg.Interface()
Expand Down
88 changes: 74 additions & 14 deletions namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,30 @@ package socketio

import (
"encoding/json"
"errors"
"reflect"
"sync"
"time"
)

type NameSpace struct {
*EventEmitter
Name string
session *Session
onMessage func([]byte) interface{}
ackPackets int
acks map[int]func([]byte)
Name string
session *Session
id int
waitingLock sync.Mutex
waiting map[int]chan []byte
onMessage func([]byte) interface{}
}

func NewNameSpace(session *Session, name string, ee *EventEmitter) *NameSpace {
ret := &NameSpace{session: session, Name: name, EventEmitter: ee}
ret := &NameSpace{
EventEmitter: ee,
Name: name,
session: session,
id: 1,
waiting: make(map[int]chan []byte),
}
if ret.EventEmitter == nil {
ret.EventEmitter = NewEventEmitter()
}
Expand All @@ -26,6 +36,64 @@ func (ns *NameSpace) Of(name string) *NameSpace {
return ns.session.Of(name)
}

func (ns *NameSpace) Call(name string, reply []interface{}, args ...interface{}) error {
var c chan []byte

pack := new(eventPacket)
pack.endPoint = ns.Name
pack.name = name
if len(reply) > 0 {
c = make(chan []byte)
ns.waitingLock.Lock()
id := ns.id
ns.id++
ns.waiting[id] = c
ns.waitingLock.Unlock()

pack.id = id
pack.ack = true
}
var err error
pack.args, err = json.Marshal(args)
if err != nil {
return err
}

err = ns.sendPacket(pack)
if err != nil {
return err
}

if len(reply) > 0 {
select {
case replyRaw := <-c:
err := json.Unmarshal(replyRaw, reply)
if err != nil {
return err
}
case <-time.After(time.Duration(ns.session.server.heartbeatTimeout) * time.Second):
return errors.New("time out")
}
}

return nil
}

func (ns *NameSpace) onAckPacket(packet *ackPacket) {
c := func() chan []byte {
ns.waitingLock.Lock()
defer ns.waitingLock.Unlock()
if c, ok := ns.waiting[packet.Id()]; ok {
delete(ns.waiting, packet.Id())
return c
}
return nil
}()
if c != nil {
c <- []byte(packet.args)
}
}

func (ns *NameSpace) onMessagePacket(packet messageMix) {
message, ok := packet.(messageMix)
if !ok {
Expand Down Expand Up @@ -75,14 +143,6 @@ func (ns *NameSpace) onEventPacket(packet *eventPacket) {
if packet.Id() == 0 {
callback = nil
}
if !packet.Ack() {
callback = nil
ack := new(ackPacket)
ack.ackId = packet.Id()
ack.args = nil
ack.endPoint = ns.Name
ns.sendPacket(ack)
}
ns.emitRaw(packet.name, ns, callback, packet.args)
}

Expand Down
2 changes: 1 addition & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (srv *SocketIOServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, "invalid session id", 400)
return
}
srv.emit("connect", session.Of(""), nil)
session.serve(transportId, w, r)
}

Expand Down Expand Up @@ -120,7 +121,6 @@ func (srv *SocketIOServer) handShake(w http.ResponseWriter, r *http.Request) {
strings.Join(transportNames, ","))
session := NewSession(srv, sessionId)
srv.addSession(session)
srv.emit("connect", session.Of(""), nil)
}

func (srv *SocketIOServer) addSession(ss *Session) {
Expand Down

0 comments on commit c01577d

Please sign in to comment.