Skip to content

Commit

Permalink
Migrate primary functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterKale committed Oct 14, 2021
1 parent 69893d7 commit fe889eb
Show file tree
Hide file tree
Showing 44 changed files with 4,027 additions and 1 deletion.
2 changes: 1 addition & 1 deletion webauthn/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.0.0'
__version__ = "1.0.0"
4 changes: 4 additions & 0 deletions webauthn/authentication/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .generate_authentication_options import ( # noqa: F401
generate_authentication_options,
)
from .verify_authentication_response import verify_authentication_response # noqa: F401
48 changes: 48 additions & 0 deletions webauthn/authentication/generate_authentication_options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import List, Optional

from ..helpers import generate_challenge
from ..helpers.structs import (
PublicKeyCredentialDescriptor,
PublicKeyCredentialRequestOptions,
UserVerificationRequirement,
)


def generate_authentication_options(
*,
rp_id: str,
challenge: Optional[bytes] = None,
timeout: int = 60000,
allow_credentials: Optional[List[PublicKeyCredentialDescriptor]] = None,
user_verification: UserVerificationRequirement = UserVerificationRequirement.PREFERRED,
) -> PublicKeyCredentialRequestOptions:
"""Generate options for retrieving a credential via navigator.credentials.get()
Args:
`rp_id`: The Relying Party's unique identifier as specified in attestations.
(optional) `challenge`: A byte sequence for the authenticator to return back in its response. If no value is specified then a sequence of random bytes will be generated.
(optional) `timeout`: How long in milliseconds the browser should give the user to choose an authenticator. This value is a *hint* and may be ignored by the browser.
(optional) `allow_credentials`: A list of credentials registered to the user.
(optional) `user_verification`: The RP's preference for the authenticator's enforcement of the "user verified" flag.
Returns:
Authentication options ready for the browser. Consider using `helpers.options_to_json()` in this library to quickly convert the options to JSON.
"""

########
# Set defaults for required values
########

if not challenge:
challenge = generate_challenge()

if not allow_credentials:
allow_credentials = []

return PublicKeyCredentialRequestOptions(
rp_id=rp_id,
challenge=challenge,
timeout=timeout,
allow_credentials=allow_credentials,
user_verification=user_verification,
)
163 changes: 163 additions & 0 deletions webauthn/authentication/verify_authentication_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import hashlib
from typing import List, Union

from cryptography.exceptions import InvalidSignature

from ..helpers import (
bytes_to_base64url,
decode_credential_public_key,
decoded_public_key_to_cryptography,
parse_authenticator_data,
parse_client_data_json,
verify_signature,
)
from ..helpers.exceptions import InvalidAuthenticationResponse
from ..helpers.structs import (
AuthenticationCredential,
ClientDataType,
PublicKeyCredentialType,
TokenBindingStatus,
WebAuthnBaseModel,
)


class VerifiedAuthentication(WebAuthnBaseModel):
"""
Information about a verified authentication of which an RP can make use
"""

credential_id: bytes
new_sign_count: int


expected_token_binding_statuses = [
TokenBindingStatus.SUPPORTED,
TokenBindingStatus.PRESENT,
]


def verify_authentication_response(
*,
credential: AuthenticationCredential,
expected_challenge: bytes,
expected_rp_id: str,
expected_origin: Union[str, List[str]],
credential_public_key: bytes,
credential_current_sign_count: int,
require_user_verification: bool = False,
) -> VerifiedAuthentication:
"""Verify a response from navigator.credentials.get()
Args:
`credential`: The value returned from `navigator.credentials.create()`.
`expected_challenge`: The challenge passed to the authenticator within the preceding authentication options.
`expected_rp_id`: The Relying Party's unique identifier as specified in the precending authentication options.
`expected_origin`: The domain, with HTTP protocol (e.g. "https://domain.here"), on which the authentication ceremony should have occurred.
`credential_public_key`: The public key for the credential's ID as provided in a preceding authenticator registration ceremony.
`credential_current_sign_count`: The current known number of times the authenticator was used.
(optional) `require_user_verification`: Whether or not to require that the authenticator verified the user.
Returns:
Information about the authenticator
Raises:
`helpers.exceptions.InvalidAuthenticationResponse` if the response cannot be verified
"""

# FIDO-specific check
if bytes_to_base64url(credential.raw_id) != credential.id:
raise InvalidAuthenticationResponse("id and raw_id were not equivalent")

# FIDO-specific check
if credential.type != PublicKeyCredentialType.PUBLIC_KEY:
raise InvalidAuthenticationResponse(
f'Unexpected credential type "{credential.type}", expected "public-key"'
)

response = credential.response

client_data = parse_client_data_json(response.client_data_json)

if client_data.type != ClientDataType.WEBAUTHN_GET:
raise InvalidAuthenticationResponse(
f'Unexpected client data type "{client_data.type}", expected "{ClientDataType.WEBAUTHN_GET}"'
)

if expected_challenge != client_data.challenge:
raise InvalidAuthenticationResponse(
"Client data challenge was not expected challenge"
)

if isinstance(expected_origin, str):
if expected_origin != client_data.origin:
raise InvalidAuthenticationResponse(
f'Unexpected client data origin "{client_data.origin}", expected "{expected_origin}"'
)
else:
try:
expected_origin.index(client_data.origin)
except ValueError:
raise InvalidAuthenticationResponse(
f'Unexpected client data origin "{client_data.origin}", expected one of {expected_origin}'
)

if client_data.token_binding:
status = client_data.token_binding.status
if status not in expected_token_binding_statuses:
raise InvalidAuthenticationResponse(
f'Unexpected token_binding status of "{status}", expected one of "{",".join(expected_token_binding_statuses)}"'
)

auth_data = parse_authenticator_data(response.authenticator_data)

# Generate a hash of the expected RP ID for comparison
expected_rp_id_hash = hashlib.sha256()
expected_rp_id_hash.update(expected_rp_id.encode("utf-8"))
expected_rp_id_hash = expected_rp_id_hash.digest()

if auth_data.rp_id_hash != expected_rp_id_hash:
raise InvalidAuthenticationResponse("Unexpected RP ID hash")

if not auth_data.flags.up:
raise InvalidAuthenticationResponse(
"User was not present during authentication"
)

if require_user_verification and not auth_data.flags.uv:
raise InvalidAuthenticationResponse(
"User verification is required but user was not verified during authentication"
)

if (
auth_data.sign_count > 0 or credential_current_sign_count > 0
) and auth_data.sign_count <= credential_current_sign_count:
# Require the sign count to have been incremented over what was reported by the
# authenticator the last time this credential was used, otherwise this might be
# a replay attack
raise InvalidAuthenticationResponse(
f"Response sign count of {auth_data.sign_count} was not greater than current count of {credential_current_sign_count}"
)

client_data_hash = hashlib.sha256()
client_data_hash.update(response.client_data_json)
client_data_hash = client_data_hash.digest()

signature_base = response.authenticator_data + client_data_hash

try:
decoded_public_key = decode_credential_public_key(credential_public_key)
crypto_public_key = decoded_public_key_to_cryptography(decoded_public_key)

verify_signature(
public_key=crypto_public_key,
signature_alg=decoded_public_key.alg,
signature=response.signature,
data=signature_base,
)
except InvalidSignature:
raise InvalidAuthenticationResponse("Could not verify authentication signature")

return VerifiedAuthentication(
credential_id=credential.raw_id,
new_sign_count=auth_data.sign_count,
)
16 changes: 16 additions & 0 deletions webauthn/helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from .aaguid_to_string import aaguid_to_string # noqa: F401
from .base64url_to_bytes import base64url_to_bytes # noqa: F401
from .bytes_to_base64url import bytes_to_base64url # noqa: F401
from .decode_credential_public_key import decode_credential_public_key # noqa: F401
from .decoded_public_key_to_cryptography import ( # noqa: F401
decoded_public_key_to_cryptography,
)
from .generate_challenge import generate_challenge # noqa: F401
from .generate_user_handle import generate_user_handle # noqa: F401
from .hash_by_alg import hash_by_alg # noqa: F401
from .options_to_json import options_to_json # noqa: F401
from .parse_attestation_object import parse_attestation_object # noqa: F401
from .parse_authenticator_data import parse_authenticator_data # noqa: F401
from .parse_client_data_json import parse_client_data_json # noqa: F401
from .validate_certificate_chain import validate_certificate_chain # noqa: F401
from .verify_signature import verify_signature # noqa: F401
27 changes: 27 additions & 0 deletions webauthn/helpers/aaguid_to_string.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import codecs


def aaguid_to_string(val: bytes) -> str:
"""
Take aaguid bytes and convert them to a GUID string
"""
if len(val) != 16:
raise ValueError(f"AAGUID was {len(val)} bytes, expected 16 bytes")

# Convert to a hexadecimal string representation
to_hex = codecs.encode(val, encoding="hex").decode("utf-8")

# Split up the hex string into segments
# 8 chars
seg_1 = to_hex[0:8]
# 4 chars
seg_2 = to_hex[8:12]
# 4 chars
seg_3 = to_hex[12:16]
# 4 chars
seg_4 = to_hex[16:20]
# 12 chars
seg_5 = to_hex[20:32]

# "00000000-0000-0000-0000-000000000000"
return f"{seg_1}-{seg_2}-{seg_3}-{seg_4}-{seg_5}"
91 changes: 91 additions & 0 deletions webauthn/helpers/algorithms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from cryptography.hazmat.primitives.asymmetric.ec import (
ECDSA,
SECP256R1,
SECP384R1,
SECP521R1,
EllipticCurve,
EllipticCurveSignatureAlgorithm,
)
from cryptography.hazmat.primitives.hashes import (
SHA1,
SHA256,
SHA384,
SHA512,
HashAlgorithm,
)

from .cose import COSECRV, COSEAlgorithmIdentifier
from .exceptions import UnsupportedAlgorithm, UnsupportedEC2Curve


def is_rsa_pkcs(alg_id: COSEAlgorithmIdentifier) -> bool:
"""Determine if the specified COSE algorithm ID denotes an RSA PKCSv1 public key"""
return alg_id in (
COSEAlgorithmIdentifier.RSASSA_PKCS1_v1_5_SHA_1,
COSEAlgorithmIdentifier.RSASSA_PKCS1_v1_5_SHA_256,
COSEAlgorithmIdentifier.RSASSA_PKCS1_v1_5_SHA_384,
COSEAlgorithmIdentifier.RSASSA_PKCS1_v1_5_SHA_512,
)


def is_rsa_pss(alg_id: COSEAlgorithmIdentifier) -> bool:
"""Determine if the specified COSE algorithm ID denotes an RSA PSS public key"""
return alg_id in (
COSEAlgorithmIdentifier.RSASSA_PSS_SHA_256,
COSEAlgorithmIdentifier.RSASSA_PSS_SHA_384,
COSEAlgorithmIdentifier.RSASSA_PSS_SHA_512,
)


def get_ec2_sig_alg(alg_id: COSEAlgorithmIdentifier) -> EllipticCurveSignatureAlgorithm:
"""Turn an "ECDSA" COSE algorithm identifier into a corresponding signature
algorithm
"""
if alg_id == COSEAlgorithmIdentifier.ECDSA_SHA_256:
return ECDSA(SHA256())
if alg_id == COSEAlgorithmIdentifier.ECDSA_SHA_512:
return ECDSA(SHA512())

raise UnsupportedAlgorithm(f"Unrecognized EC2 signature alg {alg_id}")


def get_ec2_curve(crv_id: COSECRV) -> EllipticCurve:
"""Turn an EC2 COSE crv identifier into a corresponding curve"""
if crv_id == COSECRV.P256:
return SECP256R1()
elif crv_id == COSECRV.P384:
return SECP384R1()
elif crv_id == COSECRV.P521:
return SECP521R1()

raise UnsupportedEC2Curve(f"Unrecognized EC2 curve {crv_id}")


def get_rsa_pkcs1_sig_alg(alg_id: COSEAlgorithmIdentifier) -> HashAlgorithm:
"""Turn an "RSASSA_PKCS1" COSE algorithm identifier into a corresponding signature
algorithm
"""
if alg_id == COSEAlgorithmIdentifier.RSASSA_PKCS1_v1_5_SHA_1:
return SHA1()
if alg_id == COSEAlgorithmIdentifier.RSASSA_PKCS1_v1_5_SHA_256:
return SHA256()
if alg_id == COSEAlgorithmIdentifier.RSASSA_PKCS1_v1_5_SHA_384:
return SHA384()
if alg_id == COSEAlgorithmIdentifier.RSASSA_PKCS1_v1_5_SHA_512:
return SHA512()

raise UnsupportedAlgorithm(f"Unrecognized RSA PKCS1 signature alg {alg_id}")


def get_rsa_pss_sig_alg(alg_id: COSEAlgorithmIdentifier) -> HashAlgorithm:
"""Turn an "RSASSA_PSS" COSE algorithm identifier into a corresponding signature
algorithm
"""
if alg_id == COSEAlgorithmIdentifier.RSASSA_PSS_SHA_256:
return SHA256()
if alg_id == COSEAlgorithmIdentifier.RSASSA_PSS_SHA_384:
return SHA384()
if alg_id == COSEAlgorithmIdentifier.RSASSA_PSS_SHA_512:
return SHA512()

raise UnsupportedAlgorithm(f"Unrecognized RSA PSS signature alg {alg_id}")
Empty file.
Loading

0 comments on commit fe889eb

Please sign in to comment.