mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-23 00:01:36 +00:00
[sans-io] http bugfixes!
This commit is contained in:
parent
74f94fe5a3
commit
25999ba9d4
@ -44,6 +44,7 @@ class ProxyConnectionHandler(server.StreamConnectionHandler):
|
||||
data.reply = AsyncReply(data)
|
||||
await self.master.addons.handle_lifecycle(hook.name, data)
|
||||
await data.reply.done.wait()
|
||||
data.reply = None
|
||||
|
||||
def log(self, message: str, level: str = "info") -> None:
|
||||
x = log.LogEntry(self.log_prefix + message, level)
|
||||
|
@ -33,7 +33,7 @@ def validate_request(mode, request) -> typing.Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
@dataclass(unsafe_hash=True)
|
||||
@dataclass
|
||||
class GetHttpConnection(HttpCommand):
|
||||
"""
|
||||
Open an HTTP Connection. This may not actually open a connection, but return an existing HTTP connection instead.
|
||||
@ -43,6 +43,9 @@ class GetHttpConnection(HttpCommand):
|
||||
tls: bool
|
||||
via: typing.Optional[server_spec.ServerSpec]
|
||||
|
||||
def __hash__(self):
|
||||
return id(self)
|
||||
|
||||
def connection_spec_matches(self, connection: Connection) -> bool:
|
||||
return (
|
||||
isinstance(connection, Server)
|
||||
@ -106,6 +109,15 @@ class HttpStream(layer.Layer):
|
||||
self.client_state = self.state_uninitialized
|
||||
self.server_state = self.state_uninitialized
|
||||
|
||||
def __repr__(self):
|
||||
return (
|
||||
f"HttpStream("
|
||||
f"id={self.stream_id}, "
|
||||
f"client_state={self.client_state.__name__}, "
|
||||
f"server_state={self.server_state.__name__}"
|
||||
f")"
|
||||
)
|
||||
|
||||
@expect(events.Start, HttpEvent)
|
||||
def _handle_event(self, event: events.Event) -> layer.CommandGenerator[None]:
|
||||
if isinstance(event, events.Start):
|
||||
@ -506,10 +518,14 @@ class HttpLayer(layer.Layer):
|
||||
if reuse:
|
||||
for connection in self.connections:
|
||||
# see "tricky multiplexing edge case" in make_http_connection for an explanation
|
||||
not_h2_to_h1 = connection.alpn == b"h2" or self.context.client.alpn != b"h2"
|
||||
conn_is_pending_or_h2 = (
|
||||
connection.alpn == b"h2"
|
||||
or connection in self.waiting_for_establishment
|
||||
)
|
||||
h2_to_h1 = self.context.client.alpn == b"h2" and not conn_is_pending_or_h2
|
||||
connection_suitable = (
|
||||
event.connection_spec_matches(connection) and
|
||||
not_h2_to_h1
|
||||
event.connection_spec_matches(connection)
|
||||
and not h2_to_h1
|
||||
)
|
||||
if connection_suitable:
|
||||
if connection in self.waiting_for_establishment:
|
||||
|
@ -66,9 +66,9 @@ class Http1Connection(HttpConnection, metaclass=abc.ABCMeta):
|
||||
except h11.ProtocolError as e:
|
||||
yield commands.CloseConnection(self.conn)
|
||||
if is_request:
|
||||
yield ReceiveHttp(RequestProtocolError(self.stream_id, str(e)))
|
||||
yield ReceiveHttp(RequestProtocolError(self.stream_id, f"HTTP/1 protocol error: {e}"))
|
||||
else:
|
||||
yield ReceiveHttp(ResponseProtocolError(self.stream_id, str(e)))
|
||||
yield ReceiveHttp(ResponseProtocolError(self.stream_id, f"HTTP/1 protocol error: {e}"))
|
||||
return
|
||||
|
||||
if h11_event is None:
|
||||
|
@ -1,13 +1,13 @@
|
||||
import time
|
||||
from enum import Enum
|
||||
from typing import ClassVar, Dict, Iterable, List, Optional, Set, Tuple, Type, Union
|
||||
from typing import ClassVar, Dict, Iterable, List, Tuple, Type, Union
|
||||
|
||||
import h2.connection
|
||||
import h2.config
|
||||
import h2.connection
|
||||
import h2.errors
|
||||
import h2.events
|
||||
import h2.exceptions
|
||||
import h2.settings
|
||||
import h2.errors
|
||||
import h2.utilities
|
||||
|
||||
from mitmproxy import http
|
||||
@ -17,11 +17,12 @@ from mitmproxy.utils import human
|
||||
from . import RequestData, RequestEndOfMessage, RequestHeaders, RequestProtocolError, ResponseData, \
|
||||
ResponseEndOfMessage, ResponseHeaders, ResponseProtocolError
|
||||
from ._base import HttpConnection, HttpEvent, ReceiveHttp
|
||||
from ._http_h2 import BufferedH2Connection, H2ConnectionLogger
|
||||
from ._http_h2 import BufferedH2Connection
|
||||
from ...commands import CloseConnection, Log, SendData
|
||||
from ...context import Connection, Context
|
||||
from ...events import ConnectionClosed, DataReceived, Event, Start
|
||||
from ...layer import CommandGenerator
|
||||
from ...utils import expect
|
||||
|
||||
|
||||
class StreamState(Enum):
|
||||
@ -84,7 +85,11 @@ class Http2Connection(HttpConnection):
|
||||
events = [e]
|
||||
|
||||
for h2_event in events:
|
||||
if self.debug:
|
||||
yield Log(f"{self.debug}[h2] {h2_event}", "debug")
|
||||
if (yield from self.handle_h2_event(h2_event)):
|
||||
if self.debug:
|
||||
yield Log(f"{self.debug}[h2] done", "debug")
|
||||
return
|
||||
|
||||
data_to_send = self.h2_conn.data_to_send()
|
||||
@ -123,12 +128,22 @@ class Http2Connection(HttpConnection):
|
||||
elif isinstance(event, h2.events.ConnectionTerminated):
|
||||
yield from self.close_connection(f"HTTP/2 connection closed: {event!r}")
|
||||
return True
|
||||
# The implementation above isn't really ideal, we should probably only terminate streams > last_stream_id?
|
||||
# We currently lack a mechanism to signal that connections are still active but cannot be reused.
|
||||
# for stream_id in self.streams:
|
||||
# if stream_id > event.last_stream_id:
|
||||
# yield ReceiveHttp(self.ReceiveProtocolError(stream_id, f"HTTP/2 connection closed: {event!r}"))
|
||||
# self.streams.pop(stream_id)
|
||||
elif isinstance(event, h2.events.RemoteSettingsChanged):
|
||||
pass
|
||||
elif isinstance(event, h2.events.SettingsAcknowledged):
|
||||
pass
|
||||
elif isinstance(event, h2.events.PriorityUpdated):
|
||||
pass
|
||||
elif isinstance(event, h2.events.PingReceived):
|
||||
pass
|
||||
elif isinstance(event, h2.events.PingAckReceived):
|
||||
pass
|
||||
elif isinstance(event, h2.events.TrailersReceived):
|
||||
yield Log("Received HTTP/2 trailers, which are currently unimplemented and silently discarded", "error")
|
||||
elif isinstance(event, h2.events.PushedStreamReceived):
|
||||
@ -155,6 +170,12 @@ class Http2Connection(HttpConnection):
|
||||
for stream_id in self.streams:
|
||||
# noinspection PyArgumentList
|
||||
yield ReceiveHttp(self.ReceiveProtocolError(stream_id, msg))
|
||||
self.streams.clear()
|
||||
self._handle_event = self.done
|
||||
|
||||
@expect(DataReceived, HttpEvent, ConnectionClosed)
|
||||
def done(self, _) -> CommandGenerator[None]:
|
||||
yield from ()
|
||||
|
||||
|
||||
def normalize_h1_headers(headers: List[Tuple[bytes, bytes]], is_client: bool) -> List[Tuple[bytes, bytes]]:
|
||||
@ -247,7 +268,9 @@ class Http2Client(Http2Connection):
|
||||
super().__init__(context, context.server)
|
||||
# Disable HTTP/2 push for now to keep things simple.
|
||||
# don't send here, that is done as part of initiate_connection().
|
||||
self.h2_conn.local_settings.enable_push = False
|
||||
self.h2_conn.local_settings.enable_push = 0
|
||||
# hyper-h2 pitfall: we need to acknowledge here, otherwise its sends out the old settings.
|
||||
self.h2_conn.local_settings.acknowledge()
|
||||
|
||||
def _handle_event(self, event: Event) -> CommandGenerator[None]:
|
||||
if isinstance(event, RequestHeaders):
|
||||
|
@ -60,14 +60,24 @@ class BufferedH2Connection(h2.connection.H2Connection):
|
||||
data = data[self.max_outbound_frame_size:]
|
||||
frame_size -= len(chunk_data)
|
||||
|
||||
available_window = self.local_flow_control_window(stream_id)
|
||||
if frame_size <= available_window:
|
||||
super().send_data(stream_id, data, end_stream)
|
||||
else:
|
||||
# We can't send right now, so we buffer.
|
||||
if self.stream_buffers.get(stream_id, None):
|
||||
# We already have some data buffered, let's append.
|
||||
self.stream_buffers[stream_id].append(
|
||||
SendH2Data(data, end_stream)
|
||||
)
|
||||
else:
|
||||
available_window = self.local_flow_control_window(stream_id)
|
||||
if frame_size <= available_window:
|
||||
super().send_data(stream_id, data, end_stream)
|
||||
else:
|
||||
if available_window:
|
||||
can_send_now = data[:available_window]
|
||||
super().send_data(stream_id, can_send_now, end_stream=False)
|
||||
data = data[available_window:]
|
||||
# We can't send right now, so we buffer.
|
||||
self.stream_buffers[stream_id].append(
|
||||
SendH2Data(data, end_stream)
|
||||
)
|
||||
|
||||
def receive_data(self, data: bytes):
|
||||
events = super().receive_data(data)
|
||||
|
@ -220,7 +220,8 @@ class _TLSLayer(tunnel.TunnelLayer):
|
||||
)
|
||||
if close:
|
||||
self.conn.state &= ~context.ConnectionState.CAN_READ
|
||||
yield commands.Log(f"TLS close_notify {self.conn}", level="debug")
|
||||
if self.debug:
|
||||
yield commands.Log(f"{self.debug}[tls] close_notify {self.conn}", level="debug")
|
||||
yield from self.event_to_child(
|
||||
events.ConnectionClosed(self.conn)
|
||||
)
|
||||
|
@ -133,9 +133,13 @@ class ConnectionHandler(metaclass=abc.ABCMeta):
|
||||
command.connection.timestamp_start = time.time()
|
||||
reader, writer = await asyncio.open_connection(*command.connection.address)
|
||||
except (IOError, asyncio.CancelledError) as e:
|
||||
self.log(f"error establishing server connection: {e}")
|
||||
command.connection.error = str(e)
|
||||
self.server_event(events.OpenConnectionReply(command, str(e)))
|
||||
if isinstance(e, IOError):
|
||||
err = str(e)
|
||||
else:
|
||||
err = "connection cancelled" # curiously, str(CancelledError()) returns empty string.
|
||||
self.log(f"error establishing server connection: {err}")
|
||||
command.connection.error = err
|
||||
self.server_event(events.OpenConnectionReply(command, err))
|
||||
else:
|
||||
command.connection.timestamp_tcp_setup = time.time()
|
||||
command.connection.state = ConnectionState.OPEN
|
||||
|
@ -1,8 +1,9 @@
|
||||
from typing import Callable, List, Tuple
|
||||
from typing import List, Tuple
|
||||
|
||||
import hpack
|
||||
import hyperframe.frame
|
||||
import pytest
|
||||
from h2.errors import ErrorCodes
|
||||
|
||||
from mitmproxy.http import HTTPFlow
|
||||
from mitmproxy.net.http import Headers
|
||||
@ -203,3 +204,33 @@ def test_split_pseudo_headers(input, pseudo, headers):
|
||||
def test_split_pseudo_headers_err():
|
||||
with pytest.raises(ValueError, match="Duplicate HTTP/2 pseudo header"):
|
||||
split_pseudo_headers([(b":status", b"418"), (b":status", b"418")])
|
||||
|
||||
|
||||
def test_rst_then_close(tctx):
|
||||
"""
|
||||
Test that we properly handle the case of a client that first causes protocol errors and then disconnects.
|
||||
|
||||
Adapted from h2spec http2/5.1/5.
|
||||
"""
|
||||
playbook, cff = start_h2_client(tctx)
|
||||
flow = Placeholder(HTTPFlow)
|
||||
server = Placeholder(Server)
|
||||
open_conn = OpenConnection(server)
|
||||
|
||||
assert (
|
||||
playbook
|
||||
>> DataReceived(tctx.client,
|
||||
cff.build_headers_frame(example_request_headers, flags=["END_STREAM"]).serialize())
|
||||
<< http.HttpRequestHeadersHook(flow)
|
||||
>> reply()
|
||||
<< http.HttpRequestHook(flow)
|
||||
>> reply()
|
||||
<< open_conn
|
||||
>> DataReceived(tctx.client, cff.build_data_frame(b"unexpected data frame").serialize())
|
||||
<< SendData(tctx.client, cff.build_rst_stream_frame(1, ErrorCodes.STREAM_CLOSED).serialize())
|
||||
>> ConnectionClosed(tctx.client)
|
||||
<< CloseConnection(tctx.client)
|
||||
>> reply("connection cancelled", to=open_conn)
|
||||
<< http.HttpErrorHook(flow)
|
||||
>> reply()
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user