Skip to content

Commit

Permalink
Fully type certbot-ci module (certbot#9120)
Browse files Browse the repository at this point in the history
* Fully type certbot-ci module

* Fix lint, focus lint

* Add trailing comma

* Remove unused private function

* Type properly for future usages

* Update certbot-ci/certbot_integration_tests/utils/acme_server.py

Co-authored-by: alexzorin <[email protected]>

* Cleanup files

* Fix import

* Fix mypy and lint

Co-authored-by: alexzorin <[email protected]>
  • Loading branch information
adferrand and alexzorin authored Nov 29, 2021
1 parent 0d10a44 commit aeb7beb
Show file tree
Hide file tree
Showing 26 changed files with 269 additions and 186 deletions.
4 changes: 3 additions & 1 deletion certbot-ci/certbot_integration_tests/assets/hook.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#!/usr/bin/env python
"""A Certbot hook for probing."""
import os
import sys

hook_script_type = os.path.basename(os.path.dirname(sys.argv[1]))
if hook_script_type == 'deploy' and ('RENEWED_DOMAINS' not in os.environ or 'RENEWED_LINEAGE' not in os.environ):
if hook_script_type == 'deploy' and ('RENEWED_DOMAINS' not in os.environ
or 'RENEWED_LINEAGE' not in os.environ):
sys.stderr.write('Environment variables not properly set!\n')
sys.exit(1)

Expand Down
63 changes: 29 additions & 34 deletions certbot-ci/certbot_integration_tests/certbot_tests/assertions.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
"""This module contains advanced assertions for the certbot integration tests."""
import io
import os
from typing import Type

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey, EllipticCurve
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
from cryptography.hazmat.primitives.serialization import load_pem_private_key

try:
import grp
POSIX_MODE = True
except ImportError:
import win32api
import win32security
import ntsecuritycon
POSIX_MODE = False
Expand All @@ -21,11 +21,11 @@
ADMINS_SID = 'S-1-5-32-544'


def assert_elliptic_key(key, curve):
def assert_elliptic_key(key: str, curve: Type[EllipticCurve]) -> None:
"""
Asserts that the key at the given path is an EC key using the given curve.
:param key: path to key
:param curve: name of the expected elliptic curve
:param EllipticCurve curve: name of the expected elliptic curve
"""
with open(key, 'rb') as file:
privkey1 = file.read()
Expand All @@ -36,10 +36,10 @@ def assert_elliptic_key(key, curve):
assert isinstance(key.curve, curve)


def assert_rsa_key(key):
def assert_rsa_key(key: str) -> None:
"""
Asserts that the key at the given path is an RSA key.
:param key: path to key
:param str key: path to key
"""
with open(key, 'rb') as file:
privkey1 = file.read()
Expand All @@ -48,11 +48,11 @@ def assert_rsa_key(key):
assert isinstance(key, RSAPrivateKey)


def assert_hook_execution(probe_path, probe_content):
def assert_hook_execution(probe_path: str, probe_content: str) -> None:
"""
Assert that a certbot hook has been executed
:param probe_path: path to the file that received the hook output
:param probe_content: content expected when the hook is executed
:param str probe_path: path to the file that received the hook output
:param str probe_content: content expected when the hook is executed
"""
encoding = 'utf-8' if POSIX_MODE else 'utf-16'
with io.open(probe_path, 'rt', encoding=encoding) as file:
Expand All @@ -62,34 +62,34 @@ def assert_hook_execution(probe_path, probe_content):
assert probe_content in lines


def assert_saved_renew_hook(config_dir, lineage):
def assert_saved_renew_hook(config_dir: str, lineage: str) -> None:
"""
Assert that the renew hook configuration of a lineage has been saved.
:param config_dir: location of the certbot configuration
:param lineage: lineage domain name
:param str config_dir: location of the certbot configuration
:param str lineage: lineage domain name
"""
with open(os.path.join(config_dir, 'renewal', '{0}.conf'.format(lineage))) as file_h:
assert 'renew_hook' in file_h.read()


def assert_cert_count_for_lineage(config_dir, lineage, count):
def assert_cert_count_for_lineage(config_dir: str, lineage: str, count: int) -> None:
"""
Assert the number of certificates generated for a lineage.
:param config_dir: location of the certbot configuration
:param lineage: lineage domain name
:param count: number of expected certificates
:param str config_dir: location of the certbot configuration
:param str lineage: lineage domain name
:param int count: number of expected certificates
"""
archive_dir = os.path.join(config_dir, 'archive')
lineage_dir = os.path.join(archive_dir, lineage)
certs = [file for file in os.listdir(lineage_dir) if file.startswith('cert')]
assert len(certs) == count


def assert_equals_group_permissions(file1, file2):
def assert_equals_group_permissions(file1: str, file2: str) -> None:
"""
Assert that two files have the same permissions for group owner.
:param file1: first file path to compare
:param file2: second file path to compare
:param str file1: first file path to compare
:param str file2: second file path to compare
"""
# On Windows there is no group, so this assertion does nothing on this platform
if POSIX_MODE:
Expand All @@ -99,11 +99,11 @@ def assert_equals_group_permissions(file1, file2):
assert mode_file1 == mode_file2


def assert_equals_world_read_permissions(file1, file2):
def assert_equals_world_read_permissions(file1: str, file2: str) -> None:
"""
Assert that two files have the same read permissions for everyone.
:param file1: first file path to compare
:param file2: second file path to compare
:param str file1: first file path to compare
:param str file2: second file path to compare
"""
if POSIX_MODE:
mode_file1 = os.stat(file1).st_mode & 0o004
Expand Down Expand Up @@ -134,11 +134,11 @@ def assert_equals_world_read_permissions(file1, file2):
assert mode_file1 == mode_file2


def assert_equals_group_owner(file1, file2):
def assert_equals_group_owner(file1: str, file2: str) -> None:
"""
Assert that two files have the same group owner.
:param file1: first file path to compare
:param file2: second file path to compare
:param str file1: first file path to compare
:param str file2: second file path to compare
"""
# On Windows there is no group, so this assertion does nothing on this platform
if POSIX_MODE:
Expand All @@ -148,10 +148,10 @@ def assert_equals_group_owner(file1, file2):
assert group_owner_file1 == group_owner_file2


def assert_world_no_permissions(file):
def assert_world_no_permissions(file: str) -> None:
"""
Assert that the given file is not world-readable.
:param file: path of the file to check
:param str file: path of the file to check
"""
if POSIX_MODE:
mode_file_all = os.stat(file).st_mode & 0o007
Expand All @@ -168,10 +168,10 @@ def assert_world_no_permissions(file):
assert not mode


def assert_world_read_permissions(file):
def assert_world_read_permissions(file: str) -> None:
"""
Assert that the given file is world-readable, but not world-writable or world-executable.
:param file: path of the file to check
:param str file: path of the file to check
"""
if POSIX_MODE:
mode_file_all = os.stat(file).st_mode & 0o007
Expand All @@ -188,8 +188,3 @@ def assert_world_read_permissions(file):
assert not mode & ntsecuritycon.FILE_GENERIC_WRITE
assert not mode & ntsecuritycon.FILE_GENERIC_EXECUTE
assert mode & ntsecuritycon.FILE_GENERIC_READ == ntsecuritycon.FILE_GENERIC_READ


def _get_current_user():
account_name = win32api.GetUserNameEx(win32api.NameSamCompatible)
return win32security.LookupAccountName(None, account_name)[0]
24 changes: 15 additions & 9 deletions certbot-ci/certbot_integration_tests/certbot_tests/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,25 @@
import shutil
import sys
import tempfile
from typing import Iterable
from typing import Tuple

import pytest

from certbot_integration_tests.utils import certbot_call


class IntegrationTestsContext:
"""General fixture describing a certbot integration tests context"""
def __init__(self, request):
def __init__(self, request: pytest.FixtureRequest) -> None:
self.request = request

if hasattr(request.config, 'workerinput'): # Worker node
self.worker_id = request.config.workerinput['workerid']
acme_xdist = request.config.workerinput['acme_xdist']
self.worker_id = request.config.workerinput['workerid'] # type: ignore[attr-defined]
acme_xdist = request.config.workerinput['acme_xdist'] # type: ignore[attr-defined]
else: # Primary node
self.worker_id = 'primary'
acme_xdist = request.config.acme_xdist
acme_xdist = request.config.acme_xdist # type: ignore[attr-defined]

self.acme_server = acme_xdist['acme_server']
self.directory_url = acme_xdist['directory_url']
Expand Down Expand Up @@ -52,31 +56,33 @@ def __init__(self, request):
'"'
).format(sys.executable, self.challtestsrv_port)

def cleanup(self):
def cleanup(self) -> None:
"""Cleanup the integration test context."""
shutil.rmtree(self.workspace)

def certbot(self, args, force_renew=True):
def certbot(self, args: Iterable[str], force_renew: bool = True) -> Tuple[str, str]:
"""
Execute certbot with given args, not renewing certificates by default.
:param args: args to pass to certbot
:param force_renew: set to False to not renew by default
:param bool force_renew: set to False to not renew by default
:return: stdout and stderr from certbot execution
:rtype: Tuple of `str`
"""
command = ['--authenticator', 'standalone', '--installer', 'null']
command.extend(args)
return certbot_call.certbot_test(
command, self.directory_url, self.http_01_port, self.tls_alpn_01_port,
self.config_dir, self.workspace, force_renew=force_renew)

def get_domain(self, subdomain='le'):
def get_domain(self, subdomain: str = 'le') -> str:
"""
Generate a certificate domain name suitable for distributed certbot integration tests.
This is a requirement to let the distribution know how to redirect the challenge check
from the ACME server to the relevant pytest-xdist worker. This resolution is done by
appending the pytest worker id to the subdomain, using this pattern:
{subdomain}.{worker_id}.wtf
:param subdomain: the subdomain to use in the generated domain (default 'le')
:param str subdomain: the subdomain to use in the generated domain (default 'le')
:return: the well-formed domain suitable for redirection on
:rtype: str
"""
return '{0}.{1}.wtf'.format(subdomain, self.worker_id)
Loading

0 comments on commit aeb7beb

Please sign in to comment.