Skip to content

Commit

Permalink
rpc: support subscriptions under custom namespaces
Browse files Browse the repository at this point in the history
  • Loading branch information
bas-vk committed Apr 25, 2017
1 parent ba3bcd1 commit 37e3f56
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 55 deletions.
35 changes: 19 additions & 16 deletions rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"net/url"
"reflect"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -373,14 +374,14 @@ func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...
return nil, ErrNotificationsUnsupported
}

msg, err := c.newMessage(subscribeMethod, args...)
msg, err := c.newMessage("eth"+subscribeMethodSuffix, args...)
if err != nil {
return nil, err
}
op := &requestOp{
ids: []json.RawMessage{msg.ID},
resp: make(chan *jsonrpcMessage),
sub: newClientSubscription(c, chanVal),
sub: newClientSubscription(c, "eth", chanVal),
}

// Send the subscription request.
Expand Down Expand Up @@ -575,7 +576,7 @@ func (c *Client) closeRequestOps(err error) {
}

func (c *Client) handleNotification(msg *jsonrpcMessage) {
if msg.Method != notificationMethod {
if !strings.HasSuffix(msg.Method, notificationMethodSuffix) {
log.Debug(fmt.Sprint("dropping non-subscription message: ", msg))
return
}
Expand Down Expand Up @@ -653,26 +654,28 @@ func (c *Client) read(conn net.Conn) error {

// A ClientSubscription represents a subscription established through EthSubscribe.
type ClientSubscription struct {
client *Client
etype reflect.Type
channel reflect.Value
subid string
in chan json.RawMessage
client *Client
etype reflect.Type
channel reflect.Value
namespace string
subid string
in chan json.RawMessage

quitOnce sync.Once // ensures quit is closed once
quit chan struct{} // quit is closed when the subscription exits
errOnce sync.Once // ensures err is closed once
err chan error
}

func newClientSubscription(c *Client, channel reflect.Value) *ClientSubscription {
func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription {
sub := &ClientSubscription{
client: c,
etype: channel.Type().Elem(),
channel: channel,
quit: make(chan struct{}),
err: make(chan error, 1),
in: make(chan json.RawMessage),
client: c,
namespace: namespace,
etype: channel.Type().Elem(),
channel: channel,
quit: make(chan struct{}),
err: make(chan error, 1),
in: make(chan json.RawMessage),
}
return sub
}
Expand Down Expand Up @@ -774,5 +777,5 @@ func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, e

func (sub *ClientSubscription) requestUnsubscribe() error {
var result interface{}
return sub.client.Call(&result, unsubscribeMethod, sub.subid)
return sub.client.Call(&result, sub.namespace+unsubscribeMethodSuffix, sub.subid)
}
36 changes: 17 additions & 19 deletions rpc/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ import (
)

const (
jsonrpcVersion = "2.0"
serviceMethodSeparator = "_"
subscribeMethod = "eth_subscribe"
unsubscribeMethod = "eth_unsubscribe"
notificationMethod = "eth_subscription"
jsonrpcVersion = "2.0"
serviceMethodSeparator = "_"
subscribeMethodSuffix = "_subscribe"
unsubscribeMethodSuffix = "_unsubscribe"
notificationMethodSuffix = "_subscription"
)

type jsonRequest struct {
Expand Down Expand Up @@ -164,7 +164,7 @@ func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) {
}

// subscribe are special, they will always use `subscribeMethod` as first param in the payload
if in.Method == subscribeMethod {
if strings.HasSuffix(in.Method, subscribeMethodSuffix) {
reqs := []rpcRequest{{id: &in.Id, isPubSub: true}}
if len(in.Payload) > 0 {
// first param must be subscription name
Expand All @@ -174,17 +174,16 @@ func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) {
return nil, false, &invalidRequestError{"Unable to parse subscription request"}
}

// all subscriptions are made on the eth service
reqs[0].service, reqs[0].method = "eth", subscribeMethod[0]
reqs[0].service, reqs[0].method = strings.TrimSuffix(in.Method, subscribeMethodSuffix), subscribeMethod[0]
reqs[0].params = in.Payload
return reqs, false, nil
}
return nil, false, &invalidRequestError{"Unable to parse subscription request"}
}

if in.Method == unsubscribeMethod {
if strings.HasSuffix(in.Method, unsubscribeMethodSuffix) {
return []rpcRequest{{id: &in.Id, isPubSub: true,
method: unsubscribeMethod, params: in.Payload}}, false, nil
method: in.Method, params: in.Payload}}, false, nil
}

elems := strings.Split(in.Method, serviceMethodSeparator)
Expand Down Expand Up @@ -216,8 +215,8 @@ func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error)

id := &in[i].Id

// subscribe are special, they will always use `subscribeMethod` as first param in the payload
if r.Method == subscribeMethod {
// subscribe are special, they will always use `subscriptionMethod` as first param in the payload
if strings.HasSuffix(r.Method, subscribeMethodSuffix) {
requests[i] = rpcRequest{id: id, isPubSub: true}
if len(r.Payload) > 0 {
// first param must be subscription name
Expand All @@ -227,17 +226,16 @@ func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error)
return nil, false, &invalidRequestError{"Unable to parse subscription request"}
}

// all subscriptions are made on the eth service
requests[i].service, requests[i].method = "eth", subscribeMethod[0]
requests[i].service, requests[i].method = strings.TrimSuffix(r.Method, subscribeMethodSuffix), subscribeMethod[0]
requests[i].params = r.Payload
continue
}

return nil, true, &invalidRequestError{"Unable to parse (un)subscribe request arguments"}
}

if r.Method == unsubscribeMethod {
requests[i] = rpcRequest{id: id, isPubSub: true, method: unsubscribeMethod, params: r.Payload}
if strings.HasSuffix(r.Method, unsubscribeMethodSuffix) {
requests[i] = rpcRequest{id: id, isPubSub: true, method: r.Method, params: r.Payload}
continue
}

Expand Down Expand Up @@ -325,13 +323,13 @@ func (c *jsonCodec) CreateErrorResponseWithInfo(id interface{}, err Error, info
}

// CreateNotification will create a JSON-RPC notification with the given subscription id and event as params.
func (c *jsonCodec) CreateNotification(subid string, event interface{}) interface{} {
func (c *jsonCodec) CreateNotification(subid, namespace string, event interface{}) interface{} {
if isHexNum(reflect.TypeOf(event)) {
return &jsonNotification{Version: jsonrpcVersion, Method: notificationMethod,
return &jsonNotification{Version: jsonrpcVersion, Method: namespace + notificationMethodSuffix,
Params: jsonSubscription{Subscription: subid, Result: fmt.Sprintf(`%#x`, event)}}
}

return &jsonNotification{Version: jsonrpcVersion, Method: notificationMethod,
return &jsonNotification{Version: jsonrpcVersion, Method: namespace + notificationMethodSuffix,
Params: jsonSubscription{Subscription: subid, Result: event}}
}

Expand Down
17 changes: 8 additions & 9 deletions rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"reflect"
"runtime"
"strings"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -96,32 +97,30 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error {
return fmt.Errorf("%s is not exported", reflect.Indirect(rcvrVal).Type().Name())
}

methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ)

// already a previous service register under given sname, merge methods/subscriptions
if regsvc, present := s.services[name]; present {
methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ)
if len(methods) == 0 && len(subscriptions) == 0 {
return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
}

for _, m := range methods {
regsvc.callbacks[formatName(m.method.Name)] = m
}
for _, s := range subscriptions {
regsvc.subscriptions[formatName(s.method.Name)] = s
}

return nil
}

svc.name = name
svc.callbacks, svc.subscriptions = suitableCallbacks(rcvrVal, svc.typ)
svc.callbacks, svc.subscriptions = methods, subscriptions

if len(svc.callbacks) == 0 && len(svc.subscriptions) == 0 {
return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
}

s.services[svc.name] = svc

return nil
}

Expand Down Expand Up @@ -303,7 +302,7 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque
// active the subscription after the sub id was successfully sent to the client
activateSub := func() {
notifier, _ := NotifierFromContext(ctx)
notifier.activate(subid)
notifier.activate(subid, req.svcname)
}

return codec.CreateResponse(req.id, subid), activateSub
Expand Down Expand Up @@ -383,7 +382,7 @@ func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*s
codec.Close()
}

// when request holds one of more subscribe requests this allows these subscriptions to be actived
// when request holds one of more subscribe requests this allows these subscriptions to be activated
for _, c := range callbacks {
c()
}
Expand All @@ -410,7 +409,7 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error)
continue
}

if r.isPubSub && r.method == unsubscribeMethod {
if r.isPubSub && strings.HasSuffix(r.method, unsubscribeMethodSuffix) {
requests[i] = &serverRequest{id: r.id, isUnsubscribe: true}
argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg
if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {
Expand Down Expand Up @@ -439,7 +438,7 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error)
}
}
} else {
requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{subscribeMethod, r.method}}
requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.method, r.method}}
}
continue
}
Expand Down
14 changes: 8 additions & 6 deletions rpc/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ type ID string
// a Subscription is created by a notifier and tight to that notifier. The client can use
// this subscription to wait for an unsubscribe request for the client, see Err().
type Subscription struct {
ID ID
err chan error // closed on unsubscribe
ID ID
namespace string
err chan error // closed on unsubscribe
}

// Err returns a channel that is closed when the client send an unsubscribe request.
Expand Down Expand Up @@ -78,7 +79,7 @@ func NotifierFromContext(ctx context.Context) (*Notifier, bool) {
// are dropped until the subscription is marked as active. This is done
// by the RPC server after the subscription ID is send to the client.
func (n *Notifier) CreateSubscription() *Subscription {
s := &Subscription{NewID(), make(chan error)}
s := &Subscription{ID: NewID(), err: make(chan error)}
n.subMu.Lock()
n.inactive[s.ID] = s
n.subMu.Unlock()
Expand All @@ -91,9 +92,9 @@ func (n *Notifier) Notify(id ID, data interface{}) error {
n.subMu.RLock()
defer n.subMu.RUnlock()

_, active := n.active[id]
sub, active := n.active[id]
if active {
notification := n.codec.CreateNotification(string(id), data)
notification := n.codec.CreateNotification(string(id), sub.namespace, data)
if err := n.codec.Write(notification); err != nil {
n.codec.Close()
return err
Expand Down Expand Up @@ -124,10 +125,11 @@ func (n *Notifier) unsubscribe(id ID) error {
// notifications are dropped. This method is called by the RPC server after
// the subscription ID was sent to client. This prevents notifications being
// send to the client before the subscription ID is send to the client.
func (n *Notifier) activate(id ID) {
func (n *Notifier) activate(id ID, namespace string) {
n.subMu.Lock()
defer n.subMu.Unlock()
if sub, found := n.inactive[id]; found {
sub.namespace = namespace
n.active[id] = sub
delete(n.inactive, id)
}
Expand Down
Loading

0 comments on commit 37e3f56

Please sign in to comment.