mitmproxy/netlib/certutils.py

285 lines
8.5 KiB
Python
Raw Normal View History

import os, ssl, hashlib, socket, time, datetime, tempfile, shutil
2012-06-27 04:42:00 +00:00
from pyasn1.type import univ, constraint, char, namedtype, tag
from pyasn1.codec.der.decoder import decode
from pyasn1.error import PyAsn1Error
2012-06-27 04:42:00 +00:00
import OpenSSL
import tcp
2012-06-27 04:42:00 +00:00
CERT_SLEEP_TIME = 1
CERT_EXPIRY = str(365 * 3)
def create_ca():
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, 1024)
ca = OpenSSL.crypto.X509()
ca.set_serial_number(int(time.time()*10000))
ca.set_version(2)
ca.get_subject().CN = "mitmproxy"
ca.get_subject().O = "mitmproxy"
ca.gmtime_adj_notBefore(0)
ca.gmtime_adj_notAfter(24 * 60 * 60 * 720)
ca.set_issuer(ca.get_subject())
ca.set_pubkey(key)
ca.add_extensions([
OpenSSL.crypto.X509Extension("basicConstraints", True,
"CA:TRUE"),
OpenSSL.crypto.X509Extension("nsCertType", True,
"sslCA"),
OpenSSL.crypto.X509Extension("extendedKeyUsage", True,
"serverAuth,clientAuth,emailProtection,timeStamping,msCodeInd,msCodeCom,msCTLSign,msSGC,msEFS,nsSGC"
),
OpenSSL.crypto.X509Extension("keyUsage", False,
"keyCertSign, cRLSign"),
OpenSSL.crypto.X509Extension("subjectKeyIdentifier", False, "hash",
subject=ca),
])
ca.sign(key, "sha1")
return key, ca
def dummy_ca(path):
dirname = os.path.dirname(path)
if not os.path.exists(dirname):
os.makedirs(dirname)
if path.endswith(".pem"):
basename, _ = os.path.splitext(path)
2012-07-11 09:09:41 +00:00
basename = os.path.basename(basename)
2012-06-27 04:42:00 +00:00
else:
2012-07-20 02:45:58 +00:00
basename = os.path.basename(path)
2012-06-27 04:42:00 +00:00
key, ca = create_ca()
# Dump the CA plus private key
f = open(path, "w")
f.write(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key))
f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca))
f.close()
# Dump the certificate in PEM format
f = open(os.path.join(dirname, basename + "-cert.pem"), "w")
f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca))
f.close()
# Create a .cer file with the same contents for Android
f = open(os.path.join(dirname, basename + "-cert.cer"), "w")
f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca))
f.close()
# Dump the certificate in PKCS12 format for Windows devices
f = open(os.path.join(dirname, basename + "-cert.p12"), "w")
p12 = OpenSSL.crypto.PKCS12()
p12.set_certificate(ca)
p12.set_privatekey(key)
f.write(p12.export())
f.close()
return True
def dummy_cert(fp, ca, commonname, sans):
2012-06-27 04:42:00 +00:00
"""
Generates and writes a certificate to fp.
2012-06-27 04:42:00 +00:00
ca: Path to the certificate authority file, or None.
commonname: Common name for the generated certificate.
sans: A list of Subject Alternate Names.
2012-06-27 04:42:00 +00:00
Returns cert path if operation succeeded, None if not.
"""
ss = []
for i in sans:
ss.append("DNS: %s"%i)
ss = ", ".join(ss)
raw = file(ca, "r").read()
ca = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, raw)
key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, raw)
2012-06-27 04:42:00 +00:00
req = OpenSSL.crypto.X509Req()
subj = req.get_subject()
subj.CN = commonname
req.set_pubkey(ca.get_pubkey())
req.sign(key, "sha1")
if ss:
req.add_extensions([OpenSSL.crypto.X509Extension("subjectAltName", True, ss)])
cert = OpenSSL.crypto.X509()
cert.gmtime_adj_notBefore(-3600)
2012-06-27 04:42:00 +00:00
cert.gmtime_adj_notAfter(60 * 60 * 24 * 30)
cert.set_issuer(ca.get_subject())
cert.set_subject(req.get_subject())
cert.set_serial_number(int(time.time()*10000))
if ss:
cert.add_extensions([OpenSSL.crypto.X509Extension("subjectAltName", True, ss)])
cert.set_pubkey(req.get_pubkey())
cert.sign(key, "sha1")
fp.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert))
fp.close()
2012-06-27 04:42:00 +00:00
class CertStore:
"""
Implements an on-disk certificate store.
"""
def __init__(self, certdir=None):
"""
certdir: The certificate store directory. If None, a temporary
directory will be created, and destroyed when the .cleanup() method
is called.
"""
if certdir:
self.remove = False
self.certdir = certdir
else:
self.remove = True
self.certdir = tempfile.mkdtemp(prefix="certstore")
2013-01-05 12:34:39 +00:00
def check_domain(self, commonname):
try:
commonname.decode("idna")
commonname.decode("ascii")
except:
return False
if ".." in commonname:
return False
if "/" in commonname:
return False
return True
def get_cert(self, commonname, sans, cacert=False):
"""
Returns the path to a certificate.
commonname: Common name for the generated certificate. Must be a
valid, plain-ASCII, IDNA-encoded domain name.
sans: A list of Subject Alternate Names.
cacert: An optional path to a CA certificate. If specified, the
cert is created if it does not exist, else return None.
2013-01-05 12:34:39 +00:00
Return None if the certificate could not be found or generated.
"""
2013-01-05 12:34:39 +00:00
if not self.check_domain(commonname):
return None
certpath = os.path.join(self.certdir, commonname + ".pem")
if os.path.exists(certpath):
return certpath
elif cacert:
f = open(certpath, "w")
dummy_cert(f, cacert, commonname, sans)
return certpath
def cleanup(self):
if self.remove:
shutil.rmtree(self.certdir)
2012-06-27 04:42:00 +00:00
class _GeneralName(univ.Choice):
# We are only interested in dNSNames. We use a default handler to ignore
# other types.
componentType = namedtype.NamedTypes(
namedtype.NamedType('dNSName', char.IA5String().subtype(
implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2)
)
),
)
class _GeneralNames(univ.SequenceOf):
componentType = _GeneralName()
sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, 1024)
class SSLCert:
2012-06-27 10:11:58 +00:00
def __init__(self, cert):
2012-06-27 04:42:00 +00:00
"""
Returns a (common name, [subject alternative names]) tuple.
"""
2012-06-27 10:11:58 +00:00
self.x509 = cert
@classmethod
def from_pem(klass, txt):
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, txt)
return klass(x509)
2012-06-27 04:42:00 +00:00
@classmethod
def from_der(klass, der):
pem = ssl.DER_cert_to_PEM_cert(der)
2012-06-27 10:11:58 +00:00
return klass.from_pem(pem)
2012-06-27 04:42:00 +00:00
def to_pem(self):
return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, self.x509)
2012-06-27 04:42:00 +00:00
def digest(self, name):
2012-06-27 10:11:58 +00:00
return self.x509.digest(name)
2012-06-27 04:42:00 +00:00
@property
def issuer(self):
2012-06-27 10:11:58 +00:00
return self.x509.get_issuer().get_components()
2012-06-27 04:42:00 +00:00
@property
def notbefore(self):
2012-06-27 10:11:58 +00:00
t = self.x509.get_notBefore()
2012-06-27 04:42:00 +00:00
return datetime.datetime.strptime(t, "%Y%m%d%H%M%SZ")
@property
def notafter(self):
2012-06-27 10:11:58 +00:00
t = self.x509.get_notAfter()
2012-06-27 04:42:00 +00:00
return datetime.datetime.strptime(t, "%Y%m%d%H%M%SZ")
@property
def has_expired(self):
2012-06-27 10:11:58 +00:00
return self.x509.has_expired()
2012-06-27 04:42:00 +00:00
@property
def subject(self):
2012-06-27 10:11:58 +00:00
return self.x509.get_subject().get_components()
2012-06-27 04:42:00 +00:00
@property
def serial(self):
2012-06-27 10:11:58 +00:00
return self.x509.get_serial_number()
2012-06-27 04:42:00 +00:00
@property
def keyinfo(self):
2012-06-27 10:11:58 +00:00
pk = self.x509.get_pubkey()
2012-06-27 04:42:00 +00:00
types = {
OpenSSL.crypto.TYPE_RSA: "RSA",
OpenSSL.crypto.TYPE_DSA: "DSA",
}
return (
types.get(pk.type(), "UNKNOWN"),
pk.bits()
)
@property
def cn(self):
c = None
2012-06-27 04:42:00 +00:00
for i in self.subject:
if i[0] == "CN":
c = i[1]
return c
2012-06-27 04:42:00 +00:00
@property
def altnames(self):
altnames = []
2012-06-27 10:11:58 +00:00
for i in range(self.x509.get_extension_count()):
ext = self.x509.get_extension(i)
2012-06-27 04:42:00 +00:00
if ext.get_short_name() == "subjectAltName":
try:
dec = decode(ext.get_data(), asn1Spec=_GeneralNames())
except PyAsn1Error:
continue
2012-06-27 04:42:00 +00:00
for i in dec[0]:
altnames.append(i[0].asOctets())
return altnames
def get_remote_cert(host, port, sni):
c = tcp.TCPClient(host, port)
c.connect()
c.convert_to_ssl(sni=sni)
return c.cert