diff --git a/mitmproxy/proxy2/layers/http/_http2.py b/mitmproxy/proxy2/layers/http/_http2.py index 25dabcd33..0b94b5d15 100644 --- a/mitmproxy/proxy2/layers/http/_http2.py +++ b/mitmproxy/proxy2/layers/http/_http2.py @@ -277,6 +277,9 @@ class Http2Client(Http2Connection): ReceiveEndOfMessage = ResponseEndOfMessage SendEndOfMessage = RequestEndOfMessage + our_stream_id = Dict[int, int] + their_stream_id = Dict[int, int] + def __init__(self, context: Context): super().__init__(context, context.server) # Disable HTTP/2 push for now to keep things simple. @@ -284,8 +287,27 @@ class Http2Client(Http2Connection): self.h2_conn.local_settings.enable_push = 0 # hyper-h2 pitfall: we need to acknowledge here, otherwise its sends out the old settings. self.h2_conn.local_settings.acknowledge() + self.our_stream_id = {} + self.their_stream_id = {} def _handle_event(self, event: Event) -> CommandGenerator[None]: + # We can't reuse stream ids from the client because they may arrived reordered here + # and HTTP/2 forbids opening a stream on a lower id than what was previously sent (see test_stream_concurrency). + # To mitigate this, we transparently map the outside's stream id to our stream id. + if isinstance(event, HttpEvent): + ours = self.our_stream_id.get(event.stream_id, None) + if ours is None: + ours = self.h2_conn.get_next_available_stream_id() + self.our_stream_id[event.stream_id] = ours + self.their_stream_id[ours] = event.stream_id + event.stream_id = ours + + for cmd in self._handle_event2(event): + if isinstance(cmd, ReceiveHttp): + cmd.event.stream_id = self.their_stream_id[cmd.event.stream_id] + yield cmd + + def _handle_event2(self, event: Event) -> CommandGenerator[None]: if isinstance(event, RequestHeaders): pseudo_headers = [ (b':method', event.request.method), diff --git a/test/mitmproxy/proxy2/layers/http/test_http2.py b/test/mitmproxy/proxy2/layers/http/test_http2.py index c0ed03c03..4c6cb76be 100644 --- a/test/mitmproxy/proxy2/layers/http/test_http2.py +++ b/test/mitmproxy/proxy2/layers/http/test_http2.py @@ -303,3 +303,50 @@ def test_rst_then_close(tctx): >> reply() ) assert flow().error.msg == "connection cancelled" + + +def test_stream_concurrency(tctx): + """Test that we can send an intercepted request with a lower stream id than one that has already been sent.""" + playbook, cff = start_h2_client(tctx) + flow1 = Placeholder(HTTPFlow) + flow2 = Placeholder(HTTPFlow) + + reqheadershook1 = http.HttpRequestHeadersHook(flow1) + reqheadershook2 = http.HttpRequestHeadersHook(flow2) + reqhook1 = http.HttpRequestHook(flow1) + reqhook2 = http.HttpRequestHook(flow2) + + server = Placeholder(Server) + data_req1 = Placeholder(bytes) + data_req2 = Placeholder(bytes) + + assert (playbook + >> DataReceived( + tctx.client, + cff.build_headers_frame(example_request_headers, flags=["END_STREAM"], stream_id=1).serialize() + + cff.build_headers_frame(example_request_headers, flags=["END_STREAM"], stream_id=3).serialize()) + << reqheadershook1 + << reqheadershook2 + >> reply(to=reqheadershook1) + << reqhook1 + >> reply(to=reqheadershook2) + << reqhook2 + # req 2 overtakes 1 and we already have a reply: + >> reply(to=reqhook2) + << OpenConnection(server) + >> reply(None, side_effect=make_h2) + << SendData(server, data_req2) + >> reply(to=reqhook1) + << SendData(server, data_req1) + ) + frames = decode_frames(data_req2()) + assert [type(x) for x in frames] == [ + hyperframe.frame.SettingsFrame, + hyperframe.frame.HeadersFrame, + hyperframe.frame.DataFrame + ] + frames = decode_frames(data_req1()) + assert [type(x) for x in frames] == [ + hyperframe.frame.HeadersFrame, + hyperframe.frame.DataFrame + ]