[sans-io] make http event queue more flexible

This commit is contained in:
Maximilian Hils 2019-10-16 19:21:22 +02:00
parent a7a0143ef2
commit 7fb949f8bf

View File

@ -1,3 +1,4 @@
import collections
import typing import typing
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from warnings import warn from warnings import warn
@ -15,19 +16,6 @@ from mitmproxy.proxy2.context import Connection, Context
from mitmproxy.proxy2.layer import Layer from mitmproxy.proxy2.layer import Layer
from mitmproxy.proxy2.utils import expect from mitmproxy.proxy2.utils import expect
class MakeHttpClient(commands.Command):
"""
TODO: This needs to be more formalized. There should be a way to
create HTTP clients, and a way to create TCP clients.
The main HTTP layer needs to act as a connection pool manager.
"""
connection: Connection
def __init__(self, connection: Connection):
self.connection = connection
# FIXME: Combine HttpEvent and HttpCommand? # FIXME: Combine HttpEvent and HttpCommand?
StreamIdentifier = int StreamIdentifier = int
@ -41,12 +29,36 @@ class HttpEvent(events.Event):
def __init__(self, flow: http.HTTPFlow): def __init__(self, flow: http.HTTPFlow):
self.flow = flow self.flow = flow
def __repr__(self):
x = self.__dict__.copy()
x.pop("flow", None)
return f"{type(self).__name__}({repr(x) if x else ''})"
class SendHttp(commands.Command):
class HttpCommand(commands.Command):
pass
class MakeHttpClient(HttpCommand):
"""
TODO: This needs to be more formalized. There should be a way to
create HTTP clients, and a way to create TCP clients.
The main HTTP layer needs to act as a connection pool manager.
"""
connection: Connection
def __init__(self, connection: Connection):
self.connection = connection
class SendHttp(HttpCommand):
def __init__(self, event: HttpEvent, connection: Connection): def __init__(self, event: HttpEvent, connection: Connection):
self.event = event self.event = event
self.connection = connection self.connection = connection
def __repr__(self):
return f"Send({self.event})"
HttpEventGenerator = typing.Iterator[HttpEvent] HttpEventGenerator = typing.Iterator[HttpEvent]
@ -100,13 +112,13 @@ class Http1Connection(ABC):
self.context = context self.context = context
self.buf = ReceiveBuffer() self.buf = ReceiveBuffer()
def handle_connection_event(self, event: events.Event) -> HttpEventGenerator: def receive_connection_event(self, event: events.Event) -> HttpEventGenerator:
if isinstance(event, events.DataReceived): if isinstance(event, events.DataReceived):
self.buf += event.data self.buf += event.data
yield from self.state(event) yield from self.state(event)
@abstractmethod @abstractmethod
def receive_http_event(self, event: HttpEvent) -> commands.TCommandGenerator: def send(self, event: HttpEvent) -> commands.TCommandGenerator:
yield from () yield from ()
def make_body_reader(self, expected_size: typing.Optional[int]) -> TBodyReader: def make_body_reader(self, expected_size: typing.Optional[int]) -> TBodyReader:
@ -145,7 +157,7 @@ class Http1Server(Http1Connection):
super().__init__(context) super().__init__(context)
self.state = self.read_request_headers self.state = self.read_request_headers
def receive_http_event(self, event: HttpEvent) -> commands.TCommandGenerator: def send(self, event: HttpEvent) -> commands.TCommandGenerator:
if isinstance(event, ResponseHeaders): if isinstance(event, ResponseHeaders):
raw = http1.assemble_response_head(event.flow.response) raw = http1.assemble_response_head(event.flow.response)
elif isinstance(event, ResponseData): elif isinstance(event, ResponseData):
@ -167,11 +179,9 @@ class Http1Server(Http1Connection):
def read_request_headers(self, event: events.Event) -> HttpEventGenerator: def read_request_headers(self, event: events.Event) -> HttpEventGenerator:
if isinstance(event, events.DataReceived): if isinstance(event, events.DataReceived):
request_head = self.buf.maybe_extract_lines() request_head = self.buf.maybe_extract_lines()
# TODO: Make url.parse compatible with bytearrays
request_head = [bytes(x) for x in request_head]
if request_head: if request_head:
request_head = [bytes(x) for x in request_head] # TODO: Make url.parse compatible with bytearrays
request = http.HTTPRequest.wrap(http1_sansio.read_request_head(request_head)) request = http.HTTPRequest.wrap(http1_sansio.read_request_head(request_head))
self.flow = http.HTTPFlow( self.flow = http.HTTPFlow(
self.context.client, self.context.client,
self.context.server, self.context.server,
@ -191,16 +201,23 @@ class Http1Server(Http1Connection):
def read_request_body(self, event: events.Event) -> HttpEventGenerator: def read_request_body(self, event: events.Event) -> HttpEventGenerator:
for e in self.read_body(event, True): for e in self.read_body(event, True):
if isinstance(e, RequestEndOfMessage): if isinstance(e, RequestEndOfMessage):
self.state = self.read_request_headers self.state = self.maybe_upgrade
yield from self.state(event)
yield e yield e
def maybe_upgrade(self, event: events.Event) -> HttpEventGenerator:
"""
We wait for the current flow to be finished, as we may want to upgrade to WebSocket or plain TCP afterwards.
"""
yield from ()
class Http1Client(Http1Connection): class Http1Client(Http1Connection):
def __init__(self, context: Context): def __init__(self, context: Context):
super().__init__(context) super().__init__(context)
self.state = self.read_response_headers self.state = self.read_response_headers
def receive_http_event(self, event: HttpEvent) -> commands.TCommandGenerator: def send(self, event: HttpEvent) -> commands.TCommandGenerator:
if isinstance(event, RequestHeaders): if isinstance(event, RequestHeaders):
self.flow = event.flow self.flow = event.flow
raw = http1.assemble_request_head(event.flow.request) raw = http1.assemble_request_head(event.flow.request)
@ -234,6 +251,8 @@ class Http1Client(Http1Connection):
self.state = self.read_response_body self.state = self.read_response_body
yield from self.state(event) yield from self.state(event)
elif isinstance(event, events.ConnectionClosed):
return # TODO: Teardown?
else: else:
return warn(f"Http1Client.read_request_headers: unimplemented {event}") return warn(f"Http1Client.read_request_headers: unimplemented {event}")
@ -265,6 +284,7 @@ class HttpStream(Layer):
self._handle_event = self.read_request_headers self._handle_event = self.read_request_headers
yield from () yield from ()
@expect(HttpEvent)
def read_request_headers(self, event: events.Event) -> commands.TCommandGenerator: def read_request_headers(self, event: events.Event) -> commands.TCommandGenerator:
if isinstance(event, RequestHeaders): if isinstance(event, RequestHeaders):
self.flow = event.flow self.flow = event.flow
@ -285,6 +305,7 @@ class HttpStream(Layer):
else: else:
raise NotImplementedError raise NotImplementedError
@expect(HttpEvent)
def read_request_body(self, event: events.Event) -> commands.TCommandGenerator: def read_request_body(self, event: events.Event) -> commands.TCommandGenerator:
if isinstance(event, RequestData): if isinstance(event, RequestData):
self.request_body_buf += event.data self.request_body_buf += event.data
@ -342,6 +363,7 @@ class HttpStream(Layer):
yield SendHttp(RequestEndOfMessage(self.flow), self.context.server) yield SendHttp(RequestEndOfMessage(self.flow), self.context.server)
self._handle_event = self.read_response_headers self._handle_event = self.read_response_headers
@expect(HttpEvent)
def read_response_headers(self, event: events.Event) -> commands.TCommandGenerator: def read_response_headers(self, event: events.Event) -> commands.TCommandGenerator:
if isinstance(event, ResponseHeaders): if isinstance(event, ResponseHeaders):
yield commands.Hook("responseheaders", self.flow) yield commands.Hook("responseheaders", self.flow)
@ -353,6 +375,7 @@ class HttpStream(Layer):
else: else:
raise NotImplementedError raise NotImplementedError
@expect(HttpEvent)
def read_response_body(self, event: events.Event) -> commands.TCommandGenerator: def read_response_body(self, event: events.Event) -> commands.TCommandGenerator:
if isinstance(event, ResponseData): if isinstance(event, ResponseData):
self.response_body_buf += event.data self.response_body_buf += event.data
@ -379,16 +402,17 @@ class HTTPLayer(Layer):
ConnectionEvent: We have received b"GET /\r\n\r\n" from the client. ConnectionEvent: We have received b"GET /\r\n\r\n" from the client.
HttpEvent: We have received request headers HttpEvent: We have received request headers
HttpCommand: Send request headers to X HttpCommand: Send request headers to X
Connection Command: Send b"GET /\r\n\r\n" to server. ConnectionCommand: Send b"GET /\r\n\r\n" to server.
ConnectionEvent -> HttpEvent -> HttpCommand -> ConnectionCommand ConnectionEvent -> HttpEvent -> HttpCommand -> ConnectionCommand
""" """
mode: HTTPMode mode: HTTPMode
flow: typing.Optional[http.HTTPFlow]
client_layer: Layer
stream_by_command: typing.Dict[commands.Command, HttpStream] stream_by_command: typing.Dict[commands.Command, HttpStream]
streams: typing.Dict[int, HttpStream] streams: typing.Dict[int, HttpStream]
connections: typing.Dict[Connection, Http1Connection] connections: typing.Dict[Connection, Http1Connection]
event_queue: typing.Deque[
typing.Union[HttpEvent, HttpCommand, commands.Command]
]
def __init__(self, context: Context, mode: HTTPMode): def __init__(self, context: Context, mode: HTTPMode):
super().__init__(context) super().__init__(context)
@ -399,31 +423,53 @@ class HTTPLayer(Layer):
} }
self.streams = {} self.streams = {}
self.stream_by_command = {} self.stream_by_command = {}
self.event_queue = collections.deque()
def __repr__(self):
return f"HTTPLayer(conns: {len(self.connections)}, events: {[type(e).__name__ for e in self.event_queue]})"
def _handle_event(self, event: events.Event): def _handle_event(self, event: events.Event):
if isinstance(event, events.Start): if isinstance(event, events.Start):
return return
elif isinstance(event, events.CommandReply): elif isinstance(event, events.CommandReply):
stream = self.stream_by_command.pop(event.command) stream = self.stream_by_command.pop(event.command)
yield from self.event_to_stream(stream, event) self.event_to_stream(stream, event)
elif isinstance(event, events.ConnectionEvent): elif isinstance(event, events.ConnectionEvent):
http_events = self.connections[event.connection].handle_connection_event(event) conn = self.connections[event.connection]
for event in http_events: evts = conn.receive_connection_event(event)
self.event_queue.extend(evts)
else:
raise ValueError(f"Unexpected event: {event}")
while self.event_queue:
event = self.event_queue.popleft()
if isinstance(event, RequestHeaders): if isinstance(event, RequestHeaders):
self.streams[event.flow.id] = HttpStream(self.context) self.streams[event.flow.id] = HttpStream(self.context)
self.streams[event.flow.id].debug = self.debug and self.debug + " " self.streams[event.flow.id].debug = self.debug and self.debug + " "
yield from self.event_to_stream(self.streams[event.flow.id], events.Start()) self.event_to_stream(self.streams[event.flow.id], events.Start())
yield from self.event_to_stream(self.streams[event.flow.id], event) if isinstance(event, HttpEvent):
stream = self.streams[event.flow.id]
self.event_to_stream(stream, event)
elif isinstance(event, MakeHttpClient):
self.connections[event.connection] = Http1Client(self.context)
elif isinstance(event, SendHttp):
conn = self.connections[event.connection]
evts = conn.send(event.event)
self.event_queue.extend(evts)
elif isinstance(event, commands.Command):
yield event
else: else:
return warn(f"HTTPLayer._handle_event: unimplemented {event}") raise ValueError(f"Unexpected event: {event}")
def event_to_stream(self, stream: Layer, event: events.Event): def event_to_stream(
for command in stream.handle_event(event): self,
if isinstance(command, MakeHttpClient): stream: HttpStream,
self.connections[command.connection] = Http1Client(self.context) event: events.Event,
elif isinstance(command, SendHttp): ):
yield from self.connections[command.connection].receive_http_event(command.event) stream_events = list(stream.handle_event(event))
else: for se in stream_events:
if command.blocking: # Streams may yield blocking commands, which ultimately generate CommandReply events.
self.stream_by_command[command] = stream # Those need to be routed back to the correct stream, so we need to keep track of that.
yield command if isinstance(se, commands.Command) and se.blocking:
self.stream_by_command[se] = stream
self.event_queue.extend(stream_events)