mitmproxy/libmproxy/certutils.py
Aldo Cortesi 22d4559a7a Use PyOpenSSL for certificate generation.
We no longer call external OpenSSL commands at all.
2012-03-11 14:34:17 +13:00

267 lines
7.7 KiB
Python

import subprocess, os, ssl, hashlib, socket
from pyasn1.type import univ, constraint, char, namedtype, tag
from pyasn1.codec.der.decoder import decode
import OpenSSL
import utils
CERT_SLEEP_TIME = 1
CERT_EXPIRY = str(365 * 3)
def create_ca():
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, 1024)
ca = OpenSSL.crypto.X509()
ca.set_version(3)
ca.set_serial_number(1)
ca.get_subject().CN = "mitmproxy"
ca.get_subject().OU = "mitmproxy"
ca.gmtime_adj_notBefore(0)
ca.gmtime_adj_notAfter(24 * 60 * 60 * 720)
ca.set_issuer(ca.get_subject())
ca.set_pubkey(key)
ca.add_extensions([
OpenSSL.crypto.X509Extension("basicConstraints", True,
"CA:TRUE"),
OpenSSL.crypto.X509Extension("nsCertType", True,
"sslCA"),
OpenSSL.crypto.X509Extension("extendedKeyUsage", True,
"serverAuth,clientAuth,emailProtection,timeStamping,msCodeInd,msCodeCom,msCTLSign,msSGC,msEFS,nsSGC"
),
OpenSSL.crypto.X509Extension("keyUsage", True,
"keyCertSign, cRLSign"),
OpenSSL.crypto.X509Extension("subjectKeyIdentifier", False, "hash",
subject=ca),
])
ca.sign(key, "sha1")
return key, ca
def dummy_ca(path):
dirname = os.path.dirname(path)
if not os.path.exists(dirname):
os.makedirs(dirname)
if path.endswith(".pem"):
basename, _ = os.path.splitext(path)
else:
basename = path
key, ca = create_ca()
# Dump the CA plus private key
f = open(path, "w")
f.write(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key))
f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca))
f.close()
# Dump the certificate in PEM format
f = open(os.path.join(dirname, basename + "-cert.pem"), "w")
f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca))
f.close()
# Dump the certificate in PKCS12 format for Windows devices
f = open(os.path.join(dirname, basename + "-cert.p12"), "w")
p12 = OpenSSL.crypto.PKCS12()
p12.set_certificate(ca)
f.write(p12.export())
f.close()
return True
def dummy_cert(certdir, ca, commonname, sans):
"""
certdir: Certificate directory.
ca: Path to the certificate authority file, or None.
commonname: Common name for the generated certificate.
Returns cert path if operation succeeded, None if not.
"""
namehash = hashlib.sha256(commonname).hexdigest()
certpath = os.path.join(certdir, namehash + ".pem")
if os.path.exists(certpath):
return certpath
ss = []
for i in sans:
ss.append("DNS: %s"%i)
ss = ", ".join(ss)
if ca:
raw = file(ca, "r").read()
ca = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, raw)
key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, raw)
else:
key, ca = create_ca()
pkey = ca.get_pubkey()
req = OpenSSL.crypto.X509Req()
subj = req.get_subject()
subj.CN = commonname
req.set_pubkey(ca.get_pubkey())
req.sign(key, "sha1")
if ss:
req.add_extensions([OpenSSL.crypto.X509Extension("subjectAltName", True, ss)])
cert = OpenSSL.crypto.X509()
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(60 * 60 * 24 * 30)
cert.set_issuer(ca.get_subject())
cert.set_subject(req.get_subject())
if ss:
cert.add_extensions([OpenSSL.crypto.X509Extension("subjectAltName", True, ss)])
cert.set_pubkey(req.get_pubkey())
cert.sign(key, "sha1")
f = open(certpath, "w")
f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert))
f.close()
return certpath
def dummy_cert_(certdir, ca, commonname, sans):
"""
certdir: Certificate directory.
ca: Path to the certificate authority file, or None.
commonname: Common name for the generated certificate.
Returns cert path if operation succeeded, None if not.
"""
namehash = hashlib.sha256(commonname).hexdigest()
certpath = os.path.join(certdir, namehash + ".pem")
if os.path.exists(certpath):
return certpath
confpath = os.path.join(certdir, namehash + ".cnf")
reqpath = os.path.join(certdir, namehash + ".req")
template = open(utils.pkg_data.path("resources/cert.cnf")).read()
ss = []
for i, v in enumerate(sans):
ss.append("DNS.%s = %s"%(i+1, v))
ss = "\n".join(ss)
f = open(confpath, "w")
f.write(
template%(
dict(
commonname=commonname,
sans=ss,
altnames="subjectAltName = @alt_names" if ss else ""
)
)
)
f.close()
if ca:
# Create a dummy signed certificate. Uses same key as the signing CA
cmd = [
"openssl",
"req",
"-new",
"-config", confpath,
"-out", reqpath,
"-key", ca,
]
ret = subprocess.call(
cmd,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE
)
if ret: return None
cmd = [
"openssl",
"x509",
"-req",
"-in", reqpath,
"-days", CERT_EXPIRY,
"-out", certpath,
"-CA", ca,
"-CAcreateserial",
"-extfile", confpath,
"-extensions", "v3_cert_req",
]
ret = subprocess.call(
cmd,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE
)
if ret: return None
else:
# Create a new selfsigned certificate + key
cmd = [
"openssl",
"req",
"-new",
"-x509",
"-config", confpath,
"-nodes",
"-days", CERT_EXPIRY,
"-out", certpath,
"-newkey", "rsa:1024",
"-keyout", certpath,
]
ret = subprocess.call(
cmd,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE
)
if ret: return None
return certpath
class _GeneralName(univ.Choice):
# We are only interested in dNSNames. We use a default handler to ignore
# other types.
componentType = namedtype.NamedTypes(
namedtype.NamedType('dNSName', char.IA5String().subtype(
implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2)
)
),
)
class _GeneralNames(univ.SequenceOf):
componentType = _GeneralName()
sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, 1024)
class SSLCert:
def __init__(self, pemtxt):
"""
Returns a (common name, [subject alternative names]) tuple.
"""
self.cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pemtxt)
@property
def cn(self):
cn = None
for i in self.cert.get_subject().get_components():
if i[0] == "CN":
cn = i[1]
return cn
@property
def altnames(self):
altnames = []
for i in range(self.cert.get_extension_count()):
ext = self.cert.get_extension(i)
if ext.get_short_name() == "subjectAltName":
dec = decode(ext.get_data(), asn1Spec=_GeneralNames())
for i in dec[0]:
altnames.append(i[0])
return altnames
def get_remote_cert(host, port):
addr = socket.gethostbyname(host)
s = ssl.get_server_certificate((addr, port))
return SSLCert(s)