Skip to content

Commit

Permalink
将chanrpc的异步调用回调驱动可以设置成由模块自动来驱动;
Browse files Browse the repository at this point in the history
  • Loading branch information
heyilin416 committed Jul 22, 2016
1 parent ff5c5f3 commit f23cb12
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 22 deletions.
23 changes: 8 additions & 15 deletions chanrpc/chanrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Server struct {
// func(args []interface{}) []interface{}
functions map[interface{}]interface{}
ChanCall chan *CallInfo
ChanAsynRet chan *RetInfo
}

type CallInfo struct {
Expand All @@ -44,13 +45,13 @@ type Client struct {
s *Server
chanSyncRet chan *RetInfo
ChanAsynRet chan *RetInfo
pendingAsynCall int
}

func NewServer(l int) *Server {
s := new(Server)
s.functions = make(map[interface{}]interface{})
s.ChanCall = make(chan *CallInfo, l)
s.ChanAsynRet = make(chan *RetInfo, l)
return s
}

Expand Down Expand Up @@ -146,11 +147,13 @@ func (s *Server) Close() {
}

// goroutine safe
func (s *Server) Open(l int) *Client {
func (s *Server) Open(chanAsynRet chan *RetInfo) *Client {
c := new(Client)
c.s = s
c.chanSyncRet = make(chan *RetInfo, 1)
c.ChanAsynRet = make(chan *RetInfo, l)
if chanAsynRet != nil {
c.ChanAsynRet = chanAsynRet
}
return c
}

Expand Down Expand Up @@ -270,8 +273,6 @@ func (c *Client) asynCall(id interface{}, args []interface{}, cb interface{}, n
if err != nil {
return err
}

c.pendingAsynCall++
return nil
}

Expand Down Expand Up @@ -309,7 +310,7 @@ func (c *Client) AsynCall(id interface{}, _args ...interface{}) {
}
}

func (c *Client) Cb(ri *RetInfo) {
func Cb(ri *RetInfo) {
switch ri.cb.(type) {
case func(error):
ri.cb.(func(error))(ri.err)
Expand All @@ -320,12 +321,4 @@ func (c *Client) Cb(ri *RetInfo) {
default:
panic("bug")
}

c.pendingAsynCall--
}

func (c *Client) Close() {
for c.pendingAsynCall > 0 {
c.Cb(<-c.ChanAsynRet)
}
}
}
11 changes: 6 additions & 5 deletions chanrpc/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ func Example() {
wg.Add(1)

// goroutine 2
chanAsynRet := make(chan *chanrpc.RetInfo, 10)
go func() {
c := s.Open(10)
c := s.Open(chanAsynRet)

// sync
err := c.Call0("f0")
Expand Down Expand Up @@ -107,10 +108,10 @@ func Example() {
}
})

c.Cb(<-c.ChanAsynRet)
c.Cb(<-c.ChanAsynRet)
c.Cb(<-c.ChanAsynRet)
c.Cb(<-c.ChanAsynRet)
chanrpc.Cb(<-chanAsynRet)
chanrpc.Cb(<-chanAsynRet)
chanrpc.Cb(<-chanAsynRet)
chanrpc.Cb(<-chanAsynRet)

// go
s.Go("f0")
Expand Down
2 changes: 1 addition & 1 deletion console/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (c *ExternalCommand) run(_args []string) string {
args[i] = v
}

ret, err := c.server.Open(0).Call1(c._name, args...)
ret, err := c.server.Open(nil).Call1(c._name, args...)
if err != nil {
return err.Error()
}
Expand Down
2 changes: 1 addition & 1 deletion gate/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (a *agent) Run() {

func (a *agent) OnClose() {
if a.gate.AgentChanRPC != nil {
err := a.gate.AgentChanRPC.Open(0).Call0("CloseAgent", a)
err := a.gate.AgentChanRPC.Open(nil).Call0("CloseAgent", a)
if err != nil {
log.Error("chanrpc error: %v", err)
}
Expand Down
2 changes: 2 additions & 0 deletions module/skeleton.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func (s *Skeleton) Run(closeSig chan bool) {
if err != nil {
log.Error("%v", err)
}
case ri := <-s.server.ChanAsynRet:
chanrpc.Cb(ri)
case ci := <-s.commandServer.ChanCall:
err := s.commandServer.Exec(ci)
if err != nil {
Expand Down

0 comments on commit f23cb12

Please sign in to comment.