[sans-io] improve next layer heuristics

This commit is contained in:
Maximilian Hils 2019-11-11 18:27:56 +01:00
parent 105cac231d
commit 0c04638d8d

View File

@ -1,19 +1,19 @@
import re
import typing
from mitmproxy import ctx
from mitmproxy import ctx, exceptions
from mitmproxy.net.tls import is_tls_record_magic
from mitmproxy.proxy.config import HostMatcher
from mitmproxy.proxy.protocol.http import HTTPMode
from mitmproxy.proxy2 import context, layer, layers
from mitmproxy.proxy2.layers import modes
from mitmproxy.proxy2.layers.glue import GLUE_DEBUG
from mitmproxy.proxy2.layers.tls import HTTP_ALPNS, parse_client_hello
LayerCls = typing.Type[layer.Layer]
def stack_match(
context: context.Context,
layers: typing.List[typing.Union[LayerCls, typing.Tuple[LayerCls, ...]]]
context: context.Context,
layers: typing.List[typing.Union[LayerCls, typing.Tuple[LayerCls, ...]]]
) -> bool:
if len(context.layers) != len(layers):
return False
@ -23,93 +23,139 @@ def stack_match(
)
class NextLayer:
check_tcp: HostMatcher
class HostMatcher:
def __init__(self, patterns: typing.Iterable[str] = tuple()):
self.patterns = patterns
self.regexes = [re.compile(p, re.IGNORECASE) for p in self.patterns]
def __init__(self):
self.check_tcp = HostMatcher("tcp")
def __call__(self, address):
if not address:
return False
host = f"{address[0]}:{address[1]}"
return any(rex.search(host) for rex in self.regexes)
def __bool__(self):
return bool(self.patterns)
class NextLayer:
ignore_hosts: typing.Iterable[re.Pattern] = ()
allow_hosts: typing.Iterable[re.Pattern] = ()
tcp_hosts: typing.Iterable[re.Pattern] = ()
def configure(self, updated):
if "tcp_hosts" in updated:
self.check_tcp = HostMatcher(ctx.options.tcp_hosts)
self.tcp_hosts = [
re.compile(x, re.IGNORECASE) for x in ctx.options.tcp_hosts
]
if "allow_hosts" in updated or "ignore_hosts" in updated:
if ctx.options.allow_hosts and ctx.options.ignore_hosts:
raise exceptions.OptionsError("The allow_hosts and ignore_hosts options are mutually exclusive.")
self.ignore_hosts = [
re.compile(x, re.IGNORECASE) for x in ctx.options.ignore_hosts
]
self.allow_hosts = [
re.compile(x, re.IGNORECASE) for x in ctx.options.allow_hosts
]
def ignore_connection(self, context: context.Context, data_client: bytes) -> typing.Optional[bool]:
if not ctx.options.ignore_hosts and not ctx.options.allow_hosts:
return False
addresses: typing.List[str] = [context.server.address]
if is_tls_record_magic(data_client):
try:
sni = parse_client_hello(data_client).sni
except ValueError:
return None # defer decision, wait for more input data
else:
addresses.append(sni.decode("idna"))
if ctx.options.ignore_hosts:
return any(
re.search(rex, address, re.IGNORECASE)
for address in addresses
for rex in ctx.options.ignore_hosts
)
elif ctx.options.allow_hosts:
return not any(
re.search(rex, address, re.IGNORECASE)
for address in addresses
for rex in ctx.options.allow_hosts
)
def next_layer(self, nextlayer: layer.NextLayer):
if not isinstance(nextlayer, layer.NextLayer):
if GLUE_DEBUG:
print(f"[glue: skipping nextlayer for {nextlayer}]")
return
nextlayer.layer = self._next_layer(nextlayer, nextlayer.context)
nextlayer.layer = self._next_layer(nextlayer.context, nextlayer.data_client())
def _next_layer(self, nextlayer: layer.NextLayer, context: context.Context):
# 0. New connection
if not context.layers:
def _next_layer(self, context: context.Context, data_client: bytes) -> typing.Optional[layer.Layer]:
if len(context.layers) == 0:
return self.make_top_layer(context)
if len(context.layers) == 1:
return layers.ServerTLSLayer(context)
top_layer = context.layers[-1]
data_client = nextlayer.data_client()
if len(data_client) < 3:
return
client_tls = is_tls_record_magic(data_client)
s = lambda *layers: stack_match(context, layers)
top_layer = context.layers[-1]
# 1. check for --ignore
if ctx.options.ignore_hosts:
raise NotImplementedError()
# 1. check for --ignore/--allow
ignore = self.ignore_connection(context, data_client)
if ignore is True:
return layers.TCPLayer(context, ignore=True)
if ignore is None:
return
# 2. Always insert a TLS layer as second layer, even if there's neither client nor server
# tls. An addon may upgrade from http to https, in which case we need a server TLS layer.
if s(modes.HttpProxy) or s(modes.ReverseProxy):
return layers.ServerTLSLayer(context)
elif len(context.layers) == 1:
raise NotImplementedError()
# 2. Check for TLS
if client_tls:
# client tls requires a server tls layer as parent layer
if isinstance(top_layer, layers.ServerTLSLayer):
return layers.ClientTLSLayer(context)
else:
if not s(modes.HttpProxy):
# A "Secure Web Proxy" (https://www.chromium.org/developers/design-documents/secure-web-proxy)
# This does not imply TLS on the server side.
pass
else:
# In all other cases, client TLS implies TLS for both ends.
context.server.tls = True
return layers.ServerTLSLayer(context)
if s(modes.HttpProxy, layers.ServerTLSLayer) and client_tls:
# For HttpProxy, this is a "Secure Web Proxy" (https://www.chromium.org/developers/design-documents/secure-web-proxy)
return layers.ClientTLSLayer(context)
# 3. Setup the first HTTP layer for a regular HTTP proxy or an upstream proxy.
# 3. Setup the HTTP layer for a regular HTTP proxy or an upstream proxy.
if any([
s(modes.HttpProxy, layers.ServerTLSLayer),
s(modes.HttpProxy, layers.ServerTLSLayer, layers.ClientTLSLayer),
]):
return layers.HTTPLayer(context, HTTPMode.regular)
if ctx.options.mode.startswith("upstream:") and len(context.layers) <= 3 and isinstance(top_layer,
layers.ServerTLSLayer):
raise NotImplementedError()
# 4. Check for other TLS cases (e.g. after CONNECT).
if client_tls:
# client tls requires a server tls layer as parent layer
if not isinstance(top_layer, layers.ServerTLSLayer):
context.server.tls = True
return layers.ServerTLSLayer(context)
else:
return layers.ClientTLSLayer(context)
# 5. Check for --tcp
if self.check_tcp(context.server.address):
# 4. Check for --tcp
if any(
address and re.search(rex, address, re.IGNORECASE)
for address in (context.server.address, context.client.sni)
for rex in ctx.options.allow_hosts
):
return layers.TCPLayer(context)
# 6. Check for TLS ALPN (HTTP1/HTTP2)
if isinstance(top_layer, layers.ServerTLSLayer):
alpn = context.client.alpn
if alpn == b"http/1.1":
return layers.OldHTTPLayer(context, HTTPMode.transparent) # TODO: replace this with ClientHTTP1Layer
elif alpn == b"h2":
return layers.ClientHTTP2Layer(context)
# 7. Check for raw tcp mode. Very simple heuristic here - the first three bytes should be
# 5. Check for raw tcp mode.
sni_indicates_non_http = (
context.client.sni and context.client.sni not in HTTP_ALPNS
)
# Very simple heuristic here - the first three bytes should be
# the HTTP verb, so A-Za-z is expected.
maybe_http = data_client[:3].isalpha()
if ctx.options.rawtcp and not maybe_http:
probably_no_http = (
not data_client[:3].isalpha()
)
if ctx.options.rawtcp and (sni_indicates_non_http or probably_no_http):
return layers.TCPLayer(context)
# 8. Assume HTTP1 by default.
# 6. Assume HTTP by default.
return layers.HTTPLayer(context, HTTPMode.transparent)
def make_top_layer(self, context):
def make_top_layer(self, context: context.Context) -> layer.Layer:
if ctx.options.mode == "regular":
return layers.modes.HttpProxy(context)
elif ctx.options.mode == "transparent":