|
18 | 18 | from cryptography import x509 |
19 | 19 | from cryptography.hazmat.backends import default_backend |
20 | 20 | from cryptography.hazmat.primitives.asymmetric import ec as cryptography_ec |
| 21 | +from cryptography.hazmat.primitives.asymmetric import padding |
21 | 22 | from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature |
22 | 23 | from cryptography.hazmat.primitives.hashes import SHA256, SHA384 |
23 | 24 | from cryptography.x509.oid import NameOID |
@@ -154,6 +155,19 @@ def detail(self): |
154 | 155 | ) |
155 | 156 |
|
156 | 157 |
|
| 158 | +class CertificateChainBroken(BadCertificate): |
| 159 | + def __init__(self, previous_cert, next_cert): |
| 160 | + self.previous_cert = previous_cert |
| 161 | + self.next_cert = next_cert |
| 162 | + |
| 163 | + @property |
| 164 | + def detail(self): |
| 165 | + return ( |
| 166 | + "Certificate chain is not continuous. " |
| 167 | + f"Expected {self.previous_cert!r} to sign {self.next_cert!r}" |
| 168 | + ) |
| 169 | + |
| 170 | + |
157 | 171 | class BadSignature(Exception): |
158 | 172 | detail = "Unknown signature problem" |
159 | 173 |
|
@@ -274,6 +288,19 @@ async def verify_x5u(self, url): |
274 | 288 | if root_hash != self.root_hash: |
275 | 289 | raise CertificateHasWrongRoot(expected=self.root_hash, actual=root_hash) |
276 | 290 |
|
| 291 | + current_cert = chain[0] |
| 292 | + for next_cert in chain[1:]: |
| 293 | + try: |
| 294 | + current_cert.public_key().verify( |
| 295 | + next_cert.signature, |
| 296 | + next_cert.tbs_certificate_bytes, |
| 297 | + padding.PKCS1v15(), |
| 298 | + next_cert.signature_hash_algorithm, |
| 299 | + ) |
| 300 | + except cryptography.exceptions.InvalidSignature: |
| 301 | + raise CertificateChainBroken(current_cert, next_cert) |
| 302 | + current_cert = next_cert |
| 303 | + |
277 | 304 | leaf_subject_name = ( |
278 | 305 | certs[0].subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value |
279 | 306 | ) |
|
0 commit comments