Merge pull request #4790 from mhils/ignore-after-clienthello

TLS: add `tls_handshake` hook, ignore-after-clienthello
This commit is contained in:
Maximilian Hils 2021-11-22 10:29:47 +01:00 committed by GitHub
commit a969739875
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 539 additions and 353 deletions

View File

@ -3,6 +3,8 @@
## Unreleased: mitmproxy next
* Support proxy authentication for SOCKS v5 mode (@starplanet)
* Make it possible to ignore connections in the tls_clienthello event hook (@mhils)
* Add `tls_established/failed_client/server` event hooks to record negotiation success/failure (@mhils)
* fix some responses not being decoded properly if the encoding was uppercase #4735 (@Mattwmaster58)
* Trigger event hooks for flows with semantically invalid requests, for example invalid content-length headers (@mhils)
* Improve error message on TLS version mismatch (@mhils)

View File

@ -124,6 +124,10 @@ with outfile.open("w") as f, contextlib.redirect_stdout(f):
tls.TlsClienthelloHook,
tls.TlsStartClientHook,
tls.TlsStartServerHook,
tls.TlsEstablishedClientHook,
tls.TlsEstablishedServerHook,
tls.TlsFailedClientHook,
tls.TlsFailedServerHook,
]
)

View File

@ -30,8 +30,10 @@ modules = [
"mitmproxy.flow",
"mitmproxy.http",
"mitmproxy.net.server_spec",
"mitmproxy.proxy.context",
"mitmproxy.proxy.server_hooks",
"mitmproxy.tcp",
"mitmproxy.tls",
"mitmproxy.websocket",
here / ".." / "src" / "generated" / "events.py",
]

View File

@ -55,6 +55,14 @@ To document all event hooks, we do a bit of hackery:
{% if doc.qualname.startswith("ServerConnectionHookData") and doc.name != "__init__" %}
{{ default_is_public(doc) }}
{% endif %}
{% elif doc.modulename == "mitmproxy.proxy.context" %}
{% if doc.qualname is not in(["Context.__init__", "Context.fork", "Context.options"]) %}
{{ default_is_public(doc) }}
{% endif %}
{% elif doc.modulename == "mitmproxy.tls" %}
{% if doc.qualname is not in(["TlsData.__init__", "ClientHelloData.__init__"]) %}
{{ default_is_public(doc) }}
{% endif %}
{% elif doc.modulename == "mitmproxy.websocket" %}
{% if doc.qualname != "WebSocketMessage.type" %}
{{ default_is_public(doc) }}

View File

@ -0,0 +1,11 @@
---
title: "mitmproxy.proxy.context"
url: "api/mitmproxy/proxy/context.html"
menu:
addons:
parent: 'Event Hooks & API'
---
{{< readfile file="/generated/api/mitmproxy/proxy/context.html" >}}

View File

@ -0,0 +1,11 @@
---
title: "mitmproxy.tls"
url: "api/mitmproxy/tls.html"
menu:
addons:
parent: 'Event Hooks & API'
---
{{< readfile file="/generated/api/mitmproxy/tls.html" >}}

View File

@ -1,8 +1,5 @@
# FIXME: This addon is currently not compatible with mitmproxy 7 and above.
"""
This inline script allows conditional TLS Interception based
on a user-defined strategy.
This addon allows conditional TLS Interception based on a user-defined strategy.
Example:
@ -11,138 +8,101 @@ Example:
1. curl --proxy http://localhost:8080 https://example.com --insecure
// works - we'll also see the contents in mitmproxy
2. curl --proxy http://localhost:8080 https://example.com --insecure
// still works - we'll also see the contents in mitmproxy
3. curl --proxy http://localhost:8080 https://example.com
2. curl --proxy http://localhost:8080 https://example.com
// fails with a certificate error, which we will also see in mitmproxy
4. curl --proxy http://localhost:8080 https://example.com
3. curl --proxy http://localhost:8080 https://example.com
// works again, but mitmproxy does not intercept and we do *not* see the contents
Authors: Maximilian Hils, Matthew Tuusberg
"""
import collections
import random
from abc import ABC, abstractmethod
from enum import Enum
import mitmproxy
from mitmproxy import ctx
from mitmproxy.exceptions import TlsProtocolException
from mitmproxy.proxy.protocol import TlsLayer, RawTCPLayer
from mitmproxy import connection, ctx, tls
from mitmproxy.utils import human
class InterceptionResult(Enum):
success = True
failure = False
skipped = None
SUCCESS = 1
FAILURE = 2
SKIPPED = 3
class _TlsStrategy:
"""
Abstract base class for interception strategies.
"""
class TlsStrategy(ABC):
def __init__(self):
# A server_address -> interception results mapping
self.history = collections.defaultdict(lambda: collections.deque(maxlen=200))
def should_intercept(self, server_address):
"""
Returns:
True, if we should attempt to intercept the connection.
False, if we want to employ pass-through instead.
"""
@abstractmethod
def should_intercept(self, server_address: connection.Address) -> bool:
raise NotImplementedError()
def record_success(self, server_address):
self.history[server_address].append(InterceptionResult.success)
self.history[server_address].append(InterceptionResult.SUCCESS)
def record_failure(self, server_address):
self.history[server_address].append(InterceptionResult.failure)
self.history[server_address].append(InterceptionResult.FAILURE)
def record_skipped(self, server_address):
self.history[server_address].append(InterceptionResult.skipped)
self.history[server_address].append(InterceptionResult.SKIPPED)
class ConservativeStrategy(_TlsStrategy):
class ConservativeStrategy(TlsStrategy):
"""
Conservative Interception Strategy - only intercept if there haven't been any failed attempts
in the history.
"""
def should_intercept(self, server_address):
if InterceptionResult.failure in self.history[server_address]:
return False
return True
def should_intercept(self, server_address: connection.Address) -> bool:
return InterceptionResult.FAILURE not in self.history[server_address]
class ProbabilisticStrategy(_TlsStrategy):
class ProbabilisticStrategy(TlsStrategy):
"""
Fixed probability that we intercept a given connection.
"""
def __init__(self, p):
def __init__(self, p: float):
self.p = p
super().__init__()
def should_intercept(self, server_address):
def should_intercept(self, server_address: connection.Address) -> bool:
return random.uniform(0, 1) < self.p
class TlsFeedback(TlsLayer):
"""
Monkey-patch _establish_tls_with_client to get feedback if TLS could be established
successfully on the client connection (which may fail due to cert pinning).
"""
class MaybeTls:
strategy: TlsStrategy
def _establish_tls_with_client(self):
server_address = self.server_conn.address
def load(self, l):
l.add_option(
"tls_strategy", int, 0,
"TLS passthrough strategy. If set to 0, connections will be passed through after the first unsuccessful "
"handshake. If set to 0 < p <= 100, connections with be passed through with probability p.",
)
try:
super()._establish_tls_with_client()
except TlsProtocolException as e:
tls_strategy.record_failure(server_address)
raise e
def configure(self, updated):
if "tls_strategy" not in updated:
return
if ctx.options.tls_strategy > 0:
self.strategy = ProbabilisticStrategy(ctx.options.tls_strategy / 100)
else:
tls_strategy.record_success(server_address)
self.strategy = ConservativeStrategy()
def tls_clienthello(self, data: tls.ClientHelloData):
server_address = data.context.server.peername
if not self.strategy.should_intercept(server_address):
ctx.log(f"TLS passthrough: {human.format_address(server_address)}.")
data.ignore_connection = True
self.strategy.record_skipped(server_address)
def tls_established_client(self, data: tls.TlsData):
server_address = data.context.server.peername
ctx.log(f"TLS handshake successful: {human.format_address(server_address)}")
self.strategy.record_success(server_address)
def tls_failed_client(self, data: tls.TlsData):
server_address = data.context.server.peername
ctx.log(f"TLS handshake failed: {human.format_address(server_address)}")
self.strategy.record_failure(server_address)
# inline script hooks below.
tls_strategy = None
def load(l):
l.add_option(
"tlsstrat", int, 0, "TLS passthrough strategy (0-100)",
)
def configure(updated):
global tls_strategy
if ctx.options.tlsstrat > 0:
tls_strategy = ProbabilisticStrategy(float(ctx.options.tlsstrat) / 100.0)
else:
tls_strategy = ConservativeStrategy()
def next_layer(next_layer):
"""
This hook does the actual magic - if the next layer is planned to be a TLS layer,
we check if we want to enter pass-through mode instead.
"""
if isinstance(next_layer, TlsLayer) and next_layer._client_tls:
server_address = next_layer.server_conn.address
if tls_strategy.should_intercept(server_address):
# We try to intercept.
# Monkey-Patch the layer to get feedback from the TLSLayer if interception worked.
next_layer.__class__ = TlsFeedback
else:
# We don't intercept - reply with a pass-through layer and add a "skipped" entry.
mitmproxy.ctx.log("TLS passthrough for %s" % repr(next_layer.server_conn.address), "info")
next_layer_replacement = RawTCPLayer(next_layer.ctx, ignore=True)
next_layer.reply.send(next_layer_replacement)
tls_strategy.record_skipped(server_address)
addons = [MaybeTls()]

View File

@ -4,11 +4,12 @@ from pathlib import Path
from typing import List, Optional, TypedDict, Any
from OpenSSL import SSL
from mitmproxy import certs, ctx, exceptions, connection
from mitmproxy import certs, ctx, exceptions, connection, tls
from mitmproxy.net import tls as net_tls
from mitmproxy.options import CONF_BASENAME
from mitmproxy.proxy import context
from mitmproxy.proxy.layers import tls, modes
from mitmproxy.proxy.layers import modes
from mitmproxy.proxy.layers import tls as proxy_tls
# We manually need to specify this, otherwise OpenSSL may select a non-HTTP2 cipher by default.
# https://ssl-config.mozilla.org/#config=old
@ -46,7 +47,7 @@ def alpn_select_callback(conn: SSL.Connection, options: List[bytes]) -> Any:
# We do have a server connection, but the remote server refused to negotiate a protocol:
# We need to mirror this on the client connection.
return SSL.NO_OVERLAPPING_PROTOCOLS
http_alpns = tls.HTTP_ALPNS if http2 else tls.HTTP1_ALPNS
http_alpns = proxy_tls.HTTP_ALPNS if http2 else proxy_tls.HTTP1_ALPNS
for alpn in options: # client sends in order of preference, so we are nice and respect that.
if alpn in http_alpns:
return alpn
@ -112,7 +113,7 @@ class TlsConfig:
ctx.options.upstream_cert
)
def tls_start_client(self, tls_start: tls.TlsStartData) -> None:
def tls_start_client(self, tls_start: tls.TlsData) -> None:
"""Establish TLS between client and proxy."""
client: connection.Client = tls_start.context.client
server: connection.Server = tls_start.context.server
@ -159,7 +160,7 @@ class TlsConfig:
))
tls_start.ssl_conn.set_accept_state()
def tls_start_server(self, tls_start: tls.TlsStartData) -> None:
def tls_start_server(self, tls_start: tls.TlsData) -> None:
"""Establish TLS between proxy and server."""
client: connection.Client = tls_start.context.client
server: connection.Server = tls_start.context.server

View File

@ -1,4 +1,3 @@
import io
import ipaddress
import os
import threading
@ -11,12 +10,9 @@ import certifi
from OpenSSL.crypto import X509
from cryptography.hazmat.primitives.asymmetric import rsa
from kaitaistruct import KaitaiStream
from OpenSSL import SSL, crypto
from mitmproxy import certs
from mitmproxy.contrib.kaitaistruct import tls_client_hello
from mitmproxy.net import check
# redeclared here for strict type checking
@ -273,49 +269,3 @@ def is_tls_record_magic(d):
d[1] == 0x03 and
0x0 <= d[2] <= 0x03
)
class ClientHello:
def __init__(self, raw_client_hello):
self._client_hello = tls_client_hello.TlsClientHello(
KaitaiStream(io.BytesIO(raw_client_hello))
)
@property
def cipher_suites(self) -> List[int]:
return self._client_hello.cipher_suites.cipher_suites
@property
def sni(self) -> Optional[str]:
if self._client_hello.extensions:
for extension in self._client_hello.extensions.extensions:
is_valid_sni_extension = (
extension.type == 0x00 and
len(extension.body.server_names) == 1 and
extension.body.server_names[0].name_type == 0 and
check.is_valid_host(extension.body.server_names[0].host_name)
)
if is_valid_sni_extension:
return extension.body.server_names[0].host_name.decode("ascii")
return None
@property
def alpn_protocols(self) -> List[bytes]:
if self._client_hello.extensions:
for extension in self._client_hello.extensions.extensions:
if extension.type == 0x10:
return list(x.name for x in extension.body.alpn_protocols)
return []
@property
def extensions(self) -> List[Tuple[int, bytes]]:
ret = []
if self._client_hello.extensions:
for extension in self._client_hello.extensions.extensions:
body = getattr(extension, "_raw_body", extension.body)
ret.append((extension.type, body))
return ret
def __repr__(self):
return f"ClientHello(sni: {self.sni}, alpn_protocols: {self.alpn_protocols})"

View File

@ -9,13 +9,26 @@ if TYPE_CHECKING:
class Context:
"""
The context object provided to each `mitmproxy.proxy.layer.Layer` by its parent layer.
The context object provided to each protocol layer in the proxy core.
"""
client: connection.Client
"""The client connection."""
server: connection.Server
"""
The server connection.
For practical reasons this attribute is always set, even if there is not server connection yet.
In this case the server address is `None`.
"""
options: Options
"""
Provides access to options for proxy layers. Not intended for use by addons, use `mitmproxy.ctx.options` instead.
"""
layers: List["mitmproxy.proxy.layer.Layer"]
"""
The protocol layer stack.
"""
def __init__(
self,

View File

@ -4,11 +4,12 @@ from dataclasses import dataclass
from typing import Iterator, Literal, Optional, Tuple
from OpenSSL import SSL
from mitmproxy.tls import ClientHello, ClientHelloData, TlsData
from mitmproxy import certs, connection
from mitmproxy.net import tls as net_tls
from mitmproxy.proxy import commands, events, layer, tunnel
from mitmproxy.proxy import context
from mitmproxy.proxy.commands import StartHook
from mitmproxy.proxy.layers import tcp
from mitmproxy.utils import human
@ -69,7 +70,7 @@ def get_client_hello(data: bytes) -> Optional[bytes]:
return None
def parse_client_hello(data: bytes) -> Optional[net_tls.ClientHello]:
def parse_client_hello(data: bytes) -> Optional[ClientHello]:
"""
Check if the supplied bytes contain a full ClientHello message,
and if so, parse it.
@ -85,7 +86,7 @@ def parse_client_hello(data: bytes) -> Optional[net_tls.ClientHello]:
client_hello = get_client_hello(data)
if client_hello:
try:
return net_tls.ClientHello(client_hello[4:])
return ClientHello(client_hello[4:])
except EOFError as e:
raise ValueError("Invalid ClientHello") from e
return None
@ -97,18 +98,6 @@ HTTP_ALPNS = (b"h2",) + HTTP1_ALPNS
# We need these classes as hooks can only have one argument at the moment.
@dataclass
class ClientHelloData:
context: context.Context
"""The context object for this connection."""
client_hello: net_tls.ClientHello
"""The entire parsed TLS ClientHello."""
establish_server_tls_first: bool = False
"""
If set to `True`, pause this handshake and establish TLS with an upstream server first.
This makes it possible to process the server certificate when generating an interception certificate.
"""
@dataclass
class TlsClienthelloHook(StartHook):
@ -121,33 +110,58 @@ class TlsClienthelloHook(StartHook):
data: ClientHelloData
@dataclass
class TlsStartData:
conn: connection.Connection
context: context.Context
ssl_conn: Optional[SSL.Connection] = None
@dataclass
class TlsStartClientHook(StartHook):
"""
TLS Negotation between mitmproxy and a client is about to start.
TLS negotation between mitmproxy and a client is about to start.
An addon is expected to initialize data.ssl_conn.
(by default, this is done by mitmproxy.addons.TlsConfig)
(by default, this is done by `mitmproxy.addons.tlsconfig`)
"""
data: TlsStartData
data: TlsData
@dataclass
class TlsStartServerHook(StartHook):
"""
TLS Negotation between mitmproxy and a server is about to start.
TLS negotation between mitmproxy and a server is about to start.
An addon is expected to initialize data.ssl_conn.
(by default, this is done by mitmproxy.addons.TlsConfig)
(by default, this is done by `mitmproxy.addons.tlsconfig`)
"""
data: TlsStartData
data: TlsData
@dataclass
class TlsEstablishedClientHook(StartHook):
"""
The TLS handshake with the client has been completed successfully.
"""
data: TlsData
@dataclass
class TlsEstablishedServerHook(StartHook):
"""
The TLS handshake with the server has been completed successfully.
"""
data: TlsData
@dataclass
class TlsFailedClientHook(StartHook):
"""
The TLS handshake with the client has failed.
"""
data: TlsData
@dataclass
class TlsFailedServerHook(StartHook):
"""
The TLS handshake with the server has failed.
"""
data: TlsData
class _TLSLayer(tunnel.TunnelLayer):
@ -169,8 +183,8 @@ class _TLSLayer(tunnel.TunnelLayer):
def start_tls(self) -> layer.CommandGenerator[None]:
assert not self.tls
tls_start = TlsStartData(self.conn, self.context)
if tls_start.conn == tls_start.context.client:
tls_start = TlsData(self.conn, self.context)
if self.conn == self.context.client:
yield TlsStartClientHook(tls_start)
else:
yield TlsStartServerHook(tls_start)
@ -221,7 +235,6 @@ class _TLSLayer(tunnel.TunnelLayer):
)
else:
err = f"OpenSSL {e!r}"
self.conn.error = err
return False, err
else:
# Here we set all attributes that are only known *after* the handshake.
@ -243,9 +256,21 @@ class _TLSLayer(tunnel.TunnelLayer):
self.conn.tls_version = self.tls.get_protocol_version_name()
if self.debug:
yield commands.Log(f"{self.debug}[tls] tls established: {self.conn}", "debug")
if self.conn == self.context.client:
yield TlsEstablishedClientHook(TlsData(self.conn, self.context, self.tls))
else:
yield TlsEstablishedServerHook(TlsData(self.conn, self.context, self.tls))
yield from self.receive_data(b"")
return True, None
def on_handshake_error(self, err: str) -> layer.CommandGenerator[None]:
self.conn.error = err
if self.conn == self.context.client:
yield TlsFailedClientHook(TlsData(self.conn, self.context, self.tls))
else:
yield TlsFailedServerHook(TlsData(self.conn, self.context, self.tls))
yield from super().on_handshake_error(err)
def receive_data(self, data: bytes) -> layer.CommandGenerator[None]:
if data:
self.tls.bio_write(data)
@ -401,6 +426,17 @@ class ClientTLSLayer(_TLSLayer):
tls_clienthello = ClientHelloData(self.context, client_hello)
yield TlsClienthelloHook(tls_clienthello)
if tls_clienthello.ignore_connection:
# we've figured out that we don't want to intercept this connection, so we assign fake connection objects
# to all TLS layers. This makes the real connection contents just go through.
self.conn = self.tunnel_connection = connection.Client(("ignore-conn", 0), ("ignore-conn", 0), time.time())
parent_layer = self.context.layers[self.context.layers.index(self) - 1]
if isinstance(parent_layer, ServerTLSLayer):
parent_layer.conn = parent_layer.tunnel_connection = connection.Server(None)
self.child_layer = tcp.TCPLayer(self.context, ignore=True)
yield from self.event_to_child(events.DataReceived(self.context.client, bytes(self.recv_buffer)))
self.recv_buffer.clear()
return True, None
if tls_clienthello.establish_server_tls_first and not self.context.server.tls_established:
err = yield from self.start_server_tls()
if err:

View File

@ -16,12 +16,11 @@ from contextlib import contextmanager
from dataclasses import dataclass
from OpenSSL import SSL
from mitmproxy import http, options as moptions
from mitmproxy import http, options as moptions, tls
from mitmproxy.proxy.context import Context
from mitmproxy.proxy.layers.http import HTTPMode
from mitmproxy.proxy import commands, events, layer, layers, server_hooks
from mitmproxy.connection import Address, Client, Connection, ConnectionState
from mitmproxy.proxy.layers import tls
from mitmproxy.utils import asyncio_utils
from mitmproxy.utils import human
from mitmproxy.utils.data import pkg_data
@ -414,7 +413,7 @@ if __name__ == "__main__": # pragma: no cover
if "redirect" in flow.request.path:
flow.request.host = "httpbin.org"
def tls_start_client(tls_start: tls.TlsStartData):
def tls_start_client(tls_start: tls.TlsData):
# INSECURE
ssl_context = SSL.Context(SSL.SSLv23_METHOD)
ssl_context.use_privatekey_file(
@ -426,7 +425,7 @@ if __name__ == "__main__": # pragma: no cover
tls_start.ssl_conn = SSL.Connection(ssl_context)
tls_start.ssl_conn.set_accept_state()
def tls_start_server(tls_start: tls.TlsStartData):
def tls_start_server(tls_start: tls.TlsData):
# INSECURE
ssl_context = SSL.Context(SSL.SSLv23_METHOD)
tls_start.ssl_conn = SSL.Connection(ssl_context)

107
mitmproxy/tls.py Normal file
View File

@ -0,0 +1,107 @@
import io
from dataclasses import dataclass
from typing import List, Optional, Tuple
from kaitaistruct import KaitaiStream
from OpenSSL import SSL
from mitmproxy import connection
from mitmproxy.contrib.kaitaistruct import tls_client_hello
from mitmproxy.net import check
from mitmproxy.proxy import context
class ClientHello:
"""
A TLS ClientHello is the first message sent by the client when initiating TLS.
"""
def __init__(self, raw_client_hello: bytes):
"""Create a TLS ClientHello object from raw bytes."""
self._client_hello = tls_client_hello.TlsClientHello(
KaitaiStream(io.BytesIO(raw_client_hello))
)
@property
def cipher_suites(self) -> List[int]:
"""The cipher suites offered by the client (as raw ints)."""
return self._client_hello.cipher_suites.cipher_suites
@property
def sni(self) -> Optional[str]:
"""
The [Server Name Indication](https://en.wikipedia.org/wiki/Server_Name_Indication),
which indicates which hostname the client wants to connect to.
"""
if self._client_hello.extensions:
for extension in self._client_hello.extensions.extensions:
is_valid_sni_extension = (
extension.type == 0x00 and
len(extension.body.server_names) == 1 and
extension.body.server_names[0].name_type == 0 and
check.is_valid_host(extension.body.server_names[0].host_name)
)
if is_valid_sni_extension:
return extension.body.server_names[0].host_name.decode("ascii")
return None
@property
def alpn_protocols(self) -> List[bytes]:
"""
The application layer protocols offered by the client as part of the
[ALPN](https://en.wikipedia.org/wiki/Application-Layer_Protocol_Negotiation) TLS extension.
"""
if self._client_hello.extensions:
for extension in self._client_hello.extensions.extensions:
if extension.type == 0x10:
return list(x.name for x in extension.body.alpn_protocols)
return []
@property
def extensions(self) -> List[Tuple[int, bytes]]:
"""The raw list of extensions in the form of `(extension_type, raw_bytes)` tuples."""
ret = []
if self._client_hello.extensions:
for extension in self._client_hello.extensions.extensions:
body = getattr(extension, "_raw_body", extension.body)
ret.append((extension.type, body))
return ret
def __repr__(self):
return f"ClientHello(sni: {self.sni}, alpn_protocols: {self.alpn_protocols})"
@dataclass
class ClientHelloData:
"""
Event data for `tls_clienthello` event hooks.
"""
context: context.Context
"""The context object for this connection."""
client_hello: ClientHello
"""The entire parsed TLS ClientHello."""
ignore_connection: bool = False
"""
If set to `True`, do not intercept this connection and forward encrypted contents unmodified.
"""
establish_server_tls_first: bool = False
"""
If set to `True`, pause this handshake and establish TLS with an upstream server first.
This makes it possible to process the server certificate when generating an interception certificate.
"""
@dataclass
class TlsData:
"""
Event data for `tls_start_client`, `tls_start_server`, and `tls_handshake` event hooks.
"""
conn: connection.Connection
"""The affected connection."""
context: context.Context
"""The context object for this connection."""
ssl_conn: Optional[SSL.Connection] = None
"""
The associated pyOpenSSL `SSL.Connection` object.
This will be set by an addon in the `tls_start_*` event hooks.
"""

View File

@ -6,10 +6,10 @@ from typing import Union
import pytest
from OpenSSL import SSL
from mitmproxy import certs, connection
from mitmproxy import certs, connection, tls
from mitmproxy.addons import tlsconfig
from mitmproxy.proxy import context
from mitmproxy.proxy.layers import modes, tls
from mitmproxy.proxy.layers import modes, tls as proxy_tls
from mitmproxy.test import taddons
from test.mitmproxy.proxy.layers import test_tls
@ -130,7 +130,7 @@ class TestTlsConfig:
)
ctx = context.Context(connection.Client(("client", 1234), ("127.0.0.1", 8080), 1605699329), tctx.options)
tls_start = tls.TlsStartData(ctx.client, context=ctx)
tls_start = tls.TlsData(ctx.client, context=ctx)
ta.tls_start_client(tls_start)
tssl_server = tls_start.ssl_conn
tssl_client = test_tls.SSLTest()
@ -145,7 +145,7 @@ class TestTlsConfig:
ctx.client.cipher_list = ["TLS_AES_256_GCM_SHA384", "ECDHE-RSA-AES128-SHA"]
ctx.server.address = ("example.mitmproxy.org", 443)
tls_start = tls.TlsStartData(ctx.server, context=ctx)
tls_start = tls.TlsData(ctx.server, context=ctx)
ta.tls_start_server(tls_start)
tssl_client = tls_start.ssl_conn
tssl_server = test_tls.SSLTest(server_side=True)
@ -160,7 +160,7 @@ class TestTlsConfig:
tctx.configure(ta, ssl_verify_upstream_trusted_ca=tdata.path(
"mitmproxy/net/data/verificationcerts/trusted-root.crt"))
tls_start = tls.TlsStartData(ctx.server, context=ctx)
tls_start = tls.TlsData(ctx.server, context=ctx)
ta.tls_start_server(tls_start)
tssl_client = tls_start.ssl_conn
tssl_server = test_tls.SSLTest(server_side=True)
@ -179,7 +179,7 @@ class TestTlsConfig:
http2=False,
ciphers_server="ALL"
)
tls_start = tls.TlsStartData(ctx.server, context=ctx)
tls_start = tls.TlsData(ctx.server, context=ctx)
ta.tls_start_server(tls_start)
tssl_client = tls_start.ssl_conn
tssl_server = test_tls.SSLTest(server_side=True)
@ -190,7 +190,7 @@ class TestTlsConfig:
with taddons.context(ta) as tctx:
ctx = context.Context(connection.Client(("client", 1234), ("127.0.0.1", 8080), 1605699329), tctx.options)
ctx.server.address = ("example.mitmproxy.org", 443)
tls_start = tls.TlsStartData(ctx.server, context=ctx)
tls_start = tls.TlsData(ctx.server, context=ctx)
def assert_alpn(http2, client_offers, expected):
tctx.configure(ta, http2=http2)
@ -199,8 +199,8 @@ class TestTlsConfig:
ta.tls_start_server(tls_start)
assert ctx.server.alpn_offers == expected
assert_alpn(True, tls.HTTP_ALPNS + (b"foo",), tls.HTTP_ALPNS + (b"foo",))
assert_alpn(False, tls.HTTP_ALPNS + (b"foo",), tls.HTTP1_ALPNS + (b"foo",))
assert_alpn(True, proxy_tls.HTTP_ALPNS + (b"foo",), proxy_tls.HTTP_ALPNS + (b"foo",))
assert_alpn(False, proxy_tls.HTTP_ALPNS + (b"foo",), proxy_tls.HTTP1_ALPNS + (b"foo",))
assert_alpn(True, [], [])
assert_alpn(False, [], [])
ctx.client.timestamp_tls_setup = time.time()
@ -222,7 +222,7 @@ class TestTlsConfig:
modes.HttpProxy(ctx),
123
]
tls_start = tls.TlsStartData(ctx.client, context=ctx)
tls_start = tls.TlsData(ctx.client, context=ctx)
ta.tls_start_client(tls_start)
assert tls_start.ssl_conn.get_app_data()["client_alpn"] == b"http/1.1"
@ -244,7 +244,7 @@ class TestTlsConfig:
ssl_verify_upstream_trusted_ca=tdata.path("mitmproxy/net/data/verificationcerts/trusted-root.crt"),
)
tls_start = tls.TlsStartData(ctx.server, context=ctx)
tls_start = tls.TlsData(ctx.server, context=ctx)
ta.tls_start_server(tls_start)
tssl_client = tls_start.ssl_conn
tssl_server = test_tls.SSLTest(server_side=True)

View File

@ -4,17 +4,6 @@ from OpenSSL import SSL
from mitmproxy import certs
from mitmproxy.net import tls
CLIENT_HELLO_NO_EXTENSIONS = bytes.fromhex(
"03015658a756ab2c2bff55f636814deac086b7ca56b65058c7893ffc6074f5245f70205658a75475103a152637"
"78e1bb6d22e8bbd5b6b0a3a59760ad354e91ba20d353001a0035002f000a000500040009000300060008006000"
"61006200640100"
)
FULL_CLIENT_HELLO_NO_EXTENSIONS = (
b"\x16\x03\x03\x00\x65" # record layer
b"\x01\x00\x00\x61" + # handshake header
CLIENT_HELLO_NO_EXTENSIONS
)
def test_make_master_secret_logger():
assert tls.make_master_secret_logger(None) is None
@ -84,43 +73,3 @@ def test_is_record_magic():
assert tls.is_tls_record_magic(b"\x16\x03\x01")
assert tls.is_tls_record_magic(b"\x16\x03\x02")
assert tls.is_tls_record_magic(b"\x16\x03\x03")
class TestClientHello:
def test_no_extensions(self):
c = tls.ClientHello(CLIENT_HELLO_NO_EXTENSIONS)
assert repr(c)
assert c.sni is None
assert c.cipher_suites == [53, 47, 10, 5, 4, 9, 3, 6, 8, 96, 97, 98, 100]
assert c.alpn_protocols == []
assert c.extensions == []
def test_extensions(self):
data = bytes.fromhex(
"03033b70638d2523e1cba15f8364868295305e9c52aceabda4b5147210abc783e6e1000022c02bc02fc02cc030"
"cca9cca8cc14cc13c009c013c00ac014009c009d002f0035000a0100006cff0100010000000010000e00000b65"
"78616d706c652e636f6d0017000000230000000d00120010060106030501050304010403020102030005000501"
"00000000001200000010000e000c02683208687474702f312e3175500000000b00020100000a00080006001d00"
"170018"
)
c = tls.ClientHello(data)
assert repr(c)
assert c.sni == 'example.com'
assert c.cipher_suites == [
49195, 49199, 49196, 49200, 52393, 52392, 52244, 52243, 49161,
49171, 49162, 49172, 156, 157, 47, 53, 10
]
assert c.alpn_protocols == [b'h2', b'http/1.1']
assert c.extensions == [
(65281, b'\x00'),
(0, b'\x00\x0e\x00\x00\x0bexample.com'),
(23, b''),
(35, b''),
(13, b'\x00\x10\x06\x01\x06\x03\x05\x01\x05\x03\x04\x01\x04\x03\x02\x01\x02\x03'),
(5, b'\x01\x00\x00\x00\x00'),
(18, b''),
(16, b'\x00\x0c\x02h2\x08http/1.1'),
(30032, b''),
(11, b'\x01\x00'),
(10, b'\x00\x06\x00\x1d\x00\x17\x00\x18')
]

View File

@ -1,4 +1,5 @@
import ssl
import time
import typing
import pytest
@ -8,6 +9,7 @@ from mitmproxy import connection
from mitmproxy.connection import ConnectionState, Server
from mitmproxy.proxy import commands, context, events, layer
from mitmproxy.proxy.layers import tls
from mitmproxy.tls import ClientHelloData, TlsData
from mitmproxy.utils import data
from test.mitmproxy.proxy import tutils
@ -67,8 +69,8 @@ def test_get_client_hello():
assert tls.get_client_hello(single_record) == client_hello_no_extensions
split_over_two_records = (
bytes.fromhex("1603010020") + client_hello_no_extensions[:32] +
bytes.fromhex("1603010045") + client_hello_no_extensions[32:]
bytes.fromhex("1603010020") + client_hello_no_extensions[:32] +
bytes.fromhex("1603010045") + client_hello_no_extensions[32:]
)
assert tls.get_client_hello(split_over_two_records) == client_hello_no_extensions
@ -133,9 +135,9 @@ def _test_echo(playbook: tutils.Playbook, tssl: SSLTest, conn: connection.Connec
tssl.obj.write(b"Hello World")
data = tutils.Placeholder(bytes)
assert (
playbook
>> events.DataReceived(conn, tssl.bio_read())
<< commands.SendData(conn, data)
playbook
>> events.DataReceived(conn, tssl.bio_read())
<< commands.SendData(conn, data)
)
tssl.bio_write(data())
assert tssl.obj.read() == b"hello world"
@ -153,13 +155,21 @@ class TlsEchoLayer(tutils.EchoLayer):
yield from super()._handle_event(event)
def interact(playbook: tutils.Playbook, conn: connection.Connection, tssl: SSLTest):
def finish_handshake(playbook: tutils.Playbook, conn: connection.Connection, tssl: SSLTest):
data = tutils.Placeholder(bytes)
tls_hook_data = tutils.Placeholder(TlsData)
if isinstance(conn, connection.Client):
established_hook = tls.TlsEstablishedClientHook(tls_hook_data)
else:
established_hook = tls.TlsEstablishedServerHook(tls_hook_data)
assert (
playbook
>> events.DataReceived(conn, tssl.bio_read())
<< commands.SendData(conn, data)
playbook
>> events.DataReceived(conn, tssl.bio_read())
<< established_hook
>> tutils.reply()
<< commands.SendData(conn, data)
)
assert tls_hook_data().conn.error is None
tssl.bio_write(data())
@ -168,7 +178,7 @@ def reply_tls_start_client(alpn: typing.Optional[bytes] = None, *args, **kwargs)
Helper function to simplify the syntax for tls_start_client hooks.
"""
def make_client_conn(tls_start: tls.TlsStartData) -> None:
def make_client_conn(tls_start: TlsData) -> None:
# ssl_context = SSL.Context(Method.TLS_METHOD)
# ssl_context.set_min_proto_version(SSL.TLS1_3_VERSION)
ssl_context = SSL.Context(SSL.SSLv23_METHOD)
@ -193,7 +203,7 @@ def reply_tls_start_server(alpn: typing.Optional[bytes] = None, *args, **kwargs)
Helper function to simplify the syntax for tls_start_server hooks.
"""
def make_server_conn(tls_start: tls.TlsStartData) -> None:
def make_server_conn(tls_start: TlsData) -> None:
# ssl_context = SSL.Context(Method.TLS_METHOD)
# ssl_context.set_min_proto_version(SSL.TLS1_3_VERSION)
ssl_context = SSL.Context(SSL.SSLv23_METHOD)
@ -238,9 +248,9 @@ class TestServerTLS:
layer.child_layer = TlsEchoLayer(tctx)
assert (
tutils.Playbook(layer)
>> events.DataReceived(tctx.client, b"Hello World")
<< commands.SendData(tctx.client, b"hello world")
tutils.Playbook(layer)
>> events.DataReceived(tctx.client, b"Hello World")
<< commands.SendData(tctx.client, b"hello world")
)
def test_simple(self, tctx):
@ -251,49 +261,49 @@ class TestServerTLS:
tssl = SSLTest(server_side=True)
# send ClientHello
# send ClientHello, receive ClientHello
data = tutils.Placeholder(bytes)
assert (
playbook
<< tls.TlsStartServerHook(tutils.Placeholder())
>> reply_tls_start_server()
<< commands.SendData(tctx.server, data)
playbook
<< tls.TlsStartServerHook(tutils.Placeholder())
>> reply_tls_start_server()
<< commands.SendData(tctx.server, data)
)
# receive ServerHello, finish client handshake
tssl.bio_write(data())
with pytest.raises(ssl.SSLWantReadError):
tssl.do_handshake()
interact(playbook, tctx.server, tssl)
# finish server handshake
# finish handshake (mitmproxy)
finish_handshake(playbook, tctx.server, tssl)
# finish handshake (locally)
tssl.do_handshake()
assert (
playbook
>> events.DataReceived(tctx.server, tssl.bio_read())
<< None
playbook
>> events.DataReceived(tctx.server, tssl.bio_read())
<< None
)
assert tctx.server.tls_established
# Echo
assert (
playbook
>> events.DataReceived(tctx.client, b"foo")
<< layer.NextLayerHook(tutils.Placeholder())
>> tutils.reply_next_layer(TlsEchoLayer)
<< commands.SendData(tctx.client, b"foo")
playbook
>> events.DataReceived(tctx.client, b"foo")
<< layer.NextLayerHook(tutils.Placeholder())
>> tutils.reply_next_layer(TlsEchoLayer)
<< commands.SendData(tctx.client, b"foo")
)
_test_echo(playbook, tssl, tctx.server)
with pytest.raises(ssl.SSLWantReadError):
tssl.obj.unwrap()
assert (
playbook
>> events.DataReceived(tctx.server, tssl.bio_read())
<< commands.CloseConnection(tctx.server)
>> events.ConnectionClosed(tctx.server)
<< None
playbook
>> events.DataReceived(tctx.server, tssl.bio_read())
<< commands.CloseConnection(tctx.server)
>> events.ConnectionClosed(tctx.server)
<< None
)
def test_untrusted_cert(self, tctx):
@ -307,15 +317,15 @@ class TestServerTLS:
# send ClientHello
data = tutils.Placeholder(bytes)
assert (
playbook
>> events.DataReceived(tctx.client, b"open-connection")
<< layer.NextLayerHook(tutils.Placeholder())
>> tutils.reply_next_layer(TlsEchoLayer)
<< commands.OpenConnection(tctx.server)
>> tutils.reply(None)
<< tls.TlsStartServerHook(tutils.Placeholder())
>> reply_tls_start_server()
<< commands.SendData(tctx.server, data)
playbook
>> events.DataReceived(tctx.client, b"open-connection")
<< layer.NextLayerHook(tutils.Placeholder())
>> tutils.reply_next_layer(TlsEchoLayer)
<< commands.OpenConnection(tctx.server)
>> tutils.reply(None)
<< tls.TlsStartServerHook(tutils.Placeholder())
>> reply_tls_start_server()
<< commands.SendData(tctx.server, data)
)
# receive ServerHello, finish client handshake
@ -323,14 +333,18 @@ class TestServerTLS:
with pytest.raises(ssl.SSLWantReadError):
tssl.do_handshake()
tls_hook_data = tutils.Placeholder(TlsData)
assert (
playbook
>> events.DataReceived(tctx.server, tssl.bio_read())
<< commands.Log("Server TLS handshake failed. Certificate verify failed: Hostname mismatch", "warn")
<< commands.CloseConnection(tctx.server)
<< commands.SendData(tctx.client,
b"open-connection failed: Certificate verify failed: Hostname mismatch")
playbook
>> events.DataReceived(tctx.server, tssl.bio_read())
<< commands.Log("Server TLS handshake failed. Certificate verify failed: Hostname mismatch", "warn")
<< tls.TlsFailedServerHook(tls_hook_data)
>> tutils.reply()
<< commands.CloseConnection(tctx.server)
<< commands.SendData(tctx.client,
b"open-connection failed: Certificate verify failed: Hostname mismatch")
)
assert tls_hook_data().conn.error == "Certificate verify failed: Hostname mismatch"
assert not tctx.server.tls_established
def test_remote_speaks_no_tls(self, tctx):
@ -340,15 +354,19 @@ class TestServerTLS:
# send ClientHello, receive random garbage back
data = tutils.Placeholder(bytes)
tls_hook_data = tutils.Placeholder(TlsData)
assert (
playbook
<< tls.TlsStartServerHook(tutils.Placeholder())
>> reply_tls_start_server()
<< commands.SendData(tctx.server, data)
>> events.DataReceived(tctx.server, b"HTTP/1.1 404 Not Found\r\n")
<< commands.Log("Server TLS handshake failed. The remote server does not speak TLS.", "warn")
<< commands.CloseConnection(tctx.server)
playbook
<< tls.TlsStartServerHook(tutils.Placeholder())
>> reply_tls_start_server()
<< commands.SendData(tctx.server, data)
>> events.DataReceived(tctx.server, b"HTTP/1.1 404 Not Found\r\n")
<< commands.Log("Server TLS handshake failed. The remote server does not speak TLS.", "warn")
<< tls.TlsFailedServerHook(tls_hook_data)
>> tutils.reply()
<< commands.CloseConnection(tctx.server)
)
assert tls_hook_data().conn.error == "The remote server does not speak TLS."
def test_unsupported_protocol(self, tctx: context.Context):
"""Test the scenario where the server only supports an outdated TLS version by default."""
@ -375,18 +393,22 @@ class TestServerTLS:
tssl.do_handshake()
# send back error
tls_hook_data = tutils.Placeholder(TlsData)
assert (
playbook
>> events.DataReceived(tctx.server, tssl.bio_read())
<< commands.Log("Server TLS handshake failed. The remote server and mitmproxy cannot agree on a TLS version"
" to use. You may need to adjust mitmproxy's tls_version_server_min option.", "warn")
<< tls.TlsFailedServerHook(tls_hook_data)
>> tutils.reply()
<< commands.CloseConnection(tctx.server)
)
assert tls_hook_data().conn.error
def make_client_tls_layer(
tctx: context.Context,
**kwargs
tctx: context.Context,
**kwargs
) -> typing.Tuple[tutils.Playbook, tls.ClientTLSLayer, SSLTest]:
# This is a bit contrived as the client layer expects a server layer as parent.
# We also set child layers manually to avoid NextLayer noise.
@ -418,18 +440,18 @@ class TestClientTLS:
# Send ClientHello, receive ServerHello
data = tutils.Placeholder(bytes)
assert (
playbook
>> events.DataReceived(tctx.client, tssl_client.bio_read())
<< tls.TlsClienthelloHook(tutils.Placeholder())
>> tutils.reply()
<< tls.TlsStartClientHook(tutils.Placeholder())
>> reply_tls_start_client()
<< commands.SendData(tctx.client, data)
playbook
>> events.DataReceived(tctx.client, tssl_client.bio_read())
<< tls.TlsClienthelloHook(tutils.Placeholder())
>> tutils.reply()
<< tls.TlsStartClientHook(tutils.Placeholder())
>> reply_tls_start_client()
<< commands.SendData(tctx.client, data)
)
tssl_client.bio_write(data())
tssl_client.do_handshake()
# Finish Handshake
interact(playbook, tctx.client, tssl_client)
finish_handshake(playbook, tctx.client, tssl_client)
assert tssl_client.obj.getpeercert(True)
assert tctx.client.tls_established
@ -438,18 +460,18 @@ class TestClientTLS:
_test_echo(playbook, tssl_client, tctx.client)
other_server = Server(None)
assert (
playbook
>> events.DataReceived(other_server, b"Plaintext")
<< commands.SendData(other_server, b"plaintext")
playbook
>> events.DataReceived(other_server, b"Plaintext")
<< commands.SendData(other_server, b"plaintext")
)
@pytest.mark.parametrize("eager", ["eager", ""])
def test_server_required(self, tctx, eager):
@pytest.mark.parametrize("server_state", ["open", "closed"])
def test_server_required(self, tctx, server_state):
"""
Test the scenario where a server connection is required (for example, because of an unknown ALPN)
to establish TLS with the client.
"""
if eager:
if server_state == "open":
tctx.server.state = ConnectionState.OPEN
tssl_server = SSLTest(server_side=True, alpn=["quux"])
playbook, client_layer, tssl_client = make_client_tls_layer(tctx, alpn=["quux"])
@ -457,16 +479,16 @@ class TestClientTLS:
# We should now get instructed to open a server connection.
data = tutils.Placeholder(bytes)
def require_server_conn(client_hello: tls.ClientHelloData) -> None:
def require_server_conn(client_hello: ClientHelloData) -> None:
client_hello.establish_server_tls_first = True
(
playbook
>> events.DataReceived(tctx.client, tssl_client.bio_read())
<< tls.TlsClienthelloHook(tutils.Placeholder())
>> tutils.reply(side_effect=require_server_conn)
playbook
>> events.DataReceived(tctx.client, tssl_client.bio_read())
<< tls.TlsClienthelloHook(tutils.Placeholder())
>> tutils.reply(side_effect=require_server_conn)
)
if not eager:
if server_state == "closed":
(
playbook
<< commands.OpenConnection(tctx.server)
@ -486,10 +508,12 @@ class TestClientTLS:
data = tutils.Placeholder(bytes)
assert (
playbook
>> events.DataReceived(tctx.server, tssl_server.bio_read())
<< commands.SendData(tctx.server, data)
<< tls.TlsStartClientHook(tutils.Placeholder())
playbook
>> events.DataReceived(tctx.server, tssl_server.bio_read())
<< tls.TlsEstablishedServerHook(tutils.Placeholder())
>> tutils.reply()
<< commands.SendData(tctx.server, data)
<< tls.TlsStartClientHook(tutils.Placeholder())
)
tssl_server.bio_write(data())
assert tctx.server.tls_established
@ -497,13 +521,13 @@ class TestClientTLS:
data = tutils.Placeholder(bytes)
assert (
playbook
>> reply_tls_start_client(alpn=b"quux")
<< commands.SendData(tctx.client, data)
playbook
>> reply_tls_start_client(alpn=b"quux")
<< commands.SendData(tctx.client, data)
)
tssl_client.bio_write(data())
tssl_client.do_handshake()
interact(playbook, tctx.client, tssl_client)
finish_handshake(playbook, tctx.client, tssl_client)
# Both handshakes completed!
assert tctx.client.tls_established
@ -514,18 +538,56 @@ class TestClientTLS:
_test_echo(playbook, tssl_server, tctx.server)
_test_echo(playbook, tssl_client, tctx.client)
@pytest.mark.parametrize("server_state", ["open", "closed"])
def test_passthrough_from_clienthello(self, tctx, server_state):
"""
Test the scenario where the connection is moved to passthrough mode in the tls_clienthello hook.
"""
if server_state == "open":
tctx.server.timestamp_start = time.time()
tctx.server.state = ConnectionState.OPEN
playbook, client_layer, tssl_client = make_client_tls_layer(tctx, alpn=["quux"])
def make_passthrough(client_hello: ClientHelloData) -> None:
client_hello.ignore_connection = True
client_hello = tssl_client.bio_read()
(
playbook
>> events.DataReceived(tctx.client, client_hello)
<< tls.TlsClienthelloHook(tutils.Placeholder())
>> tutils.reply(side_effect=make_passthrough)
)
if server_state == "closed":
(
playbook
<< commands.OpenConnection(tctx.server)
>> tutils.reply(None)
)
assert (
playbook
<< commands.SendData(tctx.server, client_hello) # passed through unmodified
>> events.DataReceived(tctx.server, b"ServerHello") # and the same for the serverhello.
<< commands.SendData(tctx.client, b"ServerHello")
)
def test_cannot_parse_clienthello(self, tctx: context.Context):
"""Test the scenario where we cannot parse the ClientHello"""
playbook, client_layer, tssl_client = make_client_tls_layer(tctx)
tls_hook_data = tutils.Placeholder(TlsData)
invalid = b"\x16\x03\x01\x00\x00"
assert (
playbook
>> events.DataReceived(tctx.client, invalid)
<< commands.Log(f"Client TLS handshake failed. Cannot parse ClientHello: {invalid.hex()}", level="warn")
<< commands.CloseConnection(tctx.client)
playbook
>> events.DataReceived(tctx.client, invalid)
<< commands.Log(f"Client TLS handshake failed. Cannot parse ClientHello: {invalid.hex()}", level="warn")
<< tls.TlsFailedClientHook(tls_hook_data)
>> tutils.reply()
<< commands.CloseConnection(tctx.client)
)
assert tls_hook_data().conn.error
assert not tctx.client.tls_established
# Make sure that an active server connection does not cause child layers to spawn.
@ -544,27 +606,31 @@ class TestClientTLS:
data = tutils.Placeholder(bytes)
assert (
playbook
>> events.DataReceived(tctx.client, tssl_client.bio_read())
<< tls.TlsClienthelloHook(tutils.Placeholder())
>> tutils.reply()
<< tls.TlsStartClientHook(tutils.Placeholder())
>> reply_tls_start_client()
<< commands.SendData(tctx.client, data)
playbook
>> events.DataReceived(tctx.client, tssl_client.bio_read())
<< tls.TlsClienthelloHook(tutils.Placeholder())
>> tutils.reply()
<< tls.TlsStartClientHook(tutils.Placeholder())
>> reply_tls_start_client()
<< commands.SendData(tctx.client, data)
)
tssl_client.bio_write(data())
with pytest.raises(ssl.SSLCertVerificationError):
tssl_client.do_handshake()
# Finish Handshake
tls_hook_data = tutils.Placeholder(TlsData)
assert (
playbook
>> events.DataReceived(tctx.client, tssl_client.bio_read())
<< commands.Log("Client TLS handshake failed. The client does not trust the proxy's certificate "
"for wrong.host.mitmproxy.org (sslv3 alert bad certificate)", "warn")
<< commands.CloseConnection(tctx.client)
>> events.ConnectionClosed(tctx.client)
playbook
>> events.DataReceived(tctx.client, tssl_client.bio_read())
<< commands.Log("Client TLS handshake failed. The client does not trust the proxy's certificate "
"for wrong.host.mitmproxy.org (sslv3 alert bad certificate)", "warn")
<< tls.TlsFailedClientHook(tls_hook_data)
>> tutils.reply()
<< commands.CloseConnection(tctx.client)
>> events.ConnectionClosed(tctx.client)
)
assert not tctx.client.tls_established
assert tls_hook_data().conn.error
@pytest.mark.parametrize("close_at", ["tls_clienthello", "tls_start_client", "handshake"])
def test_immediate_disconnect(self, tctx: context.Context, close_at):
@ -573,6 +639,7 @@ class TestClientTLS:
the proxy certificate."""
playbook, client_layer, tssl_client = make_client_tls_layer(tctx, sni=b"wrong.host.mitmproxy.org")
playbook.logs = True
tls_hook_data = tutils.Placeholder(TlsData)
playbook >> events.DataReceived(tctx.client, tssl_client.bio_read())
playbook << tls.TlsClienthelloHook(tutils.Placeholder())
@ -584,8 +651,11 @@ class TestClientTLS:
>> tutils.reply(to=-2)
<< tls.TlsStartClientHook(tutils.Placeholder())
>> reply_tls_start_client()
<< tls.TlsFailedClientHook(tls_hook_data)
>> tutils.reply()
<< commands.CloseConnection(tctx.client)
)
assert tls_hook_data().conn.error
return
playbook >> tutils.reply()
@ -596,8 +666,11 @@ class TestClientTLS:
playbook
>> events.ConnectionClosed(tctx.client)
>> reply_tls_start_client(to=-2)
<< tls.TlsFailedClientHook(tls_hook_data)
>> tutils.reply()
<< commands.CloseConnection(tctx.client)
)
assert tls_hook_data().conn.error
return
assert (
@ -608,14 +681,18 @@ class TestClientTLS:
<< commands.Log("Client TLS handshake failed. The client disconnected during the handshake. "
"If this happens consistently for wrong.host.mitmproxy.org, this may indicate that the "
"client does not trust the proxy's certificate.", "info")
<< tls.TlsFailedClientHook(tls_hook_data)
>> tutils.reply()
<< commands.CloseConnection(tctx.client)
)
assert tls_hook_data().conn.error
def test_unsupported_protocol(self, tctx: context.Context):
"""Test the scenario where the client only supports an outdated TLS version by default."""
playbook, client_layer, tssl_client = make_client_tls_layer(tctx, max_ver=ssl.TLSVersion.TLSv1_2)
playbook.logs = True
tls_hook_data = tutils.Placeholder(TlsData)
assert (
playbook
>> events.DataReceived(tctx.client, tssl_client.bio_read())
@ -625,5 +702,8 @@ class TestClientTLS:
>> reply_tls_start_client()
<< commands.Log("Client TLS handshake failed. Client and mitmproxy cannot agree on a TLS version to "
"use. You may need to adjust mitmproxy's tls_version_client_min option.", "warn")
<< tls.TlsFailedClientHook(tls_hook_data)
>> tutils.reply()
<< commands.CloseConnection(tctx.client)
)
assert tls_hook_data().conn.error

View File

@ -1,7 +1,7 @@
from hypothesis import given, example
from hypothesis.strategies import binary, integers
from mitmproxy.net.tls import ClientHello
from mitmproxy.tls import ClientHello
from mitmproxy.proxy.layers.tls import parse_client_hello
client_hello_with_extensions = bytes.fromhex(
@ -17,7 +17,7 @@ client_hello_with_extensions = bytes.fromhex(
@given(i=integers(0, len(client_hello_with_extensions)), data=binary())
@example(i=183, data=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00')
def test_fuzz_h2_request_chunks(i, data):
def test_fuzz_parse_client_hello(i, data):
try:
ch = parse_client_hello(client_hello_with_extensions[:i] + data)
except ValueError:

View File

@ -0,0 +1,53 @@
from mitmproxy import tls
CLIENT_HELLO_NO_EXTENSIONS = bytes.fromhex(
"03015658a756ab2c2bff55f636814deac086b7ca56b65058c7893ffc6074f5245f70205658a75475103a152637"
"78e1bb6d22e8bbd5b6b0a3a59760ad354e91ba20d353001a0035002f000a000500040009000300060008006000"
"61006200640100"
)
FULL_CLIENT_HELLO_NO_EXTENSIONS = (
b"\x16\x03\x03\x00\x65" # record layer
b"\x01\x00\x00\x61" + # handshake header
CLIENT_HELLO_NO_EXTENSIONS
)
class TestClientHello:
def test_no_extensions(self):
c = tls.ClientHello(CLIENT_HELLO_NO_EXTENSIONS)
assert repr(c)
assert c.sni is None
assert c.cipher_suites == [53, 47, 10, 5, 4, 9, 3, 6, 8, 96, 97, 98, 100]
assert c.alpn_protocols == []
assert c.extensions == []
def test_extensions(self):
data = bytes.fromhex(
"03033b70638d2523e1cba15f8364868295305e9c52aceabda4b5147210abc783e6e1000022c02bc02fc02cc030"
"cca9cca8cc14cc13c009c013c00ac014009c009d002f0035000a0100006cff0100010000000010000e00000b65"
"78616d706c652e636f6d0017000000230000000d00120010060106030501050304010403020102030005000501"
"00000000001200000010000e000c02683208687474702f312e3175500000000b00020100000a00080006001d00"
"170018"
)
c = tls.ClientHello(data)
assert repr(c)
assert c.sni == 'example.com'
assert c.cipher_suites == [
49195, 49199, 49196, 49200, 52393, 52392, 52244, 52243, 49161,
49171, 49162, 49172, 156, 157, 47, 53, 10
]
assert c.alpn_protocols == [b'h2', b'http/1.1']
assert c.extensions == [
(65281, b'\x00'),
(0, b'\x00\x0e\x00\x00\x0bexample.com'),
(23, b''),
(35, b''),
(13, b'\x00\x10\x06\x01\x06\x03\x05\x01\x05\x03\x04\x01\x04\x03\x02\x01\x02\x03'),
(5, b'\x01\x00\x00\x00\x00'),
(18, b''),
(16, b'\x00\x0c\x02h2\x08http/1.1'),
(30032, b''),
(11, b'\x01\x00'),
(10, b'\x00\x06\x00\x1d\x00\x17\x00\x18')
]