Skip to content

Commit

Permalink
Port X.509 test parser to Rust (#5979)
Browse files Browse the repository at this point in the history
  • Loading branch information
alex authored Apr 22, 2021
1 parent 09a5fb4 commit 4447e1b
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 85 deletions.
7 changes: 7 additions & 0 deletions src/cryptography/hazmat/bindings/_rust/asn1.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@ import typing

from cryptography.x509 import TLSFeature, PrecertPoison

class TestCertificate:
not_after_tag: int
not_before_tag: int
issuer_value_tags: typing.List[int]
subject_value_tags: typing.List[int]

def decode_dss_signature(signature: bytes) -> typing.Tuple[int, int]: ...
def encode_dss_signature(r: int, s: int) -> bytes: ...
def encode_tls_feature(ext: TLSFeature) -> bytes: ...
def parse_tls_feature(data: bytes) -> TLSFeature: ...
def encode_precert_poison(ext: PrecertPoison) -> bytes: ...
def parse_precert_poison(data: bytes) -> PrecertPoison: ...
def parse_spki_for_data(data: bytes) -> bytes: ...
def test_parse_certificate(data: bytes) -> TestCertificate: ...
6 changes: 5 additions & 1 deletion src/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ edition = "2018"
publish = false

[dependencies]
pyo3 = { version = "0.13.1", features = ["extension-module"] }
pyo3 = { version = "0.13.1" }
asn1 = { version = "0.3.6", default-features = false }

[features]
extension-module = ["pyo3/extension-module"]
default = ["extension-module"]

[lib]
name = "cryptography_rust"
crate-type = ["cdylib"]
Expand Down
89 changes: 89 additions & 0 deletions src/rust/src/asn1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,93 @@ fn encode_dss_signature(
Ok(pyo3::types::PyBytes::new(py, &result).to_object(py))
}

#[pyo3::prelude::pyclass]
struct TestCertificate {
#[pyo3(get)]
not_before_tag: u8,
#[pyo3(get)]
not_after_tag: u8,
#[pyo3(get)]
issuer_value_tags: Vec<u8>,
#[pyo3(get)]
subject_value_tags: Vec<u8>,
}

fn parse_name_value_tags(p: &mut asn1::Parser) -> asn1::ParseResult<Vec<u8>> {
let mut tags = vec![];
for rdn in p.read_element::<asn1::SequenceOf<asn1::SetOf<asn1::Sequence>>>()? {
let mut attributes = rdn?.collect::<asn1::ParseResult<Vec<_>>>()?;
assert_eq!(attributes.len(), 1);

let tag = attributes
.pop()
.unwrap()
.parse::<_, asn1::ParseError, _>(|p| {
p.read_element::<asn1::ObjectIdentifier>()?;
let tlv = p.read_element::<asn1::Tlv>()?;
Ok(tlv.tag())
})?;
tags.push(tag);
}
Ok(tags)
}

#[pyo3::prelude::pyfunction]
fn test_parse_certificate(data: &[u8]) -> pyo3::PyResult<TestCertificate> {
let result = asn1::parse::<_, PyAsn1Error, _>(data, |p| {
// Outer SEQUENCE
p.read_element::<asn1::Sequence>()?.parse(|p| {
// TBS certificate
let result = p
.read_element::<asn1::Sequence>()?
.parse::<_, PyAsn1Error, _>(|p| {
// Version
p.read_optional_explicit_element::<u8>(0)?;
// Serial number
p.read_element::<asn1::BigUint>()?;
// Inner signature algorithm
p.read_element::<asn1::Sequence>()?;

// Issuer
let issuer_value_tags = parse_name_value_tags(p)?;
// Validity
let (not_before_tag, not_after_tag) = p
.read_element::<asn1::Sequence>()?
.parse::<_, asn1::ParseError, _>(|p| {
let not_before_tag = p.read_element::<asn1::Tlv>()?.tag();
let not_after_tag = p.read_element::<asn1::Tlv>()?.tag();
Ok((not_before_tag, not_after_tag))
})?;
// Subject
let subject_value_tags = parse_name_value_tags(p)?;

// Subject public key info
p.read_element::<asn1::Sequence>()?;
// Issuer unique ID - never used in the real world
p.read_optional_implicit_element::<asn1::BitString>(1)?;
// Subject unique ID - never used in the real world
p.read_optional_implicit_element::<asn1::BitString>(2)?;
// Extensions
p.read_optional_explicit_element::<asn1::Sequence>(3)?;

Ok(TestCertificate {
not_before_tag,
not_after_tag,
issuer_value_tags,
subject_value_tags,
})
})?;
// Outer signature algorithm
p.read_element::<asn1::Sequence>()?;
// Signature
p.read_element::<asn1::BitString>()?;
Ok(result)
})
})?;

Ok(result)
}

pub(crate) fn create_submodule(py: pyo3::Python) -> pyo3::PyResult<&pyo3::prelude::PyModule> {
let submod = pyo3::prelude::PyModule::new(py, "asn1")?;
submod.add_wrapped(pyo3::wrap_pyfunction!(encode_tls_feature))?;
Expand All @@ -190,5 +277,7 @@ pub(crate) fn create_submodule(py: pyo3::Python) -> pyo3::PyResult<&pyo3::prelud
submod.add_wrapped(pyo3::wrap_pyfunction!(decode_dss_signature))?;
submod.add_wrapped(pyo3::wrap_pyfunction!(encode_dss_signature))?;

submod.add_wrapped(pyo3::wrap_pyfunction!(test_parse_certificate))?;

Ok(submod)
}
104 changes: 21 additions & 83 deletions tests/x509/test_x509.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@


import binascii
import collections
import copy
import datetime
import ipaddress
Expand All @@ -18,19 +17,7 @@

from cryptography import utils, x509
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat._der import (
BIT_STRING,
CONSTRUCTED,
CONTEXT_SPECIFIC,
DERReader,
GENERALIZED_TIME,
INTEGER,
OBJECT_IDENTIFIER,
PRINTABLE_STRING,
SEQUENCE,
SET,
UTC_TIME,
)
from cryptography.hazmat.bindings._rust import asn1
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import (
dh,
Expand Down Expand Up @@ -82,52 +69,6 @@ def _load_cert(filename, loader, backend):
return cert


ParsedCertificate = collections.namedtuple(
"ParsedCertificate",
["not_before_tag", "not_after_tag", "issuer", "subject"],
)


def _parse_cert(der):
# See the Certificate structured, defined in RFC 5280.
with DERReader(der).read_single_element(SEQUENCE) as cert:
tbs_cert = cert.read_element(SEQUENCE)
# Skip outer signature algorithm
_ = cert.read_element(SEQUENCE)
# Skip signature
_ = cert.read_element(BIT_STRING)

with tbs_cert:
# Skip version
_ = tbs_cert.read_optional_element(CONTEXT_SPECIFIC | CONSTRUCTED | 0)
# Skip serialNumber
_ = tbs_cert.read_element(INTEGER)
# Skip inner signature algorithm
_ = tbs_cert.read_element(SEQUENCE)
issuer = tbs_cert.read_element(SEQUENCE)
validity = tbs_cert.read_element(SEQUENCE)
subject = tbs_cert.read_element(SEQUENCE)
# Skip subjectPublicKeyInfo
_ = tbs_cert.read_element(SEQUENCE)
# Skip issuerUniqueID
_ = tbs_cert.read_optional_element(CONTEXT_SPECIFIC | 1)
# Skip subjectUniqueID
_ = tbs_cert.read_optional_element(CONTEXT_SPECIFIC | 2)
# Skip extensions
_ = tbs_cert.read_optional_element(CONTEXT_SPECIFIC | CONSTRUCTED | 3)

with validity:
not_before_tag, _ = validity.read_any_element()
not_after_tag, _ = validity.read_any_element()

return ParsedCertificate(
not_before_tag=not_before_tag,
not_after_tag=not_after_tag,
issuer=issuer,
subject=subject,
)


class TestCertificateRevocationList(object):
def test_load_pem_crl(self, backend):
crl = _load_cert(
Expand Down Expand Up @@ -1788,29 +1729,19 @@ def test_build_cert_printable_string_country_name(self, backend):

cert = builder.sign(issuer_private_key, hashes.SHA256(), backend)

parsed = _parse_cert(cert.public_bytes(serialization.Encoding.DER))
subject = parsed.subject
issuer = parsed.issuer

def read_next_rdn_value_tag(reader):
# Assume each RDN has a single attribute.
with reader.read_element(SET) as rdn:
attribute = rdn.read_element(SEQUENCE)

with attribute:
_ = attribute.read_element(OBJECT_IDENTIFIER)
tag, value = attribute.read_any_element()
return tag
parsed = asn1.test_parse_certificate(
cert.public_bytes(serialization.Encoding.DER)
)

# Check that each value was encoded as an ASN.1 PRINTABLESTRING.
assert read_next_rdn_value_tag(subject) == PRINTABLE_STRING
assert read_next_rdn_value_tag(issuer) == PRINTABLE_STRING
assert parsed.issuer_value_tags[0] == 0x13
assert parsed.subject_value_tags[0] == 0x13
if (
# This only works correctly in OpenSSL 1.1.0f+
backend._lib.CRYPTOGRAPHY_OPENSSL_110F_OR_GREATER
):
assert read_next_rdn_value_tag(subject) == PRINTABLE_STRING
assert read_next_rdn_value_tag(issuer) == PRINTABLE_STRING
assert parsed.issuer_value_tags[1] == 0x13
assert parsed.subject_value_tags[1] == 0x13


class TestCertificateBuilder(object):
Expand Down Expand Up @@ -1971,9 +1902,13 @@ def test_extreme_times(self, not_valid_before, not_valid_after, backend):
cert = builder.sign(private_key, hashes.SHA256(), backend)
assert cert.not_valid_before == not_valid_before
assert cert.not_valid_after == not_valid_after
parsed = _parse_cert(cert.public_bytes(serialization.Encoding.DER))
assert parsed.not_before_tag == UTC_TIME
assert parsed.not_after_tag == GENERALIZED_TIME
parsed = asn1.test_parse_certificate(
cert.public_bytes(serialization.Encoding.DER)
)
# UTC TIME
assert parsed.not_before_tag == 0x17
# GENERALIZED TIME
assert parsed.not_after_tag == 0x18

def test_no_subject_name(self, backend):
subject_private_key = RSA_KEY_2048.private_key(backend)
Expand Down Expand Up @@ -2237,9 +2172,12 @@ def test_earliest_time(self, backend):
cert = cert_builder.sign(private_key, hashes.SHA256(), backend)
assert cert.not_valid_before == time
assert cert.not_valid_after == time
parsed = _parse_cert(cert.public_bytes(serialization.Encoding.DER))
assert parsed.not_before_tag == UTC_TIME
assert parsed.not_after_tag == UTC_TIME
parsed = asn1.test_parse_certificate(
cert.public_bytes(serialization.Encoding.DER)
)
# UTC TIME
assert parsed.not_before_tag == 0x17
assert parsed.not_after_tag == 0x17

def test_invalid_not_valid_after(self):
with pytest.raises(TypeError):
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ allowlist_externals =
commands =
cargo fmt --all -- --check
cargo clippy -- -D warnings
cargo test
cargo test --no-default-features

[flake8]
ignore = E203,E211,W503,W504
Expand Down

0 comments on commit 4447e1b

Please sign in to comment.