[sans-io] fix HTTP/2 stream concurrency

This commit is contained in:
Maximilian Hils 2020-11-22 16:47:01 +01:00
parent 553f102d6e
commit 32208b14eb
2 changed files with 69 additions and 0 deletions

View File

@ -277,6 +277,9 @@ class Http2Client(Http2Connection):
ReceiveEndOfMessage = ResponseEndOfMessage ReceiveEndOfMessage = ResponseEndOfMessage
SendEndOfMessage = RequestEndOfMessage SendEndOfMessage = RequestEndOfMessage
our_stream_id = Dict[int, int]
their_stream_id = Dict[int, int]
def __init__(self, context: Context): def __init__(self, context: Context):
super().__init__(context, context.server) super().__init__(context, context.server)
# Disable HTTP/2 push for now to keep things simple. # Disable HTTP/2 push for now to keep things simple.
@ -284,8 +287,27 @@ class Http2Client(Http2Connection):
self.h2_conn.local_settings.enable_push = 0 self.h2_conn.local_settings.enable_push = 0
# hyper-h2 pitfall: we need to acknowledge here, otherwise its sends out the old settings. # hyper-h2 pitfall: we need to acknowledge here, otherwise its sends out the old settings.
self.h2_conn.local_settings.acknowledge() self.h2_conn.local_settings.acknowledge()
self.our_stream_id = {}
self.their_stream_id = {}
def _handle_event(self, event: Event) -> CommandGenerator[None]: def _handle_event(self, event: Event) -> CommandGenerator[None]:
# We can't reuse stream ids from the client because they may arrived reordered here
# and HTTP/2 forbids opening a stream on a lower id than what was previously sent (see test_stream_concurrency).
# To mitigate this, we transparently map the outside's stream id to our stream id.
if isinstance(event, HttpEvent):
ours = self.our_stream_id.get(event.stream_id, None)
if ours is None:
ours = self.h2_conn.get_next_available_stream_id()
self.our_stream_id[event.stream_id] = ours
self.their_stream_id[ours] = event.stream_id
event.stream_id = ours
for cmd in self._handle_event2(event):
if isinstance(cmd, ReceiveHttp):
cmd.event.stream_id = self.their_stream_id[cmd.event.stream_id]
yield cmd
def _handle_event2(self, event: Event) -> CommandGenerator[None]:
if isinstance(event, RequestHeaders): if isinstance(event, RequestHeaders):
pseudo_headers = [ pseudo_headers = [
(b':method', event.request.method), (b':method', event.request.method),

View File

@ -303,3 +303,50 @@ def test_rst_then_close(tctx):
>> reply() >> reply()
) )
assert flow().error.msg == "connection cancelled" assert flow().error.msg == "connection cancelled"
def test_stream_concurrency(tctx):
"""Test that we can send an intercepted request with a lower stream id than one that has already been sent."""
playbook, cff = start_h2_client(tctx)
flow1 = Placeholder(HTTPFlow)
flow2 = Placeholder(HTTPFlow)
reqheadershook1 = http.HttpRequestHeadersHook(flow1)
reqheadershook2 = http.HttpRequestHeadersHook(flow2)
reqhook1 = http.HttpRequestHook(flow1)
reqhook2 = http.HttpRequestHook(flow2)
server = Placeholder(Server)
data_req1 = Placeholder(bytes)
data_req2 = Placeholder(bytes)
assert (playbook
>> DataReceived(
tctx.client,
cff.build_headers_frame(example_request_headers, flags=["END_STREAM"], stream_id=1).serialize() +
cff.build_headers_frame(example_request_headers, flags=["END_STREAM"], stream_id=3).serialize())
<< reqheadershook1
<< reqheadershook2
>> reply(to=reqheadershook1)
<< reqhook1
>> reply(to=reqheadershook2)
<< reqhook2
# req 2 overtakes 1 and we already have a reply:
>> reply(to=reqhook2)
<< OpenConnection(server)
>> reply(None, side_effect=make_h2)
<< SendData(server, data_req2)
>> reply(to=reqhook1)
<< SendData(server, data_req1)
)
frames = decode_frames(data_req2())
assert [type(x) for x in frames] == [
hyperframe.frame.SettingsFrame,
hyperframe.frame.HeadersFrame,
hyperframe.frame.DataFrame
]
frames = decode_frames(data_req1())
assert [type(x) for x in frames] == [
hyperframe.frame.HeadersFrame,
hyperframe.frame.DataFrame
]