improve invalid certificate ux

This commit is contained in:
Maximilian Hils 2016-07-27 21:01:28 -07:00
parent 17fdb841f0
commit 8b325fd65a
9 changed files with 70 additions and 50 deletions

View File

@ -277,11 +277,11 @@ class ConsoleMaster(flow.FlowMaster):
if self.options.verbosity < utils.log_tier(level): if self.options.verbosity < utils.log_tier(level):
return return
if level == "error": if level in ("error", "warn"):
signals.status_message.send( signals.status_message.send(
message = "Error: %s" % str(e) message = "{}: {}".format(level.title(), e)
) )
e = urwid.Text(("error", str(e))) e = urwid.Text((level, str(e)))
else: else:
e = urwid.Text(str(e)) e = urwid.Text(str(e))
self.logbuffer.append(e) self.logbuffer.append(e)

View File

@ -8,7 +8,6 @@ from mitmproxy.console import grideditor
from mitmproxy.console import palettes from mitmproxy.console import palettes
from mitmproxy.console import select from mitmproxy.console import select
from mitmproxy.console import signals from mitmproxy.console import signals
from OpenSSL import SSL
footer = [ footer = [
('heading_key', "enter/space"), ":toggle ", ('heading_key', "enter/space"), ":toggle ",

View File

@ -104,7 +104,7 @@ class DumpMaster(flow.FlowMaster):
click.secho( click.secho(
e, e,
file=self.options.tfile, file=self.options.tfile,
fg="red" if level == "error" else None, fg=dict(error="red", warn="yellow").get(level),
dim=(level == "debug"), dim=(level == "debug"),
err=(level == "error") err=(level == "error")
) )

View File

@ -44,6 +44,12 @@ class ClientHandshakeException(TlsProtocolException):
self.server = server self.server = server
class InvalidServerCertificate(TlsProtocolException):
def __repr__(self):
# In contrast to most others, this is a user-facing error which needs to look good.
return str(self)
class Socks5ProtocolException(ProtocolException): class Socks5ProtocolException(ProtocolException):
pass pass

View File

@ -225,7 +225,7 @@ class HTTPFlow(Flow):
def make_error_response(status_code, message, headers=None): def make_error_response(status_code, message, headers=None):
response = status_codes.RESPONSES.get(status_code, "Unknown").encode() response = status_codes.RESPONSES.get(status_code, "Unknown")
body = """ body = """
<html> <html>
<head> <head>

View File

@ -543,25 +543,12 @@ class TlsLayer(base.Layer):
) )
tls_cert_err = self.server_conn.ssl_verification_error tls_cert_err = self.server_conn.ssl_verification_error
if tls_cert_err is not None: if tls_cert_err is not None:
self.log( self.log(str(tls_cert_err), "warn")
"TLS verification failed for upstream server at depth %s with error: %s" % self.log("Ignoring server verification error, continuing with connection", "warn")
(tls_cert_err['depth'], tls_cert_err['errno']),
"error")
self.log("Ignoring server verification error, continuing with connection", "error")
except netlib.exceptions.InvalidCertificateException as e: except netlib.exceptions.InvalidCertificateException as e:
tls_cert_err = self.server_conn.ssl_verification_error
self.log(
"TLS verification failed for upstream server at depth %s with error: %s" %
(tls_cert_err['depth'], tls_cert_err['errno']),
"error")
self.log("Aborting connection attempt", "error")
six.reraise( six.reraise(
exceptions.TlsProtocolException, exceptions.InvalidServerCertificate,
exceptions.TlsProtocolException("Cannot establish TLS with {address} (sni: {sni}): {e}".format( exceptions.InvalidServerCertificate(str(e)),
address=repr(self.server_conn.address),
sni=self.server_sni,
e=repr(e),
)),
sys.exc_info()[2] sys.exc_info()[2]
) )
except netlib.exceptions.TlsException as e: except netlib.exceptions.TlsException as e:

View File

@ -125,11 +125,14 @@ class ConnectionHandler(object):
self.log( self.log(
"Client Handshake failed. " "Client Handshake failed. "
"The client may not trust the proxy's certificate for {}.".format(e.server), "The client may not trust the proxy's certificate for {}.".format(e.server),
"error" "warn"
) )
self.log(repr(e), "debug") self.log(repr(e), "debug")
elif isinstance(e, exceptions.InvalidServerCertificate):
self.log(str(e), "warn")
self.log("Invalid certificate, closing connection. Pass --insecure to disable validation.", "warn")
else: else:
self.log(repr(e), "info") self.log(repr(e), "warn")
self.log(traceback.format_exc(), "debug") self.log(traceback.format_exc(), "debug")
# If an error propagates to the topmost level, # If an error propagates to the topmost level,

View File

@ -8,6 +8,10 @@ import time
import traceback import traceback
import binascii import binascii
from typing import Optional # noqa
from netlib import strutils
from six.moves import range from six.moves import range
import certifi import certifi
@ -35,7 +39,7 @@ EINTR = 4
if os.environ.get("NO_ALPN"): if os.environ.get("NO_ALPN"):
HAS_ALPN = False HAS_ALPN = False
else: else:
HAS_ALPN = OpenSSL._util.lib.Cryptography_HAS_ALPN HAS_ALPN = SSL._lib.Cryptography_HAS_ALPN
# To enable all SSL methods use: SSLv23 # To enable all SSL methods use: SSLv23
# then add options to disable certain methods # then add options to disable certain methods
@ -287,16 +291,7 @@ class Reader(_FileLike):
raise exceptions.TcpException(repr(e)) raise exceptions.TcpException(repr(e))
elif isinstance(self.o, SSL.Connection): elif isinstance(self.o, SSL.Connection):
try: try:
if tuple(int(x) for x in OpenSSL.__version__.split(".")[:2]) > (0, 15):
return self.o.recv(length, socket.MSG_PEEK) return self.o.recv(length, socket.MSG_PEEK)
else:
# TODO: remove once a new version is released
# Polyfill for pyOpenSSL <= 0.15.1
# Taken from https://github.com/pyca/pyopenssl/commit/1d95dea7fea03c7c0df345a5ea30c12d8a0378d2
buf = SSL._ffi.new("char[]", length)
result = SSL._lib.SSL_peek(self.o._ssl, buf, length)
self.o._raise_ssl_error(self.o._ssl, result)
return SSL._ffi.buffer(buf, result)[:]
except SSL.Error as e: except SSL.Error as e:
six.reraise(exceptions.TlsException, exceptions.TlsException(str(e)), sys.exc_info()[2]) six.reraise(exceptions.TlsException, exceptions.TlsException(str(e)), sys.exc_info()[2])
else: else:
@ -436,6 +431,23 @@ def close_socket(sock):
sock.close() sock.close()
class SSLVerificationError:
def __init__(self, errno, depth, message=None):
self.errno = errno
self.depth = depth
if message:
self.message = message
else:
self.message = strutils.native(SSL._ffi.string(SSL._lib.X509_verify_cert_error_string(errno)), "utf8")
def __str__(self):
return "Certificate Verification Error: {} (errno: {}, depth: {})".format(
self.message,
self.errno,
self.depth
)
class _Connection(object): class _Connection(object):
rbufsize = -1 rbufsize = -1
@ -511,6 +523,7 @@ class _Connection(object):
alpn_protos=None, alpn_protos=None,
alpn_select=None, alpn_select=None,
alpn_select_callback=None, alpn_select_callback=None,
sni=None,
): ):
""" """
Creates an SSL Context. Creates an SSL Context.
@ -532,8 +545,14 @@ class _Connection(object):
if verify_options is not None: if verify_options is not None:
def verify_cert(conn, x509, errno, err_depth, is_cert_verified): def verify_cert(conn, x509, errno, err_depth, is_cert_verified):
if not is_cert_verified: if not is_cert_verified:
self.ssl_verification_error = dict(errno=errno, self.ssl_verification_error = exceptions.InvalidCertificateException(
depth=err_depth) "Certificate Verification Error for {}: {} (errno: {}, depth: {})".format(
sni,
strutils.native(SSL._ffi.string(SSL._lib.X509_verify_cert_error_string(errno)), "utf8"),
errno,
err_depth
)
)
return is_cert_verified return is_cert_verified
context.set_verify(verify_options, verify_cert) context.set_verify(verify_options, verify_cert)
@ -609,7 +628,7 @@ class TCPClient(_Connection):
self.source_address = source_address self.source_address = source_address
self.cert = None self.cert = None
self.server_certs = [] self.server_certs = []
self.ssl_verification_error = None self.ssl_verification_error = None # type: Optional[SSLVerificationError]
self.sni = None self.sni = None
@property @property
@ -671,6 +690,7 @@ class TCPClient(_Connection):
context = self.create_ssl_context( context = self.create_ssl_context(
alpn_protos=alpn_protos, alpn_protos=alpn_protos,
sni=sni,
**sslctx_kwargs **sslctx_kwargs
) )
self.connection = SSL.Connection(context, self.connection) self.connection = SSL.Connection(context, self.connection)
@ -682,14 +702,14 @@ class TCPClient(_Connection):
self.connection.do_handshake() self.connection.do_handshake()
except SSL.Error as v: except SSL.Error as v:
if self.ssl_verification_error: if self.ssl_verification_error:
raise exceptions.InvalidCertificateException("SSL handshake error: %s" % repr(v)) raise self.ssl_verification_error
else: else:
raise exceptions.TlsException("SSL handshake error: %s" % repr(v)) raise exceptions.TlsException("SSL handshake error: %s" % repr(v))
else: else:
# Fix for pre v1.0 OpenSSL, which doesn't throw an exception on # Fix for pre v1.0 OpenSSL, which doesn't throw an exception on
# certificate validation failure # certificate validation failure
if verification_mode == SSL.VERIFY_PEER and self.ssl_verification_error is not None: if verification_mode == SSL.VERIFY_PEER and self.ssl_verification_error:
raise exceptions.InvalidCertificateException("SSL handshake error: certificate verify failed") raise self.ssl_verification_error
self.cert = certutils.SSLCert(self.connection.get_peer_certificate()) self.cert = certutils.SSLCert(self.connection.get_peer_certificate())
@ -710,9 +730,14 @@ class TCPClient(_Connection):
hostname = "no-hostname" hostname = "no-hostname"
ssl_match_hostname.match_hostname(crt, hostname) ssl_match_hostname.match_hostname(crt, hostname)
except (ValueError, ssl_match_hostname.CertificateError) as e: except (ValueError, ssl_match_hostname.CertificateError) as e:
self.ssl_verification_error = dict(depth=0, errno="Invalid Hostname") self.ssl_verification_error = exceptions.InvalidCertificateException(
"Certificate Verification Error for {}: {}".format(
sni or repr(self.address),
str(e)
)
)
if verification_mode == SSL.VERIFY_PEER: if verification_mode == SSL.VERIFY_PEER:
raise exceptions.InvalidCertificateException("Presented certificate for {} is not valid: {}".format(sni, str(e))) raise self.ssl_verification_error
self.ssl_established = True self.ssl_established = True
self.rfile.set_descriptor(self.connection) self.rfile.set_descriptor(self.connection)

View File

@ -213,7 +213,7 @@ class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase):
# Verification errors should be saved even if connection isn't aborted # Verification errors should be saved even if connection isn't aborted
# aborted # aborted
assert c.ssl_verification_error is not None assert c.ssl_verification_error
testval = b"echo!\n" testval = b"echo!\n"
c.wfile.write(testval) c.wfile.write(testval)
@ -226,7 +226,7 @@ class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase):
c.convert_to_ssl(verify_options=SSL.VERIFY_NONE) c.convert_to_ssl(verify_options=SSL.VERIFY_NONE)
# Verification errors should be saved even if connection isn't aborted # Verification errors should be saved even if connection isn't aborted
assert c.ssl_verification_error is not None assert c.ssl_verification_error
testval = b"echo!\n" testval = b"echo!\n"
c.wfile.write(testval) c.wfile.write(testval)
@ -243,11 +243,11 @@ class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase):
ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt") ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt")
) )
assert c.ssl_verification_error is not None assert c.ssl_verification_error
# Unknown issuing certificate authority for first certificate # Unknown issuing certificate authority for first certificate
assert c.ssl_verification_error['errno'] == 18 assert "errno: 18" in str(c.ssl_verification_error)
assert c.ssl_verification_error['depth'] == 0 assert "depth: 0" in str(c.ssl_verification_error)
class TestSSLUpstreamCertVerificationWBadHostname(tservers.ServerTestBase): class TestSSLUpstreamCertVerificationWBadHostname(tservers.ServerTestBase):
@ -276,7 +276,7 @@ class TestSSLUpstreamCertVerificationWBadHostname(tservers.ServerTestBase):
verify_options=SSL.VERIFY_PEER, verify_options=SSL.VERIFY_PEER,
ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt") ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt")
) )
assert c.ssl_verification_error is not None assert c.ssl_verification_error
class TestSSLUpstreamCertVerificationWValidCertChain(tservers.ServerTestBase): class TestSSLUpstreamCertVerificationWValidCertChain(tservers.ServerTestBase):