mitmproxy/libmproxy/certutils.py

211 lines
6.1 KiB
Python
Raw Normal View History

import subprocess, os, ssl, hashlib, socket
from pyasn1.type import univ, constraint, char, namedtype, tag
from pyasn1.codec.der.decoder import decode
import OpenSSL
import utils
CERT_SLEEP_TIME = 1
CERT_EXPIRY = str(365 * 3)
def dummy_ca(path):
dirname = os.path.dirname(path)
if not os.path.exists(dirname):
os.makedirs(dirname)
if path.endswith(".pem"):
basename, _ = os.path.splitext(path)
else:
basename = path
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, 1024)
ca = OpenSSL.crypto.X509()
ca.set_version(3)
ca.set_serial_number(1)
ca.get_subject().CN = "mitmproxy"
ca.get_subject().OU = "mitmproxy"
ca.gmtime_adj_notBefore(0)
ca.gmtime_adj_notAfter(24 * 60 * 60 * 720)
ca.set_issuer(ca.get_subject())
ca.set_pubkey(key)
ca.add_extensions([
OpenSSL.crypto.X509Extension("basicConstraints", True,
"CA:TRUE"),
OpenSSL.crypto.X509Extension("nsCertType", True,
"sslCA"),
OpenSSL.crypto.X509Extension("extendedKeyUsage", True,
"serverAuth,clientAuth,emailProtection,timeStamping,msCodeInd,msCodeCom,msCTLSign,msSGC,msEFS,nsSGC"
),
OpenSSL.crypto.X509Extension("keyUsage", True,
"keyCertSign, cRLSign"),
OpenSSL.crypto.X509Extension("subjectKeyIdentifier", False, "hash",
subject=ca),
])
ca.sign(key, "sha1")
# Dump the CA plus private key
f = open(path, "w")
f.write(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key))
f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca))
f.close()
# Dump the certificate in PEM format
f = open(os.path.join(dirname, basename + "-cert.pem"), "w")
f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca))
f.close()
# Dump the certificate in PKCS12 format for Windows devices
f = open(os.path.join(dirname, basename + "-cert.p12"), "w")
p12 = OpenSSL.crypto.PKCS12()
p12.set_certificate(ca)
f.write(p12.export())
f.close()
return True
def dummy_cert(certdir, ca, commonname, sans):
"""
certdir: Certificate directory.
ca: Path to the certificate authority file, or None.
commonname: Common name for the generated certificate.
Returns cert path if operation succeeded, None if not.
"""
namehash = hashlib.sha256(commonname).hexdigest()
certpath = os.path.join(certdir, namehash + ".pem")
if os.path.exists(certpath):
return certpath
confpath = os.path.join(certdir, namehash + ".cnf")
reqpath = os.path.join(certdir, namehash + ".req")
template = open(utils.pkg_data.path("resources/cert.cnf")).read()
ss = []
for i, v in enumerate(sans):
ss.append("DNS.%s = %s"%(i+1, v))
ss = "\n".join(ss)
f = open(confpath, "w")
f.write(
template%(
dict(
commonname=commonname,
sans=ss,
altnames="subjectAltName = @alt_names" if ss else ""
)
)
)
f.close()
if ca:
# Create a dummy signed certificate. Uses same key as the signing CA
cmd = [
"openssl",
"req",
"-new",
"-config", confpath,
"-out", reqpath,
"-key", ca,
]
ret = subprocess.call(
cmd,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE
)
if ret: return None
cmd = [
"openssl",
"x509",
"-req",
"-in", reqpath,
"-days", CERT_EXPIRY,
"-out", certpath,
"-CA", ca,
"-CAcreateserial",
"-extfile", confpath,
"-extensions", "v3_cert_req",
]
ret = subprocess.call(
cmd,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE
)
if ret: return None
else:
# Create a new selfsigned certificate + key
cmd = [
"openssl",
"req",
"-new",
"-x509",
"-config", confpath,
"-nodes",
"-days", CERT_EXPIRY,
"-out", certpath,
"-newkey", "rsa:1024",
"-keyout", certpath,
]
ret = subprocess.call(
cmd,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE
)
if ret: return None
return certpath
class _GeneralName(univ.Choice):
# We are only interested in dNSNames. We use a default handler to ignore
2012-03-04 21:22:47 +00:00
# other types.
componentType = namedtype.NamedTypes(
namedtype.NamedType('dNSName', char.IA5String().subtype(
implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2)
)
),
)
2012-03-04 21:22:47 +00:00
class _GeneralNames(univ.SequenceOf):
componentType = _GeneralName()
sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, 1024)
2012-03-04 21:22:47 +00:00
class SSLCert:
def __init__(self, pemtxt):
"""
Returns a (common name, [subject alternative names]) tuple.
"""
self.cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pemtxt)
@property
def cn(self):
cn = None
for i in self.cert.get_subject().get_components():
if i[0] == "CN":
cn = i[1]
return cn
@property
def altnames(self):
altnames = []
for i in range(self.cert.get_extension_count()):
ext = self.cert.get_extension(i)
if ext.get_short_name() == "subjectAltName":
dec = decode(ext.get_data(), asn1Spec=_GeneralNames())
2012-03-04 21:22:47 +00:00
for i in dec[0]:
altnames.append(i[0])
return altnames
def get_remote_cert(host, port):
addr = socket.gethostbyname(host)
s = ssl.get_server_certificate((addr, port))
return SSLCert(s)