mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2025-01-30 14:58:38 +00:00
split TLS parts from net.tcp into net.tls
This commit is contained in:
parent
781369a326
commit
2b4f58eb44
@ -5,15 +5,13 @@ import sys
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
import binascii
|
||||
from ssl import match_hostname
|
||||
from ssl import CertificateError
|
||||
|
||||
from typing import Optional # noqa
|
||||
|
||||
from mitmproxy.utils import strutils
|
||||
from mitmproxy.net import tls
|
||||
|
||||
import certifi
|
||||
from OpenSSL import SSL
|
||||
|
||||
from mitmproxy import certs
|
||||
@ -28,90 +26,6 @@ IPPROTO_IPV6 = getattr(socket, "IPPROTO_IPV6", 41)
|
||||
|
||||
EINTR = 4
|
||||
|
||||
# To enable all SSL methods use: SSLv23
|
||||
# then add options to disable certain methods
|
||||
# https://bugs.launchpad.net/pyopenssl/+bug/1020632/comments/3
|
||||
SSL_BASIC_OPTIONS = (
|
||||
SSL.OP_CIPHER_SERVER_PREFERENCE
|
||||
)
|
||||
if hasattr(SSL, "OP_NO_COMPRESSION"):
|
||||
SSL_BASIC_OPTIONS |= SSL.OP_NO_COMPRESSION
|
||||
|
||||
SSL_DEFAULT_METHOD = SSL.SSLv23_METHOD
|
||||
SSL_DEFAULT_OPTIONS = (
|
||||
SSL.OP_NO_SSLv2 |
|
||||
SSL.OP_NO_SSLv3 |
|
||||
SSL_BASIC_OPTIONS
|
||||
)
|
||||
if hasattr(SSL, "OP_NO_COMPRESSION"):
|
||||
SSL_DEFAULT_OPTIONS |= SSL.OP_NO_COMPRESSION
|
||||
|
||||
"""
|
||||
Map a reasonable SSL version specification into the format OpenSSL expects.
|
||||
Don't ask...
|
||||
https://bugs.launchpad.net/pyopenssl/+bug/1020632/comments/3
|
||||
"""
|
||||
sslversion_choices = {
|
||||
"all": (SSL.SSLv23_METHOD, SSL_BASIC_OPTIONS),
|
||||
# SSLv23_METHOD + NO_SSLv2 + NO_SSLv3 == TLS 1.0+
|
||||
# TLSv1_METHOD would be TLS 1.0 only
|
||||
"secure": (SSL.SSLv23_METHOD, (SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3 | SSL_BASIC_OPTIONS)),
|
||||
"SSLv2": (SSL.SSLv2_METHOD, SSL_BASIC_OPTIONS),
|
||||
"SSLv3": (SSL.SSLv3_METHOD, SSL_BASIC_OPTIONS),
|
||||
"TLSv1": (SSL.TLSv1_METHOD, SSL_BASIC_OPTIONS),
|
||||
"TLSv1_1": (SSL.TLSv1_1_METHOD, SSL_BASIC_OPTIONS),
|
||||
"TLSv1_2": (SSL.TLSv1_2_METHOD, SSL_BASIC_OPTIONS),
|
||||
}
|
||||
|
||||
ssl_method_names = {
|
||||
SSL.SSLv2_METHOD: "SSLv2",
|
||||
SSL.SSLv3_METHOD: "SSLv3",
|
||||
SSL.SSLv23_METHOD: "SSLv23",
|
||||
SSL.TLSv1_METHOD: "TLSv1",
|
||||
SSL.TLSv1_1_METHOD: "TLSv1.1",
|
||||
SSL.TLSv1_2_METHOD: "TLSv1.2",
|
||||
}
|
||||
|
||||
|
||||
class SSLKeyLogger:
|
||||
|
||||
def __init__(self, filename):
|
||||
self.filename = filename
|
||||
self.f = None
|
||||
self.lock = threading.Lock()
|
||||
|
||||
# required for functools.wraps, which pyOpenSSL uses.
|
||||
__name__ = "SSLKeyLogger"
|
||||
|
||||
def __call__(self, connection, where, ret):
|
||||
if where == SSL.SSL_CB_HANDSHAKE_DONE and ret == 1:
|
||||
with self.lock:
|
||||
if not self.f:
|
||||
d = os.path.dirname(self.filename)
|
||||
if not os.path.isdir(d):
|
||||
os.makedirs(d)
|
||||
self.f = open(self.filename, "ab")
|
||||
self.f.write(b"\r\n")
|
||||
client_random = binascii.hexlify(connection.client_random())
|
||||
masterkey = binascii.hexlify(connection.master_key())
|
||||
self.f.write(b"CLIENT_RANDOM %s %s\r\n" % (client_random, masterkey))
|
||||
self.f.flush()
|
||||
|
||||
def close(self):
|
||||
with self.lock:
|
||||
if self.f:
|
||||
self.f.close()
|
||||
|
||||
@staticmethod
|
||||
def create_logfun(filename):
|
||||
if filename:
|
||||
return SSLKeyLogger(filename)
|
||||
return False
|
||||
|
||||
|
||||
log_ssl_key = SSLKeyLogger.create_logfun(
|
||||
os.getenv("MITMPROXY_SSLKEYLOGFILE") or os.getenv("SSLKEYLOGFILE"))
|
||||
|
||||
|
||||
class _FileLike:
|
||||
BLOCKSIZE = 1024 * 32
|
||||
@ -422,107 +336,6 @@ class _Connection:
|
||||
except SSL.Error:
|
||||
pass
|
||||
|
||||
def _create_ssl_context(self,
|
||||
method=SSL_DEFAULT_METHOD,
|
||||
options=SSL_DEFAULT_OPTIONS,
|
||||
verify_options=SSL.VERIFY_NONE,
|
||||
ca_path=None,
|
||||
ca_pemfile=None,
|
||||
cipher_list=None,
|
||||
alpn_protos=None,
|
||||
alpn_select=None,
|
||||
alpn_select_callback=None,
|
||||
sni=None,
|
||||
):
|
||||
"""
|
||||
Creates an SSL Context.
|
||||
|
||||
:param method: One of SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD, TLSv1_1_METHOD, or TLSv1_2_METHOD
|
||||
:param options: A bit field consisting of OpenSSL.SSL.OP_* values
|
||||
:param verify_options: A bit field consisting of OpenSSL.SSL.VERIFY_* values
|
||||
:param ca_path: Path to a directory of trusted CA certificates prepared using the c_rehash tool
|
||||
:param ca_pemfile: Path to a PEM formatted trusted CA certificate
|
||||
:param cipher_list: A textual OpenSSL cipher list, see https://www.openssl.org/docs/apps/ciphers.html
|
||||
:rtype : SSL.Context
|
||||
"""
|
||||
try:
|
||||
context = SSL.Context(method)
|
||||
except ValueError as e:
|
||||
method_name = ssl_method_names.get(method, "unknown")
|
||||
raise exceptions.TlsException(
|
||||
"SSL method \"%s\" is most likely not supported "
|
||||
"or disabled (for security reasons) in your libssl. "
|
||||
"Please refer to https://github.com/mitmproxy/mitmproxy/issues/1101 "
|
||||
"for more details." % method_name
|
||||
)
|
||||
|
||||
# Options (NO_SSLv2/3)
|
||||
if options is not None:
|
||||
context.set_options(options)
|
||||
|
||||
# Verify Options (NONE/PEER and trusted CAs)
|
||||
if verify_options is not None:
|
||||
def verify_cert(conn, x509, errno, err_depth, is_cert_verified):
|
||||
if not is_cert_verified:
|
||||
self.ssl_verification_error = exceptions.InvalidCertificateException(
|
||||
"Certificate Verification Error for {}: {} (errno: {}, depth: {})".format(
|
||||
sni,
|
||||
strutils.always_str(SSL._ffi.string(SSL._lib.X509_verify_cert_error_string(errno)), "utf8"),
|
||||
errno,
|
||||
err_depth
|
||||
)
|
||||
)
|
||||
return is_cert_verified
|
||||
|
||||
context.set_verify(verify_options, verify_cert)
|
||||
if ca_path is None and ca_pemfile is None:
|
||||
ca_pemfile = certifi.where()
|
||||
try:
|
||||
context.load_verify_locations(ca_pemfile, ca_path)
|
||||
except SSL.Error:
|
||||
raise exceptions.TlsException(
|
||||
"Cannot load trusted certificates ({}, {}).".format(
|
||||
ca_pemfile, ca_path
|
||||
)
|
||||
)
|
||||
|
||||
# Workaround for
|
||||
# https://github.com/pyca/pyopenssl/issues/190
|
||||
# https://github.com/mitmproxy/mitmproxy/issues/472
|
||||
# Options already set before are not cleared.
|
||||
context.set_mode(SSL._lib.SSL_MODE_AUTO_RETRY)
|
||||
|
||||
# Cipher List
|
||||
if cipher_list:
|
||||
try:
|
||||
context.set_cipher_list(cipher_list.encode())
|
||||
except SSL.Error as v:
|
||||
raise exceptions.TlsException("SSL cipher specification error: %s" % str(v))
|
||||
|
||||
# SSLKEYLOGFILE
|
||||
if log_ssl_key:
|
||||
context.set_info_callback(log_ssl_key)
|
||||
|
||||
if alpn_protos is not None:
|
||||
# advertise application layer protocols
|
||||
context.set_alpn_protos(alpn_protos)
|
||||
elif alpn_select is not None and alpn_select_callback is None:
|
||||
# select application layer protocol
|
||||
def alpn_select_callback(conn_, options):
|
||||
if alpn_select in options:
|
||||
return bytes(alpn_select)
|
||||
else: # pragma: no cover
|
||||
return options[0]
|
||||
context.set_alpn_select_callback(alpn_select_callback)
|
||||
elif alpn_select_callback is not None and alpn_select is None:
|
||||
if not callable(alpn_select_callback):
|
||||
raise exceptions.TlsException("ALPN error: alpn_select_callback must be a function.")
|
||||
context.set_alpn_select_callback(alpn_select_callback)
|
||||
elif alpn_select_callback is not None and alpn_select is not None:
|
||||
raise exceptions.TlsException("ALPN error: only define alpn_select (string) OR alpn_select_callback (function).")
|
||||
|
||||
return context
|
||||
|
||||
|
||||
class ConnectionCloser:
|
||||
def __init__(self, conn):
|
||||
@ -567,18 +380,14 @@ class TCPClient(_Connection):
|
||||
else:
|
||||
close_socket(self.connection)
|
||||
|
||||
def create_ssl_context(self, cert=None, alpn_protos=None, **sslctx_kwargs):
|
||||
context = self._create_ssl_context(
|
||||
alpn_protos=alpn_protos,
|
||||
**sslctx_kwargs)
|
||||
# Client Certs
|
||||
if cert:
|
||||
try:
|
||||
context.use_privatekey_file(cert)
|
||||
context.use_certificate_file(cert)
|
||||
except SSL.Error as v:
|
||||
raise exceptions.TlsException("SSL client certificate error: %s" % str(v))
|
||||
return context
|
||||
def create_ssl_context(self, **sslctx_kwargs):
|
||||
def store_err(e):
|
||||
self.ssl_verification_error = e
|
||||
|
||||
return tls.create_client_context(
|
||||
verify_error_callback=store_err,
|
||||
**sslctx_kwargs,
|
||||
)
|
||||
|
||||
def convert_to_ssl(self, sni=None, alpn_protos=None, **sslctx_kwargs):
|
||||
"""
|
||||
@ -729,77 +538,30 @@ class BaseHandler(_Connection):
|
||||
self.server = server
|
||||
self.clientcert = None
|
||||
|
||||
def create_ssl_context(self,
|
||||
cert, key,
|
||||
handle_sni=None,
|
||||
request_client_cert=None,
|
||||
chain_file=None,
|
||||
dhparams=None,
|
||||
extra_chain_certs=None,
|
||||
**sslctx_kwargs):
|
||||
"""
|
||||
cert: A certs.SSLCert object or the path to a certificate
|
||||
chain file.
|
||||
def create_ssl_context(self, **kwargs):
|
||||
if kwargs.get("request_client_cert", None) is True:
|
||||
def store_clientcert(cert):
|
||||
self.clientcert = cert
|
||||
|
||||
handle_sni: SNI handler, should take a connection object. Server
|
||||
name can be retrieved like this:
|
||||
kwargs["request_client_cert"] = store_clientcert
|
||||
|
||||
connection.get_servername()
|
||||
def store_err(e):
|
||||
self.ssl_verification_error = e
|
||||
|
||||
And you can specify the connection keys as follows:
|
||||
|
||||
new_context = Context(TLSv1_METHOD)
|
||||
new_context.use_privatekey(key)
|
||||
new_context.use_certificate(cert)
|
||||
connection.set_context(new_context)
|
||||
|
||||
The request_client_cert argument requires some explanation. We're
|
||||
supposed to be able to do this with no negative effects - if the
|
||||
client has no cert to present, we're notified and proceed as usual.
|
||||
Unfortunately, Android seems to have a bug (tested on 4.2.2) - when
|
||||
an Android client is asked to present a certificate it does not
|
||||
have, it hangs up, which is frankly bogus. Some time down the track
|
||||
we may be able to make the proper behaviour the default again, but
|
||||
until then we're conservative.
|
||||
"""
|
||||
|
||||
context = self._create_ssl_context(ca_pemfile=chain_file, **sslctx_kwargs)
|
||||
|
||||
context.use_privatekey(key)
|
||||
if isinstance(cert, certs.SSLCert):
|
||||
context.use_certificate(cert.x509)
|
||||
else:
|
||||
context.use_certificate_chain_file(cert)
|
||||
|
||||
if extra_chain_certs:
|
||||
for i in extra_chain_certs:
|
||||
context.add_extra_chain_cert(i.x509)
|
||||
|
||||
if handle_sni:
|
||||
# SNI callback happens during do_handshake()
|
||||
context.set_tlsext_servername_callback(handle_sni)
|
||||
|
||||
if request_client_cert:
|
||||
def save_cert(conn_, cert, errno_, depth_, preverify_ok_):
|
||||
self.clientcert = certs.SSLCert(cert)
|
||||
# Return true to prevent cert verification error
|
||||
return True
|
||||
context.set_verify(SSL.VERIFY_PEER, save_cert)
|
||||
|
||||
if dhparams:
|
||||
SSL._lib.SSL_CTX_set_tmp_dh(context._context, dhparams)
|
||||
|
||||
return context
|
||||
return tls.create_server_context(
|
||||
**kwargs,
|
||||
verify_error_callback=store_err,
|
||||
)
|
||||
|
||||
def convert_to_ssl(self, cert, key, **sslctx_kwargs):
|
||||
"""
|
||||
Convert connection to SSL.
|
||||
For a list of parameters, see BaseHandler._create_ssl_context(...)
|
||||
For a list of parameters, see tls.create_server_context(...)
|
||||
"""
|
||||
|
||||
context = self.create_ssl_context(
|
||||
cert,
|
||||
key,
|
||||
cert=cert,
|
||||
key=key,
|
||||
**sslctx_kwargs)
|
||||
self.connection = SSL.Connection(context, self.connection)
|
||||
self.connection.set_accept_state()
|
||||
|
276
mitmproxy/net/tls.py
Normal file
276
mitmproxy/net/tls.py
Normal file
@ -0,0 +1,276 @@
|
||||
# To enable all SSL methods use: SSLv23
|
||||
# then add options to disable certain methods
|
||||
# https://bugs.launchpad.net/pyopenssl/+bug/1020632/comments/3
|
||||
import binascii
|
||||
import os
|
||||
import threading
|
||||
import typing
|
||||
|
||||
import certifi
|
||||
from OpenSSL import SSL
|
||||
|
||||
from mitmproxy import exceptions, certs
|
||||
|
||||
BASIC_OPTIONS = (
|
||||
SSL.OP_CIPHER_SERVER_PREFERENCE
|
||||
)
|
||||
if hasattr(SSL, "OP_NO_COMPRESSION"):
|
||||
BASIC_OPTIONS |= SSL.OP_NO_COMPRESSION
|
||||
|
||||
DEFAULT_METHOD = SSL.SSLv23_METHOD
|
||||
DEFAULT_OPTIONS = (
|
||||
SSL.OP_NO_SSLv2 |
|
||||
SSL.OP_NO_SSLv3 |
|
||||
BASIC_OPTIONS
|
||||
)
|
||||
|
||||
"""
|
||||
Map a reasonable SSL version specification into the format OpenSSL expects.
|
||||
Don't ask...
|
||||
https://bugs.launchpad.net/pyopenssl/+bug/1020632/comments/3
|
||||
"""
|
||||
VERSION_CHOICES = {
|
||||
"all": (SSL.SSLv23_METHOD, BASIC_OPTIONS),
|
||||
# SSLv23_METHOD + NO_SSLv2 + NO_SSLv3 == TLS 1.0+
|
||||
# TLSv1_METHOD would be TLS 1.0 only
|
||||
"secure": (DEFAULT_METHOD, DEFAULT_OPTIONS),
|
||||
"SSLv2": (SSL.SSLv2_METHOD, BASIC_OPTIONS),
|
||||
"SSLv3": (SSL.SSLv3_METHOD, BASIC_OPTIONS),
|
||||
"TLSv1": (SSL.TLSv1_METHOD, BASIC_OPTIONS),
|
||||
"TLSv1_1": (SSL.TLSv1_1_METHOD, BASIC_OPTIONS),
|
||||
"TLSv1_2": (SSL.TLSv1_2_METHOD, BASIC_OPTIONS),
|
||||
}
|
||||
|
||||
METHOD_NAMES = {
|
||||
SSL.SSLv2_METHOD: "SSLv2",
|
||||
SSL.SSLv3_METHOD: "SSLv3",
|
||||
SSL.SSLv23_METHOD: "SSLv23",
|
||||
SSL.TLSv1_METHOD: "TLSv1",
|
||||
SSL.TLSv1_1_METHOD: "TLSv1.1",
|
||||
SSL.TLSv1_2_METHOD: "TLSv1.2",
|
||||
}
|
||||
|
||||
|
||||
class MasterSecretLogger:
|
||||
def __init__(self, filename):
|
||||
self.filename = filename
|
||||
self.f = None
|
||||
self.lock = threading.Lock()
|
||||
|
||||
# required for functools.wraps, which pyOpenSSL uses.
|
||||
__name__ = "MasterSecretLogger"
|
||||
|
||||
def __call__(self, connection, where, ret):
|
||||
if where == SSL.SSL_CB_HANDSHAKE_DONE and ret == 1:
|
||||
with self.lock:
|
||||
if not self.f:
|
||||
d = os.path.dirname(self.filename)
|
||||
if not os.path.isdir(d):
|
||||
os.makedirs(d)
|
||||
self.f = open(self.filename, "ab")
|
||||
self.f.write(b"\r\n")
|
||||
client_random = binascii.hexlify(connection.client_random())
|
||||
masterkey = binascii.hexlify(connection.master_key())
|
||||
self.f.write(b"CLIENT_RANDOM %s %s\r\n" % (client_random, masterkey))
|
||||
self.f.flush()
|
||||
|
||||
def close(self):
|
||||
with self.lock:
|
||||
if self.f:
|
||||
self.f.close()
|
||||
|
||||
@staticmethod
|
||||
def create_logfun(filename):
|
||||
if filename:
|
||||
return MasterSecretLogger(filename)
|
||||
return None
|
||||
|
||||
|
||||
log_master_secret = MasterSecretLogger.create_logfun(
|
||||
os.getenv("MITMPROXY_SSLKEYLOGFILE") or os.getenv("SSLKEYLOGFILE")
|
||||
)
|
||||
|
||||
|
||||
def _create_ssl_context(
|
||||
method: int = DEFAULT_METHOD,
|
||||
options: int = DEFAULT_OPTIONS,
|
||||
verify_options: int = SSL.VERIFY_NONE,
|
||||
ca_path: str = None,
|
||||
ca_pemfile: str = None,
|
||||
cipher_list: str = None,
|
||||
alpn_protos: typing.Iterable[bytes] = None,
|
||||
alpn_select=None,
|
||||
alpn_select_callback: typing.Callable[[typing.Any, typing.Any], bytes] = None,
|
||||
sni=None,
|
||||
verify_error_callback: typing.Callable[
|
||||
[exceptions.InvalidCertificateException], None] = None,
|
||||
) -> SSL.Context:
|
||||
"""
|
||||
Creates an SSL Context.
|
||||
|
||||
:param method: One of SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD, TLSv1_1_METHOD, or TLSv1_2_METHOD
|
||||
:param options: A bit field consisting of OpenSSL.SSL.OP_* values
|
||||
:param verify_options: A bit field consisting of OpenSSL.SSL.VERIFY_* values
|
||||
:param ca_path: Path to a directory of trusted CA certificates prepared using the c_rehash tool
|
||||
:param ca_pemfile: Path to a PEM formatted trusted CA certificate
|
||||
:param cipher_list: A textual OpenSSL cipher list, see https://www.openssl.org/docs/apps/ciphers.html
|
||||
:rtype : SSL.Context
|
||||
"""
|
||||
try:
|
||||
context = SSL.Context(method)
|
||||
except ValueError:
|
||||
method_name = METHOD_NAMES.get(method, "unknown")
|
||||
raise exceptions.TlsException(
|
||||
"SSL method \"%s\" is most likely not supported "
|
||||
"or disabled (for security reasons) in your libssl. "
|
||||
"Please refer to https://github.com/mitmproxy/mitmproxy/issues/1101 "
|
||||
"for more details." % method_name
|
||||
)
|
||||
|
||||
# Options (NO_SSLv2/3)
|
||||
if options is not None:
|
||||
context.set_options(options)
|
||||
|
||||
# Verify Options (NONE/PEER and trusted CAs)
|
||||
if verify_options is not None:
|
||||
def verify_cert(conn, x509, errno, err_depth, is_cert_verified):
|
||||
if not is_cert_verified:
|
||||
if verify_error_callback:
|
||||
e = exceptions.InvalidCertificateException(
|
||||
"Certificate Verification Error for {}: {} (errno: {}, depth: {})".format(
|
||||
sni,
|
||||
SSL._ffi.string(SSL._lib.X509_verify_cert_error_string(errno)).decode(),
|
||||
errno,
|
||||
err_depth
|
||||
)
|
||||
)
|
||||
verify_error_callback(e)
|
||||
return is_cert_verified
|
||||
|
||||
context.set_verify(verify_options, verify_cert)
|
||||
if ca_path is None and ca_pemfile is None:
|
||||
ca_pemfile = certifi.where()
|
||||
try:
|
||||
context.load_verify_locations(ca_pemfile, ca_path)
|
||||
except SSL.Error:
|
||||
raise exceptions.TlsException(
|
||||
"Cannot load trusted certificates ({}, {}).".format(
|
||||
ca_pemfile, ca_path
|
||||
)
|
||||
)
|
||||
|
||||
# Workaround for
|
||||
# https://github.com/pyca/pyopenssl/issues/190
|
||||
# https://github.com/mitmproxy/mitmproxy/issues/472
|
||||
# Options already set before are not cleared.
|
||||
context.set_mode(SSL._lib.SSL_MODE_AUTO_RETRY)
|
||||
|
||||
# Cipher List
|
||||
if cipher_list:
|
||||
try:
|
||||
context.set_cipher_list(cipher_list.encode())
|
||||
except SSL.Error as v:
|
||||
raise exceptions.TlsException("SSL cipher specification error: %s" % str(v))
|
||||
|
||||
# SSLKEYLOGFILE
|
||||
if log_master_secret:
|
||||
context.set_info_callback(log_master_secret)
|
||||
|
||||
if alpn_protos is not None:
|
||||
# advertise application layer protocols
|
||||
context.set_alpn_protos(alpn_protos)
|
||||
elif alpn_select is not None and alpn_select_callback is None:
|
||||
# select application layer protocol
|
||||
def alpn_select_callback(conn_, options):
|
||||
if alpn_select in options:
|
||||
return bytes(alpn_select)
|
||||
else: # pragma: no cover
|
||||
return options[0]
|
||||
|
||||
context.set_alpn_select_callback(alpn_select_callback)
|
||||
elif alpn_select_callback is not None and alpn_select is None:
|
||||
if not callable(alpn_select_callback):
|
||||
raise exceptions.TlsException("ALPN error: alpn_select_callback must be a function.")
|
||||
context.set_alpn_select_callback(alpn_select_callback)
|
||||
elif alpn_select_callback is not None and alpn_select is not None:
|
||||
raise exceptions.TlsException(
|
||||
"ALPN error: only define alpn_select (string) OR alpn_select_callback (function).")
|
||||
|
||||
return context
|
||||
|
||||
|
||||
def create_client_context(
|
||||
cert: str = None,
|
||||
**sslctx_kwargs
|
||||
) -> SSL.Context:
|
||||
context = _create_ssl_context(
|
||||
**sslctx_kwargs
|
||||
)
|
||||
# Client Certs
|
||||
if cert:
|
||||
try:
|
||||
context.use_privatekey_file(cert)
|
||||
context.use_certificate_file(cert)
|
||||
except SSL.Error as v:
|
||||
raise exceptions.TlsException("SSL client certificate error: %s" % str(v))
|
||||
return context
|
||||
|
||||
|
||||
def create_server_context(
|
||||
cert: typing.Union[certs.SSLCert, str],
|
||||
key: SSL.PKey,
|
||||
handle_sni: typing.Optional[typing.Callable[[SSL.Connection], None]] = None,
|
||||
request_client_cert: typing.Optional[typing.Callable[[certs.SSLCert], None]] = None,
|
||||
chain_file=None,
|
||||
dhparams=None,
|
||||
extra_chain_certs: typing.Iterable[certs.SSLCert] = None,
|
||||
**sslctx_kwargs
|
||||
) -> SSL.Context:
|
||||
"""
|
||||
cert: A certs.SSLCert object or the path to a certificate
|
||||
chain file.
|
||||
|
||||
handle_sni: SNI handler, should take a connection object. Server
|
||||
name can be retrieved like this:
|
||||
|
||||
connection.get_servername()
|
||||
|
||||
The request_client_cert argument requires some explanation. We're
|
||||
supposed to be able to do this with no negative effects - if the
|
||||
client has no cert to present, we're notified and proceed as usual.
|
||||
Unfortunately, Android seems to have a bug (tested on 4.2.2) - when
|
||||
an Android client is asked to present a certificate it does not
|
||||
have, it hangs up, which is frankly bogus. Some time down the track
|
||||
we may be able to make the proper behaviour the default again, but
|
||||
until then we're conservative.
|
||||
"""
|
||||
|
||||
context = _create_ssl_context(ca_pemfile=chain_file, **sslctx_kwargs)
|
||||
|
||||
context.use_privatekey(key)
|
||||
if isinstance(cert, certs.SSLCert):
|
||||
context.use_certificate(cert.x509)
|
||||
else:
|
||||
context.use_certificate_chain_file(cert)
|
||||
|
||||
if extra_chain_certs:
|
||||
for i in extra_chain_certs:
|
||||
context.add_extra_chain_cert(i.x509)
|
||||
|
||||
if handle_sni:
|
||||
# SNI callback happens during do_handshake()
|
||||
context.set_tlsext_servername_callback(handle_sni)
|
||||
|
||||
if request_client_cert:
|
||||
def save_cert(conn_, x509, errno_, depth_, preverify_ok_):
|
||||
cert = certs.SSLCert(x509)
|
||||
request_client_cert(cert)
|
||||
# Return true to prevent cert verification error
|
||||
return True
|
||||
|
||||
context.set_verify(SSL.VERIFY_PEER, save_cert)
|
||||
|
||||
if dhparams:
|
||||
SSL._lib.SSL_CTX_set_tmp_dh(context._context, dhparams)
|
||||
|
||||
return context
|
@ -2,7 +2,7 @@ from typing import Optional, Sequence
|
||||
|
||||
from mitmproxy import optmanager
|
||||
from mitmproxy import contentviews
|
||||
from mitmproxy.net import tcp
|
||||
from mitmproxy.net import tls
|
||||
|
||||
log_verbosity = [
|
||||
"error",
|
||||
@ -408,7 +408,7 @@ class Options(optmanager.OptManager):
|
||||
Set supported SSL/TLS versions for client connections. SSLv2, SSLv3
|
||||
and 'all' are INSECURE. Defaults to secure, which is TLS1.0+.
|
||||
""",
|
||||
choices=list(tcp.sslversion_choices.keys()),
|
||||
choices=list(tls.VERSION_CHOICES.keys()),
|
||||
)
|
||||
self.add_option(
|
||||
"ssl_version_server", str, "secure",
|
||||
@ -416,7 +416,7 @@ class Options(optmanager.OptManager):
|
||||
Set supported SSL/TLS versions for server connections. SSLv2, SSLv3
|
||||
and 'all' are INSECURE. Defaults to secure, which is TLS1.0+.
|
||||
""",
|
||||
choices=list(tcp.sslversion_choices.keys()),
|
||||
choices=list(tls.VERSION_CHOICES.keys()),
|
||||
)
|
||||
self.add_option(
|
||||
"ssl_insecure", bool, False,
|
||||
|
@ -7,7 +7,7 @@ from OpenSSL import SSL, crypto
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import options as moptions
|
||||
from mitmproxy import certs
|
||||
from mitmproxy.net import tcp
|
||||
from mitmproxy.net import tls
|
||||
from mitmproxy.net import server_spec
|
||||
|
||||
CONF_BASENAME = "mitmproxy"
|
||||
@ -65,9 +65,9 @@ class ProxyConfig:
|
||||
self.check_tcp = HostMatcher(options.tcp_hosts)
|
||||
|
||||
self.openssl_method_client, self.openssl_options_client = \
|
||||
tcp.sslversion_choices[options.ssl_version_client]
|
||||
tls.VERSION_CHOICES[options.ssl_version_client]
|
||||
self.openssl_method_server, self.openssl_options_server = \
|
||||
tcp.sslversion_choices[options.ssl_version_server]
|
||||
tls.VERSION_CHOICES[options.ssl_version_server]
|
||||
|
||||
certstore_path = os.path.expanduser(options.cadir)
|
||||
if not os.path.exists(os.path.dirname(certstore_path)):
|
||||
|
@ -13,7 +13,7 @@ import logging
|
||||
|
||||
from mitmproxy import certs
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy.net import tcp
|
||||
from mitmproxy.net import tcp, tls
|
||||
from mitmproxy.net import websockets
|
||||
from mitmproxy.net import socks
|
||||
from mitmproxy.net import http as net_http
|
||||
@ -158,8 +158,8 @@ class Pathoc(tcp.TCPClient):
|
||||
# SSL
|
||||
ssl=None,
|
||||
sni=None,
|
||||
ssl_version=tcp.SSL_DEFAULT_METHOD,
|
||||
ssl_options=tcp.SSL_DEFAULT_OPTIONS,
|
||||
ssl_version=tls.DEFAULT_METHOD,
|
||||
ssl_options=tls.DEFAULT_OPTIONS,
|
||||
clientcert=None,
|
||||
ciphers=None,
|
||||
|
||||
|
@ -3,7 +3,7 @@ import argparse
|
||||
import os
|
||||
import os.path
|
||||
|
||||
from mitmproxy.net import tcp
|
||||
from mitmproxy.net import tls
|
||||
from mitmproxy import version
|
||||
from mitmproxy.net.http import user_agents
|
||||
from . import pathoc, language
|
||||
@ -111,7 +111,7 @@ def args_pathoc(argv, stdout=sys.stdout, stderr=sys.stderr):
|
||||
)
|
||||
group.add_argument(
|
||||
"--ssl-version", dest="ssl_version", type=str, default="secure",
|
||||
choices=tcp.sslversion_choices.keys(),
|
||||
choices=tls.VERSION_CHOICES.keys(),
|
||||
help="Set supported SSL/TLS versions. "
|
||||
"SSLv2, SSLv3 and 'all' are INSECURE. Defaults to secure, which is TLS1.0+."
|
||||
)
|
||||
@ -162,7 +162,7 @@ def args_pathoc(argv, stdout=sys.stdout, stderr=sys.stderr):
|
||||
|
||||
args = parser.parse_args(argv[1:])
|
||||
|
||||
args.ssl_version, args.ssl_options = tcp.sslversion_choices[args.ssl_version]
|
||||
args.ssl_version, args.ssl_options = tls.VERSION_CHOICES[args.ssl_version]
|
||||
|
||||
args.port = None
|
||||
if ":" in args.host:
|
||||
|
@ -3,7 +3,7 @@ import logging
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
from mitmproxy.net import tcp
|
||||
from mitmproxy.net import tcp, tls
|
||||
from mitmproxy import certs as mcerts
|
||||
from mitmproxy.net import websockets
|
||||
from mitmproxy import version
|
||||
@ -37,8 +37,8 @@ class SSLOptions:
|
||||
sans=(),
|
||||
not_after_connect=None,
|
||||
request_client_cert=False,
|
||||
ssl_version=tcp.SSL_DEFAULT_METHOD,
|
||||
ssl_options=tcp.SSL_DEFAULT_OPTIONS,
|
||||
ssl_version=tls.DEFAULT_METHOD,
|
||||
ssl_options=tls.DEFAULT_OPTIONS,
|
||||
ciphers=None,
|
||||
certs=None,
|
||||
alpn_select=b'h2',
|
||||
|
@ -4,7 +4,7 @@ import os
|
||||
import os.path
|
||||
import re
|
||||
|
||||
from mitmproxy.net import tcp
|
||||
from mitmproxy.net import tls
|
||||
from mitmproxy.utils import human
|
||||
from mitmproxy import version
|
||||
from . import pathod
|
||||
@ -143,7 +143,7 @@ def args_pathod(argv, stdout_=sys.stdout, stderr_=sys.stderr):
|
||||
)
|
||||
group.add_argument(
|
||||
"--ssl-version", dest="ssl_version", type=str, default="secure",
|
||||
choices=tcp.sslversion_choices.keys(),
|
||||
choices=tls.VERSION_CHOICES.keys(),
|
||||
help="Set supported SSL/TLS versions. "
|
||||
"SSLv2, SSLv3 and 'all' are INSECURE. Defaults to secure, which is TLS1.0+."
|
||||
)
|
||||
@ -182,7 +182,7 @@ def args_pathod(argv, stdout_=sys.stdout, stderr_=sys.stderr):
|
||||
|
||||
args = parser.parse_args(argv[1:])
|
||||
|
||||
args.ssl_version, args.ssl_options = tcp.sslversion_choices[args.ssl_version]
|
||||
args.ssl_version, args.ssl_options = tls.VERSION_CHOICES[args.ssl_version]
|
||||
|
||||
certs = []
|
||||
for i in args.ssl_certs:
|
||||
|
@ -789,40 +789,6 @@ class TestPeekSSL(TestPeek):
|
||||
return conn.pop()
|
||||
|
||||
|
||||
class TestSSLKeyLogger(tservers.ServerTestBase):
|
||||
handler = EchoHandler
|
||||
ssl = dict(
|
||||
cipher_list="AES256-SHA"
|
||||
)
|
||||
|
||||
def test_log(self, tmpdir):
|
||||
testval = b"echo!\n"
|
||||
_logfun = tcp.log_ssl_key
|
||||
|
||||
logfile = str(tmpdir.join("foo", "bar", "logfile"))
|
||||
tcp.log_ssl_key = tcp.SSLKeyLogger(logfile)
|
||||
|
||||
c = tcp.TCPClient(("127.0.0.1", self.port))
|
||||
with c.connect():
|
||||
c.convert_to_ssl()
|
||||
c.wfile.write(testval)
|
||||
c.wfile.flush()
|
||||
assert c.rfile.readline() == testval
|
||||
c.finish()
|
||||
|
||||
tcp.log_ssl_key.close()
|
||||
with open(logfile, "rb") as f:
|
||||
assert f.read().count(b"CLIENT_RANDOM") == 2
|
||||
|
||||
tcp.log_ssl_key = _logfun
|
||||
|
||||
def test_create_logfun(self):
|
||||
assert isinstance(
|
||||
tcp.SSLKeyLogger.create_logfun("test"),
|
||||
tcp.SSLKeyLogger)
|
||||
assert not tcp.SSLKeyLogger.create_logfun(False)
|
||||
|
||||
|
||||
class TestSSLInvalid(tservers.ServerTestBase):
|
||||
handler = EchoHandler
|
||||
ssl = True
|
||||
|
38
test/mitmproxy/net/test_tls.py
Normal file
38
test/mitmproxy/net/test_tls.py
Normal file
@ -0,0 +1,38 @@
|
||||
from mitmproxy.net import tls
|
||||
from mitmproxy.net.tcp import TCPClient
|
||||
from test.mitmproxy.net.test_tcp import EchoHandler
|
||||
from . import tservers
|
||||
|
||||
|
||||
class TestSSLKeyLogger(tservers.ServerTestBase):
|
||||
handler = EchoHandler
|
||||
ssl = dict(
|
||||
cipher_list="AES256-SHA"
|
||||
)
|
||||
|
||||
def test_log(self, tmpdir):
|
||||
testval = b"echo!\n"
|
||||
_logfun = tls.log_master_secret
|
||||
|
||||
logfile = str(tmpdir.join("foo", "bar", "logfile"))
|
||||
tls.log_master_secret = tls.MasterSecretLogger(logfile)
|
||||
|
||||
c = TCPClient(("127.0.0.1", self.port))
|
||||
with c.connect():
|
||||
c.convert_to_ssl()
|
||||
c.wfile.write(testval)
|
||||
c.wfile.flush()
|
||||
assert c.rfile.readline() == testval
|
||||
c.finish()
|
||||
|
||||
tls.log_master_secret.close()
|
||||
with open(logfile, "rb") as f:
|
||||
assert f.read().count(b"CLIENT_RANDOM") == 2
|
||||
|
||||
tls.log_master_secret = _logfun
|
||||
|
||||
def test_create_logfun(self):
|
||||
assert isinstance(
|
||||
tls.MasterSecretLogger.create_logfun("test"),
|
||||
tls.MasterSecretLogger)
|
||||
assert not tls.MasterSecretLogger.create_logfun(False)
|
Loading…
Reference in New Issue
Block a user