Skip to content

Commit

Permalink
Raise an error if GetCapabilities requests don't return XML (geopytho…
Browse files Browse the repository at this point in the history
…n#861)

* Raise an error if responses aren't in XML

* Updates

* Update all service calls to use util function and add tests

* Allow multiple mime types

* Add another xml type

* Lint fixes

* Add pytest
  • Loading branch information
geographika authored Oct 19, 2024
1 parent d817969 commit 373d682
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 15 deletions.
4 changes: 2 additions & 2 deletions owslib/coverage/wcsBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from urllib.parse import urlencode, parse_qsl
from owslib.etree import etree
from owslib.util import Authentication, openURL
from owslib.util import Authentication, openURL, getXMLTree


class ServiceException(Exception):
Expand Down Expand Up @@ -118,7 +118,7 @@ def read(self, service_url, timeout=30):
"""
request = self.capabilities_url(service_url)
u = openURL(request, timeout=timeout, cookies=self.cookies, auth=self.auth, headers=self.headers)
return etree.fromstring(u.read())
return getXMLTree(u)

def readString(self, st):
"""Parse a WCS capabilities document, returning an
Expand Down
4 changes: 2 additions & 2 deletions owslib/feature/common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from owslib.etree import etree
from owslib.util import Authentication, openURL
from owslib.util import Authentication, openURL, getXMLTree

from urllib.parse import urlencode, parse_qsl

Expand Down Expand Up @@ -52,7 +52,7 @@ def read(self, url, timeout=30):
"""
request = self.capabilities_url(url)
u = openURL(request, timeout=timeout, headers=self.headers, auth=self.auth)
return etree.fromstring(u.read())
return getXMLTree(u)

def readString(self, st):
"""Parse a WFS capabilities document, returning an
Expand Down
6 changes: 2 additions & 4 deletions owslib/map/common.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from urllib.parse import urlencode, parse_qsl

from owslib.etree import etree
from owslib.util import strip_bom, Authentication, openURL
from owslib.util import strip_bom, Authentication, openURL, getXMLTree


class WMSCapabilitiesReader(object):
Expand Down Expand Up @@ -64,9 +64,7 @@ def read(self, service_url, timeout=30):
spliturl = self.request.split('?')
u = openURL(spliturl[0], spliturl[1], method='Get',
timeout=timeout, headers=self.headers, auth=self.auth)

raw_text = strip_bom(u.read())
return etree.fromstring(raw_text)
return getXMLTree(u)

def readString(self, st):
"""Parse a WMS capabilities document, returning an elementtree instance.
Expand Down
4 changes: 2 additions & 2 deletions owslib/swe/observation/sos100.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from owslib import ows
from owslib.crs import Crs
from owslib.fes import FilterCapabilities
from owslib.util import openURL, testXMLValue, nspath_eval, nspath, extract_time
from owslib.util import openURL, testXMLValue, nspath_eval, nspath, extract_time, getXMLTree
from owslib.namespaces import Namespaces


Expand Down Expand Up @@ -314,7 +314,7 @@ def read(self, service_url):
getcaprequest = self.capabilities_url(service_url)
spliturl = getcaprequest.split('?')
u = openURL(spliturl[0], spliturl[1], method='Get', username=self.username, password=self.password)
return etree.fromstring(u.read())
return getXMLTree(u)

def read_string(self, st):
"""
Expand Down
4 changes: 2 additions & 2 deletions owslib/swe/observation/sos200.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from owslib import ows
from owslib.crs import Crs
from owslib.fes2 import FilterCapabilities
from owslib.util import openURL, testXMLValue, testXMLAttribute, nspath_eval, extract_time
from owslib.util import openURL, testXMLValue, testXMLAttribute, nspath_eval, extract_time, getXMLTree
from owslib.namespaces import Namespaces
from owslib.swe.observation.om import MeasurementObservation
from owslib.swe.observation.waterml2 import MeasurementTimeseriesObservation
Expand Down Expand Up @@ -331,7 +331,7 @@ def read(self, service_url):
getcaprequest = self.capabilities_url(service_url)
spliturl = getcaprequest.split('?')
u = openURL(spliturl[0], spliturl[1], method='Get', username=self.username, password=self.password)
return etree.fromstring(u.read())
return getXMLTree(u)

def read_string(self, st):
"""
Expand Down
34 changes: 34 additions & 0 deletions owslib/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,40 @@ def getXMLInteger(elem, tag):
return int(e.text.strip())


def getXMLTree(rsp: ResponseWrapper) -> etree:
"""
Parse a response into an XML elementtree instance
and raise a ValueError if the server returns a
non-XML response. The response may contain a useful
error message from the server.
Parameters
----------
@param rsp: the ResponseWrapper for the XML request
"""

raw_text = strip_bom(rsp.read())
et = etree.fromstring(raw_text)

# check for response type - if it is not xml then raise an error
content_type = rsp.info()['Content-Type']
url = rsp.geturl()

xml_types = ['text/xml', 'application/xml', 'application/vnd.ogc.wms_xml']
if not any(xt in content_type.lower() for xt in xml_types):
html_body = et.find('BODY') # note this is case-sensitive
if html_body is not None and len(html_body.text) > 0:
response_text = html_body.text.strip("\n")
else:
response_text = raw_text

raise ValueError("%s responded with Content-Type '%s': '%s'" %
(url, content_type, response_text))

return et


def testXMLValue(val, attrib=False):
"""
Expand Down
4 changes: 2 additions & 2 deletions owslib/wmts.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from urllib.parse import (urlencode, urlparse, urlunparse, parse_qs,
ParseResult)
from .etree import etree
from .util import clean_ows_url, testXMLValue, getXMLInteger, Authentication, openURL
from .util import clean_ows_url, testXMLValue, getXMLInteger, Authentication, openURL, getXMLTree
from .fgdc import Metadata
from .iso import MD_Metadata
from .ows import ServiceProvider, ServiceIdentification, OperationsMetadata
Expand Down Expand Up @@ -933,7 +933,7 @@ def read(self, service_url, vendor_kwargs=None):
# now split it up again to use the generic openURL function...
spliturl = getcaprequest.split('?')
u = openURL(spliturl[0], spliturl[1], method='Get', headers=self.headers, auth=self.auth)
return etree.fromstring(u.read())
return getXMLTree(u)

def readString(self, st):
"""Parse a WMTS capabilities document, returning an elementtree instance
Expand Down
33 changes: 32 additions & 1 deletion tests/test_util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# -*- coding: UTF-8 -*-
import codecs
from owslib.util import clean_ows_url, build_get_url, strip_bom, extract_time
from unittest import mock
import pytest
from owslib.util import clean_ows_url, build_get_url, strip_bom, extract_time, ResponseWrapper, getXMLTree
from owslib.etree import etree
from datetime import datetime, timezone

Expand Down Expand Up @@ -56,6 +58,35 @@ def test_build_get_url_overwrite():
'http://example.org/ows?SERVICE=WMS'


def test_getXMLTree_valid():

mock_resp = mock.Mock()
mock_resp.url = 'http:///example.org/?service=WFS&request=GetCapabilities&version=2.0.0'
mock_resp.content = b'<?xml version="1.0" encoding="UTF-8"?>\n<WFS_Capabilities><ServiceIdentification>' \
b'<Title>Example</Title></ServiceIdentification></WFS_Capabilities>'
mock_resp.headers = {'Content-Type': 'text/xml; charset=UTF-8'}
resp_wrap = ResponseWrapper(mock_resp)

et = getXMLTree(resp_wrap)
assert et.find('.//Title').text == "Example"


def test_getXMLTree_invalid():

mock_resp = mock.Mock()
mock_resp.url = 'http:///example.org/?service=WFS&request=GetCapabilities&version=2.0.0'
mock_resp.content = b'<HTML><HEAD></HEAD><BODY BGCOLOR="#FFFFFF">\nmsCGILoadMap(): Web application error. ' \
b'CGI variable &quot;map&quot; is not set.\n</BODY></HTML>'
mock_resp.headers = {'Content-Type': 'text/html'}
resp_wrap = ResponseWrapper(mock_resp)

with pytest.raises(ValueError) as ex:
getXMLTree(resp_wrap)

assert str(ex.value) == 'http:///example.org/?service=WFS&request=GetCapabilities&version=2.0.0 responded with Content-Type \'text/html\'' \
': \'msCGILoadMap(): Web application error. CGI variable \"map\" is not set.\''


def test_time_zone_utc():
now = datetime.utcnow()
as_utc = now.replace(tzinfo=timezone.utc)
Expand Down

0 comments on commit 373d682

Please sign in to comment.