mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-23 00:01:36 +00:00
[sans-io] fix HTTP/2 client aborts
This commit is contained in:
parent
3bffcf5e2f
commit
9251f7820a
@ -8,7 +8,7 @@ from mitmproxy.net import server_spec
|
|||||||
from mitmproxy.net.http import url
|
from mitmproxy.net.http import url
|
||||||
from mitmproxy.proxy.protocol.http import HTTPMode
|
from mitmproxy.proxy.protocol.http import HTTPMode
|
||||||
from mitmproxy.proxy2 import commands, events, layer, tunnel
|
from mitmproxy.proxy2 import commands, events, layer, tunnel
|
||||||
from mitmproxy.proxy2.context import Connection, Context, Server
|
from mitmproxy.proxy2.context import Connection, ConnectionState, Context, Server
|
||||||
from mitmproxy.proxy2.layers import tls
|
from mitmproxy.proxy2.layers import tls
|
||||||
from mitmproxy.proxy2.layers.http import _upstream_proxy
|
from mitmproxy.proxy2.layers.http import _upstream_proxy
|
||||||
from mitmproxy.proxy2.utils import expect
|
from mitmproxy.proxy2.utils import expect
|
||||||
@ -283,8 +283,15 @@ class HttpStream(layer.Layer):
|
|||||||
self.server_state = self.state_done
|
self.server_state = self.state_done
|
||||||
|
|
||||||
def check_killed(self) -> layer.CommandGenerator[bool]:
|
def check_killed(self) -> layer.CommandGenerator[bool]:
|
||||||
if self.flow.error and self.flow.error.msg == flow.Error.KILLED_MESSAGE:
|
killed_by_us = (
|
||||||
yield commands.CloseConnection(self.context.client)
|
self.flow.error and self.flow.error.msg == flow.Error.KILLED_MESSAGE
|
||||||
|
)
|
||||||
|
killed_by_remote = (
|
||||||
|
self.context.client.state is not ConnectionState.OPEN
|
||||||
|
)
|
||||||
|
if killed_by_us or killed_by_remote:
|
||||||
|
if self.context.client.state & ConnectionState.CAN_WRITE:
|
||||||
|
yield commands.CloseConnection(self.context.client)
|
||||||
self._handle_event = self.state_errored
|
self._handle_event = self.state_errored
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
@ -302,12 +309,29 @@ class HttpStream(layer.Layer):
|
|||||||
self,
|
self,
|
||||||
event: typing.Union[RequestProtocolError, ResponseProtocolError]
|
event: typing.Union[RequestProtocolError, ResponseProtocolError]
|
||||||
) -> layer.CommandGenerator[None]:
|
) -> layer.CommandGenerator[None]:
|
||||||
self.flow.error = flow.Error(event.message)
|
is_client_error_but_we_already_talk_upstream = (
|
||||||
yield HttpErrorHook(self.flow)
|
isinstance(event, RequestProtocolError) and
|
||||||
|
self.client_state in (self.state_stream_request_body, self.state_done)
|
||||||
|
)
|
||||||
|
if is_client_error_but_we_already_talk_upstream:
|
||||||
|
yield SendHttp(event, self.context.server)
|
||||||
|
self.client_state = self.state_errored
|
||||||
|
|
||||||
|
response_hook_already_triggered = (
|
||||||
|
self.server_state in (self.state_done, self.state_errored)
|
||||||
|
)
|
||||||
|
if not response_hook_already_triggered:
|
||||||
|
# We don't want to trigger both a response hook and an error hook,
|
||||||
|
# so we need to check if the response is done yet or not.
|
||||||
|
self.flow.error = flow.Error(event.message)
|
||||||
|
yield HttpErrorHook(self.flow)
|
||||||
|
|
||||||
if (yield from self.check_killed()):
|
if (yield from self.check_killed()):
|
||||||
return
|
return
|
||||||
elif isinstance(event, ResponseProtocolError):
|
|
||||||
|
if isinstance(event, ResponseProtocolError):
|
||||||
yield SendHttp(event, self.context.client)
|
yield SendHttp(event, self.context.client)
|
||||||
|
self.server_state = self.state_errored
|
||||||
|
|
||||||
def make_server_connection(self) -> layer.CommandGenerator[bool]:
|
def make_server_connection(self) -> layer.CommandGenerator[bool]:
|
||||||
connection, err = yield GetHttpConnection(
|
connection, err = yield GetHttpConnection(
|
||||||
|
@ -127,7 +127,11 @@ class Http2Connection(HttpConnection):
|
|||||||
self.streams.pop(event.stream_id, None)
|
self.streams.pop(event.stream_id, None)
|
||||||
elif isinstance(event, h2.events.StreamReset):
|
elif isinstance(event, h2.events.StreamReset):
|
||||||
if event.stream_id in self.streams:
|
if event.stream_id in self.streams:
|
||||||
yield ReceiveHttp(self.ReceiveProtocolError(event.stream_id, f"Stream reset, error code {event.error_code}"))
|
try:
|
||||||
|
err_str = h2.errors.ErrorCodes(event.error_code).name
|
||||||
|
except ValueError:
|
||||||
|
err_str = str(event.error_code)
|
||||||
|
yield ReceiveHttp(self.ReceiveProtocolError(event.stream_id, f"stream reset by client ({err_str})"))
|
||||||
self.streams.pop(event.stream_id)
|
self.streams.pop(event.stream_id)
|
||||||
else:
|
else:
|
||||||
pass # We don't track priority frames which could be followed by a stream reset here.
|
pass # We don't track priority frames which could be followed by a stream reset here.
|
||||||
|
@ -359,8 +359,6 @@ def test_request_streaming(tctx, response):
|
|||||||
<< SendData(tctx.client, b"HTTP/1.1 413 Request Entity Too Large\r\nContent-Length: 0\r\n\r\n")
|
<< SendData(tctx.client, b"HTTP/1.1 413 Request Entity Too Large\r\nContent-Length: 0\r\n\r\n")
|
||||||
>> ConnectionClosed(server)
|
>> ConnectionClosed(server)
|
||||||
<< CloseConnection(server)
|
<< CloseConnection(server)
|
||||||
<< http.HttpErrorHook(flow)
|
|
||||||
>> reply()
|
|
||||||
<< CloseConnection(tctx.client)
|
<< CloseConnection(tctx.client)
|
||||||
)
|
)
|
||||||
elif response == "early kill":
|
elif response == "early kill":
|
||||||
@ -717,13 +715,18 @@ def test_http_client_aborts(tctx, stream):
|
|||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
assert playbook >> reply()
|
assert playbook >> reply()
|
||||||
assert (
|
(
|
||||||
playbook
|
playbook
|
||||||
>> ConnectionClosed(tctx.client)
|
>> ConnectionClosed(tctx.client)
|
||||||
<< CloseConnection(tctx.client)
|
<< CloseConnection(tctx.client)
|
||||||
|
)
|
||||||
|
if stream:
|
||||||
|
playbook << CloseConnection(server)
|
||||||
|
assert (
|
||||||
|
playbook
|
||||||
<< http.HttpErrorHook(flow)
|
<< http.HttpErrorHook(flow)
|
||||||
>> reply()
|
>> reply()
|
||||||
|
<< None
|
||||||
)
|
)
|
||||||
|
|
||||||
assert "peer closed connection" in flow().error.msg
|
assert "peer closed connection" in flow().error.msg
|
||||||
@ -861,3 +864,16 @@ def test_kill_flow(tctx, when):
|
|||||||
return assert_kill()
|
return assert_kill()
|
||||||
else:
|
else:
|
||||||
raise AssertionError
|
raise AssertionError
|
||||||
|
|
||||||
|
|
||||||
|
def test_close_during_connect_hook(tctx):
|
||||||
|
flow = Placeholder(HTTPFlow)
|
||||||
|
assert (
|
||||||
|
Playbook(http.HttpLayer(tctx, HTTPMode.regular))
|
||||||
|
>> DataReceived(tctx.client,
|
||||||
|
b'CONNECT hi.ls:443 HTTP/1.1\r\nProxy-Connection: keep-alive\r\nConnection: keep-alive\r\nHost: hi.ls:443\r\n\r\n')
|
||||||
|
<< http.HttpConnectHook(flow)
|
||||||
|
>> ConnectionClosed(tctx.client)
|
||||||
|
<< CloseConnection(tctx.client)
|
||||||
|
>> reply(to=-3)
|
||||||
|
)
|
||||||
|
@ -101,7 +101,7 @@ def test_simple(tctx):
|
|||||||
assert flow().response.text == "Hello, World!"
|
assert flow().response.text == "Hello, World!"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("stream", ["stream", "block"])
|
@pytest.mark.parametrize("stream", ["stream", ""])
|
||||||
@pytest.mark.parametrize("when", ["request", "response"])
|
@pytest.mark.parametrize("when", ["request", "response"])
|
||||||
@pytest.mark.parametrize("how", ["RST", "disconnect", "RST+disconnect"])
|
@pytest.mark.parametrize("how", ["RST", "disconnect", "RST+disconnect"])
|
||||||
def test_http2_client_aborts(tctx, stream, when, how):
|
def test_http2_client_aborts(tctx, stream, when, how):
|
||||||
@ -127,7 +127,7 @@ def test_http2_client_aborts(tctx, stream, when, how):
|
|||||||
>> DataReceived(tctx.client, cff.build_headers_frame(example_request_headers).serialize())
|
>> DataReceived(tctx.client, cff.build_headers_frame(example_request_headers).serialize())
|
||||||
<< http.HttpRequestHeadersHook(flow)
|
<< http.HttpRequestHeadersHook(flow)
|
||||||
)
|
)
|
||||||
if stream == "stream" and when == "request":
|
if stream and when == "request":
|
||||||
assert (
|
assert (
|
||||||
playbook
|
playbook
|
||||||
>> reply(side_effect=enable_request_streaming)
|
>> reply(side_effect=enable_request_streaming)
|
||||||
@ -144,16 +144,21 @@ def test_http2_client_aborts(tctx, stream, when, how):
|
|||||||
if when == "request":
|
if when == "request":
|
||||||
if "RST" in how:
|
if "RST" in how:
|
||||||
playbook >> DataReceived(tctx.client, cff.build_rst_stream_frame(1, ErrorCodes.CANCEL).serialize())
|
playbook >> DataReceived(tctx.client, cff.build_rst_stream_frame(1, ErrorCodes.CANCEL).serialize())
|
||||||
playbook << http.HttpErrorHook(flow)
|
else:
|
||||||
playbook >> reply()
|
|
||||||
if "disconnect" in how:
|
|
||||||
playbook >> ConnectionClosed(tctx.client)
|
playbook >> ConnectionClosed(tctx.client)
|
||||||
playbook << CloseConnection(tctx.client)
|
playbook << CloseConnection(tctx.client)
|
||||||
if "RST" not in how:
|
|
||||||
playbook << http.HttpErrorHook(flow)
|
if stream:
|
||||||
playbook >> reply()
|
playbook << CloseConnection(server)
|
||||||
|
playbook << http.HttpErrorHook(flow)
|
||||||
|
playbook >> reply()
|
||||||
|
|
||||||
|
if how == "RST+disconnect":
|
||||||
|
playbook >> ConnectionClosed(tctx.client)
|
||||||
|
playbook << CloseConnection(tctx.client)
|
||||||
|
|
||||||
assert playbook
|
assert playbook
|
||||||
assert any(x in flow().error.msg for x in ["Stream reset", "peer closed connection"])
|
assert "stream reset" in flow().error.msg or "peer closed connection" in flow().error.msg
|
||||||
return
|
return
|
||||||
|
|
||||||
assert (
|
assert (
|
||||||
@ -168,7 +173,7 @@ def test_http2_client_aborts(tctx, stream, when, how):
|
|||||||
>> DataReceived(server, b"HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\n123")
|
>> DataReceived(server, b"HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\n123")
|
||||||
<< http.HttpResponseHeadersHook(flow)
|
<< http.HttpResponseHeadersHook(flow)
|
||||||
)
|
)
|
||||||
if stream == "stream":
|
if stream:
|
||||||
assert (
|
assert (
|
||||||
playbook
|
playbook
|
||||||
>> reply(side_effect=enable_response_streaming)
|
>> reply(side_effect=enable_response_streaming)
|
||||||
@ -179,17 +184,28 @@ def test_http2_client_aborts(tctx, stream, when, how):
|
|||||||
|
|
||||||
if "RST" in how:
|
if "RST" in how:
|
||||||
playbook >> DataReceived(tctx.client, cff.build_rst_stream_frame(1, ErrorCodes.CANCEL).serialize())
|
playbook >> DataReceived(tctx.client, cff.build_rst_stream_frame(1, ErrorCodes.CANCEL).serialize())
|
||||||
if "disconnect" in how:
|
else:
|
||||||
playbook >> ConnectionClosed(tctx.client)
|
playbook >> ConnectionClosed(tctx.client)
|
||||||
playbook << CloseConnection(tctx.client)
|
playbook << CloseConnection(tctx.client)
|
||||||
|
|
||||||
assert (
|
assert (
|
||||||
playbook
|
playbook
|
||||||
>> DataReceived(server, b"456")
|
<< CloseConnection(server)
|
||||||
<< http.HttpResponseHook(flow)
|
<< http.HttpErrorHook(flow)
|
||||||
>> reply()
|
>> reply()
|
||||||
)
|
)
|
||||||
if stream != "stream":
|
|
||||||
assert flow().response.content == b"123456"
|
if how == "RST+disconnect":
|
||||||
|
assert (
|
||||||
|
playbook
|
||||||
|
>> ConnectionClosed(tctx.client)
|
||||||
|
<< CloseConnection(tctx.client)
|
||||||
|
)
|
||||||
|
|
||||||
|
if "RST" in how:
|
||||||
|
assert "stream reset" in flow().error.msg
|
||||||
|
else:
|
||||||
|
assert "peer closed connection" in flow().error.msg
|
||||||
|
|
||||||
|
|
||||||
def test_no_normalization(tctx):
|
def test_no_normalization(tctx):
|
||||||
@ -268,7 +284,6 @@ def test_rst_then_close(tctx):
|
|||||||
playbook, cff = start_h2_client(tctx)
|
playbook, cff = start_h2_client(tctx)
|
||||||
flow = Placeholder(HTTPFlow)
|
flow = Placeholder(HTTPFlow)
|
||||||
server = Placeholder(Server)
|
server = Placeholder(Server)
|
||||||
open_conn = OpenConnection(server)
|
|
||||||
|
|
||||||
assert (
|
assert (
|
||||||
playbook
|
playbook
|
||||||
@ -278,12 +293,13 @@ def test_rst_then_close(tctx):
|
|||||||
>> reply()
|
>> reply()
|
||||||
<< http.HttpRequestHook(flow)
|
<< http.HttpRequestHook(flow)
|
||||||
>> reply()
|
>> reply()
|
||||||
<< open_conn
|
<< OpenConnection(server)
|
||||||
>> DataReceived(tctx.client, cff.build_data_frame(b"unexpected data frame").serialize())
|
>> DataReceived(tctx.client, cff.build_data_frame(b"unexpected data frame").serialize())
|
||||||
<< SendData(tctx.client, cff.build_rst_stream_frame(1, ErrorCodes.STREAM_CLOSED).serialize())
|
<< SendData(tctx.client, cff.build_rst_stream_frame(1, ErrorCodes.STREAM_CLOSED).serialize())
|
||||||
>> ConnectionClosed(tctx.client)
|
>> ConnectionClosed(tctx.client)
|
||||||
<< CloseConnection(tctx.client)
|
<< CloseConnection(tctx.client)
|
||||||
>> reply("connection cancelled", to=open_conn)
|
>> reply("connection cancelled", to=-5)
|
||||||
<< http.HttpErrorHook(flow)
|
<< http.HttpErrorHook(flow)
|
||||||
>> reply()
|
>> reply()
|
||||||
)
|
)
|
||||||
|
assert flow().error.msg == "connection cancelled"
|
||||||
|
Loading…
Reference in New Issue
Block a user