mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-26 18:18:25 +00:00
parent
8dfa15c2d4
commit
e63ef3a520
@ -1,3 +1,7 @@
|
|||||||
|
Some inline scripts may require additional dependencies, which can be installed using
|
||||||
|
`pip install mitmproxy[examples]`.
|
||||||
|
|
||||||
|
|
||||||
# inline script examples
|
# inline script examples
|
||||||
add_header.py Simple script that just adds a header to every request.
|
add_header.py Simple script that just adds a header to every request.
|
||||||
change_upstream_proxy.py Dynamically change the upstream proxy
|
change_upstream_proxy.py Dynamically change the upstream proxy
|
||||||
|
136
examples/tls_passthrough.py
Normal file
136
examples/tls_passthrough.py
Normal file
@ -0,0 +1,136 @@
|
|||||||
|
"""
|
||||||
|
This inline script allows conditional TLS Interception based
|
||||||
|
on a user-defined strategy.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
|
||||||
|
> mitmdump -s tls_passthrough.py
|
||||||
|
|
||||||
|
1. curl --proxy http://localhost:8080 https://example.com --insecure
|
||||||
|
// works - we'll also see the contents in mitmproxy
|
||||||
|
|
||||||
|
2. curl --proxy http://localhost:8080 https://example.com --insecure
|
||||||
|
// still works - we'll also see the contents in mitmproxy
|
||||||
|
|
||||||
|
3. curl --proxy http://localhost:8080 https://example.com
|
||||||
|
// fails with a certificate error, which we will also see in mitmproxy
|
||||||
|
|
||||||
|
4. curl --proxy http://localhost:8080 https://example.com
|
||||||
|
// works again, but mitmproxy does not intercept and we do *not* see the contents
|
||||||
|
|
||||||
|
Authors: Maximilian Hils, Matthew Tuusberg
|
||||||
|
"""
|
||||||
|
from __future__ import (absolute_import, print_function, division)
|
||||||
|
import collections
|
||||||
|
import random
|
||||||
|
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
from libmproxy.exceptions import TlsException
|
||||||
|
from libmproxy.protocol import TlsLayer, RawTCPLayer
|
||||||
|
|
||||||
|
|
||||||
|
class InterceptionResult(Enum):
|
||||||
|
success = True
|
||||||
|
failure = False
|
||||||
|
skipped = None
|
||||||
|
|
||||||
|
|
||||||
|
class _TlsStrategy(object):
|
||||||
|
"""
|
||||||
|
Abstract base class for interception strategies.
|
||||||
|
"""
|
||||||
|
def __init__(self):
|
||||||
|
# A server_address -> interception results mapping
|
||||||
|
self.history = collections.defaultdict(lambda: collections.deque(maxlen=200))
|
||||||
|
|
||||||
|
def should_intercept(self, server_address):
|
||||||
|
"""
|
||||||
|
Returns:
|
||||||
|
True, if we should attempt to intercept the connection.
|
||||||
|
False, if we want to employ pass-through instead.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def record_success(self, server_address):
|
||||||
|
self.history[server_address].append(InterceptionResult.success)
|
||||||
|
|
||||||
|
def record_failure(self, server_address):
|
||||||
|
self.history[server_address].append(InterceptionResult.failure)
|
||||||
|
|
||||||
|
def record_skipped(self, server_address):
|
||||||
|
self.history[server_address].append(InterceptionResult.skipped)
|
||||||
|
|
||||||
|
|
||||||
|
class ConservativeStrategy(_TlsStrategy):
|
||||||
|
"""
|
||||||
|
Conservative Interception Strategy - only intercept if there haven't been any failed attempts
|
||||||
|
in the history.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def should_intercept(self, server_address):
|
||||||
|
if InterceptionResult.failure in self.history[server_address]:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class ProbabilisticStrategy(_TlsStrategy):
|
||||||
|
"""
|
||||||
|
Fixed probability that we intercept a given connection.
|
||||||
|
"""
|
||||||
|
def __init__(self, p):
|
||||||
|
self.p = p
|
||||||
|
super(ProbabilisticStrategy, self).__init__()
|
||||||
|
|
||||||
|
def should_intercept(self, server_address):
|
||||||
|
return random.uniform(0, 1) < self.p
|
||||||
|
|
||||||
|
|
||||||
|
class TlsFeedback(TlsLayer):
|
||||||
|
"""
|
||||||
|
Monkey-patch _establish_tls_with_client to get feedback if TLS could be established
|
||||||
|
successfully on the client connection (which may fail due to cert pinning).
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _establish_tls_with_client(self):
|
||||||
|
server_address = self.server_conn.address
|
||||||
|
tls_strategy = self.script_context.tls_strategy
|
||||||
|
|
||||||
|
try:
|
||||||
|
super(TlsFeedback, self)._establish_tls_with_client()
|
||||||
|
except TlsException as e:
|
||||||
|
tls_strategy.record_failure(server_address)
|
||||||
|
raise e
|
||||||
|
else:
|
||||||
|
tls_strategy.record_success(server_address)
|
||||||
|
|
||||||
|
|
||||||
|
# inline script hooks below.
|
||||||
|
|
||||||
|
|
||||||
|
def start(context, argv):
|
||||||
|
if len(argv) == 2:
|
||||||
|
context.tls_strategy = ProbabilisticStrategy(float(argv[1]))
|
||||||
|
else:
|
||||||
|
context.tls_strategy = ConservativeStrategy()
|
||||||
|
|
||||||
|
|
||||||
|
def next_layer(context, next_layer):
|
||||||
|
"""
|
||||||
|
This hook does the actual magic - if the next layer is planned to be a TLS layer,
|
||||||
|
we check if we want to enter pass-through mode instead.
|
||||||
|
"""
|
||||||
|
if isinstance(next_layer, TlsLayer) and next_layer._client_tls:
|
||||||
|
server_address = next_layer.server_conn.address
|
||||||
|
|
||||||
|
if context.tls_strategy.should_intercept(server_address):
|
||||||
|
# We try to intercept.
|
||||||
|
# Monkey-Patch the layer to get feedback from the TLSLayer if interception worked.
|
||||||
|
next_layer.__class__ = TlsFeedback
|
||||||
|
next_layer.script_context = context
|
||||||
|
else:
|
||||||
|
# We don't intercept - reply with a pass-through layer and add a "skipped" entry.
|
||||||
|
context.log("TLS passthrough for %s" % repr(next_layer.server_conn.address), "info")
|
||||||
|
next_layer_replacement = RawTCPLayer(next_layer.ctx, logging=False)
|
||||||
|
next_layer.reply(next_layer_replacement)
|
||||||
|
context.tls_strategy.record_skipped(server_address)
|
Loading…
Reference in New Issue
Block a user