Skip to content

Commit

Permalink
Add support for pubsub subscription identifiers
Browse files Browse the repository at this point in the history
  • Loading branch information
ralphm committed Jan 5, 2010
1 parent 6930427 commit 2035d28
Show file tree
Hide file tree
Showing 2 changed files with 494 additions and 32 deletions.
136 changes: 106 additions & 30 deletions wokkel/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,56 @@ class Subscription(object):
"""
A subscription to a node.
@ivar nodeIdentifier: The identifier of the node subscribed to.
The root node is denoted by C{None}.
@ivar nodeIdentifier: The identifier of the node subscribed to. The root
node is denoted by C{None}.
@type nodeIdentifier: C{unicode}
@ivar subscriber: The subscribing entity.
@type subscriber: L{jid.JID}
@ivar state: The subscription state. One of C{'subscribed'}, C{'pending'},
C{'unconfigured'}.
@type state: C{unicode}
@ivar options: Optional list of subscription options.
@type options: C{dict}.
@type options: C{dict}
@ivar subscriptionIdentifier: Optional subscription identifier.
@type subscriptionIdentifier: C{unicode}
"""

def __init__(self, nodeIdentifier, subscriber, state, options=None):
def __init__(self, nodeIdentifier, subscriber, state, options=None,
subscriptionIdentifier=None):
self.nodeIdentifier = nodeIdentifier
self.subscriber = subscriber
self.state = state
self.options = options or {}
self.subscriptionIdentifier = subscriptionIdentifier


@staticmethod
def fromElement(element):
return Subscription(
element.getAttribute('node'),
jid.JID(element.getAttribute('jid')),
element.getAttribute('subscription'),
subscriptionIdentifier=element.getAttribute('subid'))


def toElement(self):
"""
Return the DOM representation of this subscription.
@rtype: L{domish.Element}
"""
element = domish.Element((None, 'subscription'))
if self.nodeIdentifier:
element['node'] = self.nodeIdentifier
element['jid'] = unicode(self.subscriber)
element['subscription'] = self.state
if self.subscriptionIdentifier:
element['subid'] = self.subscriptionIdentifier
return element



Expand Down Expand Up @@ -229,16 +265,16 @@ class PubSubRequest(generic.Stanza):
_parameters = {
'publish': ['node', 'items'],
'subscribe': ['nodeOrEmpty', 'jid', 'optionsWithSubscribe'],
'unsubscribe': ['nodeOrEmpty', 'jid'],
'optionsGet': ['nodeOrEmpty', 'jid'],
'optionsSet': ['nodeOrEmpty', 'jid', 'options'],
'unsubscribe': ['nodeOrEmpty', 'jid', 'subidOrNone'],
'optionsGet': ['nodeOrEmpty', 'jid', 'subidOrNone'],
'optionsSet': ['nodeOrEmpty', 'jid', 'options', 'subidOrNone'],
'subscriptions': [],
'affiliations': [],
'create': ['nodeOrNone', 'configureOrNone'],
'default': ['default'],
'configureGet': ['nodeOrEmpty'],
'configureSet': ['nodeOrEmpty', 'configure'],
'items': ['node', 'maxItems', 'itemIdentifiers'],
'items': ['node', 'maxItems', 'itemIdentifiers', 'subidOrNone'],
'retract': ['node', 'itemIdentifiers'],
'purge': ['node'],
'delete': ['node'],
Expand Down Expand Up @@ -429,12 +465,27 @@ def _parse_maxItems(self, verbElement):

def _render_maxItems(self, verbElement):
"""
Parse maximum items into an items request.
Render maximum items into an items request.
"""
if self.maxItems:
verbElement['max_items'] = unicode(self.maxItems)


def _parse_subidOrNone(self, verbElement):
"""
Parse subscription identifier out of a request.
"""
self.subscriptionIdentifier = verbElement.getAttribute("subid")


def _render_subidOrNone(self, verbElement):
"""
Render subscription identifier into a request.
"""
if self.subscriptionIdentifier:
verbElement['subid'] = self.subscriptionIdentifier


def _parse_options(self, verbElement):
"""
Parse options form out of a subscription options request.
Expand Down Expand Up @@ -729,13 +780,20 @@ def subscribe(self, service, nodeIdentifier, subscriber,
@param service: The publish subscribe service that keeps the node.
@type service: L{JID}
@param nodeIdentifier: The identifier of the node.
@type nodeIdentifier: C{unicode}
@param subscriber: The entity to subscribe to the node. This entity
will get notifications of new published items.
will get notifications of new published items.
@type subscriber: L{JID}
@param options: Subscription options.
@type options: C{dict}.
@type options: C{dict}
@return: Deferred that fires with L{Subscription} or errbacks with
L{SubscriptionPending} or L{SubscriptionUnconfigured}.
@rtype: L{defer.Deferred}
"""
request = PubSubRequest('subscribe')
request.recipient = service
Expand All @@ -750,38 +808,45 @@ def subscribe(self, service, nodeIdentifier, subscriber,
request.options = form

def cb(iq):
subscription = iq.pubsub.subscription["subscription"]
subscription = Subscription.fromElement(iq.pubsub.subscription)

if subscription == 'pending':
raise SubscriptionPending
elif subscription == 'unconfigured':
raise SubscriptionUnconfigured
if subscription.state == 'pending':
raise SubscriptionPending()
elif subscription.state == 'unconfigured':
raise SubscriptionUnconfigured()
else:
# we assume subscription == 'subscribed'
# any other value would be invalid, but that should have
# yielded a stanza error.
return None
return subscription

d = request.send(self.xmlstream)
d.addCallback(cb)
return d


def unsubscribe(self, service, nodeIdentifier, subscriber, sender=None):
def unsubscribe(self, service, nodeIdentifier, subscriber,
subscriptionIdentifier=None, sender=None):
"""
Unsubscribe from a publish subscribe node.
@param service: The publish subscribe service that keeps the node.
@type service: L{JID}
@param nodeIdentifier: The identifier of the node.
@type nodeIdentifier: C{unicode}
@param subscriber: The entity to unsubscribe from the node.
@type subscriber: L{JID}
@param subscriptionIdentifier: Optional subscription identifier.
@type subscriptionIdentifier: C{unicode}
"""
request = PubSubRequest('unsubscribe')
request.recipient = service
request.nodeIdentifier = nodeIdentifier
request.subscriber = subscriber
request.subscriptionIdentifier = subscriptionIdentifier
request.sender = sender
return request.send(self.xmlstream)

Expand All @@ -805,22 +870,31 @@ def publish(self, service, nodeIdentifier, items=None, sender=None):
return request.send(self.xmlstream)


def items(self, service, nodeIdentifier, maxItems=None, sender=None):
def items(self, service, nodeIdentifier, maxItems=None,
subscriptionIdentifier=None, sender=None):
"""
Retrieve previously published items from a publish subscribe node.
@param service: The publish subscribe service that keeps the node.
@type service: L{JID}
@param nodeIdentifier: The identifier of the node.
@type nodeIdentifier: C{unicode}
@param maxItems: Optional limit on the number of retrieved items.
@type maxItems: C{int}
@param subscriptionIdentifier: Optional subscription identifier. In
case the node has been subscribed to multiple times, this narrows
the results to the specific subscription.
@type subscriptionIdentifier: C{unicode}
"""
request = PubSubRequest('items')
request.recipient = service
request.nodeIdentifier = nodeIdentifier
if maxItems:
request.maxItems = str(int(maxItems))
request.subscriptionIdentifier = subscriptionIdentifier
request.sender = sender

def cb(iq):
Expand All @@ -835,7 +909,8 @@ def cb(iq):
return d


def getOptions(self, service, nodeIdentifier, subscriber, sender=None):
def getOptions(self, service, nodeIdentifier, subscriber,
subscriptionIdentifier=None, sender=None):
"""
Get subscription options.
Expand All @@ -848,12 +923,16 @@ def getOptions(self, service, nodeIdentifier, subscriber, sender=None):
@param subscriber: The entity subscribed to the node.
@type subscriber: L{JID}
@param subscriptionIdentifier: Optional subscription identifier.
@type subscriptionIdentifier: C{unicode}
@rtype: L{data_form.Form}
"""
request = PubSubRequest('optionsGet')
request.recipient = service
request.nodeIdentifier = nodeIdentifier
request.subscriber = subscriber
request.subscriptionIdentifier = subscriptionIdentifier
request.sender = sender

def cb(iq):
Expand All @@ -868,7 +947,7 @@ def cb(iq):


def setOptions(self, service, nodeIdentifier, subscriber,
options, sender=None):
options, subscriptionIdentifier=None, sender=None):
"""
Set subscription options.
Expand All @@ -883,11 +962,15 @@ def setOptions(self, service, nodeIdentifier, subscriber,
@param options: Subscription options.
@type options: C{dict}.
@param subscriptionIdentifier: Optional subscription identifier.
@type subscriptionIdentifier: C{unicode}
"""
request = PubSubRequest('optionsSet')
request.recipient = service
request.nodeIdentifier = nodeIdentifier
request.subscriber = subscriber
request.subscriptionIdentifier = subscriptionIdentifier
request.sender = sender

form = data_form.Form(formType='submit',
Expand Down Expand Up @@ -1092,22 +1175,15 @@ def _onPubSubRequest(self, iq):

def _toResponse_subscribe(self, result, resource, request):
response = domish.Element((NS_PUBSUB, "pubsub"))
subscription = response.addElement("subscription")
if result.nodeIdentifier:
subscription["node"] = result.nodeIdentifier
subscription["jid"] = result.subscriber.full()
subscription["subscription"] = result.state
subscription = response.addChild(result.toElement())
return response


def _toResponse_subscriptions(self, result, resource, request):
response = domish.Element((NS_PUBSUB, 'pubsub'))
subscriptions = response.addElement('subscriptions')
for subscription in result:
item = subscriptions.addElement('subscription')
item['node'] = subscription.nodeIdentifier
item['jid'] = subscription.subscriber.full()
item['subscription'] = subscription.state
subscriptions.addChild(subscription.toElement())
return response


Expand Down
Loading

0 comments on commit 2035d28

Please sign in to comment.