mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-23 08:11:00 +00:00
a388ddfd78
Reply is now explicit - it's no longer a callable itself. Instead, we have: reply.kill() - kill the flow reply.ack() - ack, but don't send anything reply.send(message) - send a response This is part of an incremental move to detach reply from our flow objects, and unify the script and handler interfaces.
139 lines
4.4 KiB
Python
139 lines
4.4 KiB
Python
"""
|
|
This inline script allows conditional TLS Interception based
|
|
on a user-defined strategy.
|
|
|
|
Example:
|
|
|
|
> mitmdump -s tls_passthrough.py
|
|
|
|
1. curl --proxy http://localhost:8080 https://example.com --insecure
|
|
// works - we'll also see the contents in mitmproxy
|
|
|
|
2. curl --proxy http://localhost:8080 https://example.com --insecure
|
|
// still works - we'll also see the contents in mitmproxy
|
|
|
|
3. curl --proxy http://localhost:8080 https://example.com
|
|
// fails with a certificate error, which we will also see in mitmproxy
|
|
|
|
4. curl --proxy http://localhost:8080 https://example.com
|
|
// works again, but mitmproxy does not intercept and we do *not* see the contents
|
|
|
|
Authors: Maximilian Hils, Matthew Tuusberg
|
|
"""
|
|
from __future__ import (absolute_import, print_function, division)
|
|
import collections
|
|
import random
|
|
|
|
from enum import Enum
|
|
|
|
from mitmproxy.exceptions import TlsProtocolException
|
|
from mitmproxy.protocol import TlsLayer, RawTCPLayer
|
|
|
|
|
|
class InterceptionResult(Enum):
|
|
success = True
|
|
failure = False
|
|
skipped = None
|
|
|
|
|
|
class _TlsStrategy(object):
|
|
"""
|
|
Abstract base class for interception strategies.
|
|
"""
|
|
|
|
def __init__(self):
|
|
# A server_address -> interception results mapping
|
|
self.history = collections.defaultdict(lambda: collections.deque(maxlen=200))
|
|
|
|
def should_intercept(self, server_address):
|
|
"""
|
|
Returns:
|
|
True, if we should attempt to intercept the connection.
|
|
False, if we want to employ pass-through instead.
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
def record_success(self, server_address):
|
|
self.history[server_address].append(InterceptionResult.success)
|
|
|
|
def record_failure(self, server_address):
|
|
self.history[server_address].append(InterceptionResult.failure)
|
|
|
|
def record_skipped(self, server_address):
|
|
self.history[server_address].append(InterceptionResult.skipped)
|
|
|
|
|
|
class ConservativeStrategy(_TlsStrategy):
|
|
"""
|
|
Conservative Interception Strategy - only intercept if there haven't been any failed attempts
|
|
in the history.
|
|
"""
|
|
|
|
def should_intercept(self, server_address):
|
|
if InterceptionResult.failure in self.history[server_address]:
|
|
return False
|
|
return True
|
|
|
|
|
|
class ProbabilisticStrategy(_TlsStrategy):
|
|
"""
|
|
Fixed probability that we intercept a given connection.
|
|
"""
|
|
|
|
def __init__(self, p):
|
|
self.p = p
|
|
super(ProbabilisticStrategy, self).__init__()
|
|
|
|
def should_intercept(self, server_address):
|
|
return random.uniform(0, 1) < self.p
|
|
|
|
|
|
class TlsFeedback(TlsLayer):
|
|
"""
|
|
Monkey-patch _establish_tls_with_client to get feedback if TLS could be established
|
|
successfully on the client connection (which may fail due to cert pinning).
|
|
"""
|
|
|
|
def _establish_tls_with_client(self):
|
|
server_address = self.server_conn.address
|
|
tls_strategy = self.script_context.tls_strategy
|
|
|
|
try:
|
|
super(TlsFeedback, self)._establish_tls_with_client()
|
|
except TlsProtocolException as e:
|
|
tls_strategy.record_failure(server_address)
|
|
raise e
|
|
else:
|
|
tls_strategy.record_success(server_address)
|
|
|
|
|
|
# inline script hooks below.
|
|
|
|
|
|
def start(context, argv):
|
|
if len(argv) == 2:
|
|
context.tls_strategy = ProbabilisticStrategy(float(argv[1]))
|
|
else:
|
|
context.tls_strategy = ConservativeStrategy()
|
|
|
|
|
|
def next_layer(context, next_layer):
|
|
"""
|
|
This hook does the actual magic - if the next layer is planned to be a TLS layer,
|
|
we check if we want to enter pass-through mode instead.
|
|
"""
|
|
if isinstance(next_layer, TlsLayer) and next_layer._client_tls:
|
|
server_address = next_layer.server_conn.address
|
|
|
|
if context.tls_strategy.should_intercept(server_address):
|
|
# We try to intercept.
|
|
# Monkey-Patch the layer to get feedback from the TLSLayer if interception worked.
|
|
next_layer.__class__ = TlsFeedback
|
|
next_layer.script_context = context
|
|
else:
|
|
# We don't intercept - reply with a pass-through layer and add a "skipped" entry.
|
|
context.log("TLS passthrough for %s" % repr(next_layer.server_conn.address), "info")
|
|
next_layer_replacement = RawTCPLayer(next_layer.ctx, logging=False)
|
|
next_layer.reply.send(next_layer_replacement)
|
|
context.tls_strategy.record_skipped(server_address)
|