forked from pubsubhubbub/PubSubHubbub
-
Notifications
You must be signed in to change notification settings - Fork 0
/
feed_identifier.py
137 lines (104 loc) · 3.52 KB
/
feed_identifier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/env python
#
# Copyright 2009 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
__author__ = '[email protected] (Brett Slatkin)'
"""Atom/RSS feed parser that determines a feed's canonical ID."""
import cStringIO
import logging
import re
import xml.sax
import xml.sax.handler
import xml.sax.saxutils
# Set to true to see stack level messages and other debugging information.
DEBUG = False
class TrivialEntityResolver(xml.sax.handler.EntityResolver):
"""Pass-through entity resolver."""
def resolveEntity(self, publicId, systemId):
return cStringIO.StringIO()
class FeedIdentifier(xml.sax.handler.ContentHandler):
"""Base SAX content handler for identifying feeds."""
target_tag_stack = None
def __init__(self, parser):
"""Initializer.
Args:
parser: Instance of the xml.sax parser being used with this handler.
"""
self.parser = parser
self.link = []
self.tag_stack = []
self.capture_next_element = False
# SAX methods
def startElement(self, name, attrs):
if not self.link:
if DEBUG: logging.debug('Start stack level for %r', name)
self.tag_stack.append(name)
if len(self.tag_stack) == len(self.target_tag_stack):
equal = True
for value, predicate in zip(self.tag_stack, self.target_tag_stack):
if not predicate(value):
equal = False
break
if equal:
self.capture_next_element = True
def endElement(self, name):
if self.link:
self.capture_next_element = False
else:
if DEBUG: logging.debug('End stack level %r', name)
self.tag_stack.pop()
def characters(self, content):
if self.capture_next_element:
self.link.append(content)
def get_link(self):
if not self.link:
return None
else:
return ''.join(self.link).strip()
class AtomFeedIdentifier(FeedIdentifier):
"""SAX content handler for identifying Atom feeds."""
target_tag_stack = [
re.compile(k).match for k in (
'([^:]+:)?feed$',
'([^:]+:)?id$')]
class RssFeedIdentifier(FeedIdentifier):
"""SAX content handler for identifying RSS feeds."""
target_tag_stack = (
[re.compile('^(?i)(rss)|(.*rdf)$').match] +
[re.compile(k).match for k in ('channel', 'link')])
def identify(data, format):
"""Identifies a feed.
Args:
data: String containing the data of the XML feed to parse.
format: String naming the format of the data. Should be 'rss' or 'atom'.
Returns:
The ID of the feed, or None if one could not be determined (due to parse
errors, etc).
Raises:
xml.sax.SAXException on parse errors.
"""
data_stream = cStringIO.StringIO(data)
parser = xml.sax.make_parser()
if format == 'atom':
handler = AtomFeedIdentifier(parser)
elif format == 'rss':
handler = RssFeedIdentifier(parser)
else:
assert False, 'Invalid feed format "%s"' % format
parser.setContentHandler(handler)
parser.setEntityResolver(TrivialEntityResolver())
parser.parse(data_stream)
return handler.get_link()
__all__ = ['identify', 'DEBUG']