tls: add tls_handshake, ignore-after-clienthello

this fixes #4702
This commit is contained in:
Maximilian Hils 2021-09-04 00:24:33 +02:00
parent 0437d2935e
commit 98a3e33477
7 changed files with 160 additions and 125 deletions

View File

@ -3,11 +3,13 @@
## Unreleased: mitmproxy next
* Support proxy authentication for SOCKS v5 mode (@starplanet)
* Make it possible to ignore connections in the tls_clienthello event hook (@mhils)
* Add `tls_handshake` event hook to record negotiation success/failure (@mhils)
* fix some responses not being decoded properly if the encoding was uppercase #4735 (@Mattwmaster58)
* Expose TLS 1.0 as possible minimum version on older pyOpenSSL releases
* Improve error message on TLS version mismatch.
* Expose TLS 1.0 as possible minimum version on older pyOpenSSL releases (@mhils)
* Improve error message on TLS version mismatch. (@mhils)
* Windows: Switch to Python's default asyncio event loop, which increases the number of sockets
that can be processed simultaneously.
that can be processed simultaneously (@mhils)
## 4 August 2021: mitmproxy 7.0.2

View File

@ -1,8 +1,5 @@
# FIXME: This addon is currently not compatible with mitmproxy 7 and above.
"""
This inline script allows conditional TLS Interception based
on a user-defined strategy.
This addon allows conditional TLS Interception based on a user-defined strategy.
Example:
@ -11,138 +8,103 @@ Example:
1. curl --proxy http://localhost:8080 https://example.com --insecure
// works - we'll also see the contents in mitmproxy
2. curl --proxy http://localhost:8080 https://example.com --insecure
// still works - we'll also see the contents in mitmproxy
3. curl --proxy http://localhost:8080 https://example.com
2. curl --proxy http://localhost:8080 https://example.com
// fails with a certificate error, which we will also see in mitmproxy
4. curl --proxy http://localhost:8080 https://example.com
3. curl --proxy http://localhost:8080 https://example.com
// works again, but mitmproxy does not intercept and we do *not* see the contents
Authors: Maximilian Hils, Matthew Tuusberg
"""
import collections
import random
from abc import ABC, abstractmethod
from enum import Enum
import mitmproxy
from mitmproxy import ctx
from mitmproxy.exceptions import TlsProtocolException
from mitmproxy.proxy.protocol import TlsLayer, RawTCPLayer
from mitmproxy import connection, ctx
from mitmproxy.proxy.layers import tls
from mitmproxy.utils import human
class InterceptionResult(Enum):
success = True
failure = False
skipped = None
SUCCESS = 1
FAILURE = 2
SKIPPED = 3
class _TlsStrategy:
"""
Abstract base class for interception strategies.
"""
class TlsStrategy(ABC):
def __init__(self):
# A server_address -> interception results mapping
self.history = collections.defaultdict(lambda: collections.deque(maxlen=200))
def should_intercept(self, server_address):
"""
Returns:
True, if we should attempt to intercept the connection.
False, if we want to employ pass-through instead.
"""
@abstractmethod
def should_intercept(self, server_address: connection.Address) -> bool:
raise NotImplementedError()
def record_success(self, server_address):
self.history[server_address].append(InterceptionResult.success)
self.history[server_address].append(InterceptionResult.SUCCESS)
def record_failure(self, server_address):
self.history[server_address].append(InterceptionResult.failure)
self.history[server_address].append(InterceptionResult.FAILURE)
def record_skipped(self, server_address):
self.history[server_address].append(InterceptionResult.skipped)
self.history[server_address].append(InterceptionResult.SKIPPED)
class ConservativeStrategy(_TlsStrategy):
class ConservativeStrategy(TlsStrategy):
"""
Conservative Interception Strategy - only intercept if there haven't been any failed attempts
in the history.
"""
def should_intercept(self, server_address):
if InterceptionResult.failure in self.history[server_address]:
return False
return True
def should_intercept(self, server_address: connection.Address) -> bool:
return InterceptionResult.FAILURE not in self.history[server_address]
class ProbabilisticStrategy(_TlsStrategy):
class ProbabilisticStrategy(TlsStrategy):
"""
Fixed probability that we intercept a given connection.
"""
def __init__(self, p):
def __init__(self, p: float):
self.p = p
super().__init__()
def should_intercept(self, server_address):
def should_intercept(self, server_address: connection.Address) -> bool:
return random.uniform(0, 1) < self.p
class TlsFeedback(TlsLayer):
"""
Monkey-patch _establish_tls_with_client to get feedback if TLS could be established
successfully on the client connection (which may fail due to cert pinning).
"""
class MaybeTls:
strategy: TlsStrategy
def _establish_tls_with_client(self):
server_address = self.server_conn.address
try:
super()._establish_tls_with_client()
except TlsProtocolException as e:
tls_strategy.record_failure(server_address)
raise e
else:
tls_strategy.record_success(server_address)
# inline script hooks below.
tls_strategy = None
def load(l):
def load(self, l):
l.add_option(
"tlsstrat", int, 0, "TLS passthrough strategy (0-100)",
"tls_strategy", int, 0,
"TLS passthrough strategy. If set to 0, connections will be passed through after the first unsuccessful "
"handshake. If set to 0 < p <= 100, connections with be passed through with probability p.",
)
def configure(updated):
global tls_strategy
if ctx.options.tlsstrat > 0:
tls_strategy = ProbabilisticStrategy(float(ctx.options.tlsstrat) / 100.0)
def configure(self, updated):
if "tls_strategy" not in updated:
return
if ctx.options.tls_strategy > 0:
self.strategy = ProbabilisticStrategy(ctx.options.tls_strategy / 100)
else:
tls_strategy = ConservativeStrategy()
self.strategy = ConservativeStrategy()
def tls_clienthello(self, data: tls.ClientHelloData):
server_address = data.context.server.peername
if not self.strategy.should_intercept(server_address):
ctx.log(f"TLS passthrough: {human.format_address(server_address)}.")
data.ignore_connection = True
self.strategy.record_skipped(server_address)
def next_layer(next_layer):
"""
This hook does the actual magic - if the next layer is planned to be a TLS layer,
we check if we want to enter pass-through mode instead.
"""
if isinstance(next_layer, TlsLayer) and next_layer._client_tls:
server_address = next_layer.server_conn.address
if tls_strategy.should_intercept(server_address):
# We try to intercept.
# Monkey-Patch the layer to get feedback from the TLSLayer if interception worked.
next_layer.__class__ = TlsFeedback
def tls_handshake(self, data: tls.TlsHookData):
if isinstance(data.conn, connection.Server):
return
server_address = data.context.server.peername
if data.conn.error is None:
ctx.log(f"TLS handshake successful: {human.format_address(server_address)}")
self.strategy.record_success(server_address)
else:
# We don't intercept - reply with a pass-through layer and add a "skipped" entry.
mitmproxy.ctx.log("TLS passthrough for %s" % repr(next_layer.server_conn.address), "info")
next_layer_replacement = RawTCPLayer(next_layer.ctx, ignore=True)
next_layer.reply.send(next_layer_replacement)
tls_strategy.record_skipped(server_address)
ctx.log(f"TLS handshake failed: {human.format_address(server_address)}")
self.strategy.record_failure(server_address)
addons = [MaybeTls()]

View File

@ -112,7 +112,7 @@ class TlsConfig:
ctx.options.upstream_cert
)
def tls_start_client(self, tls_start: tls.TlsStartData) -> None:
def tls_start_client(self, tls_start: tls.TlsHookData) -> None:
"""Establish TLS between client and proxy."""
client: connection.Client = tls_start.context.client
server: connection.Server = tls_start.context.server
@ -159,7 +159,7 @@ class TlsConfig:
))
tls_start.ssl_conn.set_accept_state()
def tls_start_server(self, tls_start: tls.TlsStartData) -> None:
def tls_start_server(self, tls_start: tls.TlsHookData) -> None:
"""Establish TLS between proxy and server."""
client: connection.Client = tls_start.context.client
server: connection.Server = tls_start.context.server

View File

@ -9,6 +9,7 @@ from mitmproxy.net import tls as net_tls
from mitmproxy.proxy import commands, events, layer, tunnel
from mitmproxy.proxy import context
from mitmproxy.proxy.commands import StartHook
from mitmproxy.proxy.layers import tcp
from mitmproxy.utils import human
@ -103,6 +104,10 @@ class ClientHelloData:
"""The context object for this connection."""
client_hello: net_tls.ClientHello
"""The entire parsed TLS ClientHello."""
ignore_connection: bool = False
"""
If set to `True`, do not intercept this connection and forward encrypted contents unmodified.
"""
establish_server_tls_first: bool = False
"""
If set to `True`, pause this handshake and establish TLS with an upstream server first.
@ -122,7 +127,7 @@ class TlsClienthelloHook(StartHook):
@dataclass
class TlsStartData:
class TlsHookData:
conn: connection.Connection
context: context.Context
ssl_conn: Optional[SSL.Connection] = None
@ -131,23 +136,33 @@ class TlsStartData:
@dataclass
class TlsStartClientHook(StartHook):
"""
TLS Negotation between mitmproxy and a client is about to start.
TLS negotation between mitmproxy and a client is about to start.
An addon is expected to initialize data.ssl_conn.
(by default, this is done by mitmproxy.addons.TlsConfig)
(by default, this is done by `mitmproxy.addons.tlsconfig`)
"""
data: TlsStartData
data: TlsHookData
@dataclass
class TlsStartServerHook(StartHook):
"""
TLS Negotation between mitmproxy and a server is about to start.
TLS negotation between mitmproxy and a server is about to start.
An addon is expected to initialize data.ssl_conn.
(by default, this is done by mitmproxy.addons.TlsConfig)
(by default, this is done by `mitmproxy.addons.tlsconfig`)
"""
data: TlsStartData
data: TlsHookData
@dataclass
class TlsHandshakeHook(StartHook):
"""
A TLS handshake has been completed.
If `data.conn.error` is `None`, negotiation was successful.
"""
data: TlsHookData
class _TLSLayer(tunnel.TunnelLayer):
@ -169,7 +184,7 @@ class _TLSLayer(tunnel.TunnelLayer):
def start_tls(self) -> layer.CommandGenerator[None]:
assert not self.tls
tls_start = TlsStartData(self.conn, self.context)
tls_start = TlsHookData(self.conn, self.context)
if tls_start.conn == tls_start.context.client:
yield TlsStartClientHook(tls_start)
else:
@ -220,7 +235,6 @@ class _TLSLayer(tunnel.TunnelLayer):
)
else:
err = f"OpenSSL {e!r}"
self.conn.error = err
return False, err
else:
# Here we set all attributes that are only known *after* the handshake.
@ -242,9 +256,15 @@ class _TLSLayer(tunnel.TunnelLayer):
self.conn.tls_version = self.tls.get_protocol_version_name()
if self.debug:
yield commands.Log(f"{self.debug}[tls] tls established: {self.conn}", "debug")
yield TlsHandshakeHook(TlsHookData(self.conn, self.context, self.tls))
yield from self.receive_data(b"")
return True, None
def on_handshake_error(self, err: str) -> layer.CommandGenerator[None]:
self.conn.error = err
yield TlsHandshakeHook(TlsHookData(self.conn, self.context, self.tls))
yield from super().on_handshake_error(err)
def receive_data(self, data: bytes) -> layer.CommandGenerator[None]:
if data:
self.tls.bio_write(data)
@ -400,6 +420,17 @@ class ClientTLSLayer(_TLSLayer):
tls_clienthello = ClientHelloData(self.context, client_hello)
yield TlsClienthelloHook(tls_clienthello)
if tls_clienthello.ignore_connection:
# we've figured out that we don't want to intercept this connection, so we assign fake connection objects
# to all TLS layers. This makes the real connection contents just go through.
self.conn = self.tunnel_connection = connection.Client(("ignore-conn", 0), ("ignore-conn", 0), time.time())
parent_layer = self.context.layers[-2]
if isinstance(parent_layer, ServerTLSLayer):
parent_layer.conn = parent_layer.tunnel_connection = connection.Server(None)
self.child_layer = tcp.TCPLayer(self.context, ignore=True)
yield from self.event_to_child(events.DataReceived(self.context.client, bytes(self.recv_buffer)))
self.recv_buffer.clear()
return True, None
if tls_clienthello.establish_server_tls_first and not self.context.server.tls_established:
err = yield from self.start_server_tls()
if err:

View File

@ -414,7 +414,7 @@ if __name__ == "__main__": # pragma: no cover
if "redirect" in flow.request.path:
flow.request.host = "httpbin.org"
def tls_start_client(tls_start: tls.TlsStartData):
def tls_start_client(tls_start: tls.TlsHookData):
# INSECURE
ssl_context = SSL.Context(SSL.SSLv23_METHOD)
ssl_context.use_privatekey_file(
@ -426,7 +426,7 @@ if __name__ == "__main__": # pragma: no cover
tls_start.ssl_conn = SSL.Connection(ssl_context)
tls_start.ssl_conn.set_accept_state()
def tls_start_server(tls_start: tls.TlsStartData):
def tls_start_server(tls_start: tls.TlsHookData):
# INSECURE
ssl_context = SSL.Context(SSL.SSLv23_METHOD)
tls_start.ssl_conn = SSL.Connection(ssl_context)

View File

@ -130,7 +130,7 @@ class TestTlsConfig:
)
ctx = context.Context(connection.Client(("client", 1234), ("127.0.0.1", 8080), 1605699329), tctx.options)
tls_start = tls.TlsStartData(ctx.client, context=ctx)
tls_start = tls.TlsHookData(ctx.client, context=ctx)
ta.tls_start_client(tls_start)
tssl_server = tls_start.ssl_conn
tssl_client = test_tls.SSLTest()
@ -145,7 +145,7 @@ class TestTlsConfig:
ctx.client.cipher_list = ["TLS_AES_256_GCM_SHA384", "ECDHE-RSA-AES128-SHA"]
ctx.server.address = ("example.mitmproxy.org", 443)
tls_start = tls.TlsStartData(ctx.server, context=ctx)
tls_start = tls.TlsHookData(ctx.server, context=ctx)
ta.tls_start_server(tls_start)
tssl_client = tls_start.ssl_conn
tssl_server = test_tls.SSLTest(server_side=True)
@ -160,7 +160,7 @@ class TestTlsConfig:
tctx.configure(ta, ssl_verify_upstream_trusted_ca=tdata.path(
"mitmproxy/net/data/verificationcerts/trusted-root.crt"))
tls_start = tls.TlsStartData(ctx.server, context=ctx)
tls_start = tls.TlsHookData(ctx.server, context=ctx)
ta.tls_start_server(tls_start)
tssl_client = tls_start.ssl_conn
tssl_server = test_tls.SSLTest(server_side=True)
@ -179,7 +179,7 @@ class TestTlsConfig:
http2=False,
ciphers_server="ALL"
)
tls_start = tls.TlsStartData(ctx.server, context=ctx)
tls_start = tls.TlsHookData(ctx.server, context=ctx)
ta.tls_start_server(tls_start)
tssl_client = tls_start.ssl_conn
tssl_server = test_tls.SSLTest(server_side=True)
@ -190,7 +190,7 @@ class TestTlsConfig:
with taddons.context(ta) as tctx:
ctx = context.Context(connection.Client(("client", 1234), ("127.0.0.1", 8080), 1605699329), tctx.options)
ctx.server.address = ("example.mitmproxy.org", 443)
tls_start = tls.TlsStartData(ctx.server, context=ctx)
tls_start = tls.TlsHookData(ctx.server, context=ctx)
def assert_alpn(http2, client_offers, expected):
tctx.configure(ta, http2=http2)
@ -222,7 +222,7 @@ class TestTlsConfig:
modes.HttpProxy(ctx),
123
]
tls_start = tls.TlsStartData(ctx.client, context=ctx)
tls_start = tls.TlsHookData(ctx.client, context=ctx)
ta.tls_start_client(tls_start)
assert tls_start.ssl_conn.get_app_data()["client_alpn"] == b"http/1.1"
@ -244,7 +244,7 @@ class TestTlsConfig:
ssl_verify_upstream_trusted_ca=tdata.path("mitmproxy/net/data/verificationcerts/trusted-root.crt"),
)
tls_start = tls.TlsStartData(ctx.server, context=ctx)
tls_start = tls.TlsHookData(ctx.server, context=ctx)
ta.tls_start_server(tls_start)
tssl_client = tls_start.ssl_conn
tssl_server = test_tls.SSLTest(server_side=True)

View File

@ -153,13 +153,17 @@ class TlsEchoLayer(tutils.EchoLayer):
yield from super()._handle_event(event)
def interact(playbook: tutils.Playbook, conn: connection.Connection, tssl: SSLTest):
def finish_handshake(playbook: tutils.Playbook, conn: connection.Connection, tssl: SSLTest):
data = tutils.Placeholder(bytes)
tls_hook_data = tutils.Placeholder(tls.TlsHookData)
assert (
playbook
>> events.DataReceived(conn, tssl.bio_read())
<< tls.TlsHandshakeHook(tls_hook_data)
>> tutils.reply()
<< commands.SendData(conn, data)
)
assert tls_hook_data().conn.error is None
tssl.bio_write(data())
@ -168,7 +172,7 @@ def reply_tls_start_client(alpn: typing.Optional[bytes] = None, *args, **kwargs)
Helper function to simplify the syntax for tls_start_client hooks.
"""
def make_client_conn(tls_start: tls.TlsStartData) -> None:
def make_client_conn(tls_start: tls.TlsHookData) -> None:
# ssl_context = SSL.Context(Method.TLS_METHOD)
# ssl_context.set_min_proto_version(SSL.TLS1_3_VERSION)
ssl_context = SSL.Context(SSL.SSLv23_METHOD)
@ -193,7 +197,7 @@ def reply_tls_start_server(alpn: typing.Optional[bytes] = None, *args, **kwargs)
Helper function to simplify the syntax for tls_start_server hooks.
"""
def make_server_conn(tls_start: tls.TlsStartData) -> None:
def make_server_conn(tls_start: tls.TlsHookData) -> None:
# ssl_context = SSL.Context(Method.TLS_METHOD)
# ssl_context.set_min_proto_version(SSL.TLS1_3_VERSION)
ssl_context = SSL.Context(SSL.SSLv23_METHOD)
@ -251,7 +255,7 @@ class TestServerTLS:
tssl = SSLTest(server_side=True)
# send ClientHello
# send ClientHello, receive ClientHello
data = tutils.Placeholder(bytes)
assert (
playbook
@ -259,14 +263,14 @@ class TestServerTLS:
>> reply_tls_start_server()
<< commands.SendData(tctx.server, data)
)
# receive ServerHello, finish client handshake
tssl.bio_write(data())
with pytest.raises(ssl.SSLWantReadError):
tssl.do_handshake()
interact(playbook, tctx.server, tssl)
# finish server handshake
# finish handshake (mitmproxy)
finish_handshake(playbook, tctx.server, tssl)
# finish handshake (locally)
tssl.do_handshake()
assert (
playbook
@ -323,14 +327,18 @@ class TestServerTLS:
with pytest.raises(ssl.SSLWantReadError):
tssl.do_handshake()
tls_hook_data = tutils.Placeholder(tls.TlsHookData)
assert (
playbook
>> events.DataReceived(tctx.server, tssl.bio_read())
<< commands.Log("Server TLS handshake failed. Certificate verify failed: Hostname mismatch", "warn")
<< tls.TlsHandshakeHook(tls_hook_data)
>> tutils.reply()
<< commands.CloseConnection(tctx.server)
<< commands.SendData(tctx.client,
b"open-connection failed: Certificate verify failed: Hostname mismatch")
)
assert tls_hook_data().conn.error == "Certificate verify failed: Hostname mismatch"
assert not tctx.server.tls_established
def test_remote_speaks_no_tls(self, tctx):
@ -340,6 +348,7 @@ class TestServerTLS:
# send ClientHello, receive random garbage back
data = tutils.Placeholder(bytes)
tls_hook_data = tutils.Placeholder(tls.TlsHookData)
assert (
playbook
<< tls.TlsStartServerHook(tutils.Placeholder())
@ -347,8 +356,11 @@ class TestServerTLS:
<< commands.SendData(tctx.server, data)
>> events.DataReceived(tctx.server, b"HTTP/1.1 404 Not Found\r\n")
<< commands.Log("Server TLS handshake failed. The remote server does not speak TLS.", "warn")
<< tls.TlsHandshakeHook(tls_hook_data)
>> tutils.reply()
<< commands.CloseConnection(tctx.server)
)
assert tls_hook_data().conn.error == "The remote server does not speak TLS."
def test_unsupported_protocol(self, tctx: context.Context):
"""Test the scenario where the server only supports an outdated TLS version by default."""
@ -375,13 +387,17 @@ class TestServerTLS:
tssl.do_handshake()
# send back error
tls_hook_data = tutils.Placeholder(tls.TlsHookData)
assert (
playbook
>> events.DataReceived(tctx.server, tssl.bio_read())
<< commands.Log("Server TLS handshake failed. The remote server and mitmproxy cannot agree on a TLS version"
" to use. You may need to adjust mitmproxy's tls_version_server_min option.", "warn")
<< tls.TlsHandshakeHook(tls_hook_data)
>> tutils.reply()
<< commands.CloseConnection(tctx.server)
)
assert tls_hook_data().conn.error
def make_client_tls_layer(
@ -429,7 +445,7 @@ class TestClientTLS:
tssl_client.bio_write(data())
tssl_client.do_handshake()
# Finish Handshake
interact(playbook, tctx.client, tssl_client)
finish_handshake(playbook, tctx.client, tssl_client)
assert tssl_client.obj.getpeercert(True)
assert tctx.client.tls_established
@ -488,6 +504,8 @@ class TestClientTLS:
assert (
playbook
>> events.DataReceived(tctx.server, tssl_server.bio_read())
<< tls.TlsHandshakeHook(tutils.Placeholder())
>> tutils.reply()
<< commands.SendData(tctx.server, data)
<< tls.TlsStartClientHook(tutils.Placeholder())
)
@ -503,7 +521,7 @@ class TestClientTLS:
)
tssl_client.bio_write(data())
tssl_client.do_handshake()
interact(playbook, tctx.client, tssl_client)
finish_handshake(playbook, tctx.client, tssl_client)
# Both handshakes completed!
assert tctx.client.tls_established
@ -517,6 +535,7 @@ class TestClientTLS:
def test_cannot_parse_clienthello(self, tctx: context.Context):
"""Test the scenario where we cannot parse the ClientHello"""
playbook, client_layer, tssl_client = make_client_tls_layer(tctx)
tls_hook_data = tutils.Placeholder(tls.TlsHookData)
invalid = b"\x16\x03\x01\x00\x00"
@ -524,8 +543,11 @@ class TestClientTLS:
playbook
>> events.DataReceived(tctx.client, invalid)
<< commands.Log(f"Client TLS handshake failed. Cannot parse ClientHello: {invalid.hex()}", level="warn")
<< tls.TlsHandshakeHook(tls_hook_data)
>> tutils.reply()
<< commands.CloseConnection(tctx.client)
)
assert tls_hook_data().conn.error
assert not tctx.client.tls_established
# Make sure that an active server connection does not cause child layers to spawn.
@ -556,15 +578,19 @@ class TestClientTLS:
with pytest.raises(ssl.SSLCertVerificationError):
tssl_client.do_handshake()
# Finish Handshake
tls_hook_data = tutils.Placeholder(tls.TlsHookData)
assert (
playbook
>> events.DataReceived(tctx.client, tssl_client.bio_read())
<< commands.Log("Client TLS handshake failed. The client does not trust the proxy's certificate "
"for wrong.host.mitmproxy.org (sslv3 alert bad certificate)", "warn")
<< tls.TlsHandshakeHook(tls_hook_data)
>> tutils.reply()
<< commands.CloseConnection(tctx.client)
>> events.ConnectionClosed(tctx.client)
)
assert not tctx.client.tls_established
assert tls_hook_data().conn.error
@pytest.mark.parametrize("close_at", ["tls_clienthello", "tls_start_client", "handshake"])
def test_immediate_disconnect(self, tctx: context.Context, close_at):
@ -573,6 +599,7 @@ class TestClientTLS:
the proxy certificate."""
playbook, client_layer, tssl_client = make_client_tls_layer(tctx, sni=b"wrong.host.mitmproxy.org")
playbook.logs = True
tls_hook_data = tutils.Placeholder(tls.TlsHookData)
playbook >> events.DataReceived(tctx.client, tssl_client.bio_read())
playbook << tls.TlsClienthelloHook(tutils.Placeholder())
@ -584,8 +611,11 @@ class TestClientTLS:
>> tutils.reply(to=-2)
<< tls.TlsStartClientHook(tutils.Placeholder())
>> reply_tls_start_client()
<< tls.TlsHandshakeHook(tls_hook_data)
>> tutils.reply()
<< commands.CloseConnection(tctx.client)
)
assert tls_hook_data().conn.error
return
playbook >> tutils.reply()
@ -596,8 +626,11 @@ class TestClientTLS:
playbook
>> events.ConnectionClosed(tctx.client)
>> reply_tls_start_client(to=-2)
<< tls.TlsHandshakeHook(tls_hook_data)
>> tutils.reply()
<< commands.CloseConnection(tctx.client)
)
assert tls_hook_data().conn.error
return
assert (
@ -608,14 +641,18 @@ class TestClientTLS:
<< commands.Log("Client TLS handshake failed. The client disconnected during the handshake. "
"If this happens consistently for wrong.host.mitmproxy.org, this may indicate that the "
"client does not trust the proxy's certificate.", "info")
<< tls.TlsHandshakeHook(tls_hook_data)
>> tutils.reply()
<< commands.CloseConnection(tctx.client)
)
assert tls_hook_data().conn.error
def test_unsupported_protocol(self, tctx: context.Context):
"""Test the scenario where the client only supports an outdated TLS version by default."""
playbook, client_layer, tssl_client = make_client_tls_layer(tctx, max_ver=ssl.TLSVersion.TLSv1_2)
playbook.logs = True
tls_hook_data = tutils.Placeholder(tls.TlsHookData)
assert (
playbook
>> events.DataReceived(tctx.client, tssl_client.bio_read())
@ -625,5 +662,8 @@ class TestClientTLS:
>> reply_tls_start_client()
<< commands.Log("Client TLS handshake failed. Client and mitmproxy cannot agree on a TLS version to "
"use. You may need to adjust mitmproxy's tls_version_client_min option.", "warn")
<< tls.TlsHandshakeHook(tls_hook_data)
>> tutils.reply()
<< commands.CloseConnection(tctx.client)
)
assert tls_hook_data().conn.error