[sans-io] remove http command queue

This commit is contained in:
Maximilian Hils 2020-04-05 02:06:23 +02:00
parent 21000fca2c
commit a071beb662
2 changed files with 46 additions and 57 deletions

View File

@ -411,11 +411,10 @@ class HttpLayer(layer.Layer):
ConnectionEvent -> HttpEvent -> HttpCommand -> ConnectionCommand ConnectionEvent -> HttpEvent -> HttpCommand -> ConnectionCommand
""" """
mode: HTTPMode mode: HTTPMode
stream_by_command: typing.Dict[commands.Command, HttpStream] command_sources: typing.Dict[commands.Command, layer.Layer]
streams: typing.Dict[int, HttpStream] streams: typing.Dict[int, HttpStream]
connections: typing.Dict[Connection, typing.Union[layer.Layer, HttpStream]] connections: typing.Dict[Connection, layer.Layer]
waiting_for_establishment: typing.DefaultDict[Connection, typing.List[GetHttpConnection]] waiting_for_establishment: typing.DefaultDict[Connection, typing.List[GetHttpConnection]]
command_queue: typing.Deque[commands.Command]
def __init__(self, context: Context, mode: HTTPMode): def __init__(self, context: Context, mode: HTTPMode):
super().__init__(context) super().__init__(context)
@ -423,61 +422,71 @@ class HttpLayer(layer.Layer):
self.waiting_for_establishment = collections.defaultdict(list) self.waiting_for_establishment = collections.defaultdict(list)
self.streams = {} self.streams = {}
self.stream_by_command = {} self.command_sources = {}
self.command_queue = collections.deque()
self.connections = { self.connections = {
context.client: Http1Server(context.fork()) context.client: Http1Server(context.fork())
} }
def __repr__(self): def __repr__(self):
return f"HttpLayer(conns: {len(self.connections)}, queue: {[type(e).__name__ for e in self.command_queue]})" return f"HttpLayer(conns: {len(self.connections)})"
def _handle_event(self, event: events.Event): def _handle_event(self, event: events.Event):
if isinstance(event, events.Start): if isinstance(event, events.Start):
if self.mode is HTTPMode.upstream: if self.mode is HTTPMode.upstream:
self.context.server.via = server_spec.parse_with_mode(self.context.options.mode)[1] self.context.server.via = server_spec.parse_with_mode(self.context.options.mode)[1]
elif isinstance(event, events.CommandReply): elif isinstance(event, events.CommandReply):
stream = self.stream_by_command.pop(event.command) stream = self.command_sources.pop(event.command)
self.event_to_child(stream, event) yield from self.event_to_child(stream, event)
elif isinstance(event, events.ConnectionEvent): elif isinstance(event, events.ConnectionEvent):
if event.connection == self.context.server and self.context.server not in self.connections: if event.connection == self.context.server and self.context.server not in self.connections:
pass pass
else: else:
try: handler = self.connections[event.connection]
handler = self.connections[event.connection] yield from self.event_to_child(handler, event)
except KeyError:
raise
self.event_to_child(handler, event)
else: else:
raise ValueError(f"Unexpected event: {event}") raise ValueError(f"Unexpected event: {event}")
while self.command_queue: def event_to_child(
command = self.command_queue.popleft() self,
child: typing.Union[layer.Layer, HttpStream],
event: events.Event,
) -> layer.CommandGenerator[None]:
for command in child.handle_event(event):
assert isinstance(command, commands.Command)
# Streams may yield blocking commands, which ultimately generate CommandReply events.
# Those need to be routed back to the correct stream, so we need to keep track of that.
if command.blocking:
self.command_sources[command] = child
if isinstance(command, ReceiveHttp): if isinstance(command, ReceiveHttp):
if isinstance(command.event, RequestHeaders): if isinstance(command.event, RequestHeaders):
self.streams[command.event.stream_id] = self.make_stream() self.streams[command.event.stream_id] = yield from self.make_stream()
stream = self.streams[command.event.stream_id] stream = self.streams[command.event.stream_id]
self.event_to_child(stream, command.event) yield from self.event_to_child(stream, command.event)
elif isinstance(command, SendHttp): elif isinstance(command, SendHttp):
conn = self.connections[command.connection] conn = self.connections[command.connection]
self.event_to_child(conn, command.event) yield from self.event_to_child(conn, command.event)
elif isinstance(command, GetHttpConnection): elif isinstance(command, GetHttpConnection):
self.get_connection(command) yield from self.get_connection(command)
elif isinstance(command, RegisterHttpConnection): elif isinstance(command, RegisterHttpConnection):
yield from self.register_connection(command) yield from self.register_connection(command)
elif isinstance(command, commands.OpenConnection):
self.connections[command.connection] = child
yield command
elif isinstance(command, commands.Command): elif isinstance(command, commands.Command):
yield command yield command
else: # pragma: no cover else: # pragma: no cover
raise ValueError(f"Not a command command: {command}") raise ValueError(f"Not a command command: {command}")
def make_stream(self) -> HttpStream: def make_stream(self) -> layer.CommandGenerator[HttpStream]:
ctx = self.context.fork() ctx = self.context.fork()
stream = HttpStream(ctx) stream = HttpStream(ctx)
self.event_to_child(stream, events.Start()) yield from self.event_to_child(stream, events.Start())
return stream return stream
def get_connection(self, event: GetHttpConnection, *, reuse: bool = True): def get_connection(self, event: GetHttpConnection, *, reuse: bool = True) -> layer.CommandGenerator[None]:
# Do we already have a connection we can re-use? # Do we already have a connection we can re-use?
if reuse: if reuse:
for connection in self.connections: for connection in self.connections:
@ -491,8 +500,8 @@ class HttpLayer(layer.Layer):
if connection in self.waiting_for_establishment: if connection in self.waiting_for_establishment:
self.waiting_for_establishment[connection].append(event) self.waiting_for_establishment[connection].append(event)
else: else:
stream = self.stream_by_command.pop(event) stream = self.command_sources.pop(event)
self.event_to_child(stream, GetHttpConnectionReply(event, (connection, None))) yield from self.event_to_child(stream, GetHttpConnectionReply(event, (connection, None)))
return return
can_use_context_connection = ( can_use_context_connection = (
@ -523,9 +532,9 @@ class HttpLayer(layer.Layer):
self.connections[context.server] = stack[0] self.connections[context.server] = stack[0]
self.waiting_for_establishment[context.server].append(event) self.waiting_for_establishment[context.server].append(event)
self.event_to_child(stack[0], events.Start()) yield from self.event_to_child(stack[0], events.Start())
def register_connection(self, command: RegisterHttpConnection): def register_connection(self, command: RegisterHttpConnection) -> layer.CommandGenerator[None]:
waiting = self.waiting_for_establishment.pop(command.connection) waiting = self.waiting_for_establishment.pop(command.connection)
if command.err: if command.err:
@ -534,8 +543,8 @@ class HttpLayer(layer.Layer):
reply = (command.connection, None) reply = (command.connection, None)
for cmd in waiting: for cmd in waiting:
stream = self.stream_by_command.pop(cmd) stream = self.command_sources.pop(cmd)
self.event_to_child(stream, GetHttpConnectionReply(cmd, reply)) yield from self.event_to_child(stream, GetHttpConnectionReply(cmd, reply))
# Tricky multiplexing edge case: Assume a h2 client that sends two requests (or receives two responses) # Tricky multiplexing edge case: Assume a h2 client that sends two requests (or receives two responses)
# that neither have a content-length specified nor a chunked transfer encoding. # that neither have a content-length specified nor a chunked transfer encoding.
@ -547,24 +556,6 @@ class HttpLayer(layer.Layer):
yield from self.get_connection(cmd, reuse=False) yield from self.get_connection(cmd, reuse=False)
break break
def event_to_child(
self,
child: typing.Union[layer.Layer, HttpStream],
event: events.Event,
) -> None:
child_commands = list(child.handle_event(event))
for cmd in child_commands:
assert isinstance(cmd, commands.Command)
# Streams may yield blocking commands, which ultimately generate CommandReply events.
# Those need to be routed back to the correct stream, so we need to keep track of that.
if isinstance(cmd, commands.OpenConnection):
self.connections[cmd.connection] = child
if cmd.blocking:
self.stream_by_command[cmd] = child
self.command_queue.extend(child_commands)
class HttpClient(layer.Layer): class HttpClient(layer.Layer):
@expect(events.Start) @expect(events.Start)
@ -573,13 +564,11 @@ class HttpClient(layer.Layer):
err = None err = None
else: else:
err = yield commands.OpenConnection(self.context.server) err = yield commands.OpenConnection(self.context.server)
if not err:
if self.context.server.alpn == b"h2":
raise NotImplementedError
else:
child_layer = Http1Client(self.context)
self._handle_event = child_layer.handle_event
yield from self._handle_event(event)
yield RegisterHttpConnection(self.context.server, err) yield RegisterHttpConnection(self.context.server, err)
if err:
return
if self.context.server.alpn == b"h2":
raise NotImplementedError
else:
child_layer = Http1Client(self.context)
self._handle_event = child_layer.handle_event
yield from self._handle_event(event)

View File

@ -285,8 +285,8 @@ class Http1Client(Http1Connection):
resp = http1_sansio.read_response_head(response_head) resp = http1_sansio.read_response_head(response_head)
expected_size = http1.expected_http_body_size(self.request, resp) expected_size = http1.expected_http_body_size(self.request, resp)
except (ValueError, exceptions.HttpSyntaxException) as e: except (ValueError, exceptions.HttpSyntaxException) as e:
yield ReceiveHttp(ResponseProtocolError(self.stream_id, f"Cannot parse HTTP response: {e}"))
yield commands.CloseConnection(self.conn) yield commands.CloseConnection(self.conn)
yield ReceiveHttp(ResponseProtocolError(self.stream_id, f"Cannot parse HTTP response: {e}"))
return return
self.response = http.HTTPResponse.wrap(resp) self.response = http.HTTPResponse.wrap(resp)
yield ReceiveHttp(ResponseHeaders(self.stream_id, self.response)) yield ReceiveHttp(ResponseHeaders(self.stream_id, self.response))
@ -297,6 +297,7 @@ class Http1Client(Http1Connection):
else: else:
pass # FIXME: protect against header size DoS pass # FIXME: protect against header size DoS
elif isinstance(event, events.ConnectionClosed): elif isinstance(event, events.ConnectionClosed):
yield commands.CloseConnection(self.conn)
if self.stream_id: if self.stream_id:
if self.buf: if self.buf:
yield ReceiveHttp(ResponseProtocolError(self.stream_id, yield ReceiveHttp(ResponseProtocolError(self.stream_id,
@ -308,7 +309,6 @@ class Http1Client(Http1Connection):
yield ReceiveHttp(ResponseProtocolError(self.stream_id, "server closed connection")) yield ReceiveHttp(ResponseProtocolError(self.stream_id, "server closed connection"))
else: else:
return return
yield commands.CloseConnection(self.conn)
else: else:
raise ValueError(f"Unexpected event: {event}") raise ValueError(f"Unexpected event: {event}")