Skip to content

Commit

Permalink
Merge pull request skynetservices#192 from smarinskaya/master
Browse files Browse the repository at this point in the history
New Callback methods
  • Loading branch information
erikstmartin committed Mar 4, 2013
2 parents 35616cf + ebf47af commit 6d8b22c
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 24 deletions.
3 changes: 3 additions & 0 deletions cmd/skydaemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ func (sd *SkynetDaemon) Stopped(s *service.Service) {
sd.StopAllSubServices(&skynet.RequestInfo{}, daemon.StopAllSubServicesRequest{}, &daemon.StopAllSubServicesResponse{})
}

func (sd *SkynetDaemon) MethodCalled(method string) {}
func (sd *SkynetDaemon) MethodCompleted(method string, duration int64, err error) {}

func (s *SkynetDaemon) Deploy(requestInfo *skynet.RequestInfo, in daemon.DeployRequest, out *daemon.DeployResponse) (err error) {
out.UUID = skynet.UUID()

Expand Down
4 changes: 3 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func FlagsForClient(ccfg *ClientConfig, flagset *flag.FlagSet) {
if ccfg.DoozerConfig == nil {
ccfg.DoozerConfig = &DoozerConfig{}
}

FlagsForDoozer(ccfg.DoozerConfig, flagset)
if ccfg.MongoConfig == nil {
ccfg.MongoConfig = &MongoConfig{}
Expand All @@ -171,7 +172,7 @@ func FlagsForClient(ccfg *ClientConfig, flagset *flag.FlagSet) {
flagset.IntVar(&ccfg.IdleConnectionsToInstance, "maxidle", DefaultIdleConnectionsToInstance, "maximum number of idle connections to a particular instance")
flagset.IntVar(&ccfg.MaxConnectionsToInstance, "maxconns", DefaultMaxConnectionsToInstance, "maximum number of concurrent connections to a particular instance")
flagset.StringVar(&ccfg.Region, "region", GetDefaultEnvVar("SKYNET_REGION", DefaultRegion), "region client is located in")
flagset.StringVar(&ccfg.Region, "host", GetDefaultEnvVar("SKYNET_HOST", DefaultRegion), "host client is located in")
flagset.StringVar(&ccfg.Host, "host", GetDefaultEnvVar("SKYNET_HOST", DefaultRegion), "host client is located in")
}

func GetClientConfig() (config *ClientConfig, args []string) {
Expand Down Expand Up @@ -203,6 +204,7 @@ func FlagsForService(scfg *ServiceConfig, flagset *flag.FlagSet) {
if scfg.DoozerConfig == nil {
scfg.DoozerConfig = &DoozerConfig{}
}

FlagsForDoozer(scfg.DoozerConfig, flagset)
if scfg.MongoConfig == nil {
scfg.MongoConfig = &MongoConfig{}
Expand Down
3 changes: 3 additions & 0 deletions examples/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ func (s *TestService) Stopped(service *service.Service) {
s.Log.Trace("Stopped")
}

func (s *TestService) MethodCalled(method string) {}
func (s *TestService) MethodCompleted(method string, duration int64, err error) {}

func NewTestService() *TestService {
r := &TestService{}
return r
Expand Down
10 changes: 6 additions & 4 deletions examples/testing/fibonacci/fibservice/fibservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ func NewFibonacci() (f *Fibonacci) {
return
}

func (f *Fibonacci) Registered(s *service.Service) {}
func (f *Fibonacci) Unregistered(s *service.Service) {}
func (f *Fibonacci) Started(s *service.Service) {}
func (f *Fibonacci) Stopped(s *service.Service) {}
func (f *Fibonacci) Registered(s *service.Service) {}
func (f *Fibonacci) Unregistered(s *service.Service) {}
func (f *Fibonacci) Started(s *service.Service) {}
func (f *Fibonacci) Stopped(s *service.Service) {}
func (f *Fibonacci) MethodCalled(method string) {}
func (f *Fibonacci) MethodCompleted(method string, duration int64, err error) {}

func (f *Fibonacci) Index(ri *skynet.RequestInfo, req fibonacci.Request,
resp *fibonacci.Response) (err error) {
Expand Down
10 changes: 6 additions & 4 deletions examples/testing/sleeper/sleepservice/sleepservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ func NewSleeper() (f *Sleeper) {
return
}

func (f *Sleeper) Registered(s *service.Service) {}
func (f *Sleeper) Unregistered(s *service.Service) {}
func (f *Sleeper) Started(s *service.Service) {}
func (f *Sleeper) Stopped(s *service.Service) {}
func (f *Sleeper) Registered(s *service.Service) {}
func (f *Sleeper) Unregistered(s *service.Service) {}
func (f *Sleeper) Started(s *service.Service) {}
func (f *Sleeper) Stopped(s *service.Service) {}
func (f *Sleeper) MethodCalled(method string) {}
func (f *Sleeper) MethodCompleted(method string, duration int64, err error) {}

func (f *Sleeper) Sleep(ri *skynet.RequestInfo, req sleeper.Request,
resp *sleeper.Response) (err error) {
Expand Down
11 changes: 6 additions & 5 deletions examples/tutorial/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
type TutorialService struct {
}

func (s *TutorialService) Registered(service *service.Service) {}
func (s *TutorialService) Unregistered(service *service.Service) {}
func (s *TutorialService) Started(service *service.Service) {}
func (s *TutorialService) Stopped(service *service.Service) {
}
func (s *TutorialService) Registered(service *service.Service) {}
func (s *TutorialService) Unregistered(service *service.Service) {}
func (s *TutorialService) Started(service *service.Service) {}
func (s *TutorialService) Stopped(service *service.Service) {}
func (s *TutorialService) MethodCalled(method string) {}
func (s *TutorialService) MethodCompleted(method string, duration int64, err error) {}

type TutorialRequest struct {
Value int
Expand Down
10 changes: 6 additions & 4 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type ServiceDelegate interface {
Stopped(s *Service)
Registered(s *Service)
Unregistered(s *Service)
MethodCalled(method string)
MethodCompleted(method string, duration int64, err error)
}

type ClientInfo struct {
Expand Down Expand Up @@ -61,7 +63,7 @@ type Service struct {
updateTicker *time.Ticker

clientMutex sync.Mutex
clientInfo map[string]ClientInfo
ClientInfo map[string]ClientInfo

shuttingDown bool
}
Expand All @@ -78,7 +80,7 @@ func CreateService(sd ServiceDelegate, c *skynet.ServiceConfig) (s *Service) {
registeredChan: make(chan bool),
doozerChan: make(chan interface{}),
updateTicker: time.NewTicker(c.DoozerUpdateInterval),
clientInfo: make(map[string]ClientInfo),
ClientInfo: make(map[string]ClientInfo),
shuttingDown: false,
}

Expand Down Expand Up @@ -137,7 +139,7 @@ loop:
clientID := skynet.UUID()

s.clientMutex.Lock()
s.clientInfo[clientID] = ClientInfo{
s.ClientInfo[clientID] = ClientInfo{
Address: conn.RemoteAddr(),
}
s.clientMutex.Unlock()
Expand Down Expand Up @@ -254,7 +256,7 @@ func (s *Service) getClientInfo(clientID string) (ci ClientInfo, ok bool) {
s.clientMutex.Lock()
defer s.clientMutex.Unlock()

ci, ok = s.clientInfo[clientID]
ci, ok = s.ClientInfo[clientID]
return
}

Expand Down
4 changes: 4 additions & 0 deletions service/servicerpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ func (srpc *ServiceRPC) Forward(in skynet.ServiceRPCIn, out *skynet.ServiceRPCOu
srpc.service.activeRequests.Add(1)
defer srpc.service.activeRequests.Done()

srpc.service.Delegate.MethodCalled(in.Method)

clientInfo, ok := srpc.service.getClientInfo(in.ClientID)
if !ok {
err = errors.New("did not provide the ClientID")
Expand Down Expand Up @@ -192,10 +194,12 @@ func (srpc *ServiceRPC) Forward(in skynet.ServiceRPCIn, out *skynet.ServiceRPCOu
}

erri := returns[0].Interface()
var rerr error
if erri != nil {
rerr, _ := erri.(error)
out.ErrString = rerr.Error()
}
srpc.service.Delegate.MethodCompleted(in.Method, duration, rerr)

return
}
14 changes: 8 additions & 6 deletions service/servicerpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ type M map[string]interface{}
type EchoRPC struct {
}

func (e EchoRPC) Started(s *Service) {}
func (e EchoRPC) Stopped(s *Service) {}
func (e EchoRPC) Registered(s *Service) {}
func (e EchoRPC) Unregistered(s *Service) {}
func (e EchoRPC) Started(s *Service) {}
func (e EchoRPC) Stopped(s *Service) {}
func (e EchoRPC) Registered(s *Service) {}
func (e EchoRPC) Unregistered(s *Service) {}
func (e EchoRPC) MethodCalled(method string) {}
func (e EchoRPC) MethodCompleted(method string, duration int64, err error) {}

func (e EchoRPC) Foo(rinfo *skynet.RequestInfo, in M, out *M) (err error) {
*out = M{
Expand All @@ -31,14 +33,14 @@ func TestServiceRPCBasic(t *testing.T) {

config := &skynet.ServiceConfig{}
service := CreateService(EchoRPC{}, config)
service.clientInfo = make(map[string]ClientInfo, 1)
service.ClientInfo = make(map[string]ClientInfo, 1)

addr = &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 123,
}

service.clientInfo["123"] = ClientInfo{
service.ClientInfo["123"] = ClientInfo{
Address: addr,
}

Expand Down

0 comments on commit 6d8b22c

Please sign in to comment.