diff --git a/mitmproxy/proxy2/layers/old/__init__.py b/mitmproxy/proxy2/layers/old/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/mitmproxy/proxy2/layers/old/glue.py b/mitmproxy/proxy2/layers/old/glue.py deleted file mode 100644 index 8e6adace8..000000000 --- a/mitmproxy/proxy2/layers/old/glue.py +++ /dev/null @@ -1,251 +0,0 @@ -import asyncio -import socket -import sys -import threading -import traceback - -from mitmproxy import proxy, connections, ctx, exceptions, http -from mitmproxy.log import LogEntry -from mitmproxy.net.http import http1 -from mitmproxy.proxy import modes -from mitmproxy.proxy.protocol import ServerConnectionMixin -from mitmproxy.proxy2 import commands, events, server -from mitmproxy.proxy2.context import Context -from mitmproxy.proxy2.layer import Layer -from mitmproxy.proxy2.utils import expect - -""" - ___ - ### __ __ __ __ __ __ ® - /---\ | | | | | | | | | | | | - | | | | | | | |__| | | | | | - | | | | | | | __ | | | | | - | | | `--' | | | | | | `--' | - | | \______/ |__| |__| \______/ - +++++++ - -Temporary glue code to connect the old thread-based proxy core and the new sans-io implementation. -""" - -GLUE_DEBUG = False - - -class GlueEvent(events.Event): - def __init__(self, command: commands.Command): - self.command = command - - -class GlueGetConnectionHandler(commands.Command): - blocking = True - - -class GlueGetConnectionHandlerReply(events.CommandReply): - pass - - -class GlueClientConnection(connections.ClientConnection): - def __init__(self, s: socket.socket, address): - super().__init__(s, address, None) - - def __getattribute__(self, item): - if GLUE_DEBUG: - print(f"[client_conn] {item}") - return super().__getattribute__(item) - - -class GlueTopLayer(ServerConnectionMixin): - root_context: proxy.RootContext - - def __init__(self, ctx, server_address): - self.root_context = ctx - super().__init__(server_address) - mode = self.root_context.config.options.mode - if mode.startswith("upstream:"): - m = modes.HttpUpstreamProxy - elif mode == "transparent": - m = modes.TransparentProxy - elif mode.startswith("reverse:"): - m = modes.ReverseProxy - elif mode == "socks5": - m = modes.Socks5Proxy - elif mode == "regular": - m = modes.HttpProxy - else: - raise NotImplementedError() - self.cls = m - - @property - def __class__(self): - return self.cls - - def __getattribute__(self, item): - if GLUE_DEBUG and item not in ("root_context",): - print(f"[top_layer] {item}") - return object.__getattribute__(self, item) - - def __getattr__(self, item): - return getattr(self.root_context, item) - - -class GlueLayer(Layer): - """ - Translate between old and new proxy core. - """ - context: Context - connection_handler: server.ConnectionHandler - - def log(self, msg, level): - self.master.channel.tell("log", LogEntry(msg, level)) - - def _inject(self, command: commands.Command): - e = GlueEvent(command) - self.loop.call_soon_threadsafe( - lambda: self.connection_handler.server_event(e) - ) - - @expect(events.Start) - def start(self, _) -> commands.TCommandGenerator: - if GLUE_DEBUG: - print("start!") - self.loop = asyncio.get_event_loop() - - self.connection_handler = yield GlueGetConnectionHandler() - self.master = ctx.master - - self.c1, self.c2 = socketpair() - - self.client_conn = GlueClientConnection(self.c1, self.context.client.address) - self.root_context = proxy.RootContext( - self.client_conn, - proxy.ProxyConfig(self.context.options), - self.master.channel - ) - - def spin(): - while True: - d = self.c2.recv(16384) - if not d: - break - self._inject(commands.SendData(self.context.client, d)) - - self.spin = threading.Thread(target=spin) - self.spin.daemon = True - self.spin.start() - - def run(): - try: - self.layer = self.root_context.next_layer( - GlueTopLayer(self.root_context, self.context.server.address) - ) - self.layer() - except exceptions.Kill: - self.log("Connection killed", "info") - except exceptions.ProtocolException as e: - if isinstance(e, exceptions.ClientHandshakeException): - self.log( - "Client Handshake failed. " - "The client may not trust the proxy's certificate for {}.".format(e.server), - "warn" - ) - self.log(repr(e), "debug") - elif isinstance(e, exceptions.InvalidServerCertificate): - self.log(str(e), "warn") - self.log( - "Invalid certificate, closing connection. Pass --insecure to disable validation.", - "warn") - else: - self.log(str(e), "warn") - - self.log(repr(e), "debug") - # If an error propagates to the topmost level, - # we send an HTTP error response, which is both - # understandable by HTTP clients and humans. - try: - error_response = http.make_error_response(502, repr(e)) - self.client_conn.send(http1.assemble_response(error_response)) - except exceptions.TcpException: - pass - except Exception: - self.log(traceback.format_exc(), "error") - print(traceback.format_exc(), file=sys.stderr) - print("mitmproxy has crashed!", file=sys.stderr) - print("Please lodge a bug report at: https://github.com/mitmproxy/mitmproxy", - file=sys.stderr) - - self.thread = threading.Thread(target=run) - - self._handle_event = self.translate_event - - self.thread.start() - if GLUE_DEBUG: - print("done start") - - _handle_event = start - - @expect(events.DataReceived, events.ConnectionClosed, GlueEvent) - def translate_event(self, event: events.Event) -> commands.TCommandGenerator: - if GLUE_DEBUG: - print("event!", event) - if isinstance(event, events.DataReceived): - if event.connection == self.context.client: - self.c2.sendall(event.data) - else: - raise NotImplementedError() - elif isinstance(event, GlueEvent): - yield event.command - elif isinstance(event, events.ConnectionClosed): - if event.connection == self.context.client: - self.c1.shutdown(socket.SHUT_RDWR) - self.c1.close() - try: - self.c2.shutdown(socket.SHUT_RDWR) - except: - pass - self.c2.close() - self._handle_event = self.done - else: - raise NotImplementedError() - yield from () - - @expect(events.DataReceived, events.ConnectionClosed) - def done(self, _): - yield from () - - -# https://github.com/python/cpython/blob/5c23e21ef655db35af45ed98a62eb54bff64dbd0/Lib/socket.py#L493 -def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0): - if family == socket.AF_INET: - host = socket._LOCALHOST - elif family == socket.AF_INET6: - host = socket._LOCALHOST_V6 - else: - raise ValueError("Only AF_INET and AF_INET6 socket address families " - "are supported") - if type != socket.SOCK_STREAM: - raise ValueError("Only SOCK_STREAM socket type is supported") - if proto != 0: - raise ValueError("Only protocol zero is supported") - - # We create a connected TCP socket. Note the trick with - # setblocking(False) that prevents us from having to create a thread. - lsock = socket.socket(family, type, proto) - try: - lsock.bind((host, 0)) - lsock.listen() - # On IPv6, ignore flow_info and scope_id - addr, port = lsock.getsockname()[:2] - csock = socket.socket(family, type, proto) - try: - csock.setblocking(False) - try: - csock.connect((addr, port)) - except (BlockingIOError, InterruptedError): - pass - csock.setblocking(True) - ssock, _ = lsock.accept() - except: - csock.close() - raise - finally: - lsock.close() - return (ssock, csock) diff --git a/mitmproxy/proxy2/layers/old/http1.py b/mitmproxy/proxy2/layers/old/http1.py deleted file mode 100644 index ca3eba6d7..000000000 --- a/mitmproxy/proxy2/layers/old/http1.py +++ /dev/null @@ -1,109 +0,0 @@ -import time - -import h11 -from mitmproxy.proxy2.layer import Layer -from mitmproxy.proxy2.layers.old.old_http import _make_event_from_request -from mitmproxy.proxy2.layers.old import semantics -from mitmproxy.proxy2.layers.old.http_commands import * -from mitmproxy.proxy2.layers.old.http_events import * -from mitmproxy.proxy2.utils import expect - - -class ServerHTTP1Layer(Layer): - flow: http.HTTPFlow = None - - @expect(events.Start) - def start(self, _) -> commands.TCommandGenerator: - if not self.context.server.connected: - # TODO: Can be done later - err = yield commands.OpenConnection(self.context.server) - if err: - yield commands.Log(f"Cannot open connection: {err}", level="error") - # FIXME: Handle properly. - - self.h11 = h11.Connection(h11.CLIENT) - - # debug - # \/ \/ \/ \/ \/ \/ \/ \/ \/ \/ \/ \/ - def log_event(orig): - def next_event(): - e = orig() - if True: - yield commands.Log(f"[h11] {e}") - return e - - return next_event - - self.h11.next_event = log_event(self.h11.next_event) - # /\ /\ /\ /\ /\ /\ /\ /\ /\ /\ /\ /\ - self.child_layer = semantics.HTTPLayer(self.context) - self.event_to_child(events.Start()) - - yield commands.Log("HTTP/1 connection started") - - self._handle_event = self._handle - - _handle_event = start - - def event_to_child(self, event: events.Event): - for command in self.child_layer.handle_event(event): - if isinstance(command, HttpCommand): - yield from self.handle_http_command(command) - else: - yield command - - def handle_http_command(self, command: HttpCommand): - bytes_to_send = None - if isinstance(command, SendRequestHeaders): - self.flow = command.flow - self.flow.request.http_version = b"HTTP/1.1" - h11_event = _make_event_from_request(self.flow.request) - bytes_to_send = self.h11.send(h11_event) - - elif isinstance(command, SendRequestComplete): - bytes_to_send = self.h11.send(h11.EndOfMessage()) - elif isinstance(command, SendRequestData): - yield commands.Log(f"Server HTTP1Layer unimplemented HttpCommand: {command}", - level="error") - else: - yield command - if bytes_to_send: - yield commands.SendData(self.context.server, bytes_to_send) - - def _handle(self, event: events.Event): - if isinstance(event, HttpEvent): - yield from self.event_to_child(event) - elif isinstance(event, events.DataReceived): - self.h11.receive_data(event.data) - - while True: - h11_event = yield from self.h11.next_event() - if h11_event is h11.NEED_DATA: - break - elif isinstance(h11_event, h11.Response): - yield commands.Log(f"h11 responseheaders: {h11_event}") - self.flow.response = http.HTTPResponse( - b"HTTP/1.1", - h11_event.status_code, - h11_event.reason, - h11_event.headers, - None, - time.time() - ) - yield from self.event_to_child(ResponseHeaders(self.flow)) - elif isinstance(h11_event, h11.Data): - yield from self.event_to_child(ResponseData(self.flow, h11_event.data)) - elif isinstance(h11_event, h11.EndOfMessage): - yield from self.event_to_child(ResponseComplete(self.flow)) - else: - raise NotImplementedError(h11_event) - else: - yield from self.event_to_child(event) - - -class ClientHTTP1Layer(Layer): - flow: http.HTTPFlow = None - - @expect(events.Start) - def start(self, _) -> commands.TCommandGenerator: - raise NotImplemented diff --git a/mitmproxy/proxy2/layers/old/http1_client.py b/mitmproxy/proxy2/layers/old/http1_client.py deleted file mode 100644 index 16566d919..000000000 --- a/mitmproxy/proxy2/layers/old/http1_client.py +++ /dev/null @@ -1,188 +0,0 @@ -import typing -from warnings import warn - -import h11 -from h11._readers import ChunkedReader, ContentLengthReader, Http10Reader -from h11._receivebuffer import ReceiveBuffer - -from mitmproxy import http -from mitmproxy.net.http import http1 -from mitmproxy.net.http.http1 import read_sansio as http1_sansio -from mitmproxy.proxy.protocol.http import HTTPMode -from mitmproxy.proxy2 import commands, events -from mitmproxy.proxy2.context import Context -from mitmproxy.proxy2.layer import Layer -from mitmproxy.proxy2.utils import expect - - -class ClientHTTP1Layer(Layer): - mode: HTTPMode - flow: typing.Optional[http.HTTPFlow] - client_buf: ReceiveBuffer - body_reader: typing.Union[ChunkedReader, Http10Reader, ContentLengthReader] - body_buf: bytes - - # this is like a mini state machine. - state: typing.Callable[[events.Event], commands.TCommandGenerator] - - def __init__(self, context: Context, mode: HTTPMode): - super().__init__(context) - self.mode = mode - - self.client_buf = ReceiveBuffer() - self.body_buf = b"" - - self.state = self.read_request_headers - - @expect(events.Start, events.DataReceived, events.ConnectionClosed) - def _handle_event(self, event: events.Event) -> commands.TCommandGenerator: - if isinstance(event, events.Start): - return - elif isinstance(event, events.DataReceived) and event.connection == self.context.client: - self.client_buf += event.data - else: - return warn(f"ClientHTTP1Layer unimplemented: {event}") - - yield from self.state(event) - - def read_request_headers(self, event: events.Event) -> commands.TCommandGenerator: - if isinstance(event, events.DataReceived) and event.connection == self.context.client: - request_head = self.client_buf.maybe_extract_lines() - if request_head: - self.flow = http.HTTPFlow( - self.context.client, - self.context.server, - ) - self.flow.request = http.HTTPRequest.wrap(http1_sansio.read_request_head(request_head)) - - if self.flow.request.first_line_format != "authority": - yield commands.Hook("requestheaders", self.flow) - - if self.flow.request.headers.get("expect", "").lower() == "100-continue": - raise NotImplementedError() - # self.send_response(http.expect_continue_response) - # request.headers.pop("expect") - - expected_size = http1.expected_http_body_size(self.flow.request) - if expected_size is None: - self.body_reader = ChunkedReader() - elif expected_size == -1: - self.body_reader = Http10Reader() - else: - self.body_reader = ContentLengthReader(expected_size) - - if not self.flow.request.stream: - yield from self.start_read_request_body() - else: - yield from self.request_received() - else: - return warn(f"ClientHTTP1Layer.read_request_headers: unimplemented {event}") - - def start_read_request_body(self) -> commands.TCommandGenerator: - self.state = self.read_request_body - yield from self.read_request_body(events.DataReceived(self.context.client, b"")) - - def read_request_body(self, event: events.Event) -> commands.TCommandGenerator: - if isinstance(event, events.DataReceived) and event.connection == self.context.client: - try: - event = self.body_reader(self.client_buf) - except h11.ProtocolError as e: - raise # FIXME - elif isinstance(event, events.ConnectionClosed): - try: - event = self.body_reader.read_eof() - except h11.ProtocolError as e: - raise # FIXME - else: - return warn(f"ClientHTTP1Layer.read_request_body: unimplemented {event}") - - if event is None: - return - elif isinstance(event, h11.Data): - self.body_buf += event.data - elif isinstance(event, h11.EndOfMessage): - self.flow.request.data.content = self.body_buf - self.body_buf = b"" - yield from self.request_received() - - def request_received(self) -> commands.TCommandGenerator: - # set first line format to relative in regular mode, - # see https://github.com/mitmproxy/mitmproxy/issues/1759 - if self.mode is HTTPMode.regular and self.flow.request.first_line_format == "absolute": - self.flow.request.first_line_format = "relative" - - # update host header in reverse proxy mode - if self.context.options.mode.startswith("reverse:") and not self.context.options.keep_host_header: - self.flow.request.host_header = self.context.server.address[0] - - # Determine .scheme, .host and .port attributes for inline scripts. For - # absolute-form requests, they are directly given in the request. For - # authority-form requests, we only need to determine the request - # scheme. For relative-form requests, we need to determine host and - # port as well. - if self.mode is HTTPMode.transparent: - # Setting request.host also updates the host header, which we want - # to preserve - host_header = self.flow.request.host_header - self.flow.request.host = self.context.server.address[0] - self.flow.request.port = self.context.server.address[1] - self.flow.request.host_header = host_header # set again as .host overwrites this. - self.flow.request.scheme = "https" if self.context.server.tls else "http" - yield commands.Hook("request", self.flow) - - if self.flow.response: - # response was set by an inline script. - # we now need to emulate the responseheaders hook. - yield commands.Hook("responseheaders", self.flow) - yield from self.response_received() - else: - raise NotImplementedError("Get your responses elsewhere.") - - def response_received(self): - yield commands.Hook("response", self.flow) - - raw = http1.assemble_response_head(self.flow.response) - yield commands.SendData(self.context.server, raw) - - if not f.response.stream: - # no streaming: - # we already received the full response from the server and can - # send it to the client straight away. - self.send_response(f.response) - else: - # streaming: - # First send the headers and then transfer the response incrementally - self.send_response_headers(f.response) - chunks = self.read_response_body( - f.request, - f.response - ) - if callable(f.response.stream): - chunks = f.response.stream(chunks) - self.send_response_body(f.response, chunks) - f.response.timestamp_end = time.time() - - if self.check_close_connection(f): - return False - - # Handle 101 Switching Protocols - if f.response.status_code == 101: - # Handle a successful HTTP 101 Switching Protocols Response, - # received after e.g. a WebSocket upgrade request. - # Check for WebSocket handshake - is_websocket = ( - websockets.check_handshake(f.request.headers) and - websockets.check_handshake(f.response.headers) - ) - if is_websocket and not self.config.options.websocket: - self.log( - "Client requested WebSocket connection, but the protocol is disabled.", - "info" - ) - - if is_websocket and self.config.options.websocket: - layer = WebSocketLayer(self, f) - else: - layer = self.ctx.next_layer(self) - layer() - return False # should never be reached \ No newline at end of file diff --git a/mitmproxy/proxy2/layers/old/http2.py b/mitmproxy/proxy2/layers/old/http2.py deleted file mode 100644 index 9f89eeaa4..000000000 --- a/mitmproxy/proxy2/layers/old/http2.py +++ /dev/null @@ -1,543 +0,0 @@ -import time -from typing import Dict, List # noqa - -import h2.config -import h2.events -import h2.exceptions -from h2 import connection - -from mitmproxy.net.http import Headers, http2 -from mitmproxy.proxy2.layer import Layer -from mitmproxy.proxy2.layers.old import semantics -from mitmproxy.proxy2.layers.old.http_commands import * -from mitmproxy.proxy2.layers.old.http_events import * -from mitmproxy.proxy2.utils import expect - - -TFlowId = str - - -class ServerHTTP2Layer(Layer): - stream_by_flow: Dict[TFlowId, semantics.HTTPLayer] - stream_by_command: Dict[commands.Command, semantics.HTTPLayer] - - @expect(events.Start) - def start(self, _) -> commands.TCommandGenerator: - if not self.context.server.connected: - # TODO: Can be done later - err = yield commands.OpenConnection(self.context.server) - if err: - yield commands.Log(f"Cannot open connection: {err}", level="error") - # FIXME: Handle properly. - - self.stream_by_flow = {} - self.stream_by_command = {} - - h2_config = h2.config.H2Configuration( - client_side=True, - header_encoding=False, - validate_outbound_headers=False, - validate_inbound_headers=False) - self.h2 = connection.H2Connection(config=h2_config) - self.h2.initiate_connection() - yield commands.SendData(self.context.server, self.h2.data_to_send()) - - yield commands.Log("HTTP/2 connection started") - - self._handle_event = self._handle - - _handle_event = start - - def event_to_child(self, layer: semantics.HTTPLayer, event: events.Event): - for command in layer.handle_event(event): - if command.blocking: - self.stream_by_command[command] = layer - yield command - - def _handle(self, event: events.Event): - if isinstance(event, events.CommandReply): - child_layer = self.stream_by_command.pop(event.command) - yield from self.event_to_child(child_layer, event) - - elif isinstance(event, HttpEvent): - if event.flow.id not in self.stream_by_flow: - child_layer = semantics.HTTPLayer(self.context) - yield from child_layer.handle_event(events.Start()) - self.stream_by_flow[event.flow.id] = child_layer - else: - child_layer = self.stream_by_flow[event.flow.id] - yield from self.event_to_child(child_layer, event) - - elif isinstance(event, events.DataReceived): - h2_events = self.h2.receive_data(event.data) - yield commands.SendData(self.context.server, self.h2.data_to_send()) - yield from self.handle_h2_events(h2_events) - - else: - raise NotImplementedError - - def handle_h2_events(self, h2_events: List[h2.events.Event]): - for h2_event in h2_events: - if isinstance(h2_event, h2.events.RequestReceived): - yield commands.Log("foobar") - - -class ClientHTTP2Layer(Layer): - stream_by_flow: Dict[TFlowId, int] - - @expect(events.Start) - def start(self, _) -> commands.TCommandGenerator: - self.stream_by_flow = {} - - h2_config = h2.config.H2Configuration( - client_side=False, - header_encoding=False, - validate_outbound_headers=False, - validate_inbound_headers=False) - self.h2 = connection.H2Connection(config=h2_config) - self.h2.initiate_connection() - yield commands.SendData(self.context.client, self.h2.data_to_send()) - - self.child_layer = ServerHTTP2Layer(self.context) - yield from self.event_to_child(events.Start()) - - self._handle_event = self._handle - - _handle_event = start - - def event_to_child(self, event: events.Event): - for command in self.child_layer.handle_event(event): - if isinstance(command, HttpCommand): - yield from self.handle_http_command(command) - else: - yield command - - def handle_http_command(self, command: HttpCommand): - stream_id = self.stream_by_flow[command.flow.id] - if isinstance(command, SendResponseHeaders): - headers = ( - (b":status", str(command.flow.response.status_code).encode()), - *command.flow.request.headers.fields - ) - self.h2.send_headers(stream_id, headers) - elif isinstance(command, SendResponseData): - # TODO: do chunking with max_outbound_frame_size - self.h2.send_data(stream_id, command.data) - elif isinstance(command, SendResponseComplete): - yield commands.Log(f"Ending stream {self.stream_by_flow[command.flow.id]}...") - self.h2.end_stream(stream_id) - else: - yield commands.Log(f"ClientHTTP2Layer unimplemented HttpCommand: {command}", level="error") - - yield commands.SendData(self.context.client, self.h2.data_to_send()) - - def _handle(self, event: events.Event) -> commands.TCommandGenerator: - if isinstance(event, events.DataReceived): - if event.connection == self.context.client: - h2_events = self.h2.receive_data(event.data) - yield commands.SendData(self.context.client, self.h2.data_to_send()) - yield from self.handle_h2_events(h2_events) - else: - yield from self.event_to_child(event) - else: - yield from self.event_to_child(event) - - def handle_h2_events(self, h2_events: List[h2.events.Event]): - for h2_event in h2_events: - if isinstance(h2_event, h2.events.RequestReceived): - flow = http.HTTPFlow(self.context.client, self.context.server) - headers = Headers([(k, v) for k, v in h2_event.headers]) - first_line_format, method, scheme, host, port, path = http2.parse_headers(headers) - # FIXME: This should be part of http2.parse_headers? - if ":authority" in headers: - headers["Host"] = headers.pop(":authority") - flow.request = http.HTTPRequest( - first_line_format, - method, - scheme, - host, - port, - path, - b"HTTP/2.0", - headers, - None, - timestamp_start=time.time() - ) - self.stream_by_flow[flow.id] = h2_event.stream_id - yield from self.event_to_child(RequestHeaders(flow)) - if h2_event.stream_ended: - yield from self.event_to_child(RequestComplete(flow)) - else: - yield commands.Log(f"Unimplemented h2 event: {h2_event}", level="error") - - """ - @expect(events.DataReceived, events.ConnectionClosed) - def process_data(self, event: events.Event) -> commands.TCommandGenerator: - if isinstance(event, events.DataReceived): - dead = [stream for stream in self.streams.values() if stream.death_time] - for stream in dead: - if stream.death_time <= time.time() - 10: - self.streams.pop(stream.stream_id, None) - - from_client = event.connection == self.context.client - if from_client: - source = self.h2_client - other = self.h2_server - source_conn = self.context.client - other_conn = self.context.server - else: - source = self.h2_server - other = self.h2_client - source_conn = self.context.server - other_conn = self.context.client - - received_h2_events = source.receive_data(event.data) - yield commands.SendData(source_conn, source.data_to_send()) - - for h2_event in received_h2_events: - yield commands.Log( - "HTTP/2 event from {}: {}".format("client" if from_client else "server", - h2_event) - ) - - eid = None - if hasattr(h2_event, 'stream_id'): - if not from_client and h2_event.stream_id % 2 == 1: - eid = self.server_to_client_stream_ids[h2_event.stream_id] - else: - eid = h2_event.stream_id - - if isinstance(h2_event, h2events.RequestReceived): - self.streams[eid] = Http2Stream(h2_event, self.context.client, - self.context.server) - self.streams[eid].timestamp_start = time.time() - self.streams[eid].request_arrived.set() - - yield commands.Hook("requestheaders", self.streams[eid].flow) - - while other.open_outbound_streams + 1 >= other.remote_settings.max_concurrent_streams: - # wait until we get a free slot for a new outgoing stream - # TODO make async/re-entry so we can handle other streams! - # time.sleep(0.1) - break - - server_stream_id = other.get_next_available_stream_id() - self.streams[eid].server_stream_id = server_stream_id - self.server_to_client_stream_ids[server_stream_id] = h2_event.stream_id - - if h2_event.stream_ended: - self.streams[eid].request_data_finished.set() - - headers = self.streams[eid].flow.request.headers.copy() - headers.insert(0, ":path", self.streams[eid].flow.request.path) - headers.insert(0, ":method", self.streams[eid].flow.request.method) - headers.insert(0, ":scheme", self.streams[eid].flow.request.scheme) - - # omit priority information because it is too complex to synchronize - other.send_headers( - server_stream_id, - headers=headers.items(), - end_stream=h2_event.stream_ended, - ) - yield commands.SendData(other_conn, other.data_to_send()) - - elif isinstance(h2_event, h2events.ResponseReceived): - self.streams[eid].queued_data_length = 0 - self.streams[eid].timestamp_start = time.time() - self.streams[eid].response_arrived.set() - - headers = mitmproxy.net.http.Headers([[k, v] for k, v in h2_event.headers]) - status_code = int(headers.get(':status', 502)) - headers.pop(":status", None) - - self.streams[eid].flow.response = http.HTTPResponse( - http_version=b"HTTP/2.0", - status_code=status_code, - reason=b'', - headers=headers, - content=None, - timestamp_start=self.streams[eid].timestamp_start, - timestamp_end=self.streams[eid].timestamp_end, - ) - - yield commands.Hook("responseheaders", self.streams[eid].flow) - - if self.streams[eid].flow.response.stream: - self.streams[eid].flow.response.data.content = None - - headers = self.streams[eid].flow.response.headers - headers.insert(0, ":status", str(self.streams[eid].flow.response.status_code)) - - other.send_headers( - self.streams[eid].stream_id, - headers=headers.items(), - ) - yield commands.SendData(other_conn, other.data_to_send()) - - elif isinstance(h2_event, h2events.DataReceived): - bsl = human.parse_size(self.context.options.body_size_limit) - if bsl and self.streams[eid].queued_data_length > bsl: - self.streams[eid].kill() - source.reset_stream(eid, h2.errors.ErrorCodes.REFUSED_STREAM) - yield commands.SendData(source_conn, source.data_to_send()) - other.reset_stream(eid, h2.errors.ErrorCodes.REFUSED_STREAM) - yield commands.SendData(other_conn, other.data_to_send()) - yield commands.Log("HTTP body too large. Limit is {}.".format(bsl), "info") - else: - streaming = ( - (from_client and self.streams[eid].flow.request.stream) or - (not from_client and self.streams[eid].flow.response and self.streams[ - eid].flow.response.stream) - ) - if streaming: - stream_id = self.streams[eid].server_stream_id if from_client else \ - self.streams[eid].stream_id - self.unfinished_bodies[other] = (stream_id, h2_event.data, False) - yield from self._send_body(other_conn, other) - else: - self.streams[eid].data_queue.put(h2_event.data) - self.streams[eid].queued_data_length += len(h2_event.data) - - source.acknowledge_received_data(h2_event.flow_controlled_length, - h2_event.stream_id) - yield commands.SendData(source_conn, source.data_to_send()) - - elif isinstance(h2_event, h2events.StreamEnded): - self.streams[eid].timestamp_end = time.time() - self.streams[eid].data_finished.set() - - if from_client and self.streams[eid].request_data_finished: - # end_stream already communicated via request send_headers - pass - else: - streaming = ( - (from_client and self.streams[eid].flow.request.stream) or - (not from_client and self.streams[eid].flow.response and self.streams[ - eid].flow.response.stream) - ) - if streaming: - stream_id = self.streams[eid].server_stream_id if from_client else \ - self.streams[eid].stream_id - self.unfinished_bodies[other] = (stream_id, b'', True, eid) - yield from self._send_body(other_conn, other) - else: - content = b"" - while True: - try: - content += self.streams[eid].data_queue.get_nowait() - except queue.Empty: - break - - if from_client: - self.streams[eid].flow.request.data.content = content - self.streams[eid].flow.request.timestamp_end = time.time() - yield commands.Hook("request", self.streams[eid].flow) - content = self.streams[eid].flow.request.data.content - stream_id = self.streams[eid].server_stream_id - kill_id = None - else: - self.streams[eid].flow.response.data.content = content - self.streams[eid].flow.response.timestamp_end = time.time() - yield commands.Hook("response", self.streams[eid].flow) - content = self.streams[eid].flow.response.data.content - stream_id = self.streams[eid].stream_id - kill_id = eid - - self.unfinished_bodies[other] = (stream_id, content, True, kill_id) - yield from self._send_body(other_conn, other) - - elif isinstance(h2_event, h2events.StreamReset): - if eid in self.streams: - self.streams[eid].kill() - if h2_event.error_code == h2.errors.ErrorCodes.CANCEL: - try: - stream_id = self.streams[eid].server_stream_id if from_client else \ - self.streams[eid].stream_id - if stream_id: - other.reset_stream(stream_id, h2_event.error_code) - except h2.exceptions.StreamClosedError: # pragma: no cover - # stream is already closed - good - pass - yield commands.SendData(other_conn, other.data_to_send()) - - elif isinstance(h2_event, h2events.RemoteSettingsChanged): - new_settings = dict( - [(key, cs.new_value) for (key, cs) in h2_event.changed_settings.items()]) - other.update_settings(new_settings) - yield commands.SendData(other_conn, other.data_to_send()) - - elif isinstance(h2_event, h2events.ConnectionTerminated): - yield commands.Log( - f"HTTP/2 Connection terminated: {h2_event}, {h2_event.additional_data}") - elif isinstance(h2_event, h2events.PushedStreamReceived): - parent_eid = self.server_to_client_stream_ids[h2_event.parent_stream_id] - other.push_stream(parent_eid, h2_event.pushed_stream_id, h2_event.headers) - yield commands.SendData(other_conn, other.data_to_send()) - - self.streams[h2_event.pushed_stream_id] = Http2Stream(h2_event, - self.context.client, - self.context.server) - self.streams[h2_event.pushed_stream_id].timestamp_start = time.time() - self.streams[h2_event.pushed_stream_id].pushed = True - self.streams[h2_event.pushed_stream_id].parent_stream_id = parent_eid - self.streams[h2_event.pushed_stream_id].timestamp_end = time.time() - self.streams[h2_event.pushed_stream_id].request_arrived.set() - self.streams[h2_event.pushed_stream_id].request_data_finished.set() - - yield commands.Hook("requestheaders", - self.streams[h2_event.pushed_stream_id].flow) - - elif isinstance(h2_event, h2events.WindowUpdated): - if source in self.unfinished_bodies: - yield from self._send_body(source_conn, source) - elif isinstance(h2_event, h2events.TrailersReceived): - raise NotImplementedError('TrailersReceived not implemented') - - elif isinstance(event, events.ConnectionClosed): - yield commands.Log("Connection closed abnormally") - if event.connection == self.context.server: - yield commands.CloseConnection(self.context.client) - self._handle_event = self.done - - - @expect(events.DataReceived, events.ConnectionClosed) - def done(self, _): - yield from () - - def _send_body(self, send_to_endpoint, endpoint): - stream_id, content, end_stream, kill_id = self.unfinished_bodies[endpoint] - - max_outbound_frame_size = endpoint.max_outbound_frame_size - position = 0 - while position < len(content): - frame_chunk = content[position:position + max_outbound_frame_size] - if endpoint.local_flow_control_window(stream_id) < len(frame_chunk): - self.unfinished_bodies[endpoint] = ( - stream_id, content[position:], end_stream, kill_id) - return - endpoint.send_data(stream_id, frame_chunk) - yield commands.SendData(send_to_endpoint, endpoint.data_to_send()) - position += max_outbound_frame_size - - del self.unfinished_bodies[endpoint] - - if end_stream: - endpoint.end_stream(stream_id) - yield commands.SendData(send_to_endpoint, endpoint.data_to_send()) - if kill_id: - self.streams[kill_id].kill() - """ - -# def _handle_connection_terminated(self, event, is_server): -# self.log("HTTP/2 connection terminated by {}: error code: {}, last stream id: {}, additional data: {}".format( -# "server" if is_server else "client", -# event.error_code, -# event.last_stream_id, -# event.additional_data), "info") -# -# if event.error_code != h2.errors.ErrorCodes.NO_ERROR: -# # Something terrible has happened - kill everything! -# self.connections[self.client_conn].close_connection( -# error_code=event.error_code, -# last_stream_id=event.last_stream_id, -# additional_data=event.additional_data -# ) -# self.client_conn.send(self.connections[self.client_conn].data_to_send()) -# self._kill_all_streams() -# else: -# """ -# Do not immediately terminate the other connection. -# Some streams might be still sending data to the client. -# """ -# return False -# - -""" -class Http2Stream: - - def __init__(self, h2_event, client_conn, server_conn) -> None: - if isinstance(h2_event, h2.events.RequestReceived): - self.stream_id = h2_event.stream_id - else: - self.stream_id = h2_event.pushed_stream_id - - self.server_stream_id: int = None - self.pushed = False - - if isinstance(h2_event, - h2.events.RequestReceived) and h2_event.priority_updated is not None: - self.priority_exclusive = h2_event.priority_updated.exclusive - self.priority_depends_on = h2_event.priority_updated.depends_on - self.priority_weight = h2_event.priority_updated.weight - self.handled_priority_event = h2_event.priority_updated - else: - self.priority_exclusive: bool = None - self.priority_depends_on: int = None - self.priority_weight: int = None - self.handled_priority_event: Any = None - - self.timestamp_start: float = None - self.timestamp_end: float = None - self.death_time: float = None - - self.request_arrived = threading.Event() - self.request_data_queue: queue.Queue[bytes] = queue.Queue() - self.request_queued_data_length = 0 - self.request_data_finished = threading.Event() - - self.response_arrived = threading.Event() - self.response_data_queue: queue.Queue[bytes] = queue.Queue() - self.response_queued_data_length = 0 - self.response_data_finished = threading.Event() - - self.flow = http.HTTPFlow( - client_conn, - server_conn, - live=self, - mode='regular', - ) - - headers = mitmproxy.net.http.Headers([[k, v] for k, v in h2_event.headers]) - first_line_format, method, scheme, host, port, path = http2.parse_headers(headers) - self.flow.request = http.HTTPRequest( - first_line_format, - method, - scheme, - host, - port, - path, - b"HTTP/2.0", - headers, - None, - timestamp_start=self.timestamp_start, - timestamp_end=self.timestamp_end, - ) - - def kill(self): - self.death_time = time.time() - - @property - def data_queue(self): - if self.response_arrived.is_set(): - return self.response_data_queue - else: - return self.request_data_queue - - @property - def queued_data_length(self): - if self.response_arrived.is_set(): - return self.response_queued_data_length - else: - return self.request_queued_data_length - - @queued_data_length.setter - def queued_data_length(self, v): - self.request_queued_data_length = v - - @property - def data_finished(self): - if self.response_arrived.is_set(): - return self.response_data_finished - else: - return self.request_data_finished -""" diff --git a/mitmproxy/proxy2/layers/old/http_commands.py b/mitmproxy/proxy2/layers/old/http_commands.py deleted file mode 100644 index b21ea9111..000000000 --- a/mitmproxy/proxy2/layers/old/http_commands.py +++ /dev/null @@ -1,37 +0,0 @@ -from mitmproxy import http - -from mitmproxy.proxy2 import commands - - -class HttpCommand(commands.Command): - flow: http.HTTPFlow - - def __init__(self, flow: http.HTTPFlow): - self.flow = flow - -class SendRequestHeaders(HttpCommand): - pass - - -class SendRequestData(HttpCommand): - pass - - -class SendRequestComplete(HttpCommand): - pass - - -class SendResponseHeaders(HttpCommand): - pass - - -class SendResponseData(HttpCommand): - data: bytes - - def __init__(self, flow, data): - super().__init__(flow) - self.data = data - - -class SendResponseComplete(HttpCommand): - pass diff --git a/mitmproxy/proxy2/layers/old/http_events.py b/mitmproxy/proxy2/layers/old/http_events.py deleted file mode 100644 index 447618ab4..000000000 --- a/mitmproxy/proxy2/layers/old/http_events.py +++ /dev/null @@ -1,38 +0,0 @@ -from mitmproxy import http - -from mitmproxy.proxy2 import events - - -class HttpEvent(events.Event): - flow: http.HTTPFlow - - def __init__(self, flow: http.HTTPFlow): - self.flow = flow - - -class RequestHeaders(HttpEvent): - pass - - -class RequestData(HttpEvent): - pass - - -class RequestComplete(HttpEvent): - pass - - -class ResponseHeaders(HttpEvent): - pass - - -class ResponseData(HttpEvent): - data: bytes - - def __init__(self, flow, data): - super().__init__(flow) - self.data = data - - -class ResponseComplete(HttpEvent): - pass diff --git a/mitmproxy/proxy2/layers/old/old_http.py b/mitmproxy/proxy2/layers/old/old_http.py deleted file mode 100644 index e61eeecaa..000000000 --- a/mitmproxy/proxy2/layers/old/old_http.py +++ /dev/null @@ -1,270 +0,0 @@ -import enum -import typing -from warnings import warn - -import h11 -from mitmproxy import http -from mitmproxy.net import http as net_http -from mitmproxy.net import websockets -from mitmproxy.net.http import url -from mitmproxy.net.http.http1.read import _parse_authority_form -from mitmproxy.proxy.protocol.http import HTTPMode -from mitmproxy.proxy2 import events, commands, context -from mitmproxy.proxy2.context import Context -from mitmproxy.proxy2.layer import Layer, NextLayer -from mitmproxy.proxy2.layers.old import websocket -from mitmproxy.proxy2.utils import expect - - -class FirstLineFormat(enum.Enum): - authority = "authority" - relative = "relative" - absolute = "absolute" - - -MODE_REQUEST_FORMS = { - HTTPMode.regular: (FirstLineFormat.authority, FirstLineFormat.absolute), - HTTPMode.transparent: (FirstLineFormat.relative,), - HTTPMode.upstream: (FirstLineFormat.authority, FirstLineFormat.absolute), -} - - -def _make_request_from_event(event: h11.Request) -> http.HTTPRequest: - if event.target == b"*" or event.target.startswith(b"/"): - form = "relative" - path = event.target - scheme, host, port = None, None, None - elif event.method == b"CONNECT": - form = "authority" - host, port = _parse_authority_form(event.target) - scheme, path = None, None - else: - form = "absolute" - scheme, host, port, path = url.parse(event.target) - - return http.HTTPRequest( - form, - event.method, - scheme, - host, - port, - path, - b"HTTP/" + event.http_version, - event.headers, - None, - -1 # FIXME: first_byte_timestamp - ) - -def _make_event_from_request(request: http.HTTPRequest) -> h11.Request: - if request.first_line_format == FirstLineFormat.relative.value: - target = request.path - else: - target = request.url - return h11.Request( - method=request.method, - headers=request.headers.fields, - http_version=request.http_version.replace("HTTP/", ""), - target=target - ) - - -def validate_request_form( - mode: HTTPMode, - first_line_format: FirstLineFormat, - scheme: str -) -> None: - if first_line_format == FirstLineFormat.absolute and scheme != "http": - raise ValueError(f"Invalid request scheme: {scheme}") - - allowed_request_forms = MODE_REQUEST_FORMS[mode] - if first_line_format not in allowed_request_forms: - if mode == HTTPMode.transparent: - desc = "HTTP CONNECT" if first_line_format == "authority" else "absolute-form" - raise ValueError( - f""" - Mitmproxy received an {desc} request even though it is not running - in regular mode. This usually indicates a misconfiguration, - please see the mitmproxy mode documentation for details. - """ - ) - else: - expected = ' or '.join(x.value for x in allowed_request_forms) - raise ValueError( - f"Invalid HTTP request form (expected: {expected}, got: {first_line_format})") - - -class OldHTTPLayer(Layer): - """ - Simple TCP layer that just relays messages right now. - """ - context: Context = None - mode: HTTPMode - - # this is like a mini state machine. - state: typing.Callable[[events.Event], commands.TCommandGenerator] - - def __init__(self, context: Context, mode: HTTPMode): - super().__init__(context) - self.mode = mode - - self.state = self.read_request_headers - self.flow = http.HTTPFlow(self.context.client, self.context.server) - self.client_conn = h11.Connection(h11.SERVER) - self.server_conn = h11.Connection(h11.CLIENT) - - # debug - # \/ \/ \/ \/ \/ \/ \/ \/ \/ \/ \/ \/ - def log_event(orig): - def next_event(): - e = orig() - if False: - yield commands.Log(f"[h11] {e}") - return e - - return next_event - - self.client_conn.next_event = log_event(self.client_conn.next_event) - self.server_conn.next_event = log_event(self.server_conn.next_event) - # /\ /\ /\ /\ /\ /\ /\ /\ /\ /\ /\ /\ - # this is very preliminary: [request_events, response_events] - self.flow_events = [[], []] - - @expect(events.Start, events.DataReceived, events.ConnectionClosed) - def _handle_event(self, event: events.Event) -> commands.TCommandGenerator: - if isinstance(event, events.Start): - return - if isinstance(event, events.DataReceived): - if event.connection == self.context.client: - self.client_conn.receive_data(event.data) - else: - self.server_conn.receive_data(event.data) - elif isinstance(event, events.ConnectionClosed): - return warn("unimplemented: http.handle:close") - - yield from self.state() - - def read_request_headers(self): - event = yield from self.client_conn.next_event() - if event is h11.NEED_DATA: - return - elif isinstance(event, h11.Request): - yield commands.Log(f"requestheaders: {event}") - - if self.client_conn.client_is_waiting_for_100_continue: - raise NotImplementedError() - - self.flow.request = _make_request_from_event(event) - validate_request_form(self.mode, FirstLineFormat(self.flow.request.first_line_format), self.flow.request.scheme) - - yield commands.Hook("requestheaders", self.flow) - - self.state = self.read_request_body - yield from self.read_request_body() # there may already be further events. - else: - raise TypeError(f"Unexpected event: {event}") - - def read_request_body(self): - while True: - event = yield from self.client_conn.next_event() - if event is h11.NEED_DATA: - return - elif isinstance(event, h11.Data): - self.flow_events[0].append(event) - elif isinstance(event, h11.EndOfMessage): - self.flow_events[0].append(event) - yield commands.Log(f"request {self.flow_events}") - - if self.flow.request.first_line_format == FirstLineFormat.authority.value: - if self.mode == HTTPMode.regular: - yield commands.Hook("http_connect", self.flow) - self.context.server = context.Server( - (self.flow.request.host, self.flow.request.port) - ) - yield commands.SendData( - self.context.client, - b'%s 200 Connection established\r\n\r\n' % self.flow.request.data.http_version - ) - child_layer = NextLayer(self.context) - self._handle_event = child_layer.handle_event - yield from child_layer.handle_event(events.Start()) - return - - if self.mode == HTTPMode.upstream: - raise NotImplementedError() - elif self.flow.request.first_line_format == FirstLineFormat.absolute.value: - if self.mode == HTTPMode.regular: - self.context.server.address = (self.flow.request.host, self.flow.request.port) - else: - raise NotImplementedError() - - yield from self._send_request() - return - else: - raise TypeError(f"Unexpected event: {event}") - - def _send_request(self): - if not self.context.server.connected: - err = yield commands.OpenConnection(self.context.server) - if err: - yield commands.Log(f"error {err}") - yield commands.CloseConnection(self.context.client) - self._handle_event = self.done - return - - self.flow_events[0].insert(0, _make_event_from_request(self.flow.request)) - for e in self.flow_events[0]: - bytes_to_send = self.server_conn.send(e) - if bytes_to_send: - yield commands.SendData(self.context.server, bytes_to_send) - self.state = self.read_response_headers - - def read_response_headers(self): - event = yield from self.server_conn.next_event() - if event is h11.NEED_DATA: - return - elif isinstance(event, h11.Response): - yield commands.Log(f"responseheaders {event}") - - self.flow_events[1].append(event) - self.state = self.read_response_body - yield from self.read_response_body() # there may already be further events. - elif isinstance(event, h11.InformationalResponse): - self.flow.response.headers = net_http.Headers(event.headers) - if event.status_code == 101 and websockets.check_handshake(self.flow.response.headers): - child_layer = websocket.WebsocketLayer(self.context, self.flow) - yield from child_layer.handle_event(events.Start()) - self._handle_event = child_layer.handle_event - return - else: - raise TypeError(f"Unexpected event: {event}") - - def read_response_body(self): - while True: - event = yield from self.server_conn.next_event() - if event is h11.NEED_DATA: - return - elif isinstance(event, h11.Data): - self.flow_events[1].append(event) - elif isinstance(event, h11.EndOfMessage): - self.flow_events[1].append(event) - yield commands.Log(f"response {self.flow_events}") - yield from self._send_response() - return - else: - raise TypeError(f"Unexpected event: {event}") - - def _send_response(self): - for e in self.flow_events[1]: - bytes_to_send = self.client_conn.send(e) - if bytes_to_send: - yield commands.SendData(self.context.client, bytes_to_send) - - # reset for next request. - self.state = self.read_request_headers - self.flow_events = [[], []] - self.client_conn.start_next_cycle() - self.server_conn.start_next_cycle() - - @expect(events.DataReceived, events.ConnectionClosed) - def done(self, _): - yield from () diff --git a/mitmproxy/proxy2/layers/old/old_tls.py b/mitmproxy/proxy2/layers/old/old_tls.py deleted file mode 100644 index a33e8608a..000000000 --- a/mitmproxy/proxy2/layers/old/old_tls.py +++ /dev/null @@ -1,301 +0,0 @@ - -class _TLSLayer(layer.Layer): - send_buffer: MutableMapping[SSL.Connection, bytearray] - tls: MutableMapping[context.Connection, SSL.Connection] - child_layer: Optional[layer.Layer] = None - - def __init__(self, context): - super().__init__(context) - self.send_buffer = {} - self.tls = {} - - def tls_interact(self, conn: context.Connection): - while True: - try: - data = self.tls[conn].bio_read(65535) - except SSL.WantReadError: - # Okay, nothing more waiting to be sent. - return - else: - yield commands.SendData(conn, data) - - def send( - self, - send_command: commands.SendData, - ) -> commands.TCommandGenerator: - tls_conn = self.tls[send_command.connection] - if send_command.connection.tls_established: - tls_conn.sendall(send_command.data) - yield from self.tls_interact(send_command.connection) - else: - buf = self.send_buffer.setdefault(tls_conn, bytearray()) - buf.extend(send_command.data) - - def negotiate(self, event: events.DataReceived) -> Generator[commands.Command, Any, bool]: - """ - Make sure to trigger processing if done! - """ - # bio_write errors for b"", so we need to check first if we actually received something. - tls_conn = self.tls[event.connection] - if event.data: - tls_conn.bio_write(event.data) - try: - tls_conn.do_handshake() - except SSL.WantReadError: - yield from self.tls_interact(event.connection) - return False - else: - event.connection.tls_established = True - event.connection.alpn = tls_conn.get_alpn_proto_negotiated() - print(f"TLS established: {event.connection}") - # TODO: Set all other connection attributes here - # there might already be data in the OpenSSL BIO, so we need to trigger its processing. - yield from self.relay(events.DataReceived(event.connection, b"")) - if tls_conn in self.send_buffer: - data_to_send = bytes(self.send_buffer.pop(tls_conn)) - yield from self.send(commands.SendData(event.connection, data_to_send)) - return True - - def relay(self, event: events.DataReceived): - tls_conn = self.tls[event.connection] - if event.data: - tls_conn.bio_write(event.data) - yield from self.tls_interact(event.connection) - - plaintext = bytearray() - while True: - try: - plaintext.extend(tls_conn.recv(65535)) - except (SSL.WantReadError, SSL.ZeroReturnError): - break - - if plaintext: - evt = events.DataReceived(event.connection, bytes(plaintext)) - # yield commands.Log(f"Plain{evt}") - yield from self.event_to_child(evt) - - def event_to_child(self, event: events.Event) -> commands.TCommandGenerator: - for command in self.child_layer.handle_event(event): - if isinstance(command, commands.SendData) and command.connection in self.tls: - yield from self.send(command) - else: - yield command - - -class ServerTLSLayer(_TLSLayer): - """ - This layer manages TLS for a single server connection. - """ - lazy_init: bool = False - - def __init__(self, context: context.Context): - super().__init__(context) - self.child_layer = layer.NextLayer(context) - - @expect(events.Start) - def start(self, event: events.Start) -> commands.TCommandGenerator: - yield from self.child_layer.handle_event(event) - - server = self.context.server - if server.tls: - if server.connected: - yield from self._start_tls(server) - else: - self.lazy_init = True - self._handle_event = self.process - - _handle_event = start - - def process(self, event: events.Event) -> None: - if isinstance(event, events.DataReceived) and event.connection in self.tls: - if not event.connection.tls_established: - yield from self.negotiate(event) - else: - yield from self.relay(event) - elif isinstance(event, events.OpenConnectionReply): - err = event.reply - conn = event.command.connection - if self.lazy_init and not err and conn == self.context.server: - yield from self._start_tls(conn) - yield from self.event_to_child(event) - elif isinstance(event, events.ConnectionClosed): - yield from self.event_to_child(event) - self.send_buffer.pop( - self.tls.pop(event.connection, None), - None - ) - else: - yield from self.event_to_child(event) - - def _start_tls(self, server: context.Server): - ssl_context = SSL.Context(SSL.SSLv23_METHOD) - - if server.alpn_offers: - ssl_context.set_alpn_protos(server.alpn_offers) - - self.tls[server] = SSL.Connection(ssl_context) - - if server.sni: - if server.sni is True: - if self.context.client.sni: - server.sni = self.context.client.sni - else: - server.sni = server.address[0] - self.tls[server].set_tlsext_host_name(server.sni) - self.tls[server].set_connect_state() - - yield from self.process(events.DataReceived(server, b"")) - - -class ClientTLSLayer(_TLSLayer): - """ - This layer establishes TLS on a single client connection. - - ┌─────┐ - │Start│ - └┬────┘ - ↓ - ┌────────────────────┐ - │Wait for ClientHello│ - └┬───────────────────┘ - │ Do we need server TLS info - │ to establish TLS with client? - │ ┌───────────────────┐ - ├─────→│Wait for Server TLS│ - │ yes └┬──────────────────┘ - │no │ - ↓ ↓ - ┌────────────────┐ - │Process messages│ - └────────────────┘ - - """ - recv_buffer: bytearray - - def __init__(self, context: context.Context): - super().__init__(context) - self.recv_buffer = bytearray() - self.child_layer = ServerTLSLayer(self.context) - - @expect(events.Start) - def state_start(self, _) -> commands.TCommandGenerator: - self.context.client.tls = True - self._handle_event = self.state_wait_for_clienthello - yield from () - - _handle_event = state_start - - @expect(events.DataReceived, events.ConnectionClosed) - def state_wait_for_clienthello(self, event: events.Event): - client = self.context.client - server = self.context.server - if isinstance(event, events.DataReceived) and event.connection == client: - self.recv_buffer.extend(event.data) - try: - client_hello = parse_client_hello(self.recv_buffer) - except ValueError as e: - raise NotImplementedError() from e # TODO - - if client_hello: - yield commands.Log(f"Client Hello: {client_hello}") - - # TODO: Don't do double conversion - client.sni = client_hello.sni.encode("idna") - client.alpn_offers = client_hello.alpn_protocols - - client_tls_requires_server_connection = ( - self.context.server.tls and - self.context.options.upstream_cert and - ( - self.context.options.add_upstream_certs_to_client_chain or - # client.alpn_offers or - not client.sni - ) - ) - - # What do we do with the client connection now? - if client_tls_requires_server_connection and not server.tls_established: - yield from self.start_server_tls() - self._handle_event = self.state_wait_for_server_tls - else: - yield from self.start_negotiate() - self._handle_event = self.state_process - - # In any case, we now have enough information to start server TLS if needed. - yield from self.child_layer.handle_event(events.Start()) - else: - raise NotImplementedError(event) # TODO - - def state_wait_for_server_tls(self, event: events.Event): - yield from self.event_to_child(event) - # TODO: Handle case where TLS establishment fails. - # We still need a good way to signal this - one possibility would be by closing - # the connection? - if self.context.server.tls_established: - yield from self.start_negotiate() - self._handle_event = self.state_process - - def state_process(self, event: events.Event): - if isinstance(event, events.DataReceived) and event.connection == self.context.client: - if not self.context.client.tls_established: - yield from self.negotiate(event) - else: - yield from self.relay(event) - else: - yield from self.event_to_child(event) - - def start_server_tls(self): - """ - We often need information from the upstream connection to establish TLS with the client. - For example, we need to check if the client does ALPN or not. - """ - if not self.context.server.connected: - self.context.server.alpn_offers = [ - x for x in self.context.client.alpn_offers - if not (x.startswith(b"h2-") or x.startswith(b"spdy")) - ] - - err = yield commands.OpenConnection(self.context.server) - if err: - yield commands.Log( - "Cannot establish server connection, which is required to establish TLS with the client." - ) - - def start_negotiate(self): - # FIXME: Do this properly - client = self.context.client - server = self.context.server - context = SSL.Context(SSL.SSLv23_METHOD) - cert, privkey, cert_chain = CertStore.from_store( - os.path.expanduser("~/.mitmproxy"), "mitmproxy", - self.context.options.key_size - ).get_cert(client.sni.encode(), (client.sni.encode(),)) - context.use_privatekey(privkey) - context.use_certificate(cert.x509) - context.set_cipher_list(tls.DEFAULT_CLIENT_CIPHERS) - - def alpn_select_callback(conn_, options): - if server.alpn in options: - return server.alpn - elif b"h2" in options: - return b"h2" - elif b"http/1.1" in options: - return b"http/1.1" - elif b"http/1.0" in options: - return b"http/1.0" - elif b"http/0.9" in options: - return b"http/0.9" - else: - # FIXME: We MUST return something here. At this point we are at loss. - # We probably need better checks when negotiating with the client. - return options[0] - - context.set_alpn_select_callback(alpn_select_callback) - - self.tls[self.context.client] = SSL.Connection(context) - self.tls[self.context.client].set_accept_state() - - yield from self.state_process(events.DataReceived( - client, bytes(self.recv_buffer) - )) - self.recv_buffer = bytearray() diff --git a/mitmproxy/proxy2/layers/old/semantics.py b/mitmproxy/proxy2/layers/old/semantics.py deleted file mode 100644 index cd2ab5893..000000000 --- a/mitmproxy/proxy2/layers/old/semantics.py +++ /dev/null @@ -1,50 +0,0 @@ -from mitmproxy.proxy2.layer import Layer -from mitmproxy.proxy2.layers.old.http_commands import * -from mitmproxy.proxy2.layers.old.http_events import * - -class HTTPLayer(Layer): - """ - HTTP Semantics layer used by the on-the-wire layers for HTTP/1 and HTTP/2. - """ - - def _handle_event(self, event: HttpEvent): - if isinstance(event, RequestHeaders): - yield commands.Log(f"RequestHeadersReceived: {event}") - - # This is blocking only this layer, none of the parent layers. - yield commands.Hook("requestheaders", event.flow) - yield commands.Log(f"Hook processed: {event}") - - elif isinstance(event, RequestData): - raise NotImplementedError - elif isinstance(event, RequestComplete): - yield commands.Log(f"RequestComplete: {event}") - yield commands.Hook("request", event.flow) - yield commands.Log(f"Hook processed: {event}") - yield SendRequestHeaders(event.flow) - # TODO yield SendRequestData() - yield SendRequestComplete(event.flow) - elif isinstance(event, ResponseHeaders): - yield commands.Log(f"ResponseHeadersReceived: {event}") - - # This is blocking only this layer, none of the parent layers. - yield commands.Hook("responseheaders", event.flow) - yield commands.Log(f"Hook processed: {event}") - - elif isinstance(event, ResponseData): - event.flow.response.raw_content = ( - (event.flow.response.raw_content or b"") - + event.data - ) - elif isinstance(event, ResponseComplete): - yield commands.Log(f"ResponseComplete: {event}") - yield commands.Hook("response", event.flow) - yield commands.Log(f"Hook processed: {event}") - yield SendResponseHeaders(event.flow) - yield SendResponseData(event.flow, event.flow.response.raw_content) - yield SendResponseComplete(event.flow) - - elif isinstance(event, events.ConnectionClosed): - yield commands.Log(f"HTTPLayer unimplemented event: {event}", level="error") - else: - raise NotImplementedError(event) diff --git a/mitmproxy/proxy2/layers/old/websocket.py b/mitmproxy/proxy2/layers/outdated/websocket.py similarity index 100% rename from mitmproxy/proxy2/layers/old/websocket.py rename to mitmproxy/proxy2/layers/outdated/websocket.py