Skip to content

Commit

Permalink
If an intermediate CA is trusted, don't force to distribute the full …
Browse files Browse the repository at this point in the history
…chain on client computers.

Changed:in check_control_signature: renamed cabundle to trusted_bundle for clarity
Added: SSLCertificateSigningRequest class
  • Loading branch information
htouvet committed Jan 8, 2018
1 parent d38f70f commit 92d03ce
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 19 deletions.
4 changes: 0 additions & 4 deletions common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3641,10 +3641,6 @@ def in_futures(pe):
def update(self,force=False,register=True,filter_on_host_cap=True):
"""Update local database with packages definition from repositories
.. versionchanged:: 1.3.11
If self.strict_control_signature is False, control signatures are not checked else
package entries with no matching certificates are discarded.
Args:
force (boolean): update even if Packages index on repository has not been
updated since last update (based on http headers)
Expand Down
52 changes: 51 additions & 1 deletion tests/tests_waptpackage.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# along with WAPT. If not, see <http://www.gnu.org/licenses/>.
#
# -----------------------------------------------------------------------
__version__ = "1.5.1.8"
__version__ = "1.5.1.14"
import logging
import sys
import tempfile
Expand Down Expand Up @@ -901,10 +901,60 @@ def test_host_repo():
print repo.packages


def test_inter():
# test when inter ca is deployed on host, and packages signed with gest
caroottest_key = SSLPrivateKey('c:/private/caroottest.pem')
caroottest_key.create()
caroottest_key.save_as_pem()

caroottest_cert = caroottest_key.build_sign_certificate(cn='caroottest',is_ca=True)
caroottest_cert.save_as_pem('c:/private/caroottest.crt')

# inter CA
inter1test_key = SSLPrivateKey('c:/private/inter1test.pem')
inter1test_key.create()
inter1test_key.save_as_pem()

inter1test_cert = inter1test_key.build_sign_certificate(caroottest_key,caroottest_cert,cn='inter1test',is_ca=True)
inter1test_cert.save_as_pem('c:/private/inter1test.crt')

# leaf dev cert
devtest_key = SSLPrivateKey('c:/private/devtest.pem')
devtest_key.create()
devtest_key.save_as_pem()

devtest_cert = devtest_key.build_sign_certificate(inter1test_key,inter1test_cert,cn='devtest',is_code_signing=True)
devtest_cert.save_as_pem('c:/private/devtest.crt')

catestbundle = SSLCABundle(certificates = [caroottest_cert,inter1test_cert,devtest_cert])
catestbundle.save_as_pem('c:/private/catest-full.crt')

trusted = SSLCABundle(certificates = [inter1test_cert])

pe = PackageEntry('test')
pe.build_management_package()
pe.sign_package(devtest_cert,devtest_key)
print pe.check_control_signature(trusted)
print trusted.check_certificates_chain(pe.package_certificate())

print('OK')

csr = devtest_key.build_csr(cn='test2')
print csr

test2_cert = inter1test_cert.build_certificate_from_csr(csr,inter1test_key)
print test2_cert
print catestbundle.check_certificates_chain(test2_cert)






if __name__ == '__main__':
setup_test()
test_inter()

test_host_repo()
test_personalchain()
test_newcrypto()
Expand Down
2 changes: 1 addition & 1 deletion wapt-signpackages.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def main():

if options.if_needed:
try:
pe.check_control_signature(cabundle=signers_bundle,signers_bundle=signers_bundle)
pe.check_control_signature(trusted_bundle=signers_bundle,signers_bundle=signers_bundle)
for md in ensure_list(options.md):
if not pe.has_file(pe.get_signature_filename(md)):
raise Exception('Missing signature for md %s' % md)
Expand Down
193 changes: 190 additions & 3 deletions waptcrypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,9 @@ def add_chain_cache(cache_key,chain):
# append chain of trusted upstream CA certificates
issuer_chain = cert.verify_signature_with(self)
for issuer in issuer_chain:
if allow_pinned and issuer in self._certificates:
result.append(issuer)
break
issuer.verify_signature_with(self)
result.append(issuer)

Expand Down Expand Up @@ -673,12 +676,35 @@ def check_if_revoked(self,cert):
else:
return False

def as_pem(self):
return " \n".join([key.as_pem() for key in self._keys]) + \
" \n".join(["# CN: %s\n# Issuer CN: %s\n%s" % (crt.cn,crt.issuer_cn,crt.as_pem()) for crt in self._certificates]) + \
def as_pem(self,with_keys=True,password=None):
if isinstance(password,unicode):
password = password.encode('utf8')
# reorder by longest path to have leaf first
roots = [crt for crt in self._certificates]
return " \n".join([key.as_pem(password=password) for key in self._keys]) + \
" \n".join(["# CN: %s\n# Issuer CN: %s\n%s" % (crt.cn,crt.issuer_cn,crt.as_pem()) for crt in reversed(self._certificates)]) + \
" \n".join(["# CRL Issuer CN: %s\n%s" % (crl.issuer_cn,crl.as_pem()) for crl in self.crls])


def save_as_pem(self,filename,with_keys=True,password=None):
"""Save the RSA private key as a PEM encoded file
Optionnaly, encypt the key with a password.
Args:
filename (str) : filename of pem file to create. If not provided
use the filename from self.
password (str) : password. If None, don't encrypt the key.
if password is unicode, it is encoded in utf8 first.
"""
# get before opening file to be sure to not overwrite a file if pem data can not decrypted...

pem_data = self.as_pem(with_keys=with_keys,password=password)
with open(filename,'wb') as f:
f.write(pem_data)


def __repr__(self):
if len(self._certificates)<20:
return "<SSLCABundle %s crls:%s>" % (repr(self._certificates),self.crls)
Expand Down Expand Up @@ -1179,12 +1205,107 @@ def build_sign_certificate(self,
crypto_crt = builder.sign(ca_signing_key.rsa,algorithm=hashes.SHA256(), backend=default_backend())
return SSLCertificate(crt = crypto_crt)

def build_csr(self,
cn=None,
organizational_unit=None,
organization=None,
locality=None,
country=None,
dnsname=None,
email=None,
is_ca=False,
is_code_signing=None,
key_usages=['digital_signature','content_commitment','key_cert_sign','data_encipherment']
):
"""Build a certificate signing request with self public key and supplied attributes,
Args:
is_ca (bool) : certificate is a CA root or intermediate or self-signed
if None, default to True is ca_signing_cert is None
is_code_signing (bool): subject can sign code
if None, default to (not is_ca)
dnsname (str): Witll be added as an DNS SubjectAlternativeName.
key_usages (list of str) : list of certificate / key usage targets.
Returns:
SSLCertificateSigningRequest
"""

if is_code_signing is None:
is_code_signing = not is_ca

map = [
[x509.NameOID.COUNTRY_NAME,country or None],
[x509.NameOID.LOCALITY_NAME,locality or None],
[x509.NameOID.ORGANIZATION_NAME,organization or None],
[x509.NameOID.COMMON_NAME,cn or None],
[x509.NameOID.EMAIL_ADDRESS,email or None],
[x509.NameOID.ORGANIZATIONAL_UNIT_NAME,organizational_unit or None],
]
att = []
for (oid,value) in map:
if value is not None:
att.append(x509.NameAttribute(oid,ensure_unicode(value)))

subject = x509.Name(att)

extensions = []

extensions.append(dict(
extension=x509.BasicConstraints(ca=is_ca,path_length=None),
critical=True))

if is_ca and not 'crl_sign' in key_usages:
key_usages.append('crl_sign')

if is_code_signing:
extensions.append(dict(
extension=x509.ExtendedKeyUsage([x509.OID_CODE_SIGNING]),
critical=True))

extensions.append(dict(
extension=x509.SubjectKeyIdentifier.from_public_key(self.public_key()),
critical = False))


if dnsname is not None:
extensions.append(dict(
extension=x509.SubjectAlternativeName([x509.DNSName(ensure_unicode(dnsname))]),
critical=False))

for key_usage in key_usages:
kwargs = {}
for key in [ 'content_commitment','crl_sign','data_encipherment','decipher_only',
'digital_signature', 'encipher_only', 'key_agreement', 'key_cert_sign',
'key_encipherment']:
kwargs[key] = key in key_usages

extensions.append(dict(
extension=x509.KeyUsage(**kwargs),
critical=True))

public_key = self.public_key()

if not isinstance(public_key,rsa.RSAPublicKey):
raise TypeError('public_key must be an instance of rsa.RSAPublicKey')

builder = x509.CertificateSigningRequestBuilder(subject_name=subject)
for ext in extensions:
builder = builder.add_extension(
ext.get('extension'), ext.get('critical')
)

crypto_csr = builder.sign(self.rsa,algorithm=hashes.SHA256(), backend=default_backend())
return SSLCertificateSigningRequest(csr=crypto_csr)


def public_key(self):
"""Return the RSA public key object
Returns:
RSAPublicKey
"""

return self.rsa.public_key()

def public_key_as_pem(self):
Expand All @@ -1205,6 +1326,25 @@ def public_key_as_openssh(self):
pem = self.public_key().public_bytes(encoding=serialization.Encoding.OpenSSH,format=serialization.PublicFormat.OpenSSH)
return pem


class SSLCertificateSigningRequest(BaseObjectClass):
def __init__(self,csr=None,csr_filename=None,csr_pem_string=None):
self.csr_filename = csr_filename
if csr:
self.csr = csr
elif csr_pem_string:
self.csr = x509.load_pem_x509_csr(str(csr_pem_string),default_backend())
elif csr_filename:
with open(csr_filename, "rb") as f:
self.csr = x509.load_pem_x509_csr(f.read(),default_backend())

def as_pem(self):
return self.csr.public_bytes(serialization.Encoding.PEM)

def save_as_pem(self,filename):
with open(filename, "wb") as f:
f.write(self.as_pem())

class SSLCertificate(BaseObjectClass):
"""Hold a X509 public certificate"""
def __init__(self,crt_filename=None,crt=None,crt_string=None,ignore_validity_checks=False):
Expand Down Expand Up @@ -1735,6 +1875,53 @@ def verify_claim(self,claim,max_age_secs=None,required_attributes=[]):
verified_by=self.cn,
)

def build_certificate_from_csr(self,csr,ca_signing_key,validity_duration=365):
"""
Args:
csr (SSLCertificateSigningRequest):
ca_signing_key (SSLPrivateKey): sign the resulting certificate with this CA Key
Returns:
SSLCertificate
"""
if not csr.csr.is_signature_valid:
raise EWaptCryptoException('CSR signature check failed')

extensions = []

issuer = self.crt.subject
extensions.append(
dict(extension=x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
self.crt.extensions.get_extension_for_oid(x509.OID_SUBJECT_KEY_IDENTIFIER)),
critical=False))

serial_number = x509.random_serial_number()

builder = x509.CertificateBuilder().serial_number(
serial_number
).issuer_name(
issuer
).subject_name(
csr.csr.subject
).public_key(
csr.csr.public_key()
).not_valid_before(
datetime.datetime.utcnow(),
).not_valid_after(
datetime.datetime.utcnow()+datetime.timedelta(days=validity_duration)
)

for ext in csr.csr.extensions:
builder = builder.add_extension(ext.value, ext.critical)

for ext in extensions:
builder = builder.add_extension(
ext.get('extension'), ext.get('critical')
)

crypto_crt = builder.sign(ca_signing_key.rsa,algorithm=hashes.SHA256(), backend=default_backend())
return SSLCertificate(crt = crypto_crt)

class SSLCRL(BaseObjectClass):
def __init__(self,filename=None,pem_data=None,der_data=None):
self._crl = None
Expand Down
23 changes: 13 additions & 10 deletions waptpackage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1006,17 +1006,17 @@ def _sign_control(self,private_key,certificate):
private_key.sign_content(self._signed_content(),self._md or self._default_md))
return self.get_default_signed_attributes()

def check_control_signature(self,cabundle,signers_bundle=None):
def check_control_signature(self,trusted_bundle,signers_bundle=None):
"""Check in memory control signature against a list of public certificates
Args:
cabundle (SSLCABundle): Trusted certificates. : packages certificates must be signed by a one of this bundle.
trusted_bundle (SSLCABundle): Trusted certificates. : packages certificates must be signed by a one of this bundle.
signers_bundle : Optional. List of potential packages signers certificates chains.
When checking Packages index, actual
packages are not available, only certificates embedded in Packages index.
Package signature are checked againt these certificates
looking here for potential intermediate CA too.
and matching certificate is checked against cabundle.
and matching certificate is checked against trusted_bundle.
Returns:
SSLCertificate : matching trusted package's signers SSLCertificate
Expand All @@ -1036,20 +1036,23 @@ def check_control_signature(self,cabundle,signers_bundle=None):
"""
if not self.signature:
raise EWaptNotSigned('Package control %s on repo %s is not signed' % (self.asrequirement(),self.repo))
assert(isinstance(trusted_bundle,SSLCABundle))

assert(isinstance(cabundle,SSLCABundle))

signed_content = self._signed_content()
signature_raw = self.signature.decode('base64')
certs = self.package_certificate()
if certs is None and signers_bundle is not None:
certs = signers_bundle.certificate_chain(fingerprint = self.signer_fingerprint)
if not certs and trusted_bundle:
certs = trusted_bundle.certificate_chain(fingerprint = self.signer_fingerprint)
if not certs:
raise EWaptMissingCertificate('Control %s data has no matching certificate in Packages index or Package, please rescan your Packages index.' % self.asrequirement())

issued_by = cabundle.check_certificates_chain(certs)[-1]
#append trusted to ca

issued_by = trusted_bundle.check_certificates_chain(certs)[-1]
#logger.debug('Certificate %s is trusted by root CA %s' % (cert.subject,issued_by.subject))

signed_content = self._signed_content()
signature_raw = self.signature.decode('base64')
if certs[0].verify_content(signed_content,signature_raw,md=self._default_md):
self._md = self._default_md
return certs[0]
Expand Down Expand Up @@ -1159,7 +1162,7 @@ def sign_package(self,certificate,private_key=None,password_callback=None,privat
str: signature
"""
if not os.path.isfile(self.localpath) and not os.path.isdir(self.localpath):
if not self.localpath or (not os.path.isfile(self.localpath) and not os.path.isdir(self.localpath)):
raise Exception(u"%s is not a Wapt package" % self.localpath)

if isinstance(certificate,list):
Expand Down Expand Up @@ -2363,7 +2366,7 @@ def add(start,end):

try:
if self.cabundle is not None:
package.check_control_signature(cabundle=self.cabundle,signers_bundle = signer_certificates)
package.check_control_signature(cabundle=self.cabundle,signers_bundle = signer_certificates)
new_packages.append(package)
if package.package not in self._index or self._index[package.package] < package:
self._index[package.package] = package
Expand Down

0 comments on commit 92d03ce

Please sign in to comment.