mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-25 09:37:37 +00:00
move cert serialization to cryptography
This commit is contained in:
parent
3fe29b27be
commit
48b166ab57
@ -38,7 +38,7 @@ If you depend on these features, please raise your voice in
|
||||
### Full Changelog
|
||||
|
||||
* New Proxy Core based on sans-io pattern (@mhils)
|
||||
* Use cryptography to generate certificates, not pyOpenSSL (@mhils)
|
||||
* Use pyca/cryptography to generate certificates, not pyOpenSSL (@mhils)
|
||||
* Remove the legacy protocol stack (@Kriechi)
|
||||
* Remove all deprecated pathod and pathoc tools and modules (@Kriechi)
|
||||
* --- TODO: add new PRs above this line ---
|
||||
|
@ -4,18 +4,20 @@ import ipaddress
|
||||
import os
|
||||
import ssl
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Tuple, Optional, Union, Dict, List
|
||||
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
from cryptography.hazmat.primitives.serialization import pkcs12
|
||||
from cryptography.x509 import NameOID, ExtendedKeyUsageOID
|
||||
from pyasn1.codec.der.decoder import decode
|
||||
from pyasn1.error import PyAsn1Error
|
||||
from pyasn1.type import univ, constraint, char, namedtype, tag
|
||||
|
||||
import OpenSSL
|
||||
from cryptography.x509 import NameOID, ExtendedKeyUsageOID
|
||||
from mitmproxy.coretypes import serializable
|
||||
|
||||
# Default expiry must not be too long: https://github.com/mitmproxy/mitmproxy/issues/815
|
||||
@ -40,13 +42,17 @@ rD693XKIHUCWOjMh1if6omGXKHH40QuME2gNa50+YPn1iYDl88uDbbMCAQI=
|
||||
"""
|
||||
|
||||
|
||||
def create_ca(organization: str, cn: str, key_size: int) -> Tuple[OpenSSL.crypto.PKey, OpenSSL.crypto.X509]:
|
||||
def create_ca(
|
||||
organization: str,
|
||||
cn: str,
|
||||
key_size: int,
|
||||
) -> Tuple[rsa.RSAPrivateKeyWithSerialization, x509.Certificate]:
|
||||
now = datetime.datetime.now()
|
||||
|
||||
private_key = rsa.generate_private_key(
|
||||
public_exponent=65537,
|
||||
key_size=key_size,
|
||||
)
|
||||
) # type: ignore
|
||||
name = x509.Name([
|
||||
x509.NameAttribute(NameOID.COMMON_NAME, cn),
|
||||
x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization)
|
||||
@ -73,8 +79,8 @@ def create_ca(organization: str, cn: str, key_size: int) -> Tuple[OpenSSL.crypto
|
||||
decipher_only=False,
|
||||
), critical=True)
|
||||
builder = builder.add_extension(x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()), critical=False)
|
||||
cert = builder.sign(private_key=private_key, algorithm=hashes.SHA256())
|
||||
return OpenSSL.crypto.PKey.from_cryptography_key(private_key), OpenSSL.crypto.X509.from_cryptography(cert)
|
||||
cert = builder.sign(private_key=private_key, algorithm=hashes.SHA256()) # type: ignore
|
||||
return private_key, cert
|
||||
|
||||
|
||||
def dummy_cert(
|
||||
@ -99,27 +105,31 @@ def dummy_cert(
|
||||
XX_cacert: x509.Certificate = cacert.to_cryptography()
|
||||
XX_commonname: Optional[str] = commonname.decode("idna") if commonname else None
|
||||
XX_organization: Optional[str] = organization.decode() if organization else None
|
||||
XX_sans: Optional[List[str]] = [x.decode("ascii") for x in sans]
|
||||
|
||||
now = datetime.datetime.now()
|
||||
XX_sans: List[str] = [x.decode("ascii") for x in sans]
|
||||
|
||||
builder = x509.CertificateBuilder()
|
||||
builder = builder.issuer_name(XX_cacert.subject)
|
||||
builder = builder.add_extension(x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), critical=False)
|
||||
builder = builder.public_key(XX_cacert.public_key())
|
||||
|
||||
now = datetime.datetime.now()
|
||||
builder = builder.not_valid_before(now - datetime.timedelta(days=2))
|
||||
builder = builder.not_valid_after(now + CERT_EXPIRY)
|
||||
builder = builder.issuer_name(XX_cacert.subject)
|
||||
|
||||
subject = []
|
||||
is_valid_commonname = (
|
||||
commonname is not None and len(commonname) < 64
|
||||
XX_commonname is not None and len(XX_commonname) < 64
|
||||
)
|
||||
if is_valid_commonname:
|
||||
assert XX_commonname is not None
|
||||
subject.append(x509.NameAttribute(NameOID.COMMON_NAME, XX_commonname))
|
||||
if organization is not None:
|
||||
if XX_organization is not None:
|
||||
assert XX_organization is not None
|
||||
subject.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, XX_organization))
|
||||
builder = builder.subject_name(x509.Name(subject))
|
||||
builder = builder.serial_number(x509.random_serial_number())
|
||||
|
||||
ss = []
|
||||
ss: List[x509.GeneralName] = []
|
||||
for x in XX_sans:
|
||||
try:
|
||||
ip = ipaddress.ip_address(x)
|
||||
@ -129,18 +139,17 @@ def dummy_cert(
|
||||
ss.append(x509.IPAddress(ip))
|
||||
# RFC 5280 §4.2.1.6: subjectAltName is critical if subject is empty.
|
||||
builder = builder.add_extension(x509.SubjectAlternativeName(ss), critical=not is_valid_commonname)
|
||||
builder = builder.add_extension(x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), critical=False)
|
||||
builder = builder.public_key(XX_cacert.public_key())
|
||||
cert = builder.sign(private_key=XX_privkey, algorithm=hashes.SHA256())
|
||||
cert = builder.sign(private_key=XX_privkey, algorithm=hashes.SHA256()) # type: ignore
|
||||
return Cert(OpenSSL.crypto.X509.from_cryptography(cert))
|
||||
|
||||
|
||||
@dataclass
|
||||
class CertStoreEntry:
|
||||
|
||||
def __init__(self, cert, privatekey, chain_file):
|
||||
self.cert = cert
|
||||
self.privatekey = privatekey
|
||||
self.chain_file = chain_file
|
||||
cert: OpenSSL.crypto.X509
|
||||
# cert: x509.Certificate
|
||||
privatekey: OpenSSL.crypto.PKey
|
||||
# privatekey: rsa.RSAPrivateKey
|
||||
chain_file: str
|
||||
|
||||
|
||||
TCustomCertId = bytes # manually provided certs (e.g. mitmproxy's --certs)
|
||||
@ -240,44 +249,45 @@ class CertStore:
|
||||
organization = organization or basename
|
||||
cn = cn or basename
|
||||
|
||||
key: rsa.RSAPrivateKeyWithSerialization
|
||||
ca: x509.Certificate
|
||||
key, ca = create_ca(organization=organization, cn=cn, key_size=key_size)
|
||||
# Dump the CA plus private key
|
||||
with CertStore.umask_secret(), (path / f"{basename}-ca.pem").open("wb") as f:
|
||||
f.write(
|
||||
OpenSSL.crypto.dump_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM,
|
||||
key))
|
||||
f.write(
|
||||
OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM,
|
||||
ca))
|
||||
f.write(key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption(),
|
||||
))
|
||||
f.write(ca.public_bytes(serialization.Encoding.PEM))
|
||||
|
||||
# Dump the certificate in PEM format
|
||||
with (path / f"{basename}-ca-cert.pem").open("wb") as f:
|
||||
f.write(
|
||||
OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM,
|
||||
ca))
|
||||
f.write(ca.public_bytes(serialization.Encoding.PEM))
|
||||
|
||||
# Create a .cer file with the same contents for Android
|
||||
with (path / f"{basename}-ca-cert.cer").open("wb") as f:
|
||||
f.write(
|
||||
OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM,
|
||||
ca))
|
||||
f.write(ca.public_bytes(serialization.Encoding.PEM))
|
||||
|
||||
# Dump the certificate in PKCS12 format for Windows devices
|
||||
with (path / f"{basename}-ca-cert.p12").open("wb") as f:
|
||||
p12 = OpenSSL.crypto.PKCS12()
|
||||
p12.set_certificate(ca)
|
||||
f.write(p12.export())
|
||||
f.write(pkcs12.serialize_key_and_certificates( # type: ignore
|
||||
name=basename.encode(),
|
||||
key=None,
|
||||
cert=ca,
|
||||
cas=None,
|
||||
encryption_algorithm=serialization.NoEncryption(),
|
||||
))
|
||||
|
||||
# Dump the certificate and key in a PKCS12 format for Windows devices
|
||||
with CertStore.umask_secret(), (path / f"{basename}-ca.p12").open("wb") as f:
|
||||
p12 = OpenSSL.crypto.PKCS12()
|
||||
p12.set_certificate(ca)
|
||||
p12.set_privatekey(key)
|
||||
f.write(p12.export())
|
||||
f.write(pkcs12.serialize_key_and_certificates( # type: ignore
|
||||
name=basename.encode(),
|
||||
key=key,
|
||||
cert=ca,
|
||||
cas=None,
|
||||
encryption_algorithm=serialization.NoEncryption(),
|
||||
))
|
||||
|
||||
with (path / f"{basename}-dhparam.pem").open("wb") as f:
|
||||
f.write(DEFAULT_DHPARAM)
|
||||
@ -386,8 +396,10 @@ class _GeneralName(univ.Choice):
|
||||
|
||||
class _GeneralNames(univ.SequenceOf):
|
||||
componentType = _GeneralName()
|
||||
sizeSpec = univ.SequenceOf.sizeSpec + \
|
||||
sizeSpec = (
|
||||
univ.SequenceOf.sizeSpec +
|
||||
constraint.ValueSizeConstraint(1, 1024)
|
||||
)
|
||||
|
||||
|
||||
class Cert(serializable.Serializable):
|
||||
|
@ -1,6 +1,4 @@
|
||||
import gc
|
||||
import os
|
||||
import secrets
|
||||
|
||||
import pytest
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user