[sans-io] tlsconfig: generate proper certificates

This commit is contained in:
Maximilian Hils 2020-11-02 03:12:55 +01:00
parent decd2b8c80
commit 0234a44c84
3 changed files with 84 additions and 23 deletions

View File

@ -1,19 +1,24 @@
import os
from typing import Optional, Tuple, cast
from typing import List, Optional, Tuple, TypedDict, cast
from OpenSSL import SSL, crypto
from mitmproxy import certs, ctx, exceptions
from mitmproxy.net import tls as net_tls
from mitmproxy.options import CONF_BASENAME
from mitmproxy.proxy.protocol.tls import CIPHER_ID_NAME_MAP, DEFAULT_CLIENT_CIPHERS
from mitmproxy.proxy.protocol.tls import DEFAULT_CLIENT_CIPHERS
from mitmproxy.proxy2 import context
from mitmproxy.proxy2.layers import tls
def alpn_select_callback(conn: SSL.Connection, options):
server_alpn = conn.get_app_data()["server_alpn"]
http2 = conn.get_app_data()["http2"]
class AppData(TypedDict):
server_alpn: Optional[bytes]
http2: bool
def alpn_select_callback(conn: SSL.Connection, options: List[bytes]):
app_data: AppData = conn.get_app_data()
server_alpn = app_data["server_alpn"]
http2 = app_data["http2"]
if server_alpn and server_alpn in options:
return server_alpn
http_alpns = tls.HTTP_ALPNS if http2 else tls.HTTP1_ALPNS
@ -25,16 +30,57 @@ def alpn_select_callback(conn: SSL.Connection, options):
class TlsConfig:
"""
This addon supplies the proxy core with the desired OpenSSL connection objects to negotiate TLS.
"""
certstore: certs.CertStore = None
# TODO: We should support configuring TLS 1.3 cipher suites (https://github.com/mitmproxy/mitmproxy/issues/4260)
# TODO: We should re-use SSL.Context options here, if only for TLS session resumption.
# This may require patches to pyOpenSSL, as some functionality is only exposed on contexts.
# TODO: This addon should manage the following options itself, which are current defined in mitmproxy/options.py:
# - upstream_cert
# - add_upstream_certs_to_client_chain
# - ssl_version_client
# - ssl_version_server
# - ciphers_client
# - ciphers_server
# - key_size
# - certs
def get_cert(self, context: context.Context) -> Tuple[certs.Cert, SSL.PKey, str]:
# FIXME
return self.certstore.get_cert(
context.client.sni or b"localhost", [context.client.sni or b"localhost"]
)
"""
This function determines the Common Name (CN), Subject Alternative Names (SANs) and Organization Name
our certificate should have and then fetches a matching cert from the certstore.
"""
altnames: List[bytes] = []
organization: Optional[str] = None
# Use upstream certificate if available.
if context.server.certificate_list:
upstream_cert = context.server.certificate_list[0]
if upstream_cert.cn:
altnames.append(upstream_cert.cn)
altnames.extend(upstream_cert.altnames)
if upstream_cert.organization:
organization = upstream_cert.organization
# Add SNI. If not available, try the server address as well.
if context.client.sni:
altnames.append(context.client.sni)
elif context.server.address:
altnames.append(context.server.address[0])
# As a last resort, add *something* so that we have a certificate to serve.
if not altnames:
altnames.append(b"mitmproxy")
# only keep first occurrence of each hostname
altnames = list(dict.fromkeys(altnames))
# RFC 2818: If a subjectAltName extension of type dNSName is present, that MUST be used as the identity.
# In other words, the Common Name is irrelevant then.
return self.certstore.get_cert(altnames[0], altnames, organization)
def tls_clienthello(self, tls_clienthello: tls.ClientHelloData):
context = tls_clienthello.context
@ -61,9 +107,9 @@ class TlsConfig:
tls_method, tls_options = net_tls.VERSION_CHOICES[ctx.options.ssl_version_client]
cert, key, chain_file = self.get_cert(tls_start.context)
if ctx.options.add_upstream_certs_to_client_chain:
raise NotImplementedError()
extra_chain_certs = tls_start.context.server.certificate_list
else:
extra_chain_certs = None
extra_chain_certs = ()
ssl_ctx = net_tls.create_server_context(
cert=cert,
key=key,
@ -76,10 +122,10 @@ class TlsConfig:
extra_chain_certs=extra_chain_certs,
)
tls_start.ssl_conn = SSL.Connection(ssl_ctx)
tls_start.ssl_conn.set_app_data({
"server_alpn": tls_start.context.server.alpn,
"http2": ctx.options.http2,
})
tls_start.ssl_conn.set_app_data(AppData(
server_alpn=tls_start.context.server.alpn,
http2=ctx.options.http2,
))
tls_start.ssl_conn.set_accept_state()
def create_proxy_server_ssl_conn(self, tls_start: tls.TlsStartData) -> None:
@ -104,10 +150,10 @@ class TlsConfig:
if ctx.options.ciphers_server:
server.cipher_list = ctx.options.ciphers_server.split(":")
elif client.cipher_list:
server.cipher_list = [
x for x in client.cipher_list
if x in CIPHER_ID_NAME_MAP
]
# We used to filter for known ciphers here, but that doesn't seem to make sense.
# According to OpenSSL docs, the control string str should be universally
# usable and not depend on details of the library configuration (ciphers compiled in).
server.cipher_list = list(client.cipher_list)
args = net_tls.client_arguments_from_options(ctx.options)

View File

@ -25,7 +25,22 @@ class Connection:
tls: bool = False
tls_established: bool = False
certificate_chain: Optional[Sequence[certs.Cert]] = None
certificate_list: Optional[Sequence[certs.Cert]] = None
"""
The TLS certificate list as sent by the peer.
The first certificate is the end-entity certificate.
[RFC 8446] Prior to TLS 1.3, "certificate_list" ordering required each
certificate to certify the one immediately preceding it; however,
some implementations allowed some flexibility. Servers sometimes
send both a current and deprecated intermediate for transitional
purposes, and others are simply configured incorrectly, but these
cases can nonetheless be validated properly. For maximum
compatibility, all implementations SHOULD be prepared to handle
potentially extraneous certificates and arbitrary orderings from any
TLS version, with the exception of the end-entity certificate which
MUST be first.
"""
alpn: Optional[bytes] = None
alpn_offers: Sequence[bytes] = ()
cipher_list: Sequence[bytes] = ()

View File

@ -190,7 +190,7 @@ class _TLSLayer(tunnel.TunnelLayer):
self.conn.tls_established = True
self.conn.sni = self.tls.get_servername()
self.conn.alpn = self.tls.get_alpn_proto_negotiated()
self.conn.certificate_chain = [certs.Cert(x) for x in all_certs]
self.conn.certificate_list = [certs.Cert(x) for x in all_certs]
self.conn.cipher_list = self.tls.get_cipher_list()
self.conn.tls_version = self.tls.get_protocol_version_name()
self.conn.timestamp_tls_setup = time.time()