From 7fb949f8bf9b901991d35e8a48777859fd60d8cb Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Wed, 16 Oct 2019 19:21:22 +0200 Subject: [PATCH] [sans-io] make http event queue more flexible --- mitmproxy/proxy2/layers/http/http.py | 132 ++++++++++++++++++--------- 1 file changed, 89 insertions(+), 43 deletions(-) diff --git a/mitmproxy/proxy2/layers/http/http.py b/mitmproxy/proxy2/layers/http/http.py index 638b8b315..654ed6501 100644 --- a/mitmproxy/proxy2/layers/http/http.py +++ b/mitmproxy/proxy2/layers/http/http.py @@ -1,3 +1,4 @@ +import collections import typing from abc import ABC, abstractmethod from warnings import warn @@ -15,19 +16,6 @@ from mitmproxy.proxy2.context import Connection, Context from mitmproxy.proxy2.layer import Layer from mitmproxy.proxy2.utils import expect - -class MakeHttpClient(commands.Command): - """ - TODO: This needs to be more formalized. There should be a way to - create HTTP clients, and a way to create TCP clients. - The main HTTP layer needs to act as a connection pool manager. - """ - connection: Connection - - def __init__(self, connection: Connection): - self.connection = connection - - # FIXME: Combine HttpEvent and HttpCommand? StreamIdentifier = int @@ -41,12 +29,36 @@ class HttpEvent(events.Event): def __init__(self, flow: http.HTTPFlow): self.flow = flow + def __repr__(self): + x = self.__dict__.copy() + x.pop("flow", None) + return f"{type(self).__name__}({repr(x) if x else ''})" -class SendHttp(commands.Command): + +class HttpCommand(commands.Command): + pass + + +class MakeHttpClient(HttpCommand): + """ + TODO: This needs to be more formalized. There should be a way to + create HTTP clients, and a way to create TCP clients. + The main HTTP layer needs to act as a connection pool manager. + """ + connection: Connection + + def __init__(self, connection: Connection): + self.connection = connection + + +class SendHttp(HttpCommand): def __init__(self, event: HttpEvent, connection: Connection): self.event = event self.connection = connection + def __repr__(self): + return f"Send({self.event})" + HttpEventGenerator = typing.Iterator[HttpEvent] @@ -100,13 +112,13 @@ class Http1Connection(ABC): self.context = context self.buf = ReceiveBuffer() - def handle_connection_event(self, event: events.Event) -> HttpEventGenerator: + def receive_connection_event(self, event: events.Event) -> HttpEventGenerator: if isinstance(event, events.DataReceived): self.buf += event.data yield from self.state(event) @abstractmethod - def receive_http_event(self, event: HttpEvent) -> commands.TCommandGenerator: + def send(self, event: HttpEvent) -> commands.TCommandGenerator: yield from () def make_body_reader(self, expected_size: typing.Optional[int]) -> TBodyReader: @@ -145,7 +157,7 @@ class Http1Server(Http1Connection): super().__init__(context) self.state = self.read_request_headers - def receive_http_event(self, event: HttpEvent) -> commands.TCommandGenerator: + def send(self, event: HttpEvent) -> commands.TCommandGenerator: if isinstance(event, ResponseHeaders): raw = http1.assemble_response_head(event.flow.response) elif isinstance(event, ResponseData): @@ -167,11 +179,9 @@ class Http1Server(Http1Connection): def read_request_headers(self, event: events.Event) -> HttpEventGenerator: if isinstance(event, events.DataReceived): request_head = self.buf.maybe_extract_lines() - # TODO: Make url.parse compatible with bytearrays - request_head = [bytes(x) for x in request_head] if request_head: + request_head = [bytes(x) for x in request_head] # TODO: Make url.parse compatible with bytearrays request = http.HTTPRequest.wrap(http1_sansio.read_request_head(request_head)) - self.flow = http.HTTPFlow( self.context.client, self.context.server, @@ -191,16 +201,23 @@ class Http1Server(Http1Connection): def read_request_body(self, event: events.Event) -> HttpEventGenerator: for e in self.read_body(event, True): if isinstance(e, RequestEndOfMessage): - self.state = self.read_request_headers + self.state = self.maybe_upgrade + yield from self.state(event) yield e + def maybe_upgrade(self, event: events.Event) -> HttpEventGenerator: + """ + We wait for the current flow to be finished, as we may want to upgrade to WebSocket or plain TCP afterwards. + """ + yield from () + class Http1Client(Http1Connection): def __init__(self, context: Context): super().__init__(context) self.state = self.read_response_headers - def receive_http_event(self, event: HttpEvent) -> commands.TCommandGenerator: + def send(self, event: HttpEvent) -> commands.TCommandGenerator: if isinstance(event, RequestHeaders): self.flow = event.flow raw = http1.assemble_request_head(event.flow.request) @@ -234,6 +251,8 @@ class Http1Client(Http1Connection): self.state = self.read_response_body yield from self.state(event) + elif isinstance(event, events.ConnectionClosed): + return # TODO: Teardown? else: return warn(f"Http1Client.read_request_headers: unimplemented {event}") @@ -265,6 +284,7 @@ class HttpStream(Layer): self._handle_event = self.read_request_headers yield from () + @expect(HttpEvent) def read_request_headers(self, event: events.Event) -> commands.TCommandGenerator: if isinstance(event, RequestHeaders): self.flow = event.flow @@ -285,6 +305,7 @@ class HttpStream(Layer): else: raise NotImplementedError + @expect(HttpEvent) def read_request_body(self, event: events.Event) -> commands.TCommandGenerator: if isinstance(event, RequestData): self.request_body_buf += event.data @@ -342,6 +363,7 @@ class HttpStream(Layer): yield SendHttp(RequestEndOfMessage(self.flow), self.context.server) self._handle_event = self.read_response_headers + @expect(HttpEvent) def read_response_headers(self, event: events.Event) -> commands.TCommandGenerator: if isinstance(event, ResponseHeaders): yield commands.Hook("responseheaders", self.flow) @@ -353,6 +375,7 @@ class HttpStream(Layer): else: raise NotImplementedError + @expect(HttpEvent) def read_response_body(self, event: events.Event) -> commands.TCommandGenerator: if isinstance(event, ResponseData): self.response_body_buf += event.data @@ -379,16 +402,17 @@ class HTTPLayer(Layer): ConnectionEvent: We have received b"GET /\r\n\r\n" from the client. HttpEvent: We have received request headers HttpCommand: Send request headers to X - Connection Command: Send b"GET /\r\n\r\n" to server. + ConnectionCommand: Send b"GET /\r\n\r\n" to server. ConnectionEvent -> HttpEvent -> HttpCommand -> ConnectionCommand """ mode: HTTPMode - flow: typing.Optional[http.HTTPFlow] - client_layer: Layer stream_by_command: typing.Dict[commands.Command, HttpStream] streams: typing.Dict[int, HttpStream] connections: typing.Dict[Connection, Http1Connection] + event_queue: typing.Deque[ + typing.Union[HttpEvent, HttpCommand, commands.Command] + ] def __init__(self, context: Context, mode: HTTPMode): super().__init__(context) @@ -399,31 +423,53 @@ class HTTPLayer(Layer): } self.streams = {} self.stream_by_command = {} + self.event_queue = collections.deque() + + def __repr__(self): + return f"HTTPLayer(conns: {len(self.connections)}, events: {[type(e).__name__ for e in self.event_queue]})" def _handle_event(self, event: events.Event): if isinstance(event, events.Start): return elif isinstance(event, events.CommandReply): stream = self.stream_by_command.pop(event.command) - yield from self.event_to_stream(stream, event) + self.event_to_stream(stream, event) elif isinstance(event, events.ConnectionEvent): - http_events = self.connections[event.connection].handle_connection_event(event) - for event in http_events: - if isinstance(event, RequestHeaders): - self.streams[event.flow.id] = HttpStream(self.context) - self.streams[event.flow.id].debug = self.debug and self.debug + " " - yield from self.event_to_stream(self.streams[event.flow.id], events.Start()) - yield from self.event_to_stream(self.streams[event.flow.id], event) + conn = self.connections[event.connection] + evts = conn.receive_connection_event(event) + self.event_queue.extend(evts) else: - return warn(f"HTTPLayer._handle_event: unimplemented {event}") + raise ValueError(f"Unexpected event: {event}") - def event_to_stream(self, stream: Layer, event: events.Event): - for command in stream.handle_event(event): - if isinstance(command, MakeHttpClient): - self.connections[command.connection] = Http1Client(self.context) - elif isinstance(command, SendHttp): - yield from self.connections[command.connection].receive_http_event(command.event) + while self.event_queue: + event = self.event_queue.popleft() + if isinstance(event, RequestHeaders): + self.streams[event.flow.id] = HttpStream(self.context) + self.streams[event.flow.id].debug = self.debug and self.debug + " " + self.event_to_stream(self.streams[event.flow.id], events.Start()) + if isinstance(event, HttpEvent): + stream = self.streams[event.flow.id] + self.event_to_stream(stream, event) + elif isinstance(event, MakeHttpClient): + self.connections[event.connection] = Http1Client(self.context) + elif isinstance(event, SendHttp): + conn = self.connections[event.connection] + evts = conn.send(event.event) + self.event_queue.extend(evts) + elif isinstance(event, commands.Command): + yield event else: - if command.blocking: - self.stream_by_command[command] = stream - yield command + raise ValueError(f"Unexpected event: {event}") + + def event_to_stream( + self, + stream: HttpStream, + event: events.Event, + ): + stream_events = list(stream.handle_event(event)) + for se in stream_events: + # Streams may yield blocking commands, which ultimately generate CommandReply events. + # Those need to be routed back to the correct stream, so we need to keep track of that. + if isinstance(se, commands.Command) and se.blocking: + self.stream_by_command[se] = stream + self.event_queue.extend(stream_events)