mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-23 00:01:36 +00:00
[sans-io] http implementation++
This commit is contained in:
parent
6e1e999c3d
commit
87a4d3efdb
@ -68,3 +68,9 @@ class Context:
|
||||
self.options = options
|
||||
self.server = Server(None)
|
||||
self.layers = []
|
||||
|
||||
def fork(self) -> "Context":
|
||||
ret = Context(self.client, self.options)
|
||||
ret.server = self.server
|
||||
ret.layers = self.layers.copy()
|
||||
return ret
|
||||
|
@ -6,89 +6,108 @@ import h11
|
||||
from h11._readers import ChunkedReader, ContentLengthReader, Http10Reader
|
||||
from h11._receivebuffer import ReceiveBuffer
|
||||
|
||||
from mitmproxy import http
|
||||
from mitmproxy import flow, http
|
||||
from mitmproxy.net.http import http1
|
||||
from mitmproxy.net.http.http1 import read_sansio as http1_sansio
|
||||
from mitmproxy.proxy.protocol.http import HTTPMode
|
||||
from mitmproxy.proxy2 import commands, events
|
||||
from mitmproxy.proxy2.context import Connection, Context, Server, Client
|
||||
from mitmproxy.proxy2.context import Client, Connection, Context, Server
|
||||
from mitmproxy.proxy2.layer import Layer, NextLayer
|
||||
from mitmproxy.proxy2.layers.tls import EstablishServerTLS, EstablishServerTLSReply
|
||||
from mitmproxy.proxy2.utils import expect
|
||||
from mitmproxy.utils import human
|
||||
|
||||
# FIXME: Combine HttpEvent and HttpCommand?
|
||||
|
||||
StreamIdentifier = int
|
||||
StreamId = int
|
||||
|
||||
|
||||
class HttpEvent(events.Event):
|
||||
flow: http.HTTPFlow
|
||||
stream_id: StreamId
|
||||
|
||||
# we need flow identifiers on every event to avoid race conditions
|
||||
# we need stream ids on every event to avoid race conditions
|
||||
|
||||
def __init__(self, flow: http.HTTPFlow):
|
||||
self.flow = flow
|
||||
def __init__(self, stream_id: StreamId):
|
||||
self.stream_id = stream_id
|
||||
|
||||
def __repr__(self):
|
||||
def __repr__(self) -> str:
|
||||
x = self.__dict__.copy()
|
||||
x.pop("flow", None)
|
||||
x.pop("stream_id")
|
||||
return f"{type(self).__name__}({repr(x) if x else ''})"
|
||||
|
||||
|
||||
class HttpCommand(commands.ConnectionCommand):
|
||||
class HttpCommand(commands.Command):
|
||||
pass
|
||||
|
||||
|
||||
class OpenHttpConnection(HttpCommand):
|
||||
class GetHttpConnection(HttpCommand):
|
||||
"""
|
||||
Open a HTTP Connection. This may not actually open a connection, but return an existing HTTP connection instead.
|
||||
"""
|
||||
blocking = True
|
||||
address: typing.Tuple[str, int]
|
||||
tls: bool
|
||||
|
||||
def __init__(self, address: typing.Tuple[str, int], tls: bool):
|
||||
self.address = address
|
||||
self.tls = tls
|
||||
|
||||
def connection_spec_matches(self, connection: Connection) -> bool:
|
||||
return (
|
||||
self.address == connection.address
|
||||
and
|
||||
self.tls == connection.tls
|
||||
)
|
||||
|
||||
|
||||
class OpenHttpConnectionReply(events.CommandReply):
|
||||
command: OpenHttpConnection
|
||||
class GetHttpConnectionReply(events.CommandReply):
|
||||
command: GetHttpConnection
|
||||
reply: typing.Optional[str]
|
||||
"""error message"""
|
||||
|
||||
|
||||
class SendHttp(HttpCommand):
|
||||
connection: Connection
|
||||
event: HttpEvent
|
||||
|
||||
def __init__(self, event: HttpEvent, connection: Connection):
|
||||
super().__init__(connection)
|
||||
self.connection = connection
|
||||
self.event = event
|
||||
|
||||
def __repr__(self):
|
||||
def __repr__(self) -> str:
|
||||
return f"Send({self.event})"
|
||||
|
||||
|
||||
HttpEventGenerator = typing.Iterator[HttpEvent]
|
||||
|
||||
|
||||
# HttpCommandGenerator = typing.Generator[commands.Command, typing.Any, None]
|
||||
|
||||
|
||||
class RequestHeaders(HttpEvent):
|
||||
pass
|
||||
request: http.HTTPRequest
|
||||
|
||||
def __init__(self, request: http.HTTPRequest, stream_id: StreamId):
|
||||
super().__init__(stream_id)
|
||||
self.request = request
|
||||
|
||||
|
||||
class ResponseHeaders(HttpEvent):
|
||||
pass
|
||||
response: http.HTTPResponse
|
||||
|
||||
def __init__(self, response: http.HTTPResponse, stream_id: StreamId):
|
||||
super().__init__(stream_id)
|
||||
self.response = response
|
||||
|
||||
|
||||
class RequestData(HttpEvent):
|
||||
data: bytes
|
||||
|
||||
def __init__(self, data: bytes, flow: http.HTTPFlow):
|
||||
super().__init__(flow)
|
||||
def __init__(self, data: bytes, stream_id: StreamId):
|
||||
super().__init__(stream_id)
|
||||
self.data = data
|
||||
|
||||
|
||||
class ResponseData(HttpEvent):
|
||||
data: bytes
|
||||
|
||||
def __init__(self, data: bytes, flow: http.HTTPFlow):
|
||||
super().__init__(flow)
|
||||
def __init__(self, data: bytes, stream_id: StreamId):
|
||||
super().__init__(stream_id)
|
||||
self.data = data
|
||||
|
||||
|
||||
@ -105,7 +124,9 @@ TBodyReader = typing.Union[ChunkedReader, Http10Reader, ContentLengthReader]
|
||||
|
||||
class Http1Connection(ABC):
|
||||
conn: Connection
|
||||
flow: http.HTTPFlow = None
|
||||
stream_id: StreamId = None
|
||||
request: http.HTTPRequest
|
||||
response: http.HTTPResponse
|
||||
state: typing.Callable[[events.Event], HttpEventGenerator]
|
||||
body_reader: TBodyReader
|
||||
buf: ReceiveBuffer
|
||||
@ -147,11 +168,16 @@ class Http1Connection(ABC):
|
||||
if h11_event is None:
|
||||
return
|
||||
elif isinstance(h11_event, h11.Data):
|
||||
Data = RequestData if is_request else ResponseData
|
||||
yield Data(bytes(h11_event.data), self.flow)
|
||||
h11_event.data: bytearray # type checking
|
||||
if is_request:
|
||||
yield RequestData(bytes(h11_event.data), self.stream_id)
|
||||
else:
|
||||
yield ResponseData(bytes(h11_event.data), self.stream_id)
|
||||
elif isinstance(h11_event, h11.EndOfMessage):
|
||||
EndOfMessage = RequestEndOfMessage if is_request else ResponseEndOfMessage
|
||||
yield EndOfMessage(self.flow)
|
||||
if is_request:
|
||||
yield RequestEndOfMessage(self.stream_id)
|
||||
else:
|
||||
yield ResponseEndOfMessage(self.stream_id)
|
||||
return
|
||||
|
||||
def wait(self, event: events.Event) -> HttpEventGenerator:
|
||||
@ -175,31 +201,35 @@ class Http1Server(Http1Connection):
|
||||
def __init__(self, conn: Client):
|
||||
super().__init__(conn)
|
||||
self.state = self.read_request_headers
|
||||
self.stream_id = -1
|
||||
|
||||
def send(self, event: HttpEvent) -> commands.TCommandGenerator:
|
||||
if isinstance(event, ResponseHeaders):
|
||||
raw = http1.assemble_response_head(event.flow.response)
|
||||
if self.flow.request.first_line_format == "authority":
|
||||
self.response = event.response
|
||||
raw = http1.assemble_response_head(event.response)
|
||||
if self.request.first_line_format == "authority":
|
||||
assert self.state == self.wait
|
||||
self.body_reader = self.make_body_reader(-1)
|
||||
self.state = self.read_request_body
|
||||
yield from self.state(events.DataReceived(self.conn, b""))
|
||||
elif isinstance(event, ResponseData):
|
||||
if "chunked" in event.flow.response.headers.get("transfer-encoding", "").lower():
|
||||
if "chunked" in self.response.headers.get("transfer-encoding", "").lower():
|
||||
raw = b"%x\r\n%s\r\n" % (len(event.data), event.data)
|
||||
else:
|
||||
raw = event.data
|
||||
elif isinstance(event, ResponseEndOfMessage):
|
||||
if "chunked" in event.flow.response.headers.get("transfer-encoding", "").lower():
|
||||
if "chunked" in self.response.headers.get("transfer-encoding", "").lower():
|
||||
raw = b"0\r\n\r\n"
|
||||
else:
|
||||
raw = False
|
||||
if http1.expected_http_body_size(self.flow.request, self.flow.response) == -1:
|
||||
elif http1.expected_http_body_size(self.request, self.response) == -1:
|
||||
yield commands.CloseConnection(self.conn)
|
||||
return
|
||||
else:
|
||||
self.state = self.read_request_headers
|
||||
yield from self.state(events.DataReceived(self.conn, b""))
|
||||
raw = False
|
||||
self.request = None
|
||||
self.response = None
|
||||
self.stream_id += 2
|
||||
self.state = self.read_request_headers
|
||||
yield from self.state(events.DataReceived(self.conn, b""))
|
||||
else:
|
||||
raise NotImplementedError(f"{event}")
|
||||
|
||||
@ -211,22 +241,17 @@ class Http1Server(Http1Connection):
|
||||
request_head = self.buf.maybe_extract_lines()
|
||||
if request_head:
|
||||
request_head = [bytes(x) for x in request_head] # TODO: Make url.parse compatible with bytearrays
|
||||
request = http.HTTPRequest.wrap(http1_sansio.read_request_head(request_head))
|
||||
self.flow = http.HTTPFlow(
|
||||
self.conn,
|
||||
None,
|
||||
)
|
||||
self.flow.request = request
|
||||
yield RequestHeaders(self.flow)
|
||||
self.request = http.HTTPRequest.wrap(http1_sansio.read_request_head(request_head))
|
||||
yield RequestHeaders(self.request, self.stream_id)
|
||||
|
||||
if self.flow.request.first_line_format == "authority":
|
||||
if self.request.first_line_format == "authority":
|
||||
# The previous proxy server implementation tried to read the request body here:
|
||||
# https://github.com/mitmproxy/mitmproxy/blob/45e3ae0f9cb50b0edbf4180fd969ea99d40bdf7b/mitmproxy/proxy/protocol/http.py#L251-L255
|
||||
# We don't do this to be compliant with the h2 spec:
|
||||
# https://http2.github.io/http2-spec/#CONNECT
|
||||
self.state = self.wait
|
||||
else:
|
||||
expected_size = http1.expected_http_body_size(self.flow.request)
|
||||
expected_size = http1.expected_http_body_size(self.request)
|
||||
self.body_reader = self.make_body_reader(expected_size)
|
||||
self.state = self.read_request_body
|
||||
yield from self.state(event)
|
||||
@ -254,10 +279,11 @@ class Http1Client(Http1Connection):
|
||||
self.send_queue = []
|
||||
|
||||
def send(self, event: HttpEvent) -> commands.TCommandGenerator:
|
||||
if not self.flow:
|
||||
if not self.stream_id:
|
||||
assert isinstance(event, RequestHeaders)
|
||||
self.flow = event.flow
|
||||
if self.flow != event.flow:
|
||||
self.stream_id = event.stream_id
|
||||
self.request = event.request
|
||||
if self.stream_id != event.stream_id:
|
||||
# Assuming an h2 server, we may have multiple Streams that try to send requests
|
||||
# over a single h1 connection. To keep things relatively simple, we don't do any HTTP/1 pipelining
|
||||
# but keep a queue of still-to-send requests.
|
||||
@ -265,15 +291,19 @@ class Http1Client(Http1Connection):
|
||||
return
|
||||
|
||||
if isinstance(event, RequestHeaders):
|
||||
raw = http1.assemble_request_head(event.flow.request)
|
||||
raw = http1.assemble_request_head(event.request)
|
||||
elif isinstance(event, RequestData):
|
||||
if "chunked" in event.flow.request.headers.get("transfer-encoding", "").lower():
|
||||
if "chunked" in self.request.headers.get("transfer-encoding", "").lower():
|
||||
raw = b"%x\r\n%s\r\n" % (len(event.data), event.data)
|
||||
else:
|
||||
raw = event.data
|
||||
elif isinstance(event, RequestEndOfMessage):
|
||||
if "chunked" in event.flow.request.headers.get("transfer-encoding", "").lower():
|
||||
if "chunked" in self.request.headers.get("transfer-encoding", "").lower():
|
||||
raw = b"0\r\n\r\n"
|
||||
elif http1.expected_http_body_size(self.request) == -1:
|
||||
assert not self.send_queue
|
||||
yield commands.CloseConnection(self.conn)
|
||||
return
|
||||
else:
|
||||
raw = False
|
||||
else:
|
||||
@ -288,16 +318,16 @@ class Http1Client(Http1Connection):
|
||||
|
||||
if response_head:
|
||||
response_head = [bytes(x) for x in response_head]
|
||||
self.flow.response = http.HTTPResponse.wrap(http1_sansio.read_response_head(response_head))
|
||||
yield ResponseHeaders(self.flow)
|
||||
self.response = http.HTTPResponse.wrap(http1_sansio.read_response_head(response_head))
|
||||
yield ResponseHeaders(self.response, self.stream_id)
|
||||
|
||||
expected_size = http1.expected_http_body_size(self.flow.request, self.flow.response)
|
||||
expected_size = http1.expected_http_body_size(self.request, self.response)
|
||||
self.body_reader = self.make_body_reader(expected_size)
|
||||
|
||||
self.state = self.read_response_body
|
||||
yield from self.state(event)
|
||||
elif isinstance(event, events.ConnectionClosed):
|
||||
if self.flow:
|
||||
if self.stream_id:
|
||||
raise NotImplementedError(f"{event}")
|
||||
else:
|
||||
return
|
||||
@ -309,7 +339,9 @@ class Http1Client(Http1Connection):
|
||||
yield e
|
||||
if isinstance(e, ResponseEndOfMessage):
|
||||
self.state = self.read_response_headers
|
||||
self.flow = None
|
||||
self.stream_id = None
|
||||
self.request = None
|
||||
self.response = None
|
||||
if self.send_queue:
|
||||
events = self.send_queue
|
||||
self.send_queue = []
|
||||
@ -321,6 +353,7 @@ class HttpStream(Layer):
|
||||
request_body_buf: bytes
|
||||
response_body_buf: bytes
|
||||
flow: http.HTTPFlow
|
||||
stream_id: StreamId
|
||||
child_layer: typing.Optional[Layer] = None
|
||||
|
||||
@property
|
||||
@ -341,13 +374,17 @@ class HttpStream(Layer):
|
||||
|
||||
@expect(RequestHeaders)
|
||||
def read_request_headers(self, event: RequestHeaders) -> commands.TCommandGenerator:
|
||||
self.flow = event.flow
|
||||
self.stream_id = event.stream_id
|
||||
self.flow = http.HTTPFlow(
|
||||
self.context.client,
|
||||
self.context.server
|
||||
)
|
||||
self.flow.request = event.request
|
||||
|
||||
if self.flow.request.first_line_format == "authority":
|
||||
yield from self.handle_connect()
|
||||
return
|
||||
else:
|
||||
if self.context.server:
|
||||
self.flow.server_conn = self.context.server
|
||||
yield commands.Hook("requestheaders", self.flow)
|
||||
|
||||
if self.flow.request.headers.get("expect", "").lower() == "100-continue":
|
||||
@ -372,28 +409,26 @@ class HttpStream(Layer):
|
||||
def handle_connect(self) -> commands.TCommandGenerator:
|
||||
yield commands.Hook("http_connect", self.flow)
|
||||
|
||||
self.context.server.address = (self.flow.request.host, self.flow.request.port)
|
||||
self.context.server = Server((self.flow.request.host, self.flow.request.port))
|
||||
if self.context.options.connection_strategy == "eager":
|
||||
err = yield commands.OpenConnection(self.context.server)
|
||||
if err:
|
||||
self.flow.response = http.HTTPResponse.make(
|
||||
502, f"Cannot connect to {human.format_address(self.context.server.address)}: {err}"
|
||||
)
|
||||
else:
|
||||
raise NotImplementedError("<insert lazy joke here>")
|
||||
|
||||
if not self.flow.response:
|
||||
self.flow.response = http.make_connect_response(self.flow.request.data.http_version)
|
||||
|
||||
yield SendHttp(ResponseHeaders(self.flow), self.context.client)
|
||||
yield SendHttp(ResponseHeaders(self.flow.response, self.stream_id), self.context.client)
|
||||
|
||||
if 200 <= self.flow.response.status_code < 300:
|
||||
self.child_layer = NextLayer(self.context)
|
||||
yield from self.child_layer.handle_event(events.Start())
|
||||
self._handle_event = self.passthrough
|
||||
else:
|
||||
yield SendHttp(ResponseData(self.flow.response.data.content, self.flow), self.context.client)
|
||||
yield SendHttp(ResponseEndOfMessage(self.flow), self.context.client)
|
||||
yield SendHttp(ResponseData(self.flow.response.data.content, self.stream_id), self.context.client)
|
||||
yield SendHttp(ResponseEndOfMessage(self.stream_id), self.context.client)
|
||||
|
||||
@expect(RequestData, RequestEndOfMessage, events.Event)
|
||||
def passthrough(self, event: events.Event) -> commands.TCommandGenerator:
|
||||
@ -406,9 +441,9 @@ class HttpStream(Layer):
|
||||
for command in self.child_layer.handle_event(event):
|
||||
# normal connection events -> HTTP events
|
||||
if isinstance(command, commands.SendData) and command.connection == self.context.client:
|
||||
yield SendHttp(ResponseData(command.data, self.flow), self.context.client)
|
||||
yield SendHttp(ResponseData(command.data, self.stream_id), self.context.client)
|
||||
elif isinstance(command, commands.CloseConnection) and command.connection == self.context.client:
|
||||
yield SendHttp(ResponseEndOfMessage(self.flow), self.context.client)
|
||||
yield SendHttp(ResponseEndOfMessage(self.stream_id), self.context.client)
|
||||
elif isinstance(command, commands.OpenConnection) and command.connection == self.context.server:
|
||||
yield from self.passthrough(events.OpenConnectionReply(command, None))
|
||||
else:
|
||||
@ -430,7 +465,6 @@ class HttpStream(Layer):
|
||||
# scheme. For relative-form requests, we need to determine host and
|
||||
# port as well.
|
||||
if self.mode is HTTPMode.transparent:
|
||||
# FIXME
|
||||
# Setting request.host also updates the host header, which we want
|
||||
# to preserve
|
||||
host_header = self.flow.request.host_header
|
||||
@ -446,22 +480,28 @@ class HttpStream(Layer):
|
||||
yield commands.Hook("responseheaders", self.flow)
|
||||
yield from self.handle_response()
|
||||
else:
|
||||
# FIXME wrong location
|
||||
self.context.server.address = (self.flow.request.host, self.flow.request.port)
|
||||
err = yield OpenHttpConnection(self.context.server)
|
||||
connection, err = yield GetHttpConnection(
|
||||
(self.flow.request.host, self.flow.request.port),
|
||||
self.flow.request.scheme == "https"
|
||||
)
|
||||
if err:
|
||||
raise NotImplementedError
|
||||
yield SendHttp(RequestHeaders(self.flow), self.context.server)
|
||||
yield from self.send_error_response(502, err)
|
||||
self.flow.error = flow.Error(err)
|
||||
yield commands.Hook("error", self.flow)
|
||||
return
|
||||
|
||||
yield SendHttp(RequestHeaders(self.flow.request, self.stream_id), connection)
|
||||
|
||||
if self.flow.request.stream:
|
||||
raise NotImplementedError
|
||||
else:
|
||||
yield SendHttp(RequestData(self.flow.request.data.content, self.flow), self.context.server)
|
||||
yield SendHttp(RequestEndOfMessage(self.flow), self.context.server)
|
||||
yield SendHttp(RequestData(self.flow.request.data.content, self.stream_id), connection)
|
||||
yield SendHttp(RequestEndOfMessage(self.stream_id), connection)
|
||||
self._handle_event = self.read_response_headers
|
||||
|
||||
@expect(ResponseHeaders)
|
||||
def read_response_headers(self, event: events.Event) -> commands.TCommandGenerator:
|
||||
def read_response_headers(self, event: ResponseHeaders) -> commands.TCommandGenerator:
|
||||
self.flow.response = event.response
|
||||
yield commands.Hook("responseheaders", self.flow)
|
||||
if not self.flow.response.stream:
|
||||
self._handle_event = self.read_response_body
|
||||
@ -479,13 +519,19 @@ class HttpStream(Layer):
|
||||
|
||||
def handle_response(self):
|
||||
yield commands.Hook("response", self.flow)
|
||||
yield SendHttp(ResponseHeaders(self.flow), self.context.client)
|
||||
yield SendHttp(ResponseHeaders(self.flow.response, self.stream_id), self.context.client)
|
||||
|
||||
if self.flow.response.stream:
|
||||
raise NotImplementedError
|
||||
else:
|
||||
yield SendHttp(ResponseData(self.flow.response.data.content, self.flow), self.context.client)
|
||||
yield SendHttp(ResponseEndOfMessage(self.flow), self.context.client)
|
||||
yield SendHttp(ResponseData(self.flow.response.data.content, self.stream_id), self.context.client)
|
||||
yield SendHttp(ResponseEndOfMessage(self.stream_id), self.context.client)
|
||||
|
||||
def send_error_response(self, status_code: int, message: str, headers=None):
|
||||
response = http.make_error_response(status_code, message, headers)
|
||||
yield SendHttp(ResponseHeaders(response, self.stream_id), self.context.client)
|
||||
yield SendHttp(ResponseData(response.data.content, self.stream_id), self.context.client)
|
||||
yield SendHttp(ResponseEndOfMessage(self.stream_id), self.context.client)
|
||||
|
||||
|
||||
class HTTPLayer(Layer):
|
||||
@ -501,7 +547,7 @@ class HTTPLayer(Layer):
|
||||
stream_by_command: typing.Dict[commands.Command, HttpStream]
|
||||
streams: typing.Dict[int, HttpStream]
|
||||
connections: typing.Dict[Connection, typing.Union[Http1Connection, HttpStream]]
|
||||
waiting_for_connection: typing.DefaultDict[Connection, typing.List[OpenHttpConnection]]
|
||||
waiting_for_connection: typing.DefaultDict[Connection, typing.List[GetHttpConnection]]
|
||||
event_queue: typing.Deque[
|
||||
typing.Union[HttpEvent, HttpCommand, commands.Command]
|
||||
]
|
||||
@ -510,29 +556,34 @@ class HTTPLayer(Layer):
|
||||
super().__init__(context)
|
||||
self.mode = mode
|
||||
|
||||
self.connections = {
|
||||
context.client: Http1Server(context.client)
|
||||
}
|
||||
self.waiting_for_connection = collections.defaultdict(list)
|
||||
self.streams = {}
|
||||
self.stream_by_command = {}
|
||||
self.event_queue = collections.deque()
|
||||
|
||||
self.connections = {
|
||||
context.client: Http1Server(context.client)
|
||||
}
|
||||
if self.context.server.connected:
|
||||
self.make_http_connection(self.context.server)
|
||||
|
||||
def __repr__(self):
|
||||
return f"HTTPLayer(conns: {len(self.connections)}, events: {[type(e).__name__ for e in self.event_queue]})"
|
||||
|
||||
def _handle_event(self, event: events.Event):
|
||||
if isinstance(event, events.Start):
|
||||
return
|
||||
elif isinstance(event, events.OpenConnectionReply) and event.command.connection in self.waiting_for_connection:
|
||||
if event.command.connection.tls:
|
||||
new_command = EstablishServerTLS(event.command.connection)
|
||||
new_command.blocking = object()
|
||||
yield new_command
|
||||
elif isinstance(event, (EstablishServerTLSReply, events.OpenConnectionReply)) and \
|
||||
event.command.connection in self.waiting_for_connection:
|
||||
if event.reply:
|
||||
waiting = self.waiting_for_connection.pop(event.command.connection)
|
||||
for cmd in waiting:
|
||||
stream = self.stream_by_command.pop(cmd)
|
||||
self.event_to_child(stream, GetHttpConnectionReply(cmd, (None, event.reply)))
|
||||
else:
|
||||
self.make_http_connection(event.command.connection)
|
||||
yield from self.make_http_connection(event.command.connection)
|
||||
elif isinstance(event, EstablishServerTLSReply) and event.command.connection in self.waiting_for_connection:
|
||||
self.make_http_connection(event.command.connection)
|
||||
yield from self.make_http_connection(event.command.connection)
|
||||
elif isinstance(event, events.CommandReply):
|
||||
try:
|
||||
stream = self.stream_by_command.pop(event.command)
|
||||
@ -550,36 +601,52 @@ class HTTPLayer(Layer):
|
||||
while self.event_queue:
|
||||
event = self.event_queue.popleft()
|
||||
if isinstance(event, RequestHeaders):
|
||||
self.streams[event.flow.id] = self.make_stream()
|
||||
self.streams[event.stream_id] = self.make_stream()
|
||||
if isinstance(event, HttpEvent):
|
||||
stream = self.streams[event.flow.id]
|
||||
stream = self.streams[event.stream_id]
|
||||
self.event_to_child(stream, event)
|
||||
elif isinstance(event, SendHttp):
|
||||
conn = self.connections[event.connection]
|
||||
evts = conn.send(event.event)
|
||||
self.event_queue.extend(evts)
|
||||
elif isinstance(event, OpenHttpConnection):
|
||||
if event.connection in self.connections:
|
||||
stream = self.stream_by_command.pop(event)
|
||||
self.event_to_child(stream, OpenHttpConnectionReply(event, None))
|
||||
else:
|
||||
if event.connection not in self.waiting_for_connection:
|
||||
open_command = commands.OpenConnection(event.connection)
|
||||
open_command.blocking = object()
|
||||
self.event_queue.append(open_command)
|
||||
self.waiting_for_connection[event.connection].append(event)
|
||||
elif isinstance(event, GetHttpConnection):
|
||||
yield from self.get_connection(event)
|
||||
elif isinstance(event, commands.Command):
|
||||
yield event
|
||||
else:
|
||||
raise ValueError(f"Unexpected event: {event}")
|
||||
|
||||
def make_stream(self) -> HttpStream:
|
||||
ctx = Context(
|
||||
self.context.client,
|
||||
self.context.options
|
||||
def get_connection(self, event: GetHttpConnection):
|
||||
# Do we already have a connection?
|
||||
for connection, handler in self.connections.items():
|
||||
if event.connection_spec_matches(connection) and isinstance(handler, Http1Client):
|
||||
stream = self.stream_by_command.pop(event)
|
||||
self.event_to_child(stream, GetHttpConnectionReply(event, (connection, None)))
|
||||
return
|
||||
# Are we waiting for one?
|
||||
for connection in self.waiting_for_connection:
|
||||
if event.connection_spec_matches(connection):
|
||||
self.waiting_for_connection[connection].append(event)
|
||||
return
|
||||
# Can we reuse context.server?
|
||||
can_reuse_context_connection = (
|
||||
self.context.server.connected and
|
||||
self.context.server.tls == event.tls
|
||||
)
|
||||
ctx.server = self.context.server
|
||||
ctx.layers = self.context.layers.copy()
|
||||
if can_reuse_context_connection:
|
||||
self.waiting_for_connection[self.context.server].append(event)
|
||||
yield from self.make_http_connection(self.context.server)
|
||||
# We need a new one.
|
||||
else:
|
||||
connection = Server(event.address)
|
||||
connection.tls = event.tls
|
||||
self.waiting_for_connection[connection].append(event)
|
||||
open_command = commands.OpenConnection(connection)
|
||||
open_command.blocking = object()
|
||||
yield open_command
|
||||
|
||||
def make_stream(self) -> HttpStream:
|
||||
ctx = self.context.fork()
|
||||
|
||||
stream = HttpStream(ctx)
|
||||
if self.debug:
|
||||
@ -587,7 +654,13 @@ class HTTPLayer(Layer):
|
||||
self.event_to_child(stream, events.Start())
|
||||
return stream
|
||||
|
||||
def make_http_connection(self, connection: Server):
|
||||
def make_http_connection(self, connection: Server) -> None:
|
||||
if connection.tls and not connection.tls_established:
|
||||
new_command = EstablishServerTLS(connection)
|
||||
new_command.blocking = object()
|
||||
yield new_command
|
||||
return
|
||||
|
||||
if connection.tls_established and connection.alpn == b"h2":
|
||||
raise NotImplementedError
|
||||
else:
|
||||
@ -596,7 +669,7 @@ class HTTPLayer(Layer):
|
||||
waiting = self.waiting_for_connection.pop(connection)
|
||||
for cmd in waiting:
|
||||
stream = self.stream_by_command.pop(cmd)
|
||||
self.event_to_child(stream, OpenHttpConnectionReply(cmd, None)) # TODO: Error handling.
|
||||
self.event_to_child(stream, GetHttpConnectionReply(cmd, (connection, None)))
|
||||
|
||||
def event_to_child(
|
||||
self,
|
||||
|
@ -25,10 +25,11 @@ class TCPLayer(Layer):
|
||||
yield commands.Hook("tcp_start", self.flow)
|
||||
|
||||
if not self.context.server.connected:
|
||||
err = yield commands.OpenConnection(self.context.server)
|
||||
if err:
|
||||
try:
|
||||
yield commands.OpenConnection(self.context.server)
|
||||
except IOError as e:
|
||||
if not self.ignore:
|
||||
self.flow.error = flow.Error(err)
|
||||
self.flow.error = flow.Error(str(e))
|
||||
yield commands.Hook("tcp_error", self.flow)
|
||||
yield commands.CloseConnection(self.context.client)
|
||||
self._handle_event = self.done
|
||||
|
@ -279,7 +279,6 @@ class ClientTLSLayer(_TLSLayer):
|
||||
client.alpn_offers = client_hello.alpn_protocols
|
||||
|
||||
client_tls_requires_server_connection = (
|
||||
self.context.server and
|
||||
self.context.server.tls and
|
||||
self.context.options.upstream_cert and
|
||||
(
|
||||
@ -372,7 +371,8 @@ class ClientTLSLayer(_TLSLayer):
|
||||
except SSL.ZeroReturnError:
|
||||
yield commands.Log(
|
||||
f"Client TLS Handshake failed. "
|
||||
f"The client may not trust the proxy's certificate (SNI: {self.context.client.sni})."
|
||||
f"The client may not trust the proxy's certificate (SNI: {self.context.client.sni}).",
|
||||
level="warn"
|
||||
# TODO: Also use other sources than SNI
|
||||
)
|
||||
yield commands.CloseConnection(self.context.client)
|
||||
|
@ -10,6 +10,7 @@ import abc
|
||||
import asyncio
|
||||
import logging
|
||||
import socket
|
||||
import sys
|
||||
import traceback
|
||||
import typing
|
||||
|
||||
@ -54,7 +55,7 @@ class ConnectionHandler(metaclass=abc.ABCMeta):
|
||||
self.server_event(events.Start())
|
||||
await self.handle_connection(self.client)
|
||||
|
||||
self.log("[sans-io] clientdisconnect")
|
||||
self.log("[sans-io] clientdisconnected")
|
||||
|
||||
if self.transports:
|
||||
self.log("[sans-io] closing transports...")
|
||||
@ -91,6 +92,8 @@ class ConnectionHandler(metaclass=abc.ABCMeta):
|
||||
else:
|
||||
if connection.state is ConnectionState.CAN_READ:
|
||||
await self.close_connection(connection)
|
||||
else:
|
||||
connection.state &= ~ConnectionState.CAN_READ
|
||||
self.server_event(events.ConnectionClosed(connection))
|
||||
break
|
||||
|
||||
@ -110,7 +113,7 @@ class ConnectionHandler(metaclass=abc.ABCMeta):
|
||||
command.connection.state = ConnectionState.OPEN
|
||||
self.server_event(events.OpenConnectionReply(command, None))
|
||||
await self.handle_connection(command.connection)
|
||||
self.log("serverdisconnect")
|
||||
self.log("serverdisconnected")
|
||||
|
||||
@abc.abstractmethod
|
||||
async def handle_hook(self, hook: commands.Hook) -> None:
|
||||
@ -121,17 +124,24 @@ class ConnectionHandler(metaclass=abc.ABCMeta):
|
||||
|
||||
def server_event(self, event: events.Event) -> None:
|
||||
try:
|
||||
layer_commands = list(self.layer.handle_event(event))
|
||||
self._server_event(event)
|
||||
except Exception:
|
||||
self.log(f"mitmproxy has crashed!\n{traceback.format_exc()}", level="error")
|
||||
return
|
||||
|
||||
def _server_event(self, event: events.Event) -> None:
|
||||
layer_commands = self.layer.handle_event(event)
|
||||
for command in layer_commands:
|
||||
if isinstance(command, commands.OpenConnection):
|
||||
asyncio.ensure_future(
|
||||
self.open_connection(command)
|
||||
)
|
||||
elif isinstance(command, commands.SendData):
|
||||
self.transports[command.connection].w.write(command.data)
|
||||
try:
|
||||
io = self.transports[command.connection]
|
||||
except KeyError:
|
||||
raise RuntimeError(f"Cannot write to closed connection: {command.connection}")
|
||||
else:
|
||||
io.w.write(command.data)
|
||||
elif isinstance(command, commands.CloseConnection):
|
||||
if command.connection == self.client:
|
||||
asyncio.ensure_future(
|
||||
@ -173,7 +183,7 @@ class SimpleConnectionHandler(ConnectionHandler):
|
||||
|
||||
def log(self, message: str, level: str = "info"):
|
||||
if "Hook" not in message:
|
||||
print(message)
|
||||
print(message, file=sys.stderr if level in ("error", "warn") else sys.stdout)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
@ -181,7 +191,7 @@ if __name__ == "__main__":
|
||||
|
||||
opts = moptions.Options()
|
||||
opts.add_option(
|
||||
"connection_strategy", str, "eager",
|
||||
"connection_strategy", str, "lazy",
|
||||
"Determine when server connections should be established.",
|
||||
choices=("eager", "lazy")
|
||||
)
|
||||
@ -201,8 +211,15 @@ if __name__ == "__main__":
|
||||
nl.layer.debug = " " * len(nl.context.layers)
|
||||
|
||||
def request(flow: http.HTTPFlow):
|
||||
if flow.request.path == "/cached":
|
||||
flow.response = http.HTTPResponse.make(418, flow.request.content)
|
||||
if "cached" in flow.request.path:
|
||||
flow.response = http.HTTPResponse.make(418, f"(cached) {flow.request.text}")
|
||||
if "toggle-tls" in flow.request.path:
|
||||
if flow.request.url.startswith("https://"):
|
||||
flow.request.url = flow.request.url.replace("https://", "http://")
|
||||
else:
|
||||
flow.request.url = flow.request.url.replace("http://", "https://")
|
||||
if "redirect" in flow.request.path:
|
||||
flow.request.url = "https://httpbin.org/robots.txt"
|
||||
|
||||
await SimpleConnectionHandler(reader, writer, opts, {
|
||||
"next_layer": next_layer,
|
||||
|
@ -0,0 +1,29 @@
|
||||
def test_http_proxy():
|
||||
"""Test a simple HTTP GET / request"""
|
||||
|
||||
|
||||
def test_https_proxy_eager():
|
||||
"""Test a CONNECT request, followed by TLS, followed by a HTTP GET /"""
|
||||
|
||||
|
||||
def test_https_proxy_lazy():
|
||||
"""Test a CONNECT request, followed by TLS, followed by a HTTP GET /"""
|
||||
|
||||
|
||||
def test_http_to_https():
|
||||
"""Test a simple HTTP GET request that is being rewritten to HTTPS by an addon."""
|
||||
|
||||
|
||||
def test_http_redirect():
|
||||
"""Test a simple HTTP GET request that redirected to another host"""
|
||||
|
||||
|
||||
def test_multiple_server_connections():
|
||||
"""Test multiple requests being rewritten to different targets."""
|
||||
|
||||
|
||||
def test_http_reply_from_proxy():
|
||||
"""Test a response served by mitmproxy itself."""
|
||||
|
||||
def test_disconnect_while_intercept():
|
||||
"""Test a server disconnect while a request is intercepted."""
|
Loading…
Reference in New Issue
Block a user