forked from tranquilit/WAPT
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwaptcrypto.py
2421 lines (1984 loc) · 89.5 KB
/
waptcrypto.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/python
# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------
# This file is part of WAPT
# Copyright (C) 2013 Tranquil IT Systems http://www.tranquil.it
# WAPT aims to help Windows systems administrators to deploy
# setup and update applications on users PC.
#
# WAPT is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# WAPT is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with WAPT. If not, see <http://www.gnu.org/licenses/>.
#
# -----------------------------------------------------------------------
from __future__ import absolute_import
from waptutils import __version__
import os
import sys
import codecs
import base64
import hashlib
import glob
import subprocess
import logging
import time
import urlparse
import datetime
from cryptography import x509
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization,hashes
from cryptography.hazmat.primitives.asymmetric import padding,utils,rsa,AsymmetricVerificationContext,AsymmetricVerificationContext
from cryptography.x509.extensions import ExtensionNotFound,AccessDescription,DistributionPoint
from cryptography.x509.verification import CertificateVerificationContext, InvalidCertificate, InvalidSigningCertificate
from cryptography.x509.verification import CertificateRevocationListVerificationContext, InvalidCertificateRevocationList
from cryptography.fernet import Fernet
from OpenSSL import crypto
from OpenSSL import SSL
import certifi
import ssl
from waptutils import BaseObjectClass,wgets,jsondump,ensure_unicode,ensure_list,isodate2datetime
logger = logging.getLogger()
class EWaptCryptoException(Exception):
pass
class SSLVerifyException(EWaptCryptoException):
pass
class EWaptEmptyPassword(EWaptCryptoException):
pass
class EWaptMissingPrivateKey(EWaptCryptoException):
pass
class EWaptMissingCertificate(EWaptCryptoException):
pass
class EWaptBadCertificate(EWaptCryptoException):
pass
class EWaptCertificateBadSignature(EWaptBadCertificate):
pass
class EWaptCertificateUnknownIssuer(EWaptBadCertificate):
pass
class EWaptCertificateUntrustedIssuer(EWaptBadCertificate):
pass
class EWaptCertificateExpired(EWaptBadCertificate):
pass
class EWaptCertificateRevoked(EWaptBadCertificate):
pass
class EWaptBadKeyPassword(EWaptCryptoException):
pass
def check_key_password(key_filename,password=None):
"""Check if provided password is valid to read the PEM private key
Args:
password (str): or None if key is not encrypted.
"""
try:
if isinstance(password,unicode):
password = password.encode('utf8')
with open(key_filename,'rb') as key_pem:
serialization.load_pem_private_key(key_pem.read(),password or None,default_backend())
except (TypeError,ValueError) as e:
return False
return True
def read_in_chunks(f, chunk_size=1024*128):
"""Lazy function (generator) to read a file piece by piece.
Default chunk size: 128k."""
while True:
data = f.read(chunk_size)
if not data:
break
yield data
def hexdigest_for_file(fname, block_size=2**20,md='sha256'):
digest = hashlib.new(md)
with open(fname,'rb') as f:
while True:
data = f.read(block_size)
if not data:
break
digest.update(data)
return digest.hexdigest()
def hash_for_file(fname, block_size=2**20,md='sha256'):
digest = hashlib.new(md)
with open(fname,'rb') as f:
while True:
data = f.read(block_size)
if not data:
break
digest.update(data)
return digest.hexdigest()
def sha1_for_file(fname, block_size=2**20):
return hexdigest_for_file(fname, block_size=2**20,md='sha1')
def sha256_for_file(fname, block_size=2**20):
return hexdigest_for_file(fname, block_size=2**20,md='sha256')
def hexdigest_for_data(data,md='sha256'):
digest = hashlib.new(md)
assert(isinstance(data,str))
digest.update(data)
return digest.hexdigest()
def sha256_for_data(data):
return hexdigest_for_data(data,md='sha256')
def sha1_for_data(data):
return hexdigest_for_data(data,md='sha1')
def serialize_content_for_signature(content,pre_py3=False):
result = content
if isinstance(result,unicode):
result = result.encode('utf8')
elif isinstance(result,(list,dict)):
if pre_py3:
result = jsondump(result)
else:
result = jsondump(result,sort_keys=True,separators=(',', ':'))
return result
def default_pwd_callback(*args):
"""Default password callback for opening private keys.
"""
import getpass
print('Please type the password to decrypt the private key %s' % (args and args[0] or '',))
pwd = getpass.getpass().encode('ascii')
if pwd:
return pwd
else:
return None
def NOPASSWORD_CALLBACK(*args):
pass
def get_hash_algo(md='sha256'):
return {'sha1':hashes.SHA1(),
'sha256':hashes.SHA256(),
}.get(md,hashes.SHA256())
class SSLCABundle(BaseObjectClass):
BEGIN_KEY = '-----BEGIN ENCRYPTED PRIVATE KEY-----'
END_KEY = '-----END ENCRYPTED PRIVATE KEY-----'
BEGIN_CERTIFICATE = '-----BEGIN CERTIFICATE-----'
END_CERTIFICATE = '-----END CERTIFICATE-----'
BEGIN_CRL = '-----BEGIN X509 CRL-----'
END_CRL = '-----END X509 CRL-----'
md = 'sha256'
def __init__(self,cert_pattern_or_dir=None,callback=None,certificates=None):
"""Handle certificates checks giving a list of trusted certificates.
Can load and save PEM encoded CA certificates from directory and from supplied certificates list.
Args:
cert_pattern_or_dir (str): Loads CA trusted certs from here. Path to a directory or files pattern like c:/wapt/ssl/*.crt
callback (func): callback to decrypt keys in supplied PEM.
certificates (list) : list of *trusted* SSLCertificate to include.
"""
self._keys = []
self._certificates = []
self._certs_subject_hash_idx = {}
self._certs_fingerprint_idx = {}
self.crls = []
# store url : last failed http get datetime
self._crls_negative_cache = {}
# store check certificate chain check result with expiration
self._cert_chains_cache = {}
self.check_cache_ttl = 10 # minutes
if callback is None:
callback = default_pwd_callback
self.callback = callback
if cert_pattern_or_dir is not None:
self.add_pems(cert_pattern_or_dir,load_keys=False)
if certificates is not None:
self.add_certificates(certificates)
def clear(self):
del self._keys[:]
del self._certificates[:]
self._certs_subject_hash_idx.clear()
self._certs_fingerprint_idx.clear()
del self.crls[:]
self._crls_negative_cache.clear()
self._cert_chains_cache.clear()
def add_pems(self,cert_pattern_or_dir=u'*.crt',load_keys=False):
if cert_pattern_or_dir:
if os.path.isdir(cert_pattern_or_dir):
# load pems from provided directory
for fn in glob.glob(os.path.join(cert_pattern_or_dir,u'*.crt'))+glob.glob(os.path.join(cert_pattern_or_dir,u'*.pem')):
self.add_certificates_from_pem(pem_filename = fn,load_keys=load_keys)
else:
# load pems based on file wildcards
for fn in glob.glob(cert_pattern_or_dir):
self.add_certificates_from_pem(pem_filename = fn,load_keys=load_keys)
return self
def add_certificates(self,certificates):
"""Add a list of certificates to the bundle and index them.
Args:
certificates (list): certificates (SSLCertificate instances) to add to the bundle.
Returns:
list of SSLCertificates actually added
"""
self._cert_chains_cache.clear()
if not isinstance(certificates,list):
certificates = [certificates]
result = []
for cert in certificates:
try:
if not cert.fingerprint in self._certs_fingerprint_idx:
self._certs_subject_hash_idx[cert.subject_hash] = cert
self._certs_fingerprint_idx[cert.fingerprint] = cert
self._certificates.append(cert)
result.append(cert)
except Exception as e:
logger.warning(u'Error adding certificate %s: %s' % (cert.subject,e))
return result
def add_certificates_from_pem(self,pem_data=None,load_keys=False,pem_filename=None):
"""Parse a PEM encoded bundle with multiple certificates, CRL and keys.
If key needs to be decrypted, password callback property must be assigned.
Returns:
SSLCABundle : self
"""
if pem_data is None:
if os.path.isfile(pem_filename):
pem_data = open(pem_filename,'rb').read()
else:
raise EWaptCryptoException(u'PEM file %s does not exist'%pem_filename)
lines = pem_data.splitlines()
inkey = False
incert = False
incrl = False
tmplines = []
result = []
keys = []
crls = []
for line in lines:
if line == self.BEGIN_CERTIFICATE:
tmplines = [line]
incert = True
elif line == self.END_CERTIFICATE:
tmplines.append(line)
cert = SSLCertificate(crt_string = str('\n'.join(tmplines)))
cert._public_cert_filename = pem_filename
result.append(cert)
incert = False
tmplines = []
elif line == self.BEGIN_CRL:
tmplines = [line]
incrl = True
elif line == self.END_CRL:
tmplines.append(line)
crl = SSLCRL (pem_data = str('\n'.join(tmplines)))
crl.filename = pem_filename
crls.append(crl)
incrl = False
tmplines = []
elif line == self.BEGIN_KEY:
tmplines = [line]
inkey = True
elif line == self.END_KEY:
tmplines.append(line)
if load_keys:
key_pem_data = str('\n'.join(tmplines))
key = SSLPrivateKey(pem_data = key_pem_data,callback=self.callback)
key.private_key_filename = pem_filename
keys.append(key)
inkey = False
tmplines = []
else:
if inkey or incert or incrl:
tmplines.append(line)
self.add_certificates(result)
for crl in crls:
self.add_crl(crl)
self._keys.extend(keys)
return self
def key(self,modulus,password):
for k in self._keys:
if k.modulus == modulus:
return k
return None
def certificate(self,fingerprint):
"""Returns the certificate matching the supplied sha256 fingerprint
Args:
fingerprint (str): hex encoded sha256 certificate fingerprint to lookup
Returns:
SSLCertificate
"""
if not isinstance(fingerprint,(str,unicode)):
raise EWaptCryptoException(u'A certificate fingerprint as bytes str is expected, %s supplied' % fingerprint)
return self._certs_fingerprint_idx.get(fingerprint,None)
def certificate_for_cn(self,cn):
"""Handles wildcards cn..."""
for cert in self._certificates:
if (cert.cn == cn) or (cn and cert.cn and glob.fnmatch.fnmatch(cn,cert.cn)):
return cert
return None
def certificate_for_subject_key_identifier(self,subject_key_identifier):
for cert in self._certificates:
if (cert.subject_key_identifier == subject_key_identifier):
return cert
return None
def certificate_for_subject_hash(self,subject_hash):
return self._certs_subject_hash_idx.get(subject_hash,None)
def keys(self):
return self._keys
def certificates(self,valid_only=False):
return [crt for crt in self._certificates if not valid_only or crt.is_valid()]
def matching_certs(self,key,ca=None,code_signing=None,valid=True):
return [
crt for crt in self._certificates if
(valid is None or crt.is_valid() == valid) and
(code_signing is None or crt.is_code_signing == code_signing) and
(ca is None or crt.is_ca == ca) and
crt.match_key(key)
]
def certificate_chain(self,certificate=None,fingerprint=None):
"""return certificates chain from certificate, without checking certificate signatures and validity
Returns:
list: list of certificates starting with leaf up to root CA.
"""
result = []
if not certificate and fingerprint:
certificate = self.certificate(fingerprint = fingerprint)
if not certificate:
raise EWaptCryptoException('certificate_chain: certificate not found')
issuer_cert = self.issuer_cert_for(certificate)
# we include the certificate in the chain if it is itself in the cabundle evane if we have not found the issuer
if issuer_cert or self.certificate(fingerprint = fingerprint):
result.append(certificate)
while issuer_cert:
# TODO : verify certificate.signature with issuercert public key
if issuer_cert and not issuer_cert.is_ca:
logger.debug(u'Certificate %s issued by non CA certificate %s' % (certificate,issuer_cert))
break
result.append(issuer_cert)
# halt on top self signed certificate
if issuer_cert.subject_hash == issuer_cert.issuer_subject_hash:
break
issuer_cert = self.issuer_cert_for(issuer_cert)
return result
def is_known_issuer(self,certificate,include_self=True):
"""Check if certificate is issued by one of this certificate bundle CA
and check certificate signature. Return top most CA.
Top most CA should be trusted somewhere...
Args:
certificate: certificate to check
include_self: if certificate is in bunclde, accept it (pining)
Return:
SSLCertificate: issuer certificate or None
"""
if include_self and isinstance(certificate,SSLCertificate) and certificate.fingerprint in self._certs_fingerprint_idx:
return certificate
cert_chain = certificate.verify_signature_with(self)
if cert_chain:
return cert_chain[-1]
else:
return None
def is_valid_certificate(self,certificate,check_revoke=True):
"""Check if certificate valid using ssl store context
Args:
certificate: certificate to check
include_self: if certificate is in bundle, accept it (pining)
Returns:
SSLCertificate: issuer certificate or None
"""
flags = (crypto.X509StoreFlags.CB_ISSUER_CHECK |
crypto.X509StoreFlags.CHECK_SS_SIGNATURE
)
if check_revoke and certificate.crl_urls():
flags = flags | crypto.X509StoreFlags.CRL_CHECK
store = crypto.X509Store()
store.set_flags(flags)
for cert in self._certificates:
if cert.is_valid():
store.add_cert(cert.as_X509())
for crl in self.crls:
crlcert = crypto.load_crl(crypto.FILETYPE_ASN1,crl.as_der())
store.add_crl(crlcert)
store_ctx = crypto.X509StoreContext(store,cert.as_X509())
try:
verify = store_ctx.verify_certificate()
return True
except crypto.X509StoreContextError as e:
logger.critical(u'Error for certificate %s. Faulty certificate is %s: %s' % (certificate,e.certificate.get_subject(),e))
raise
def check_certificates_chain(self,cert_chain,verify_expiry=True,verify_revoke=True,allow_pinned=True):
"""Check if first certificate in cert_chain is approved
by one of the CA certificate from this bundle.
If intermediate issuers can not be found in this ca bundle, try to get them from
supplied cert_chain.
Args:
cert_chain (list) : list of certificates. first one is starting point. The other ones are used if None can be found in cabundle
verify_expiry (bool) : Check if certificates expiry dates are okay relative to today.
verify_revoke (bool) : Check if certificate is not in the CRLs (if certificate contains crl location URL)
CRL must have been already retrieved using update_crl.
allow_pinned (bool) : If True, accept certificate if it is in trusted certificates, even if we don't know the issuer.
Returns:
(list) : SSLCertificate chain of trusted cert
"""
def check_cert(cert):
if verify_expiry and not cert.is_valid():
raise EWaptCertificateExpired(u'Certificate %s is expired' % cert)
if verify_revoke:
self.check_if_revoked(cert)
return cert
def add_chain_cache(cache_key,chain,reason):
logger.debug('Stores cert chain check in cache')
self._cert_chains_cache[cache_key] = (time.time() + self.check_cache_ttl * 60,chain,reason)
if isinstance(cert_chain,SSLCABundle):
cert_chain = cert_chain._certificates
if isinstance(cert_chain,SSLCertificate):
cert_chain = [cert_chain]
if not cert_chain:
raise Exception('No certificates to check')
cert = cert_chain[0]
# try to get a cached result
cache_key = (cert.fingerprint,verify_expiry,verify_revoke,allow_pinned)
(cache_expiration_date,cached_chain,reason) = self._cert_chains_cache.get(cache_key,(None,None,None))
if not cache_expiration_date or cache_expiration_date < time.time():
# build an index of certificates in chain for intermediates CA
idx = dict([crt.subject_key_identifier,crt] for crt in cert_chain)
check_cert(cert)
result= [cert]
while cert:
try:
# trust the cert itself if it is the bundle, even if issuer is unknown at this stage.
if allow_pinned and cert in self._certificates:
reason = u'Certificate "%s" is trusted by himself' % cert.cn
add_chain_cache(cache_key,result,reason)
return result
# append chain of trusted upstream CA certificates
issuer_chain = cert.verify_signature_with(self)
for issuer in issuer_chain:
if allow_pinned and issuer in self._certificates:
result.append(issuer)
break
issuer.verify_signature_with(self)
result.append(issuer)
reason = u'Certificate "%s" is trusted' % cert.cn
add_chain_cache(cache_key,result,reason)
return result
except SSLVerifyException as e:
# try to use intermediate from supplied list
issuer = idx.get(cert.authority_key_identifier,None)
reason = u'None of certificates (%s) are trusted.' % (','.join(['"%s"' % c.cn for c in cert_chain]))
if not issuer:
add_chain_cache(cache_key,[],reason)
raise EWaptCertificateUnknownIssuer(reason)
if issuer == cert:
add_chain_cache(cache_key,[],reason)
raise EWaptCertificateUnknownIssuer(reason)
if cert.verify_signature_with(issuer):
check_cert(issuer)
if cert != issuer:
result.append(issuer)
cert = issuer
# return cached checked chain
elif cached_chain:
return cached_chain
#reason = u'None of certificates (%s) are trusted.' % (','.join(['"%s"' % c.cn for c in cert_chain]))
# store negative caching
if cached_chain is None:
add_chain_cache(cache_key,[],reason)
raise EWaptCertificateUntrustedIssuer(reason)
def add_crl(self,crl):
"""Replace or Add pem encoded CRL"""
self._cert_chains_cache.clear()
oldcrl = self.crl_for_authority_key_identifier(crl.authority_key_identifier)
if oldcrl is None:
# check with alternative method
oldcrl = self.crl_for_issuer_subject_hash(crl.issuer_subject_hash)
if (oldcrl and crl > oldcrl) or not oldcrl:
if oldcrl:
self.crls.remove(oldcrl)
self.crls.append(crl)
def crl_for_authority_key_identifier(self,authority_key_identifier):
for crl in self.crls:
if crl.authority_key_identifier == authority_key_identifier:
return crl
return None
def crl_for_issuer_subject_hash(self,issuer_subject_hash):
for crl in self.crls:
if crl.issuer_subject_hash == issuer_subject_hash:
return crl
return None
def download_issuer_certs(self,force=False,for_certificates=None):
"""Download and add CA certs using authorityInfoAccess access_location
No check is attempted on cert signatures.
Returns:
list: of missing downloaded SSLCertificates
"""
result = []
if for_certificates is None:
for_certificates = self._certificates
if isinstance(for_certificates,SSLCertificate):
for_certificates = [for_certificates]
for cert in for_certificates:
issuer_cert = self.issuer_cert_for(cert)
if not issuer_cert:
issuer_urls = cert.issuer_cert_urls()
for url in issuer_urls:
try:
logger.debug(u'Download certificate %s' % (url,))
cert_data = wgets(url,timeout=(0.3,2.0))
issuer_cert = SSLCertificate(crt_string = cert_data)
self.add_certificates(issuer_cert)
result.append(issuer_cert)
if self.issuer_cert_for(issuer_cert) is None:
result.extend(self.download_issuer_certs(force=False,for_certificates=issuer_cert))
break
except Exception as e:
logger.warning(u'Unable to download certificate from %s: %s' % (url,repr(e)))
pass
return result
def issuer_cert_for(self,certificate):
return self.certificate_for_subject_key_identifier(certificate.authority_key_identifier) or self.certificate_for_subject_hash(certificate.issuer_subject_hash)
def update_crl(self,force=False,for_certificates=None,cache_dir=None,timeout=2.0,proxies=None):
"""Download and update all crls for certificates in this bundle or
for certificates in for_certificates list
Returns:
list: list of downloaded / updated CRL
"""
# TODO : to be moved to an abstracted wapt https client
result = []
if for_certificates is None:
for_certificates = self._certificates
if isinstance(for_certificates,SSLCertificate):
for_certificates = [ for_certificates ]
for cert in for_certificates:
crl_urls = cert.crl_urls()
for url in crl_urls:
ssl_crl = self.crl_for_authority_key_identifier(cert.authority_key_identifier)
if ssl_crl is None:
# check with alternative method
ssl_crl = self.crl_for_issuer_subject_hash(cert.issuer_subject_hash)
if force or not ssl_crl or ssl_crl.next_update < datetime.datetime.utcnow():
try:
if not force:
self._check_url_in_negative_cache(url)
logger.debug(u'Download CRL %s' % (url,))
if cache_dir:
crl_filename = os.path.join(cache_dir,urlparse.urlparse(url).path.split('/')[-1])
else:
crl_filename = None
# try to find CRL in cache dir
crl_data = None
if cache_dir and os.path.isfile(crl_filename):
try:
crl_data = open(crl_filename,'rb').read()
ssl_crl = SSLCRL(der_data = crl_data)
except Exception as e:
crl_data = None
ssl_crl = None
# get it from remote location
if not crl_data:
crl_data = wgets(url,timeout=timeout,proxies=proxies)
try:
ssl_crl = SSLCRL(der_data = crl_data)
except Exception as e:
logger.debug('trying PEM format...')
ssl_crl = SSLCRL(pem_data = crl_data)
ssl_crl.verify_signature_with(self)
self.add_crl(ssl_crl)
result.append(ssl_crl)
except Exception as e:
self._crls_negative_cache[url] = datetime.datetime.utcnow()
logger.warning(u'Unable to download CRL from %s: %s' % (url,repr(e)))
pass
elif ssl_crl:
logger.debug(u'CRL %s does not yet need to be refreshed from location %s' % (ssl_crl,url))
return result
def _check_url_in_negative_cache(self,url):
last = self._crls_negative_cache.get(url,None)
if last:
if datetime.datetime.utcnow() - last < datetime.timedelta(hours = 1):
raise Exception('Url in negative cache')
else:
del self._crls_negative_cache[url]
def check_if_revoked(self,cert):
"""Raise exception if certificate has been revoked before now"""
crl = self.crl_for_authority_key_identifier(cert.authority_key_identifier)
if crl is None:
# check with alternative method
crl = self.crl_for_issuer_subject_hash(cert.issuer_subject_hash)
if crl:
if crl.next_update < datetime.datetime.utcnow():
raise Exception(u'CRL is too old, revoke test failed for %s'% cert)
revoked_on = crl.is_revoked(cert)
if revoked_on and revoked_on < datetime.datetime.utcnow():
raise EWaptCertificateRevoked(u'Certificate %s has been revoked on %s' % (cert.cn,revoked_on))
else:
return False
def as_pem(self,with_keys=True,password=None):
if isinstance(password,unicode):
password = password.encode('utf8')
# reorder by longest path to have leaf first
roots = [crt for crt in self._certificates]
return " \n".join([key.as_pem(password=password) for key in self._keys]) + \
" \n".join(["# CN: %s\n# Issuer CN: %s\n%s" % (crt.cn,crt.issuer_cn,crt.as_pem()) for crt in reversed(self._certificates)]) + \
" \n".join(["# CRL Issuer CN: %s\n%s" % (crl.issuer_cn,crl.as_pem()) for crl in self.crls])
def save_as_pem(self,filename,with_keys=True,password=None):
"""Save the RSA private key as a PEM encoded file
Optionnally, encrypt the key with a password.
Args:
filename (str) : filename of pem file to create. If not provided
use the filename from self.
password (str) : password. If None, don't encrypt the key.
if password is unicode, it is encoded in utf8 first.
"""
# get before opening file to be sure to not overwrite a file if pem data can not decrypted...
pem_data = self.as_pem(with_keys=with_keys,password=password)
with open(filename,'wb') as f:
f.write(pem_data)
def __repr__(self):
if len(self._certificates)<20:
return "<SSLCABundle %s crls:%s>" % (repr(self._certificates),self.crls)
else:
return "<SSLCABundle %s certificates, %s crls>" % (len(self._certificates),len(self.crls))
def __add__(self,otherbundle):
return SSLCABundle(certificates = self._certificates+otherbundle._certificates)
def __substract__(self,otherbundle):
certificates = self._certificates
for cert in otherbundle._certificates:
if not cert.fingerprint in self._certs_fingerprint_idx:
certificates.append(cert)
return SSLCABundle(certificates=certificates)
def certificates_sha256_fingerprints(self):
"""Returns csv of sha256 fingerprints
Returns:
str
"""
return ','.join([cert.fingerprint for cert in self.certificates()])
def get_peer_cert_chain_from_server(url):
"""Returns list of SSLCertificates from initial handshake of https server
Add certificates to current SSLCAchain
First certificate is certificate for URL's FQDN, next are intermediate ones.
"""
def verify_cb(conn, cert, errnum, depth, ok):
return ok
url = str(url)
location = urlparse.urlparse(url)
client_ctx = SSL.Context(SSL.SSLv23_METHOD)
client_ctx.set_verify(SSL.VERIFY_NONE, verify_cb)
client = SSL.Connection(client_ctx, SSL.socket.socket())
client.set_connect_state()
# for SNI
client.set_tlsext_host_name(location.hostname)
client.connect((location.hostname,location.port or 443))
client.do_handshake()
result = []
chain = client.get_peer_cert_chain()
for cert in chain:
pem_data = crypto.dump_certificate(crypto.FILETYPE_PEM,cert)
result.append(SSLCertificate(crt_string=pem_data))
return result
def get_pem_server_certificate(url,save_to_file=None):
"""Retrieve single certificate from ssl server for further checks
Returns:
str: pem encoded data
"""
url = str(url)
url = urlparse.urlparse(url)
if url.scheme == 'https':
# try a connection to get server certificate
pem_data = str(ssl.get_server_certificate((url.hostname, url.port or 443)))
if save_to_file:
open(save_to_file,'wb').write(pem_data)
return pem_data
else:
return None
def get_cert_chain_as_pem(certificates_chain):
"""Build a x509 encoded PEM string from a list of certificates
Args:
certificates_chain (list) : list of SSLCertificates
Returns
str: x509 pem encoded (utf8)
"""
if certificates_chain is None:
return None
return (u" \n".join(["# CN: %s\n# Issuer CN: %s\n%s" % (crt.cn,crt.issuer_cn,crt.as_pem()) for crt in certificates_chain])).encode('utf-8')
class SSLPrivateKey(BaseObjectClass):
def __init__(self,filename=None,pem_data=None,callback=None,password = None):
"""Args:
private_key (str) : Filename Path to PEM encoded Private Key
key (PKey) : Public/[private] PKey structure
callback (func) : Called to provide password for the key if needed.
If password is set (not None), this parameter is ignored
else if None, default is default_pwd_callback.
password (str) : passpharse to decrypt private key.
If '', no decryption and no password is asked. RSA key loadind will fail.
"""
self.private_key_filename = filename
if password == '':
callback = NOPASSWORD_CALLBACK
else:
if password is None and callback is None:
callback = default_pwd_callback
self.password_callback = callback
if isinstance(password,unicode):
password = password.encode('utf8')
self._rsa = None
self.pem_data = pem_data
if not self.pem_data and self.private_key_filename and os.path.isfile(self.private_key_filename):
self._load_pem_data_from_file()
# decrypt immediately if possible...
if self.pem_data and (not self._is_encrypted or password is not None):
self.load_key_data(self.pem_data,password)
def create(self,bits=2048):
"""Create a RSA key pair"""
self._rsa = rsa.generate_private_key(
public_exponent=65537,
key_size=bits,
backend=default_backend())
return self
def _is_encrypted(self):
return 'ENCRYPTED' in self.pem_data
def as_pem(self,password=None):
"""Return private key as a PEM str
Args;
password (str): password to use to encrypt the key.
Returns:
str: pem encoded RSA Private key.
"""
if isinstance(password,unicode):
password = password.encode('utf8')
if password is not None:
enc = serialization.BestAvailableEncryption(password)
else:
enc = serialization.NoEncryption()
pem = self.rsa.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=enc,
)
return pem
def save_as_pem(self,filename=None,password=None):
"""Save the RSA private key as a PEM encoded file
Optionnaly, encypt the key with a password.
Args:
filename (str) : filename of pem file to create. If not provided
use the filename from self.
password (str) : password. If None, don't encrypt the key.
if password is unicode, it is encoded in utf8 first.
"""
if filename is None:
filename = self.private_key_filename
if isinstance(password,unicode):
password = password.encode('utf8')
# get before opening file to be sure to not overwrite a file if pem data can not decrypted...
pem_data = self.as_pem(password=password)
with open(filename,'wb') as f:
f.write(pem_data)
self.private_key_filename = filename
def load_key_data(self,pem_data,password=None):
"""Load RSA structure with the provided pem_data
Args;
pem_data (str) : base64 PEM style encoded RSA private key
password (str) : try with this password first. If dails to decrypt, use password_callback if provided
Returns:
None
"""
retry_cnt=3
while retry_cnt>0:
try:
self._rsa = serialization.load_pem_private_key(
str(pem_data),
password = password,
backend = default_backend())
break
except (TypeError,ValueError) as e:
if "Password was not given but private key is encrypted" in e.message or\
"Bad decrypt. Incorrect key passphrase ?" in e.message and self.password_callback is not None:
retry_cnt -= 1
password = self.password_callback(self.private_key_filename)
if password == '':
password = None
if isinstance(password,unicode):
password = password.encode('utf8')
else:
raise
def _load_pem_data_from_file(self):
with open(self.private_key_filename,'rb') as pem_file:
self.pem_data = pem_file.read()
@property
def rsa(self):
"""access to RSA keys
>>> key = SSLPrivateKey('c:/private/tranquilit2.pem')
>>> key.rsa
Please type the password to decrypt the private key
<cryptography.hazmat.backends.openssl.rsa._RSAPrivateKey object at 0x040ECE70>
>>> key.rsa.public_key()
<bound method _RSAPrivateKey.public_key of <cryptography.hazmat.backends.openssl.rsa._RSAPrivateKey object at 0x040ECE70>>
>>> key.rsa.private_bytes()
<bound method _RSAPrivateKey.private_bytes of <cryptography.hazmat.backends.openssl.rsa._RSAPrivateKey object at 0x040ECE70>>
"""
if not self._rsa:
if not self.pem_data and self.private_key_filename and os.path.isfile(self.private_key_filename):
self._load_pem_data_from_file()
self.load_key_data(self.pem_data)
if not self._rsa:
raise EWaptEmptyPassword(u'Unable to load key %s'%self.private_key_filename)
return self._rsa
def sign_content(self,content,md='sha256',block_size=2**20,pre_py3=True):
""" Sign content with the private_key, return the signature
If content is not a raw string, it is first encoded in json or utf8
Args:
content (str, list or dict): content to sign
md (str): lessage digest type to use
clock_size (int) : unused
pre_py3 (bool) : if True serialization is not compatible with python3
(keys are sorted in the undeterministic python2 order
and there are spaces in json seperators)
Returns:
bytes: signature
"""
#apadding = padding.PSS(
# mgf=padding.MGF1(hashes.SHA256()),
# salt_length=padding.PSS.MAX_LENGTH)
apadding = padding.PKCS1v15()
algo = get_hash_algo(md)
content = serialize_content_for_signature(content,pre_py3=pre_py3)