diff --git a/mitmproxy/addons/next_layer.py b/mitmproxy/addons/next_layer.py index 8f18f2b86..38f0c97da 100644 --- a/mitmproxy/addons/next_layer.py +++ b/mitmproxy/addons/next_layer.py @@ -1,5 +1,5 @@ import re -import typing +from typing import Type, Sequence, Union, Tuple, Any, Iterable, Optional, List from mitmproxy import ctx, exceptions from mitmproxy.net.tls import is_tls_record_magic @@ -9,40 +9,25 @@ from mitmproxy.proxy2 import context, layer, layers from mitmproxy.proxy2.layers import modes from mitmproxy.proxy2.layers.tls import HTTP_ALPNS, parse_client_hello -LayerCls = typing.Type[layer.Layer] +LayerCls = Type[layer.Layer] def stack_match( context: context.Context, - layers: typing.Sequence[typing.Union[LayerCls, typing.Tuple[LayerCls, ...]]] + layers: Sequence[Union[LayerCls, Tuple[LayerCls, ...]]] ) -> bool: if len(context.layers) != len(layers): return False return all( - expected is typing.Any or isinstance(actual, expected) + expected is Any or isinstance(actual, expected) for actual, expected in zip(context.layers, layers) ) -class HostMatcher: - def __init__(self, patterns: typing.Iterable[str] = tuple()): - self.patterns = patterns - self.regexes = [re.compile(p, re.IGNORECASE) for p in self.patterns] - - def __call__(self, address): - if not address: - return False - host = f"{address[0]}:{address[1]}" - return any(rex.search(host) for rex in self.regexes) - - def __bool__(self): - return bool(self.patterns) - - class NextLayer: - ignore_hosts: typing.Iterable[re.Pattern] = () - allow_hosts: typing.Iterable[re.Pattern] = () - tcp_hosts: typing.Iterable[re.Pattern] = () + ignore_hosts: Iterable[re.Pattern] = () + allow_hosts: Iterable[re.Pattern] = () + tcp_hosts: Iterable[re.Pattern] = () def configure(self, updated): if "tcp_hosts" in updated: @@ -59,7 +44,7 @@ class NextLayer: re.compile(x, re.IGNORECASE) for x in ctx.options.allow_hosts ] - def ignore_connection(self, context: context.Context, data_client: bytes) -> typing.Optional[bool]: + def ignore_connection(self, server_address: Optional[context.Address], data_client: bytes) -> Optional[bool]: """ Returns: True, if the connection should be ignored. @@ -69,17 +54,17 @@ class NextLayer: if not ctx.options.ignore_hosts and not ctx.options.allow_hosts: return False - hostnames: typing.List[str] = [] - if context.server.address: - hostnames.append(context.server.address[0]) + hostnames: List[str] = [] + if server_address is not None: + hostnames.append(server_address[0]) if is_tls_record_magic(data_client): try: ch = parse_client_hello(data_client) - if ch is None: + if ch is None: # not complete yet return None sni = ch.sni except ValueError: - return None # defer decision, wait for more input data + pass else: if sni: hostnames.append(sni.decode("idna")) @@ -103,15 +88,19 @@ class NextLayer: raise AssertionError() def next_layer(self, nextlayer: layer.NextLayer): - if isinstance(nextlayer, base.Layer): + if isinstance(nextlayer, base.Layer): # pragma: no cover return # skip the old proxy core's next_layer event. - nextlayer.layer = self._next_layer(nextlayer.context, nextlayer.data_client()) + nextlayer.layer = self._next_layer( + nextlayer.context, + nextlayer.data_client(), + nextlayer.data_server(), + ) - def _next_layer(self, context: context.Context, data_client: bytes) -> typing.Optional[layer.Layer]: + def _next_layer(self, context: context.Context, data_client: bytes, data_server: bytes) -> Optional[layer.Layer]: if len(context.layers) == 0: return self.make_top_layer(context) - if len(data_client) < 3: + if len(data_client) < 3 and not data_server: return None client_tls = is_tls_record_magic(data_client) @@ -122,7 +111,7 @@ class NextLayer: top_layer = context.layers[-1] # 1. check for --ignore/--allow - ignore = self.ignore_connection(context, data_client) + ignore = self.ignore_connection(context.server.address, data_client) if ignore is True: return layers.TCPLayer(context, ignore=True) if ignore is None: @@ -158,15 +147,14 @@ class NextLayer: return layers.TCPLayer(context) # 5. Check for raw tcp mode. - alpn_indicates_non_http = ( - context.client.alpn and context.client.alpn not in HTTP_ALPNS + very_likely_http = ( + context.client.alpn and context.client.alpn in HTTP_ALPNS ) - # Very simple heuristic here - the first three bytes should be - # the HTTP verb, so A-Za-z is expected. - probably_no_http = ( - not data_client[:3].isalpha() + probably_no_http = not very_likely_http and ( + not data_client[:3].isalpha() # the first three bytes should be the HTTP verb, so A-Za-z is expected. + or data_server # a server greeting would be uncharacteristic. ) - if ctx.options.rawtcp and (alpn_indicates_non_http or probably_no_http): + if ctx.options.rawtcp and probably_no_http: return layers.TCPLayer(context) # 6. Assume HTTP by default. @@ -185,5 +173,5 @@ class NextLayer: elif ctx.options.mode == "socks5": raise NotImplementedError("Mode not implemented.") - else: - raise NotImplementedError("Unknown mode.") + else: # pragma: no cover + raise AssertionError("Unknown mode.")