[sans-io] tls: handle untrusted mitmproxy certs

This commit is contained in:
Maximilian Hils 2019-11-24 17:39:53 +01:00
parent b075b7fc15
commit 9e6548e581
2 changed files with 115 additions and 79 deletions

View File

@ -92,6 +92,9 @@ HTTP_ALPNS = (b"h2", b"http/1.1", b"http/1.0", b"http/0.9")
class EstablishServerTLS(commands.ConnectionCommand):
"""Establish TLS on the given connection.
If TLS establishment fails, the connection will automatically be closed by the TLS layer."""
connection: context.Server
blocking = True
@ -177,15 +180,20 @@ class _TLSLayer(layer.Layer):
return False, None
except SSL.Error as e:
# provide more detailed information for some errors.
last_err = e.args[0][-1]
last_err = e.args and isinstance(e.args[0], list) and e.args[0] and e.args[0][-1]
if last_err == ('SSL routines', 'tls_process_server_certificate', 'certificate verify failed'):
verify_result = SSL._lib.SSL_get_verify_result(self.tls[conn]._ssl)
error = SSL._ffi.string(SSL._lib.X509_verify_cert_error_string(verify_result)).decode()
return False, f"Certificate verify failed: {error}"
elif last_err == ('SSL routines', 'ssl3_read_bytes', 'tlsv1 alert unknown ca'):
return False, "TLS Alert: Unknown CA"
err = f"Certificate verify failed: {error}"
elif last_err in [
('SSL routines', 'ssl3_read_bytes', 'tlsv1 alert unknown ca'),
('SSL routines', 'ssl3_read_bytes', 'sslv3 alert bad certificate')
]:
err = last_err[2]
else:
return False, repr(e)
err = repr(e)
yield from self.on_handshake_error(conn, err)
return False, err
else:
# Get all peer certificates.
# https://www.openssl.org/docs/man1.1.1/man3/SSL_get_peer_cert_chain.html
@ -197,7 +205,6 @@ class _TLSLayer(layer.Layer):
if cert:
all_certs.insert(0, cert)
conn.tls_established = True
conn.sni = self.tls[conn].get_servername()
conn.alpn = self.tls[conn].get_alpn_proto_negotiated()
@ -250,15 +257,20 @@ class _TLSLayer(layer.Layer):
yield from self.negotiate(event.connection, event.data)
else:
yield from self.receive(event.connection, event.data)
elif (
isinstance(event, events.ConnectionClosed) and
event.connection in self.tls and
self.tls[event.connection].get_shutdown() & SSL.RECEIVED_SHUTDOWN
):
pass # We have already dispatched a ConnectionClosed to the child layer.
elif isinstance(event, events.ConnectionClosed) and event.connection in self.tls:
if event.connection.tls_established:
if self.tls[event.connection].get_shutdown() & SSL.RECEIVED_SHUTDOWN:
pass # We have already dispatched a ConnectionClosed to the child layer.
else:
yield from self.event_to_child(event)
else:
yield from self.on_handshake_error(event.connection, "connection closed without notice")
else:
yield from self.event_to_child(event)
def on_handshake_error(self, conn: context.Connection, err: str) -> commands.TCommandGenerator:
yield commands.CloseConnection(conn)
class ServerTLSLayer(_TLSLayer):
"""
@ -287,6 +299,13 @@ class ServerTLSLayer(_TLSLayer):
else:
yield command
def on_handshake_error(self, conn: context.Connection, err: str) -> commands.TCommandGenerator:
yield commands.Log(
f"Server TLS handshake failed. {err}",
level="warn"
)
yield from super().on_handshake_error(conn, err)
class ClientTLSLayer(_TLSLayer):
"""
@ -320,13 +339,13 @@ class ClientTLSLayer(_TLSLayer):
self._handle_event = self.state_wait_for_clienthello
yield from ()
def state_wait_for_clienthello(self, event: events.Event):
def state_wait_for_clienthello(self, event: events.Event) -> commands.TCommandGenerator:
client = self.context.client
if isinstance(event, events.DataReceived) and event.connection == client:
self.recv_buffer.extend(event.data)
try:
client_hello = parse_client_hello(self.recv_buffer)
except ValueError as e:
except ValueError:
yield commands.Log(f"Cannot parse ClientHello: {self.recv_buffer.hex()}")
yield commands.CloseConnection(client)
return
@ -373,21 +392,18 @@ class ClientTLSLayer(_TLSLayer):
)
return err
def negotiate(self, conn: context.Connection, data: bytes) -> Generator[commands.Command, Any, bool]:
done, err = yield from super().negotiate(conn, data)
if err:
if self.context.client.sni:
dest = self.context.client.sni.decode("idna")
else:
dest = human.format_address(self.context.server.address)
if "Unknown CA" in err:
keyword = "does not"
else:
keyword = "may not"
yield commands.Log(
f"Client TLS Handshake failed. "
f"The client {keyword} trust the proxy's certificate for {dest} ({err}).",
level="warn"
)
yield commands.CloseConnection(self.context.client)
return done
def on_handshake_error(self, conn: context.Connection, err: str) -> commands.TCommandGenerator:
if conn.sni:
dest = conn.sni.decode("idna")
else:
dest = human.format_address(self.context.server.address)
if "unknown ca" in err or "bad certificate" in err:
keyword = "does not"
else:
keyword = "may not"
yield commands.Log(
f"Client TLS handshake failed. "
f"The client {keyword} trust the proxy's certificate for {dest} ({err})",
level="warn"
)
yield from super().on_handshake_error(conn, err)

View File

@ -276,12 +276,17 @@ class TestServerTLS:
assert (
playbook
>> events.DataReceived(tctx.server, tssl.out.read())
<< commands.Log("Server TLS handshake failed. Certificate verify failed: Hostname mismatch","warn")
<< commands.CloseConnection(tctx.server)
<< commands.SendData(tctx.client, b"server-tls-failed: Certificate verify failed: Hostname mismatch")
)
assert not tctx.server.tls_established
def _make_client_tls_layer(tctx: context.Context) -> typing.Tuple[tutils.Playbook, tls.ClientTLSLayer]:
def make_client_tls_layer(
tctx: context.Context,
**kwargs
) -> typing.Tuple[tutils.Playbook, tls.ClientTLSLayer, SSLTest]:
# This is a bit contrived as the client layer expects a server layer as parent.
# We also set child layers manually to avoid NextLayer noise.
server_layer = tls.ServerTLSLayer(tctx)
@ -289,18 +294,13 @@ def _make_client_tls_layer(tctx: context.Context) -> typing.Tuple[tutils.Playboo
server_layer.child_layer = client_layer
client_layer.child_layer = TlsEchoLayer(tctx)
playbook = tutils.Playbook(server_layer)
return playbook, client_layer
def _test_tls_client_server(tctx: context.Context, **kwargs) -> typing.Tuple[
tutils.Playbook, tls.ClientTLSLayer, SSLTest]:
playbook, client_layer = _make_client_tls_layer(tctx)
tctx.server.tls = True
# Add some server config, this is needed anyways.
tctx.server.address = ("example.mitmproxy.org", 443)
tctx.server.sni = b"example.mitmproxy.org"
tssl_client = SSLTest(**kwargs)
# Send ClientHello
tssl_client = SSLTest(**kwargs)
# Start handshake.
with pytest.raises(ssl.SSLWantReadError):
tssl_client.obj.do_handshake()
@ -310,69 +310,43 @@ def _test_tls_client_server(tctx: context.Context, **kwargs) -> typing.Tuple[
class TestClientTLS:
def test_client_only(self, tctx: context.Context):
"""Test TLS with client only"""
playbook, client_layer = _make_client_tls_layer(tctx)
tssl = SSLTest()
playbook, client_layer, tssl_client = make_client_tls_layer(tctx)
assert not tctx.client.tls_established
# Start Handshake, send ClientHello and ServerHello
with pytest.raises(ssl.SSLWantReadError):
tssl.obj.do_handshake()
# Send ClientHello, receive ServerHello
data = tutils.Placeholder()
assert (
playbook
>> events.DataReceived(tctx.client, tssl.out.read())
>> events.DataReceived(tctx.client, tssl_client.out.read())
<< commands.Hook("tls_clienthello", tutils.Placeholder())
>> tutils.reply()
<< commands.Hook("tls_start", tutils.Placeholder())
>> reply_tls_start()
<< commands.SendData(tctx.client, data)
)
tssl.inc.write(data())
tssl.obj.do_handshake()
tssl_client.inc.write(data())
tssl_client.obj.do_handshake()
# Finish Handshake
interact(playbook, tctx.client, tssl)
interact(playbook, tctx.client, tssl_client)
assert tssl.obj.getpeercert(True)
assert tssl_client.obj.getpeercert(True)
assert tctx.client.tls_established
# Echo
_test_echo(playbook, tssl, tctx.client)
_test_echo(playbook, tssl_client, tctx.client)
assert (
playbook
>> events.DataReceived(tctx.server, b"Plaintext")
<< commands.SendData(tctx.server, b"plaintext")
)
def test_server_not_required(self, tctx):
"""
Here we test the scenario where a server connection is _not_ required
to establish TLS with the client. After determining this when parsing the ClientHello,
we only establish a connection with the client. The server connection may ultimately
be established when OpenConnection is called.
"""
playbook, client_layer, tssl = _test_tls_client_server(tctx)
data = tutils.Placeholder()
assert (
playbook
>> events.DataReceived(tctx.client, tssl.out.read())
<< commands.Hook("tls_clienthello", tutils.Placeholder())
>> tutils.reply()
<< commands.Hook("tls_start", tutils.Placeholder())
>> reply_tls_start()
<< commands.SendData(tctx.client, data)
)
tssl.inc.write(data())
tssl.obj.do_handshake()
interact(playbook, tctx.client, tssl)
assert tctx.client.tls_established
def test_server_required(self, tctx):
"""
Here we test the scenario where a server connection is required (for example, because SNI is missing)
Test the scenario where a server connection is required (for example, because of an unknown ALPN)
to establish TLS with the client.
"""
tssl_server = SSLTest(server_side=True, alpn=["quux"])
playbook, client_layer, tssl_client = _test_tls_client_server(tctx, alpn=["quux"])
playbook, client_layer, tssl_client = make_client_tls_layer(tctx, alpn=["quux"])
# We should now get instructed to open a server connection.
data = tutils.Placeholder()
@ -430,8 +404,8 @@ class TestClientTLS:
_test_echo(playbook, tssl_client, tctx.client)
def test_cannot_parse_clienthello(self, tctx: context.Context):
"""We have a client layer, but we only receive garbage."""
playbook, client_layer = _make_client_tls_layer(tctx)
"""Test the scenario where we cannot parse the ClientHello"""
playbook, client_layer, tssl_client = make_client_tls_layer(tctx)
invalid = b"\x16\x03\x01\x00\x00"
@ -442,3 +416,49 @@ class TestClientTLS:
<< commands.CloseConnection(tctx.client)
)
assert not tctx.client.tls_established
def test_mitmproxy_ca_is_untrusted(self, tctx: context.Context):
"""Test the scenario where the client doesn't trust the mitmproxy CA."""
playbook, client_layer, tssl_client = make_client_tls_layer(tctx, sni=b"wrong.host.mitmproxy.org")
data = tutils.Placeholder()
assert (
playbook
>> events.DataReceived(tctx.client, tssl_client.out.read())
<< commands.Hook("tls_clienthello", tutils.Placeholder())
>> tutils.reply()
<< commands.Hook("tls_start", tutils.Placeholder())
>> reply_tls_start()
<< commands.SendData(tctx.client, data)
)
tssl_client.inc.write(data())
with pytest.raises(ssl.SSLCertVerificationError):
tssl_client.obj.do_handshake()
# Finish Handshake
assert (
playbook
>> events.DataReceived(tctx.client, tssl_client.out.read())
<< commands.Log("Client TLS handshake failed. The client does not trust the proxy's certificate "
"for wrong.host.mitmproxy.org (sslv3 alert bad certificate)", "warn")
<< commands.CloseConnection(tctx.client)
)
assert not tctx.client.tls_established
def test_mitmproxy_ca_is_untrusted_immediate_disconnect(self, tctx: context.Context):
"""Test the scenario where the client doesn't trust the mitmproxy CA."""
playbook, client_layer, tssl_client = make_client_tls_layer(tctx, sni=b"wrong.host.mitmproxy.org")
data = tutils.Placeholder()
assert (
playbook
>> events.DataReceived(tctx.client, tssl_client.out.read())
<< commands.Hook("tls_clienthello", tutils.Placeholder())
>> tutils.reply()
<< commands.Hook("tls_start", tutils.Placeholder())
>> reply_tls_start()
<< commands.SendData(tctx.client, data)
>> events.ConnectionClosed(tctx.client)
<< commands.Log("Client TLS handshake failed. The client may not trust the proxy's certificate "
"for wrong.host.mitmproxy.org (connection closed without notice)", "warn")
<< commands.CloseConnection(tctx.client)
)