From a071beb662a4b6af86ed94a2a96ffa376f4b9bb1 Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Sun, 5 Apr 2020 02:06:23 +0200 Subject: [PATCH] [sans-io] remove http command queue --- mitmproxy/proxy2/layers/http/__init__.py | 99 +++++++++++------------- mitmproxy/proxy2/layers/http/_http1.py | 4 +- 2 files changed, 46 insertions(+), 57 deletions(-) diff --git a/mitmproxy/proxy2/layers/http/__init__.py b/mitmproxy/proxy2/layers/http/__init__.py index 1e99af578..287a5f31b 100644 --- a/mitmproxy/proxy2/layers/http/__init__.py +++ b/mitmproxy/proxy2/layers/http/__init__.py @@ -411,11 +411,10 @@ class HttpLayer(layer.Layer): ConnectionEvent -> HttpEvent -> HttpCommand -> ConnectionCommand """ mode: HTTPMode - stream_by_command: typing.Dict[commands.Command, HttpStream] + command_sources: typing.Dict[commands.Command, layer.Layer] streams: typing.Dict[int, HttpStream] - connections: typing.Dict[Connection, typing.Union[layer.Layer, HttpStream]] + connections: typing.Dict[Connection, layer.Layer] waiting_for_establishment: typing.DefaultDict[Connection, typing.List[GetHttpConnection]] - command_queue: typing.Deque[commands.Command] def __init__(self, context: Context, mode: HTTPMode): super().__init__(context) @@ -423,61 +422,71 @@ class HttpLayer(layer.Layer): self.waiting_for_establishment = collections.defaultdict(list) self.streams = {} - self.stream_by_command = {} - self.command_queue = collections.deque() + self.command_sources = {} self.connections = { context.client: Http1Server(context.fork()) } def __repr__(self): - return f"HttpLayer(conns: {len(self.connections)}, queue: {[type(e).__name__ for e in self.command_queue]})" + return f"HttpLayer(conns: {len(self.connections)})" def _handle_event(self, event: events.Event): if isinstance(event, events.Start): if self.mode is HTTPMode.upstream: self.context.server.via = server_spec.parse_with_mode(self.context.options.mode)[1] elif isinstance(event, events.CommandReply): - stream = self.stream_by_command.pop(event.command) - self.event_to_child(stream, event) + stream = self.command_sources.pop(event.command) + yield from self.event_to_child(stream, event) elif isinstance(event, events.ConnectionEvent): if event.connection == self.context.server and self.context.server not in self.connections: pass else: - try: - handler = self.connections[event.connection] - except KeyError: - raise - self.event_to_child(handler, event) + handler = self.connections[event.connection] + yield from self.event_to_child(handler, event) else: raise ValueError(f"Unexpected event: {event}") - while self.command_queue: - command = self.command_queue.popleft() + def event_to_child( + self, + child: typing.Union[layer.Layer, HttpStream], + event: events.Event, + ) -> layer.CommandGenerator[None]: + for command in child.handle_event(event): + assert isinstance(command, commands.Command) + # Streams may yield blocking commands, which ultimately generate CommandReply events. + # Those need to be routed back to the correct stream, so we need to keep track of that. + + if command.blocking: + self.command_sources[command] = child + if isinstance(command, ReceiveHttp): if isinstance(command.event, RequestHeaders): - self.streams[command.event.stream_id] = self.make_stream() + self.streams[command.event.stream_id] = yield from self.make_stream() stream = self.streams[command.event.stream_id] - self.event_to_child(stream, command.event) + yield from self.event_to_child(stream, command.event) elif isinstance(command, SendHttp): conn = self.connections[command.connection] - self.event_to_child(conn, command.event) + yield from self.event_to_child(conn, command.event) elif isinstance(command, GetHttpConnection): - self.get_connection(command) + yield from self.get_connection(command) elif isinstance(command, RegisterHttpConnection): yield from self.register_connection(command) + elif isinstance(command, commands.OpenConnection): + self.connections[command.connection] = child + yield command elif isinstance(command, commands.Command): yield command else: # pragma: no cover raise ValueError(f"Not a command command: {command}") - def make_stream(self) -> HttpStream: + def make_stream(self) -> layer.CommandGenerator[HttpStream]: ctx = self.context.fork() stream = HttpStream(ctx) - self.event_to_child(stream, events.Start()) + yield from self.event_to_child(stream, events.Start()) return stream - def get_connection(self, event: GetHttpConnection, *, reuse: bool = True): + def get_connection(self, event: GetHttpConnection, *, reuse: bool = True) -> layer.CommandGenerator[None]: # Do we already have a connection we can re-use? if reuse: for connection in self.connections: @@ -491,8 +500,8 @@ class HttpLayer(layer.Layer): if connection in self.waiting_for_establishment: self.waiting_for_establishment[connection].append(event) else: - stream = self.stream_by_command.pop(event) - self.event_to_child(stream, GetHttpConnectionReply(event, (connection, None))) + stream = self.command_sources.pop(event) + yield from self.event_to_child(stream, GetHttpConnectionReply(event, (connection, None))) return can_use_context_connection = ( @@ -523,9 +532,9 @@ class HttpLayer(layer.Layer): self.connections[context.server] = stack[0] self.waiting_for_establishment[context.server].append(event) - self.event_to_child(stack[0], events.Start()) + yield from self.event_to_child(stack[0], events.Start()) - def register_connection(self, command: RegisterHttpConnection): + def register_connection(self, command: RegisterHttpConnection) -> layer.CommandGenerator[None]: waiting = self.waiting_for_establishment.pop(command.connection) if command.err: @@ -534,8 +543,8 @@ class HttpLayer(layer.Layer): reply = (command.connection, None) for cmd in waiting: - stream = self.stream_by_command.pop(cmd) - self.event_to_child(stream, GetHttpConnectionReply(cmd, reply)) + stream = self.command_sources.pop(cmd) + yield from self.event_to_child(stream, GetHttpConnectionReply(cmd, reply)) # Tricky multiplexing edge case: Assume a h2 client that sends two requests (or receives two responses) # that neither have a content-length specified nor a chunked transfer encoding. @@ -547,24 +556,6 @@ class HttpLayer(layer.Layer): yield from self.get_connection(cmd, reuse=False) break - def event_to_child( - self, - child: typing.Union[layer.Layer, HttpStream], - event: events.Event, - ) -> None: - child_commands = list(child.handle_event(event)) - for cmd in child_commands: - assert isinstance(cmd, commands.Command) - # Streams may yield blocking commands, which ultimately generate CommandReply events. - # Those need to be routed back to the correct stream, so we need to keep track of that. - if isinstance(cmd, commands.OpenConnection): - self.connections[cmd.connection] = child - - if cmd.blocking: - self.stream_by_command[cmd] = child - - self.command_queue.extend(child_commands) - class HttpClient(layer.Layer): @expect(events.Start) @@ -573,13 +564,11 @@ class HttpClient(layer.Layer): err = None else: err = yield commands.OpenConnection(self.context.server) + if not err: + if self.context.server.alpn == b"h2": + raise NotImplementedError + else: + child_layer = Http1Client(self.context) + self._handle_event = child_layer.handle_event + yield from self._handle_event(event) yield RegisterHttpConnection(self.context.server, err) - if err: - return - - if self.context.server.alpn == b"h2": - raise NotImplementedError - else: - child_layer = Http1Client(self.context) - self._handle_event = child_layer.handle_event - yield from self._handle_event(event) diff --git a/mitmproxy/proxy2/layers/http/_http1.py b/mitmproxy/proxy2/layers/http/_http1.py index a2fdd5cae..feaea65fd 100644 --- a/mitmproxy/proxy2/layers/http/_http1.py +++ b/mitmproxy/proxy2/layers/http/_http1.py @@ -285,8 +285,8 @@ class Http1Client(Http1Connection): resp = http1_sansio.read_response_head(response_head) expected_size = http1.expected_http_body_size(self.request, resp) except (ValueError, exceptions.HttpSyntaxException) as e: - yield ReceiveHttp(ResponseProtocolError(self.stream_id, f"Cannot parse HTTP response: {e}")) yield commands.CloseConnection(self.conn) + yield ReceiveHttp(ResponseProtocolError(self.stream_id, f"Cannot parse HTTP response: {e}")) return self.response = http.HTTPResponse.wrap(resp) yield ReceiveHttp(ResponseHeaders(self.stream_id, self.response)) @@ -297,6 +297,7 @@ class Http1Client(Http1Connection): else: pass # FIXME: protect against header size DoS elif isinstance(event, events.ConnectionClosed): + yield commands.CloseConnection(self.conn) if self.stream_id: if self.buf: yield ReceiveHttp(ResponseProtocolError(self.stream_id, @@ -308,7 +309,6 @@ class Http1Client(Http1Connection): yield ReceiveHttp(ResponseProtocolError(self.stream_id, "server closed connection")) else: return - yield commands.CloseConnection(self.conn) else: raise ValueError(f"Unexpected event: {event}")