mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-26 02:10:59 +00:00
commit
ef69701b0a
38
examples/contrib/custom_next_layer.py
Normal file
38
examples/contrib/custom_next_layer.py
Normal file
@ -0,0 +1,38 @@
|
||||
"""
|
||||
This addon demonstrates how to override next_layer to modify the protocol in use.
|
||||
In this example, we are forcing connections to example.com:443 to instead go as plaintext
|
||||
to example.com:80.
|
||||
|
||||
Example usage:
|
||||
|
||||
- mitmdump -s custom_next_layer.py
|
||||
- curl -x localhost:8080 -k https://example.com
|
||||
"""
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy.proxy import layer, layers
|
||||
|
||||
|
||||
def running():
|
||||
# We change the connection strategy to lazy so that next_layer happens before we actually connect upstream.
|
||||
# Alternatively we could also change the server address in `server_connect`.
|
||||
ctx.options.connection_strategy = "lazy"
|
||||
|
||||
|
||||
def next_layer(nextlayer: layer.NextLayer):
|
||||
ctx.log(
|
||||
f"{nextlayer.context=}\n"
|
||||
f"{nextlayer.data_client()[:70]=}\n"
|
||||
f"{nextlayer.data_server()[:70]=}\n"
|
||||
)
|
||||
|
||||
if nextlayer.context.server.address == ("example.com", 443):
|
||||
nextlayer.context.server.address = ("example.com", 80)
|
||||
|
||||
# We are disabling ALPN negotiation as our curl client would otherwise agree on HTTP/2,
|
||||
# which our example server here does not accept for plaintext connections.
|
||||
nextlayer.context.client.alpn = b""
|
||||
|
||||
# We know all layers that come next: First negotiate TLS with the client, then do simple TCP passthrough.
|
||||
# Setting only one layer here would also work, in that case next_layer would be called again after TLS establishment.
|
||||
nextlayer.layer = layers.ClientTLSLayer(nextlayer.context)
|
||||
nextlayer.layer.child_layer = layers.TCPLayer(nextlayer.context)
|
@ -41,11 +41,11 @@ def default_addons():
|
||||
cut.Cut(),
|
||||
disable_h2c.DisableH2C(),
|
||||
export.Export(),
|
||||
next_layer.NextLayer(),
|
||||
onboarding.Onboarding(),
|
||||
proxyauth.ProxyAuth(),
|
||||
proxyserver.Proxyserver(),
|
||||
script.ScriptLoader(),
|
||||
next_layer.NextLayer(),
|
||||
serverplayback.ServerPlayback(),
|
||||
mapremote.MapRemote(),
|
||||
maplocal.MapLocal(),
|
||||
|
@ -1,3 +1,19 @@
|
||||
"""
|
||||
This addon determines the next protocol layer in our proxy stack.
|
||||
Whenever a protocol layer in the proxy wants to pass a connection to a child layer and isn't sure which protocol comes
|
||||
next, it calls the `next_layer` hook, which ends up here.
|
||||
For example, if mitmproxy runs as a regular proxy, we first need to determine if
|
||||
new clients start with a TLS handshake right away (Secure Web Proxy) or send a plaintext HTTP CONNECT request.
|
||||
This addon here peeks at the incoming bytes and then makes a decision based on proxy mode, mitmproxy options, etc.
|
||||
|
||||
For a typical HTTPS request, this addon is called a couple of times: First to determine that we start with an HTTP layer
|
||||
which processes the `CONNECT` request, a second time to determine that the client then starts negotiating TLS, and a
|
||||
third time where we check if the protocol within that TLS stream is actually HTTP or something else.
|
||||
|
||||
Sometimes it's useful to hardcode specific logic in next_layer when one wants to do fancy things.
|
||||
In that case it's not necessary to modify mitmproxy's source, adding a custom addon with a next_layer event hook
|
||||
that sets nextlayer.layer works just as well.
|
||||
"""
|
||||
import re
|
||||
from typing import Type, Sequence, Union, Tuple, Any, Iterable, Optional, List
|
||||
|
||||
@ -87,21 +103,21 @@ class NextLayer:
|
||||
raise AssertionError()
|
||||
|
||||
def next_layer(self, nextlayer: layer.NextLayer):
|
||||
nextlayer.layer = self._next_layer(
|
||||
nextlayer.context,
|
||||
nextlayer.data_client(),
|
||||
nextlayer.data_server(),
|
||||
)
|
||||
if nextlayer.layer is None:
|
||||
nextlayer.layer = self._next_layer(
|
||||
nextlayer.context,
|
||||
nextlayer.data_client(),
|
||||
nextlayer.data_server(),
|
||||
)
|
||||
|
||||
def _next_layer(self, context: context.Context, data_client: bytes, data_server: bytes) -> Optional[layer.Layer]:
|
||||
if len(context.layers) == 0:
|
||||
return self.make_top_layer(context)
|
||||
|
||||
if len(data_client) < 3 and not data_server:
|
||||
return None
|
||||
|
||||
client_tls = is_tls_record_magic(data_client)
|
||||
return None # not enough data yet to make a decision
|
||||
|
||||
# helper function to quickly check if the existing layer stack matches a particular configuration.
|
||||
def s(*layers):
|
||||
return stack_match(context, layers)
|
||||
|
||||
@ -113,6 +129,7 @@ class NextLayer:
|
||||
return None
|
||||
|
||||
# 2. Check for TLS
|
||||
client_tls = is_tls_record_magic(data_client)
|
||||
if client_tls:
|
||||
# client tls usually requires a server tls layer as parent layer, except:
|
||||
# - a secure web proxy doesn't have a server part.
|
||||
|
@ -24,14 +24,21 @@ DEFAULT_CIPHERS = (
|
||||
|
||||
|
||||
class AppData(TypedDict):
|
||||
client_alpn: Optional[bytes]
|
||||
server_alpn: Optional[bytes]
|
||||
http2: bool
|
||||
|
||||
|
||||
def alpn_select_callback(conn: SSL.Connection, options: List[bytes]) -> Any:
|
||||
app_data: AppData = conn.get_app_data()
|
||||
client_alpn = app_data["client_alpn"]
|
||||
server_alpn = app_data["server_alpn"]
|
||||
http2 = app_data["http2"]
|
||||
if client_alpn is not None:
|
||||
if client_alpn in options:
|
||||
return client_alpn
|
||||
else:
|
||||
return SSL.NO_OVERLAPPING_PROTOCOLS
|
||||
if server_alpn and server_alpn in options:
|
||||
return server_alpn
|
||||
if server_alpn == b"":
|
||||
@ -148,6 +155,7 @@ class TlsConfig:
|
||||
)
|
||||
tls_start.ssl_conn = SSL.Connection(ssl_ctx)
|
||||
tls_start.ssl_conn.set_app_data(AppData(
|
||||
client_alpn=client.alpn,
|
||||
server_alpn=server.alpn,
|
||||
http2=ctx.options.http2,
|
||||
))
|
||||
|
@ -293,6 +293,11 @@ class Server(Connection):
|
||||
local_port = ""
|
||||
return f"Server({human.format_address(self.address)}, state={self.state.name.lower()}{tls_state}{local_port})"
|
||||
|
||||
def __setattr__(self, name, value):
|
||||
if name == "address" and self.__dict__.get("state", ConnectionState.CLOSED) is ConnectionState.OPEN:
|
||||
raise RuntimeError("Cannot change server address on open connection.")
|
||||
return super().__setattr__(name, value)
|
||||
|
||||
def get_state(self):
|
||||
return {
|
||||
'address': self.address,
|
||||
|
@ -6,7 +6,9 @@ from mitmproxy import hooks
|
||||
|
||||
class LogEntry:
|
||||
def __init__(self, msg, level):
|
||||
self.msg = msg
|
||||
# it's important that we serialize to string here already so that we don't pick up changes
|
||||
# happening after this log statement.
|
||||
self.msg = str(msg)
|
||||
self.level = level
|
||||
|
||||
def __eq__(self, other):
|
||||
|
@ -32,3 +32,17 @@ class Context:
|
||||
ret.server = self.server
|
||||
ret.layers = self.layers.copy()
|
||||
return ret
|
||||
|
||||
def __repr__(self):
|
||||
layers = "\n ".join(repr(l) for l in self.layers)
|
||||
if layers:
|
||||
layers = f"[\n {layers}\n ]"
|
||||
else:
|
||||
layers = "[]"
|
||||
return (
|
||||
f"Context(\n"
|
||||
f" {self.client!r},\n"
|
||||
f" {self.server!r},\n"
|
||||
f" layers={layers}\n"
|
||||
f")"
|
||||
)
|
||||
|
@ -122,5 +122,6 @@ class TestNextLayer:
|
||||
assert isinstance(nl._next_layer(ctx, b"", b"hello"), layers.TCPLayer)
|
||||
|
||||
l = MagicMock()
|
||||
l.layer = None
|
||||
nl.next_layer(l)
|
||||
assert isinstance(l.layer, layers.modes.HttpProxy)
|
||||
|
@ -17,13 +17,19 @@ from test.mitmproxy.proxy.layers import test_tls
|
||||
def test_alpn_select_callback():
|
||||
ctx = SSL.Context(SSL.SSLv23_METHOD)
|
||||
conn = SSL.Connection(ctx)
|
||||
conn.set_app_data(tlsconfig.AppData(server_alpn=b"h2", http2=True))
|
||||
|
||||
# Test that we respect addons setting `client.alpn`.
|
||||
conn.set_app_data(tlsconfig.AppData(server_alpn=b"h2", http2=True, client_alpn=b"qux"))
|
||||
assert tlsconfig.alpn_select_callback(conn, [b"http/1.1", b"qux", b"h2"]) == b"qux"
|
||||
conn.set_app_data(tlsconfig.AppData(server_alpn=b"h2", http2=True, client_alpn=b""))
|
||||
assert tlsconfig.alpn_select_callback(conn, [b"http/1.1", b"qux", b"h2"]) == SSL.NO_OVERLAPPING_PROTOCOLS
|
||||
|
||||
# Test that we try to mirror the server connection's ALPN
|
||||
conn.set_app_data(tlsconfig.AppData(server_alpn=b"h2", http2=True, client_alpn=None))
|
||||
assert tlsconfig.alpn_select_callback(conn, [b"http/1.1", b"qux", b"h2"]) == b"h2"
|
||||
|
||||
# Test that we respect the client's preferred HTTP ALPN.
|
||||
conn.set_app_data(tlsconfig.AppData(server_alpn=None, http2=True))
|
||||
conn.set_app_data(tlsconfig.AppData(server_alpn=None, http2=True, client_alpn=None))
|
||||
assert tlsconfig.alpn_select_callback(conn, [b"qux", b"http/1.1", b"h2"]) == b"http/1.1"
|
||||
assert tlsconfig.alpn_select_callback(conn, [b"qux", b"h2", b"http/1.1"]) == b"h2"
|
||||
|
||||
@ -31,7 +37,7 @@ def test_alpn_select_callback():
|
||||
assert tlsconfig.alpn_select_callback(conn, [b"qux", b"quux"]) == SSL.NO_OVERLAPPING_PROTOCOLS
|
||||
|
||||
# Test that we don't select an ALPN if the server refused to select one.
|
||||
conn.set_app_data(tlsconfig.AppData(server_alpn=b"", http2=True))
|
||||
conn.set_app_data(tlsconfig.AppData(server_alpn=b"", http2=True, client_alpn=None))
|
||||
assert tlsconfig.alpn_select_callback(conn, [b"http/1.1"]) == SSL.NO_OVERLAPPING_PROTOCOLS
|
||||
|
||||
|
||||
|
@ -226,8 +226,8 @@ class TestServerTLS:
|
||||
|
||||
def test_simple(self, tctx):
|
||||
playbook = tutils.Playbook(tls.ServerTLSLayer(tctx))
|
||||
tctx.server.state = ConnectionState.OPEN
|
||||
tctx.server.address = ("example.mitmproxy.org", 443)
|
||||
tctx.server.state = ConnectionState.OPEN
|
||||
tctx.server.sni = "example.mitmproxy.org"
|
||||
|
||||
tssl = SSLTest(server_side=True)
|
||||
@ -345,7 +345,7 @@ def make_client_tls_layer(
|
||||
playbook = tutils.Playbook(server_layer)
|
||||
|
||||
# Add some server config, this is needed anyways.
|
||||
tctx.server.address = ("example.mitmproxy.org", 443)
|
||||
tctx.server.__dict__["address"] = ("example.mitmproxy.org", 443) # .address fails because connection is open
|
||||
tctx.server.sni = "example.mitmproxy.org"
|
||||
|
||||
tssl_client = SSLTest(**kwargs)
|
||||
|
@ -10,6 +10,7 @@ def test_context():
|
||||
)
|
||||
assert repr(c)
|
||||
c.layers.append(1)
|
||||
assert repr(c)
|
||||
c2 = c.fork()
|
||||
c.layers.append(2)
|
||||
c2.layers.append(3)
|
||||
|
@ -1,3 +1,5 @@
|
||||
import pytest
|
||||
|
||||
from mitmproxy.connection import Server, Client, ConnectionState
|
||||
from mitmproxy.test.tflow import tclient_conn, tserver_conn
|
||||
|
||||
@ -76,3 +78,10 @@ class TestServer:
|
||||
assert c2.get_state() != c.get_state()
|
||||
c.id = c2.id = "foo"
|
||||
assert c2.get_state() == c.get_state()
|
||||
|
||||
def test_address(self):
|
||||
s = Server(("address", 22))
|
||||
s.address = ("example.com", 443)
|
||||
s.state = ConnectionState.OPEN
|
||||
with pytest.raises(RuntimeError):
|
||||
s.address = ("example.com", 80)
|
||||
|
@ -9,3 +9,10 @@ def test_logentry():
|
||||
assert e == e
|
||||
assert e != f
|
||||
assert e != 42
|
||||
|
||||
|
||||
def test_dont_pick_up_mutations():
|
||||
x = {"foo": "bar"}
|
||||
e = log.LogEntry(x, "info")
|
||||
x["foo"] = "baz" # this should not affect the log entry anymore.
|
||||
assert repr(e) == "LogEntry({'foo': 'bar'}, info)"
|
||||
|
Loading…
Reference in New Issue
Block a user