diff --git a/acme/acme/challenges_test.py b/acme/acme/challenges_test.py index 27976931a49..dfd40ebdb4d 100644 --- a/acme/acme/challenges_test.py +++ b/acme/acme/challenges_test.py @@ -11,7 +11,6 @@ from acme import jose from acme import test_util - CERT = test_util.load_comparable_cert('cert.pem') KEY = jose.JWKRSA(key=test_util.load_rsa_private_key('rsa512_key.pem')) @@ -77,6 +76,20 @@ def test_verify_wrong_form(self): self.assertFalse(response.verify(self.chall, KEY.public_key())) +def dns_available(): + """Checks if dns can be imported. + + :rtype: bool + :returns: ``True`` if dns can be imported, otherwise, ``False`` + + """ + try: + import dns # pylint: disable=unused-variable + except ImportError: # pragma: no cover + return False + return True # pragma: no cover + + class DNS01ResponseTest(unittest.TestCase): # pylint: disable=too-many-instance-attributes @@ -92,6 +105,7 @@ def setUp(self): from acme.challenges import DNS01 self.chall = DNS01(token=(b'x' * 16)) self.response = self.chall.response(KEY) + self.records_for_name_path = "acme.dns_resolver.txt_records_for_name" def test_to_partial_json(self): self.assertEqual(self.jmsg, self.msg.to_partial_json()) @@ -108,28 +122,35 @@ def test_simple_verify_bad_key_authorization(self): key2 = jose.JWKRSA.load(test_util.load_vector('rsa256_key.pem')) self.response.simple_verify(self.chall, "local", key2.public_key()) - @mock.patch("acme.dns_resolver.txt_records_for_name") - def test_simple_verify_good_validation(self, mock_resolver): - mock_resolver.return_value = [self.chall.validation(KEY.public_key())] - self.assertTrue(self.response.simple_verify( - self.chall, "local", KEY.public_key())) - mock_resolver.assert_called_once_with( - self.chall.validation_domain_name("local")) - - @mock.patch("acme.dns_resolver.txt_records_for_name") - def test_simple_verify_good_validation_multiple_txts(self, mock_resolver): - mock_resolver.return_value = [ - "!", self.chall.validation(KEY.public_key())] - self.assertTrue(self.response.simple_verify( - self.chall, "local", KEY.public_key())) - mock_resolver.assert_called_once_with( - self.chall.validation_domain_name("local")) - - @mock.patch("acme.dns_resolver.txt_records_for_name") - def test_simple_verify_bad_validation(self, mock_dns): - mock_dns.return_value = ["!"] - self.assertFalse(self.response.simple_verify( - self.chall, "local", KEY.public_key())) + @test_util.skip_unless(dns_available(), + "optional dependency dnspython is not available") + def test_simple_verify_good_validation(self): # pragma: no cover + with mock.patch(self.records_for_name_path) as mock_resolver: + mock_resolver.return_value = [ + self.chall.validation(KEY.public_key())] + self.assertTrue(self.response.simple_verify( + self.chall, "local", KEY.public_key())) + mock_resolver.assert_called_once_with( + self.chall.validation_domain_name("local")) + + @test_util.skip_unless(dns_available(), + "optional dependency dnspython is not available") + def test_simple_verify_good_validation_multitxts(self): # pragma: no cover + with mock.patch(self.records_for_name_path) as mock_resolver: + mock_resolver.return_value = [ + "!", self.chall.validation(KEY.public_key())] + self.assertTrue(self.response.simple_verify( + self.chall, "local", KEY.public_key())) + mock_resolver.assert_called_once_with( + self.chall.validation_domain_name("local")) + + @test_util.skip_unless(dns_available(), + "optional dependency dnspython is not available") + def test_simple_verify_bad_validation(self): # pragma: no cover + with mock.patch(self.records_for_name_path) as mock_resolver: + mock_resolver.return_value = ["!"] + self.assertFalse(self.response.simple_verify( + self.chall, "local", KEY.public_key())) class DNS01Test(unittest.TestCase): diff --git a/acme/acme/dns_resolver_test.py b/acme/acme/dns_resolver_test.py index 53fc0cc77f5..03f1b3a9372 100644 --- a/acme/acme/dns_resolver_test.py +++ b/acme/acme/dns_resolver_test.py @@ -1,13 +1,17 @@ """Tests for acme.dns_resolver.""" +import sys import unittest + import mock -from acme import dns_resolver +from acme import test_util + try: import dns + DNS_AVAILABLE = True # pragma: no cover except ImportError: # pragma: no cover - dns = None + DNS_AVAILABLE = False def create_txt_response(name, txt_records): @@ -21,33 +25,56 @@ def create_txt_response(name, txt_records): return dns.rrset.from_text_list(name, 60, "IN", "TXT", txt_records) -class TxtRecordsForNameTest(unittest.TestCase): +@test_util.skip_unless(DNS_AVAILABLE, + "optional dependency dnspython is not available") +class DnsResolverTestWithDns(unittest.TestCase): + """Tests for acme.dns_resolver when dns is available.""" + @classmethod + def _call(cls, name): + from acme import dns_resolver + return dns_resolver.txt_records_for_name(name) @mock.patch("acme.dns_resolver.dns.resolver.query") def test_txt_records_for_name_with_single_response(self, mock_dns): mock_dns.return_value = create_txt_response('name', ['response']) - self.assertEqual(['response'], - dns_resolver.txt_records_for_name('name')) + self.assertEqual(['response'], self._call('name')) @mock.patch("acme.dns_resolver.dns.resolver.query") def test_txt_records_for_name_with_multiple_responses(self, mock_dns): mock_dns.return_value = create_txt_response( 'name', ['response1', 'response2']) - self.assertEqual(['response1', 'response2'], - dns_resolver.txt_records_for_name('name')) + self.assertEqual(['response1', 'response2'], self._call('name')) @mock.patch("acme.dns_resolver.dns.resolver.query") def test_txt_records_for_name_domain_not_found(self, mock_dns): mock_dns.side_effect = dns.resolver.NXDOMAIN - self.assertEquals([], dns_resolver.txt_records_for_name('name')) + self.assertEquals([], self._call('name')) @mock.patch("acme.dns_resolver.dns.resolver.query") def test_txt_records_for_name_domain_other_error(self, mock_dns): mock_dns.side_effect = dns.exception.DNSException - self.assertEquals([], dns_resolver.txt_records_for_name('name')) + self.assertEquals([], self._call('name')) + + +class DnsResolverTestWithoutDns(unittest.TestCase): + """Tests for acme.dns_resolver when dns is unavailable.""" + def setUp(self): + self.dns_module = sys.modules['dns'] if 'dns' in sys.modules else None + + if DNS_AVAILABLE: + sys.modules['dns'] = None # pragma: no cover + + def tearDown(self): + if self.dns_module is not None: + sys.modules['dns'] = self.dns_module # pragma: no cover + + @classmethod + def _import_dns(cls): + import dns as failed_dns_import # pylint: disable=unused-variable + + def test_import_error_is_raised(self): + self.assertRaises(ImportError, self._import_dns) + - def run(self, result=None): - if dns is None: # pragma: no cover - print(self, "... SKIPPING, no dnspython available") - return - super(TxtRecordsForNameTest, self).run(result) +if __name__ == '__main__': + unittest.main() # pragma: no cover diff --git a/acme/acme/test_util.py b/acme/acme/test_util.py index 24eceff5ae5..0f5763682c2 100644 --- a/acme/acme/test_util.py +++ b/acme/acme/test_util.py @@ -5,6 +5,7 @@ """ import os import pkg_resources +import unittest from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization @@ -73,3 +74,24 @@ def load_pyopenssl_private_key(*names): loader = _guess_loader( names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1) return OpenSSL.crypto.load_privatekey(loader, load_vector(*names)) + + +def skip_unless(condition, reason): # pragma: no cover + """Skip tests unless a condition holds. + + This implements the basic functionality of unittest.skipUnless + which is only available on Python 2.7+. + + :param bool condition: If ``False``, the test will be skipped + :param str reason: the reason for skipping the test + + :rtype: callable + :returns: decorator that hides tests unless condition is ``True`` + + """ + if hasattr(unittest, "skipUnless"): + return unittest.skipUnless(condition, reason) + elif condition: + return lambda cls: cls + else: + return lambda cls: None diff --git a/certbot/plugins/util_test.py b/certbot/plugins/util_test.py index 27ede653302..71fb2a023dc 100644 --- a/certbot/plugins/util_test.py +++ b/certbot/plugins/util_test.py @@ -6,6 +6,8 @@ import mock from six.moves import reload_module # pylint: disable=import-error +from certbot.tests import test_util + class PathSurgeryTest(unittest.TestCase): """Tests for certbot.plugins.path_surgery.""" @@ -89,28 +91,8 @@ def psutil_available(): return True -def skipUnless(condition, reason): - """Skip tests unless a condition holds. - - This implements the basic functionality of unittest.skipUnless - which is only available on Python 2.7+. - - :param bool condition: If ``False``, the test will be skipped - :param str reason: the reason for skipping the test - - :rtype: callable - :returns: decorator that hides tests unless condition is ``True`` - - """ - if hasattr(unittest, "skipUnless"): - return unittest.skipUnless(condition, reason) - elif condition: - return lambda cls: cls - else: - return lambda cls: None - - -@skipUnless(psutil_available(), "optional dependency psutil is not available") +@test_util.skip_unless(psutil_available(), + "optional dependency psutil is not available") class AlreadyListeningTestPsutil(unittest.TestCase): """Tests for certbot.plugins.already_listening.""" def _call(self, *args, **kwargs): diff --git a/certbot/tests/test_util.py b/certbot/tests/test_util.py index 24eceff5ae5..0f5763682c2 100644 --- a/certbot/tests/test_util.py +++ b/certbot/tests/test_util.py @@ -5,6 +5,7 @@ """ import os import pkg_resources +import unittest from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization @@ -73,3 +74,24 @@ def load_pyopenssl_private_key(*names): loader = _guess_loader( names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1) return OpenSSL.crypto.load_privatekey(loader, load_vector(*names)) + + +def skip_unless(condition, reason): # pragma: no cover + """Skip tests unless a condition holds. + + This implements the basic functionality of unittest.skipUnless + which is only available on Python 2.7+. + + :param bool condition: If ``False``, the test will be skipped + :param str reason: the reason for skipping the test + + :rtype: callable + :returns: decorator that hides tests unless condition is ``True`` + + """ + if hasattr(unittest, "skipUnless"): + return unittest.skipUnless(condition, reason) + elif condition: + return lambda cls: cls + else: + return lambda cls: None