Skip to content

Commit

Permalink
Allow tests to pass without dnspython (certbot#3581)
Browse files Browse the repository at this point in the history
* move skipUnless to test_util

* add skip_unless to acme test_util

* Make dns_resolver_tests work with and without dnspython

* make acme.challenges_test pass when dns is unavailable
  • Loading branch information
bmw authored Oct 4, 2016
1 parent 3c85ecb commit da22e64
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 59 deletions.
67 changes: 44 additions & 23 deletions acme/acme/challenges_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from acme import jose
from acme import test_util


CERT = test_util.load_comparable_cert('cert.pem')
KEY = jose.JWKRSA(key=test_util.load_rsa_private_key('rsa512_key.pem'))

Expand Down Expand Up @@ -77,6 +76,20 @@ def test_verify_wrong_form(self):
self.assertFalse(response.verify(self.chall, KEY.public_key()))


def dns_available():
"""Checks if dns can be imported.
:rtype: bool
:returns: ``True`` if dns can be imported, otherwise, ``False``
"""
try:
import dns # pylint: disable=unused-variable
except ImportError: # pragma: no cover
return False
return True # pragma: no cover


class DNS01ResponseTest(unittest.TestCase):
# pylint: disable=too-many-instance-attributes

Expand All @@ -92,6 +105,7 @@ def setUp(self):
from acme.challenges import DNS01
self.chall = DNS01(token=(b'x' * 16))
self.response = self.chall.response(KEY)
self.records_for_name_path = "acme.dns_resolver.txt_records_for_name"

def test_to_partial_json(self):
self.assertEqual(self.jmsg, self.msg.to_partial_json())
Expand All @@ -108,28 +122,35 @@ def test_simple_verify_bad_key_authorization(self):
key2 = jose.JWKRSA.load(test_util.load_vector('rsa256_key.pem'))
self.response.simple_verify(self.chall, "local", key2.public_key())

@mock.patch("acme.dns_resolver.txt_records_for_name")
def test_simple_verify_good_validation(self, mock_resolver):
mock_resolver.return_value = [self.chall.validation(KEY.public_key())]
self.assertTrue(self.response.simple_verify(
self.chall, "local", KEY.public_key()))
mock_resolver.assert_called_once_with(
self.chall.validation_domain_name("local"))

@mock.patch("acme.dns_resolver.txt_records_for_name")
def test_simple_verify_good_validation_multiple_txts(self, mock_resolver):
mock_resolver.return_value = [
"!", self.chall.validation(KEY.public_key())]
self.assertTrue(self.response.simple_verify(
self.chall, "local", KEY.public_key()))
mock_resolver.assert_called_once_with(
self.chall.validation_domain_name("local"))

@mock.patch("acme.dns_resolver.txt_records_for_name")
def test_simple_verify_bad_validation(self, mock_dns):
mock_dns.return_value = ["!"]
self.assertFalse(self.response.simple_verify(
self.chall, "local", KEY.public_key()))
@test_util.skip_unless(dns_available(),
"optional dependency dnspython is not available")
def test_simple_verify_good_validation(self): # pragma: no cover
with mock.patch(self.records_for_name_path) as mock_resolver:
mock_resolver.return_value = [
self.chall.validation(KEY.public_key())]
self.assertTrue(self.response.simple_verify(
self.chall, "local", KEY.public_key()))
mock_resolver.assert_called_once_with(
self.chall.validation_domain_name("local"))

@test_util.skip_unless(dns_available(),
"optional dependency dnspython is not available")
def test_simple_verify_good_validation_multitxts(self): # pragma: no cover
with mock.patch(self.records_for_name_path) as mock_resolver:
mock_resolver.return_value = [
"!", self.chall.validation(KEY.public_key())]
self.assertTrue(self.response.simple_verify(
self.chall, "local", KEY.public_key()))
mock_resolver.assert_called_once_with(
self.chall.validation_domain_name("local"))

@test_util.skip_unless(dns_available(),
"optional dependency dnspython is not available")
def test_simple_verify_bad_validation(self): # pragma: no cover
with mock.patch(self.records_for_name_path) as mock_resolver:
mock_resolver.return_value = ["!"]
self.assertFalse(self.response.simple_verify(
self.chall, "local", KEY.public_key()))


class DNS01Test(unittest.TestCase):
Expand Down
55 changes: 41 additions & 14 deletions acme/acme/dns_resolver_test.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
"""Tests for acme.dns_resolver."""
import sys
import unittest

import mock

from acme import dns_resolver
from acme import test_util


try:
import dns
DNS_AVAILABLE = True # pragma: no cover
except ImportError: # pragma: no cover
dns = None
DNS_AVAILABLE = False


def create_txt_response(name, txt_records):
Expand All @@ -21,33 +25,56 @@ def create_txt_response(name, txt_records):
return dns.rrset.from_text_list(name, 60, "IN", "TXT", txt_records)


class TxtRecordsForNameTest(unittest.TestCase):
@test_util.skip_unless(DNS_AVAILABLE,
"optional dependency dnspython is not available")
class DnsResolverTestWithDns(unittest.TestCase):
"""Tests for acme.dns_resolver when dns is available."""
@classmethod
def _call(cls, name):
from acme import dns_resolver
return dns_resolver.txt_records_for_name(name)

@mock.patch("acme.dns_resolver.dns.resolver.query")
def test_txt_records_for_name_with_single_response(self, mock_dns):
mock_dns.return_value = create_txt_response('name', ['response'])
self.assertEqual(['response'],
dns_resolver.txt_records_for_name('name'))
self.assertEqual(['response'], self._call('name'))

@mock.patch("acme.dns_resolver.dns.resolver.query")
def test_txt_records_for_name_with_multiple_responses(self, mock_dns):
mock_dns.return_value = create_txt_response(
'name', ['response1', 'response2'])
self.assertEqual(['response1', 'response2'],
dns_resolver.txt_records_for_name('name'))
self.assertEqual(['response1', 'response2'], self._call('name'))

@mock.patch("acme.dns_resolver.dns.resolver.query")
def test_txt_records_for_name_domain_not_found(self, mock_dns):
mock_dns.side_effect = dns.resolver.NXDOMAIN
self.assertEquals([], dns_resolver.txt_records_for_name('name'))
self.assertEquals([], self._call('name'))

@mock.patch("acme.dns_resolver.dns.resolver.query")
def test_txt_records_for_name_domain_other_error(self, mock_dns):
mock_dns.side_effect = dns.exception.DNSException
self.assertEquals([], dns_resolver.txt_records_for_name('name'))
self.assertEquals([], self._call('name'))


class DnsResolverTestWithoutDns(unittest.TestCase):
"""Tests for acme.dns_resolver when dns is unavailable."""
def setUp(self):
self.dns_module = sys.modules['dns'] if 'dns' in sys.modules else None

if DNS_AVAILABLE:
sys.modules['dns'] = None # pragma: no cover

def tearDown(self):
if self.dns_module is not None:
sys.modules['dns'] = self.dns_module # pragma: no cover

@classmethod
def _import_dns(cls):
import dns as failed_dns_import # pylint: disable=unused-variable

def test_import_error_is_raised(self):
self.assertRaises(ImportError, self._import_dns)


def run(self, result=None):
if dns is None: # pragma: no cover
print(self, "... SKIPPING, no dnspython available")
return
super(TxtRecordsForNameTest, self).run(result)
if __name__ == '__main__':
unittest.main() # pragma: no cover
22 changes: 22 additions & 0 deletions acme/acme/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""
import os
import pkg_resources
import unittest

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
Expand Down Expand Up @@ -73,3 +74,24 @@ def load_pyopenssl_private_key(*names):
loader = _guess_loader(
names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1)
return OpenSSL.crypto.load_privatekey(loader, load_vector(*names))


def skip_unless(condition, reason): # pragma: no cover
"""Skip tests unless a condition holds.
This implements the basic functionality of unittest.skipUnless
which is only available on Python 2.7+.
:param bool condition: If ``False``, the test will be skipped
:param str reason: the reason for skipping the test
:rtype: callable
:returns: decorator that hides tests unless condition is ``True``
"""
if hasattr(unittest, "skipUnless"):
return unittest.skipUnless(condition, reason)
elif condition:
return lambda cls: cls
else:
return lambda cls: None
26 changes: 4 additions & 22 deletions certbot/plugins/util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import mock
from six.moves import reload_module # pylint: disable=import-error

from certbot.tests import test_util


class PathSurgeryTest(unittest.TestCase):
"""Tests for certbot.plugins.path_surgery."""
Expand Down Expand Up @@ -89,28 +91,8 @@ def psutil_available():
return True


def skipUnless(condition, reason):
"""Skip tests unless a condition holds.
This implements the basic functionality of unittest.skipUnless
which is only available on Python 2.7+.
:param bool condition: If ``False``, the test will be skipped
:param str reason: the reason for skipping the test
:rtype: callable
:returns: decorator that hides tests unless condition is ``True``
"""
if hasattr(unittest, "skipUnless"):
return unittest.skipUnless(condition, reason)
elif condition:
return lambda cls: cls
else:
return lambda cls: None


@skipUnless(psutil_available(), "optional dependency psutil is not available")
@test_util.skip_unless(psutil_available(),
"optional dependency psutil is not available")
class AlreadyListeningTestPsutil(unittest.TestCase):
"""Tests for certbot.plugins.already_listening."""
def _call(self, *args, **kwargs):
Expand Down
22 changes: 22 additions & 0 deletions certbot/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""
import os
import pkg_resources
import unittest

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
Expand Down Expand Up @@ -73,3 +74,24 @@ def load_pyopenssl_private_key(*names):
loader = _guess_loader(
names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1)
return OpenSSL.crypto.load_privatekey(loader, load_vector(*names))


def skip_unless(condition, reason): # pragma: no cover
"""Skip tests unless a condition holds.
This implements the basic functionality of unittest.skipUnless
which is only available on Python 2.7+.
:param bool condition: If ``False``, the test will be skipped
:param str reason: the reason for skipping the test
:rtype: callable
:returns: decorator that hides tests unless condition is ``True``
"""
if hasattr(unittest, "skipUnless"):
return unittest.skipUnless(condition, reason)
elif condition:
return lambda cls: cls
else:
return lambda cls: None

0 comments on commit da22e64

Please sign in to comment.