mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-23 00:01:36 +00:00
[sans-io] remove old layers
This commit is contained in:
parent
03801aecb2
commit
7ea17b859c
@ -1,251 +0,0 @@
|
||||
import asyncio
|
||||
import socket
|
||||
import sys
|
||||
import threading
|
||||
import traceback
|
||||
|
||||
from mitmproxy import proxy, connections, ctx, exceptions, http
|
||||
from mitmproxy.log import LogEntry
|
||||
from mitmproxy.net.http import http1
|
||||
from mitmproxy.proxy import modes
|
||||
from mitmproxy.proxy.protocol import ServerConnectionMixin
|
||||
from mitmproxy.proxy2 import commands, events, server
|
||||
from mitmproxy.proxy2.context import Context
|
||||
from mitmproxy.proxy2.layer import Layer
|
||||
from mitmproxy.proxy2.utils import expect
|
||||
|
||||
"""
|
||||
___
|
||||
### __ __ __ __ __ __ ®
|
||||
/---\ | | | | | | | | | | | |
|
||||
| | | | | | | |__| | | | | |
|
||||
| | | | | | | __ | | | | |
|
||||
| | | `--' | | | | | | `--' |
|
||||
| | \______/ |__| |__| \______/
|
||||
+++++++
|
||||
|
||||
Temporary glue code to connect the old thread-based proxy core and the new sans-io implementation.
|
||||
"""
|
||||
|
||||
GLUE_DEBUG = False
|
||||
|
||||
|
||||
class GlueEvent(events.Event):
|
||||
def __init__(self, command: commands.Command):
|
||||
self.command = command
|
||||
|
||||
|
||||
class GlueGetConnectionHandler(commands.Command):
|
||||
blocking = True
|
||||
|
||||
|
||||
class GlueGetConnectionHandlerReply(events.CommandReply):
|
||||
pass
|
||||
|
||||
|
||||
class GlueClientConnection(connections.ClientConnection):
|
||||
def __init__(self, s: socket.socket, address):
|
||||
super().__init__(s, address, None)
|
||||
|
||||
def __getattribute__(self, item):
|
||||
if GLUE_DEBUG:
|
||||
print(f"[client_conn] {item}")
|
||||
return super().__getattribute__(item)
|
||||
|
||||
|
||||
class GlueTopLayer(ServerConnectionMixin):
|
||||
root_context: proxy.RootContext
|
||||
|
||||
def __init__(self, ctx, server_address):
|
||||
self.root_context = ctx
|
||||
super().__init__(server_address)
|
||||
mode = self.root_context.config.options.mode
|
||||
if mode.startswith("upstream:"):
|
||||
m = modes.HttpUpstreamProxy
|
||||
elif mode == "transparent":
|
||||
m = modes.TransparentProxy
|
||||
elif mode.startswith("reverse:"):
|
||||
m = modes.ReverseProxy
|
||||
elif mode == "socks5":
|
||||
m = modes.Socks5Proxy
|
||||
elif mode == "regular":
|
||||
m = modes.HttpProxy
|
||||
else:
|
||||
raise NotImplementedError()
|
||||
self.cls = m
|
||||
|
||||
@property
|
||||
def __class__(self):
|
||||
return self.cls
|
||||
|
||||
def __getattribute__(self, item):
|
||||
if GLUE_DEBUG and item not in ("root_context",):
|
||||
print(f"[top_layer] {item}")
|
||||
return object.__getattribute__(self, item)
|
||||
|
||||
def __getattr__(self, item):
|
||||
return getattr(self.root_context, item)
|
||||
|
||||
|
||||
class GlueLayer(Layer):
|
||||
"""
|
||||
Translate between old and new proxy core.
|
||||
"""
|
||||
context: Context
|
||||
connection_handler: server.ConnectionHandler
|
||||
|
||||
def log(self, msg, level):
|
||||
self.master.channel.tell("log", LogEntry(msg, level))
|
||||
|
||||
def _inject(self, command: commands.Command):
|
||||
e = GlueEvent(command)
|
||||
self.loop.call_soon_threadsafe(
|
||||
lambda: self.connection_handler.server_event(e)
|
||||
)
|
||||
|
||||
@expect(events.Start)
|
||||
def start(self, _) -> commands.TCommandGenerator:
|
||||
if GLUE_DEBUG:
|
||||
print("start!")
|
||||
self.loop = asyncio.get_event_loop()
|
||||
|
||||
self.connection_handler = yield GlueGetConnectionHandler()
|
||||
self.master = ctx.master
|
||||
|
||||
self.c1, self.c2 = socketpair()
|
||||
|
||||
self.client_conn = GlueClientConnection(self.c1, self.context.client.address)
|
||||
self.root_context = proxy.RootContext(
|
||||
self.client_conn,
|
||||
proxy.ProxyConfig(self.context.options),
|
||||
self.master.channel
|
||||
)
|
||||
|
||||
def spin():
|
||||
while True:
|
||||
d = self.c2.recv(16384)
|
||||
if not d:
|
||||
break
|
||||
self._inject(commands.SendData(self.context.client, d))
|
||||
|
||||
self.spin = threading.Thread(target=spin)
|
||||
self.spin.daemon = True
|
||||
self.spin.start()
|
||||
|
||||
def run():
|
||||
try:
|
||||
self.layer = self.root_context.next_layer(
|
||||
GlueTopLayer(self.root_context, self.context.server.address)
|
||||
)
|
||||
self.layer()
|
||||
except exceptions.Kill:
|
||||
self.log("Connection killed", "info")
|
||||
except exceptions.ProtocolException as e:
|
||||
if isinstance(e, exceptions.ClientHandshakeException):
|
||||
self.log(
|
||||
"Client Handshake failed. "
|
||||
"The client may not trust the proxy's certificate for {}.".format(e.server),
|
||||
"warn"
|
||||
)
|
||||
self.log(repr(e), "debug")
|
||||
elif isinstance(e, exceptions.InvalidServerCertificate):
|
||||
self.log(str(e), "warn")
|
||||
self.log(
|
||||
"Invalid certificate, closing connection. Pass --insecure to disable validation.",
|
||||
"warn")
|
||||
else:
|
||||
self.log(str(e), "warn")
|
||||
|
||||
self.log(repr(e), "debug")
|
||||
# If an error propagates to the topmost level,
|
||||
# we send an HTTP error response, which is both
|
||||
# understandable by HTTP clients and humans.
|
||||
try:
|
||||
error_response = http.make_error_response(502, repr(e))
|
||||
self.client_conn.send(http1.assemble_response(error_response))
|
||||
except exceptions.TcpException:
|
||||
pass
|
||||
except Exception:
|
||||
self.log(traceback.format_exc(), "error")
|
||||
print(traceback.format_exc(), file=sys.stderr)
|
||||
print("mitmproxy has crashed!", file=sys.stderr)
|
||||
print("Please lodge a bug report at: https://github.com/mitmproxy/mitmproxy",
|
||||
file=sys.stderr)
|
||||
|
||||
self.thread = threading.Thread(target=run)
|
||||
|
||||
self._handle_event = self.translate_event
|
||||
|
||||
self.thread.start()
|
||||
if GLUE_DEBUG:
|
||||
print("done start")
|
||||
|
||||
_handle_event = start
|
||||
|
||||
@expect(events.DataReceived, events.ConnectionClosed, GlueEvent)
|
||||
def translate_event(self, event: events.Event) -> commands.TCommandGenerator:
|
||||
if GLUE_DEBUG:
|
||||
print("event!", event)
|
||||
if isinstance(event, events.DataReceived):
|
||||
if event.connection == self.context.client:
|
||||
self.c2.sendall(event.data)
|
||||
else:
|
||||
raise NotImplementedError()
|
||||
elif isinstance(event, GlueEvent):
|
||||
yield event.command
|
||||
elif isinstance(event, events.ConnectionClosed):
|
||||
if event.connection == self.context.client:
|
||||
self.c1.shutdown(socket.SHUT_RDWR)
|
||||
self.c1.close()
|
||||
try:
|
||||
self.c2.shutdown(socket.SHUT_RDWR)
|
||||
except:
|
||||
pass
|
||||
self.c2.close()
|
||||
self._handle_event = self.done
|
||||
else:
|
||||
raise NotImplementedError()
|
||||
yield from ()
|
||||
|
||||
@expect(events.DataReceived, events.ConnectionClosed)
|
||||
def done(self, _):
|
||||
yield from ()
|
||||
|
||||
|
||||
# https://github.com/python/cpython/blob/5c23e21ef655db35af45ed98a62eb54bff64dbd0/Lib/socket.py#L493
|
||||
def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):
|
||||
if family == socket.AF_INET:
|
||||
host = socket._LOCALHOST
|
||||
elif family == socket.AF_INET6:
|
||||
host = socket._LOCALHOST_V6
|
||||
else:
|
||||
raise ValueError("Only AF_INET and AF_INET6 socket address families "
|
||||
"are supported")
|
||||
if type != socket.SOCK_STREAM:
|
||||
raise ValueError("Only SOCK_STREAM socket type is supported")
|
||||
if proto != 0:
|
||||
raise ValueError("Only protocol zero is supported")
|
||||
|
||||
# We create a connected TCP socket. Note the trick with
|
||||
# setblocking(False) that prevents us from having to create a thread.
|
||||
lsock = socket.socket(family, type, proto)
|
||||
try:
|
||||
lsock.bind((host, 0))
|
||||
lsock.listen()
|
||||
# On IPv6, ignore flow_info and scope_id
|
||||
addr, port = lsock.getsockname()[:2]
|
||||
csock = socket.socket(family, type, proto)
|
||||
try:
|
||||
csock.setblocking(False)
|
||||
try:
|
||||
csock.connect((addr, port))
|
||||
except (BlockingIOError, InterruptedError):
|
||||
pass
|
||||
csock.setblocking(True)
|
||||
ssock, _ = lsock.accept()
|
||||
except:
|
||||
csock.close()
|
||||
raise
|
||||
finally:
|
||||
lsock.close()
|
||||
return (ssock, csock)
|
@ -1,109 +0,0 @@
|
||||
import time
|
||||
|
||||
import h11
|
||||
from mitmproxy.proxy2.layer import Layer
|
||||
from mitmproxy.proxy2.layers.old.old_http import _make_event_from_request
|
||||
from mitmproxy.proxy2.layers.old import semantics
|
||||
from mitmproxy.proxy2.layers.old.http_commands import *
|
||||
from mitmproxy.proxy2.layers.old.http_events import *
|
||||
from mitmproxy.proxy2.utils import expect
|
||||
|
||||
|
||||
class ServerHTTP1Layer(Layer):
|
||||
flow: http.HTTPFlow = None
|
||||
|
||||
@expect(events.Start)
|
||||
def start(self, _) -> commands.TCommandGenerator:
|
||||
if not self.context.server.connected:
|
||||
# TODO: Can be done later
|
||||
err = yield commands.OpenConnection(self.context.server)
|
||||
if err:
|
||||
yield commands.Log(f"Cannot open connection: {err}", level="error")
|
||||
# FIXME: Handle properly.
|
||||
|
||||
self.h11 = h11.Connection(h11.CLIENT)
|
||||
|
||||
# debug
|
||||
# \/ \/ \/ \/ \/ \/ \/ \/ \/ \/ \/ \/
|
||||
def log_event(orig):
|
||||
def next_event():
|
||||
e = orig()
|
||||
if True:
|
||||
yield commands.Log(f"[h11] {e}")
|
||||
return e
|
||||
|
||||
return next_event
|
||||
|
||||
self.h11.next_event = log_event(self.h11.next_event)
|
||||
# /\ /\ /\ /\ /\ /\ /\ /\ /\ /\ /\ /\
|
||||
self.child_layer = semantics.HTTPLayer(self.context)
|
||||
self.event_to_child(events.Start())
|
||||
|
||||
yield commands.Log("HTTP/1 connection started")
|
||||
|
||||
self._handle_event = self._handle
|
||||
|
||||
_handle_event = start
|
||||
|
||||
def event_to_child(self, event: events.Event):
|
||||
for command in self.child_layer.handle_event(event):
|
||||
if isinstance(command, HttpCommand):
|
||||
yield from self.handle_http_command(command)
|
||||
else:
|
||||
yield command
|
||||
|
||||
def handle_http_command(self, command: HttpCommand):
|
||||
bytes_to_send = None
|
||||
if isinstance(command, SendRequestHeaders):
|
||||
self.flow = command.flow
|
||||
self.flow.request.http_version = b"HTTP/1.1"
|
||||
h11_event = _make_event_from_request(self.flow.request)
|
||||
bytes_to_send = self.h11.send(h11_event)
|
||||
|
||||
elif isinstance(command, SendRequestComplete):
|
||||
bytes_to_send = self.h11.send(h11.EndOfMessage())
|
||||
elif isinstance(command, SendRequestData):
|
||||
yield commands.Log(f"Server HTTP1Layer unimplemented HttpCommand: {command}",
|
||||
level="error")
|
||||
else:
|
||||
yield command
|
||||
if bytes_to_send:
|
||||
yield commands.SendData(self.context.server, bytes_to_send)
|
||||
|
||||
def _handle(self, event: events.Event):
|
||||
if isinstance(event, HttpEvent):
|
||||
yield from self.event_to_child(event)
|
||||
elif isinstance(event, events.DataReceived):
|
||||
self.h11.receive_data(event.data)
|
||||
|
||||
while True:
|
||||
h11_event = yield from self.h11.next_event()
|
||||
if h11_event is h11.NEED_DATA:
|
||||
break
|
||||
elif isinstance(h11_event, h11.Response):
|
||||
yield commands.Log(f"h11 responseheaders: {h11_event}")
|
||||
self.flow.response = http.HTTPResponse(
|
||||
b"HTTP/1.1",
|
||||
h11_event.status_code,
|
||||
h11_event.reason,
|
||||
h11_event.headers,
|
||||
None,
|
||||
time.time()
|
||||
)
|
||||
yield from self.event_to_child(ResponseHeaders(self.flow))
|
||||
elif isinstance(h11_event, h11.Data):
|
||||
yield from self.event_to_child(ResponseData(self.flow, h11_event.data))
|
||||
elif isinstance(h11_event, h11.EndOfMessage):
|
||||
yield from self.event_to_child(ResponseComplete(self.flow))
|
||||
else:
|
||||
raise NotImplementedError(h11_event)
|
||||
else:
|
||||
yield from self.event_to_child(event)
|
||||
|
||||
|
||||
class ClientHTTP1Layer(Layer):
|
||||
flow: http.HTTPFlow = None
|
||||
|
||||
@expect(events.Start)
|
||||
def start(self, _) -> commands.TCommandGenerator:
|
||||
raise NotImplemented
|
@ -1,188 +0,0 @@
|
||||
import typing
|
||||
from warnings import warn
|
||||
|
||||
import h11
|
||||
from h11._readers import ChunkedReader, ContentLengthReader, Http10Reader
|
||||
from h11._receivebuffer import ReceiveBuffer
|
||||
|
||||
from mitmproxy import http
|
||||
from mitmproxy.net.http import http1
|
||||
from mitmproxy.net.http.http1 import read_sansio as http1_sansio
|
||||
from mitmproxy.proxy.protocol.http import HTTPMode
|
||||
from mitmproxy.proxy2 import commands, events
|
||||
from mitmproxy.proxy2.context import Context
|
||||
from mitmproxy.proxy2.layer import Layer
|
||||
from mitmproxy.proxy2.utils import expect
|
||||
|
||||
|
||||
class ClientHTTP1Layer(Layer):
|
||||
mode: HTTPMode
|
||||
flow: typing.Optional[http.HTTPFlow]
|
||||
client_buf: ReceiveBuffer
|
||||
body_reader: typing.Union[ChunkedReader, Http10Reader, ContentLengthReader]
|
||||
body_buf: bytes
|
||||
|
||||
# this is like a mini state machine.
|
||||
state: typing.Callable[[events.Event], commands.TCommandGenerator]
|
||||
|
||||
def __init__(self, context: Context, mode: HTTPMode):
|
||||
super().__init__(context)
|
||||
self.mode = mode
|
||||
|
||||
self.client_buf = ReceiveBuffer()
|
||||
self.body_buf = b""
|
||||
|
||||
self.state = self.read_request_headers
|
||||
|
||||
@expect(events.Start, events.DataReceived, events.ConnectionClosed)
|
||||
def _handle_event(self, event: events.Event) -> commands.TCommandGenerator:
|
||||
if isinstance(event, events.Start):
|
||||
return
|
||||
elif isinstance(event, events.DataReceived) and event.connection == self.context.client:
|
||||
self.client_buf += event.data
|
||||
else:
|
||||
return warn(f"ClientHTTP1Layer unimplemented: {event}")
|
||||
|
||||
yield from self.state(event)
|
||||
|
||||
def read_request_headers(self, event: events.Event) -> commands.TCommandGenerator:
|
||||
if isinstance(event, events.DataReceived) and event.connection == self.context.client:
|
||||
request_head = self.client_buf.maybe_extract_lines()
|
||||
if request_head:
|
||||
self.flow = http.HTTPFlow(
|
||||
self.context.client,
|
||||
self.context.server,
|
||||
)
|
||||
self.flow.request = http.HTTPRequest.wrap(http1_sansio.read_request_head(request_head))
|
||||
|
||||
if self.flow.request.first_line_format != "authority":
|
||||
yield commands.Hook("requestheaders", self.flow)
|
||||
|
||||
if self.flow.request.headers.get("expect", "").lower() == "100-continue":
|
||||
raise NotImplementedError()
|
||||
# self.send_response(http.expect_continue_response)
|
||||
# request.headers.pop("expect")
|
||||
|
||||
expected_size = http1.expected_http_body_size(self.flow.request)
|
||||
if expected_size is None:
|
||||
self.body_reader = ChunkedReader()
|
||||
elif expected_size == -1:
|
||||
self.body_reader = Http10Reader()
|
||||
else:
|
||||
self.body_reader = ContentLengthReader(expected_size)
|
||||
|
||||
if not self.flow.request.stream:
|
||||
yield from self.start_read_request_body()
|
||||
else:
|
||||
yield from self.request_received()
|
||||
else:
|
||||
return warn(f"ClientHTTP1Layer.read_request_headers: unimplemented {event}")
|
||||
|
||||
def start_read_request_body(self) -> commands.TCommandGenerator:
|
||||
self.state = self.read_request_body
|
||||
yield from self.read_request_body(events.DataReceived(self.context.client, b""))
|
||||
|
||||
def read_request_body(self, event: events.Event) -> commands.TCommandGenerator:
|
||||
if isinstance(event, events.DataReceived) and event.connection == self.context.client:
|
||||
try:
|
||||
event = self.body_reader(self.client_buf)
|
||||
except h11.ProtocolError as e:
|
||||
raise # FIXME
|
||||
elif isinstance(event, events.ConnectionClosed):
|
||||
try:
|
||||
event = self.body_reader.read_eof()
|
||||
except h11.ProtocolError as e:
|
||||
raise # FIXME
|
||||
else:
|
||||
return warn(f"ClientHTTP1Layer.read_request_body: unimplemented {event}")
|
||||
|
||||
if event is None:
|
||||
return
|
||||
elif isinstance(event, h11.Data):
|
||||
self.body_buf += event.data
|
||||
elif isinstance(event, h11.EndOfMessage):
|
||||
self.flow.request.data.content = self.body_buf
|
||||
self.body_buf = b""
|
||||
yield from self.request_received()
|
||||
|
||||
def request_received(self) -> commands.TCommandGenerator:
|
||||
# set first line format to relative in regular mode,
|
||||
# see https://github.com/mitmproxy/mitmproxy/issues/1759
|
||||
if self.mode is HTTPMode.regular and self.flow.request.first_line_format == "absolute":
|
||||
self.flow.request.first_line_format = "relative"
|
||||
|
||||
# update host header in reverse proxy mode
|
||||
if self.context.options.mode.startswith("reverse:") and not self.context.options.keep_host_header:
|
||||
self.flow.request.host_header = self.context.server.address[0]
|
||||
|
||||
# Determine .scheme, .host and .port attributes for inline scripts. For
|
||||
# absolute-form requests, they are directly given in the request. For
|
||||
# authority-form requests, we only need to determine the request
|
||||
# scheme. For relative-form requests, we need to determine host and
|
||||
# port as well.
|
||||
if self.mode is HTTPMode.transparent:
|
||||
# Setting request.host also updates the host header, which we want
|
||||
# to preserve
|
||||
host_header = self.flow.request.host_header
|
||||
self.flow.request.host = self.context.server.address[0]
|
||||
self.flow.request.port = self.context.server.address[1]
|
||||
self.flow.request.host_header = host_header # set again as .host overwrites this.
|
||||
self.flow.request.scheme = "https" if self.context.server.tls else "http"
|
||||
yield commands.Hook("request", self.flow)
|
||||
|
||||
if self.flow.response:
|
||||
# response was set by an inline script.
|
||||
# we now need to emulate the responseheaders hook.
|
||||
yield commands.Hook("responseheaders", self.flow)
|
||||
yield from self.response_received()
|
||||
else:
|
||||
raise NotImplementedError("Get your responses elsewhere.")
|
||||
|
||||
def response_received(self):
|
||||
yield commands.Hook("response", self.flow)
|
||||
|
||||
raw = http1.assemble_response_head(self.flow.response)
|
||||
yield commands.SendData(self.context.server, raw)
|
||||
|
||||
if not f.response.stream:
|
||||
# no streaming:
|
||||
# we already received the full response from the server and can
|
||||
# send it to the client straight away.
|
||||
self.send_response(f.response)
|
||||
else:
|
||||
# streaming:
|
||||
# First send the headers and then transfer the response incrementally
|
||||
self.send_response_headers(f.response)
|
||||
chunks = self.read_response_body(
|
||||
f.request,
|
||||
f.response
|
||||
)
|
||||
if callable(f.response.stream):
|
||||
chunks = f.response.stream(chunks)
|
||||
self.send_response_body(f.response, chunks)
|
||||
f.response.timestamp_end = time.time()
|
||||
|
||||
if self.check_close_connection(f):
|
||||
return False
|
||||
|
||||
# Handle 101 Switching Protocols
|
||||
if f.response.status_code == 101:
|
||||
# Handle a successful HTTP 101 Switching Protocols Response,
|
||||
# received after e.g. a WebSocket upgrade request.
|
||||
# Check for WebSocket handshake
|
||||
is_websocket = (
|
||||
websockets.check_handshake(f.request.headers) and
|
||||
websockets.check_handshake(f.response.headers)
|
||||
)
|
||||
if is_websocket and not self.config.options.websocket:
|
||||
self.log(
|
||||
"Client requested WebSocket connection, but the protocol is disabled.",
|
||||
"info"
|
||||
)
|
||||
|
||||
if is_websocket and self.config.options.websocket:
|
||||
layer = WebSocketLayer(self, f)
|
||||
else:
|
||||
layer = self.ctx.next_layer(self)
|
||||
layer()
|
||||
return False # should never be reached
|
@ -1,543 +0,0 @@
|
||||
import time
|
||||
from typing import Dict, List # noqa
|
||||
|
||||
import h2.config
|
||||
import h2.events
|
||||
import h2.exceptions
|
||||
from h2 import connection
|
||||
|
||||
from mitmproxy.net.http import Headers, http2
|
||||
from mitmproxy.proxy2.layer import Layer
|
||||
from mitmproxy.proxy2.layers.old import semantics
|
||||
from mitmproxy.proxy2.layers.old.http_commands import *
|
||||
from mitmproxy.proxy2.layers.old.http_events import *
|
||||
from mitmproxy.proxy2.utils import expect
|
||||
|
||||
|
||||
TFlowId = str
|
||||
|
||||
|
||||
class ServerHTTP2Layer(Layer):
|
||||
stream_by_flow: Dict[TFlowId, semantics.HTTPLayer]
|
||||
stream_by_command: Dict[commands.Command, semantics.HTTPLayer]
|
||||
|
||||
@expect(events.Start)
|
||||
def start(self, _) -> commands.TCommandGenerator:
|
||||
if not self.context.server.connected:
|
||||
# TODO: Can be done later
|
||||
err = yield commands.OpenConnection(self.context.server)
|
||||
if err:
|
||||
yield commands.Log(f"Cannot open connection: {err}", level="error")
|
||||
# FIXME: Handle properly.
|
||||
|
||||
self.stream_by_flow = {}
|
||||
self.stream_by_command = {}
|
||||
|
||||
h2_config = h2.config.H2Configuration(
|
||||
client_side=True,
|
||||
header_encoding=False,
|
||||
validate_outbound_headers=False,
|
||||
validate_inbound_headers=False)
|
||||
self.h2 = connection.H2Connection(config=h2_config)
|
||||
self.h2.initiate_connection()
|
||||
yield commands.SendData(self.context.server, self.h2.data_to_send())
|
||||
|
||||
yield commands.Log("HTTP/2 connection started")
|
||||
|
||||
self._handle_event = self._handle
|
||||
|
||||
_handle_event = start
|
||||
|
||||
def event_to_child(self, layer: semantics.HTTPLayer, event: events.Event):
|
||||
for command in layer.handle_event(event):
|
||||
if command.blocking:
|
||||
self.stream_by_command[command] = layer
|
||||
yield command
|
||||
|
||||
def _handle(self, event: events.Event):
|
||||
if isinstance(event, events.CommandReply):
|
||||
child_layer = self.stream_by_command.pop(event.command)
|
||||
yield from self.event_to_child(child_layer, event)
|
||||
|
||||
elif isinstance(event, HttpEvent):
|
||||
if event.flow.id not in self.stream_by_flow:
|
||||
child_layer = semantics.HTTPLayer(self.context)
|
||||
yield from child_layer.handle_event(events.Start())
|
||||
self.stream_by_flow[event.flow.id] = child_layer
|
||||
else:
|
||||
child_layer = self.stream_by_flow[event.flow.id]
|
||||
yield from self.event_to_child(child_layer, event)
|
||||
|
||||
elif isinstance(event, events.DataReceived):
|
||||
h2_events = self.h2.receive_data(event.data)
|
||||
yield commands.SendData(self.context.server, self.h2.data_to_send())
|
||||
yield from self.handle_h2_events(h2_events)
|
||||
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
def handle_h2_events(self, h2_events: List[h2.events.Event]):
|
||||
for h2_event in h2_events:
|
||||
if isinstance(h2_event, h2.events.RequestReceived):
|
||||
yield commands.Log("foobar")
|
||||
|
||||
|
||||
class ClientHTTP2Layer(Layer):
|
||||
stream_by_flow: Dict[TFlowId, int]
|
||||
|
||||
@expect(events.Start)
|
||||
def start(self, _) -> commands.TCommandGenerator:
|
||||
self.stream_by_flow = {}
|
||||
|
||||
h2_config = h2.config.H2Configuration(
|
||||
client_side=False,
|
||||
header_encoding=False,
|
||||
validate_outbound_headers=False,
|
||||
validate_inbound_headers=False)
|
||||
self.h2 = connection.H2Connection(config=h2_config)
|
||||
self.h2.initiate_connection()
|
||||
yield commands.SendData(self.context.client, self.h2.data_to_send())
|
||||
|
||||
self.child_layer = ServerHTTP2Layer(self.context)
|
||||
yield from self.event_to_child(events.Start())
|
||||
|
||||
self._handle_event = self._handle
|
||||
|
||||
_handle_event = start
|
||||
|
||||
def event_to_child(self, event: events.Event):
|
||||
for command in self.child_layer.handle_event(event):
|
||||
if isinstance(command, HttpCommand):
|
||||
yield from self.handle_http_command(command)
|
||||
else:
|
||||
yield command
|
||||
|
||||
def handle_http_command(self, command: HttpCommand):
|
||||
stream_id = self.stream_by_flow[command.flow.id]
|
||||
if isinstance(command, SendResponseHeaders):
|
||||
headers = (
|
||||
(b":status", str(command.flow.response.status_code).encode()),
|
||||
*command.flow.request.headers.fields
|
||||
)
|
||||
self.h2.send_headers(stream_id, headers)
|
||||
elif isinstance(command, SendResponseData):
|
||||
# TODO: do chunking with max_outbound_frame_size
|
||||
self.h2.send_data(stream_id, command.data)
|
||||
elif isinstance(command, SendResponseComplete):
|
||||
yield commands.Log(f"Ending stream {self.stream_by_flow[command.flow.id]}...")
|
||||
self.h2.end_stream(stream_id)
|
||||
else:
|
||||
yield commands.Log(f"ClientHTTP2Layer unimplemented HttpCommand: {command}", level="error")
|
||||
|
||||
yield commands.SendData(self.context.client, self.h2.data_to_send())
|
||||
|
||||
def _handle(self, event: events.Event) -> commands.TCommandGenerator:
|
||||
if isinstance(event, events.DataReceived):
|
||||
if event.connection == self.context.client:
|
||||
h2_events = self.h2.receive_data(event.data)
|
||||
yield commands.SendData(self.context.client, self.h2.data_to_send())
|
||||
yield from self.handle_h2_events(h2_events)
|
||||
else:
|
||||
yield from self.event_to_child(event)
|
||||
else:
|
||||
yield from self.event_to_child(event)
|
||||
|
||||
def handle_h2_events(self, h2_events: List[h2.events.Event]):
|
||||
for h2_event in h2_events:
|
||||
if isinstance(h2_event, h2.events.RequestReceived):
|
||||
flow = http.HTTPFlow(self.context.client, self.context.server)
|
||||
headers = Headers([(k, v) for k, v in h2_event.headers])
|
||||
first_line_format, method, scheme, host, port, path = http2.parse_headers(headers)
|
||||
# FIXME: This should be part of http2.parse_headers?
|
||||
if ":authority" in headers:
|
||||
headers["Host"] = headers.pop(":authority")
|
||||
flow.request = http.HTTPRequest(
|
||||
first_line_format,
|
||||
method,
|
||||
scheme,
|
||||
host,
|
||||
port,
|
||||
path,
|
||||
b"HTTP/2.0",
|
||||
headers,
|
||||
None,
|
||||
timestamp_start=time.time()
|
||||
)
|
||||
self.stream_by_flow[flow.id] = h2_event.stream_id
|
||||
yield from self.event_to_child(RequestHeaders(flow))
|
||||
if h2_event.stream_ended:
|
||||
yield from self.event_to_child(RequestComplete(flow))
|
||||
else:
|
||||
yield commands.Log(f"Unimplemented h2 event: {h2_event}", level="error")
|
||||
|
||||
"""
|
||||
@expect(events.DataReceived, events.ConnectionClosed)
|
||||
def process_data(self, event: events.Event) -> commands.TCommandGenerator:
|
||||
if isinstance(event, events.DataReceived):
|
||||
dead = [stream for stream in self.streams.values() if stream.death_time]
|
||||
for stream in dead:
|
||||
if stream.death_time <= time.time() - 10:
|
||||
self.streams.pop(stream.stream_id, None)
|
||||
|
||||
from_client = event.connection == self.context.client
|
||||
if from_client:
|
||||
source = self.h2_client
|
||||
other = self.h2_server
|
||||
source_conn = self.context.client
|
||||
other_conn = self.context.server
|
||||
else:
|
||||
source = self.h2_server
|
||||
other = self.h2_client
|
||||
source_conn = self.context.server
|
||||
other_conn = self.context.client
|
||||
|
||||
received_h2_events = source.receive_data(event.data)
|
||||
yield commands.SendData(source_conn, source.data_to_send())
|
||||
|
||||
for h2_event in received_h2_events:
|
||||
yield commands.Log(
|
||||
"HTTP/2 event from {}: {}".format("client" if from_client else "server",
|
||||
h2_event)
|
||||
)
|
||||
|
||||
eid = None
|
||||
if hasattr(h2_event, 'stream_id'):
|
||||
if not from_client and h2_event.stream_id % 2 == 1:
|
||||
eid = self.server_to_client_stream_ids[h2_event.stream_id]
|
||||
else:
|
||||
eid = h2_event.stream_id
|
||||
|
||||
if isinstance(h2_event, h2events.RequestReceived):
|
||||
self.streams[eid] = Http2Stream(h2_event, self.context.client,
|
||||
self.context.server)
|
||||
self.streams[eid].timestamp_start = time.time()
|
||||
self.streams[eid].request_arrived.set()
|
||||
|
||||
yield commands.Hook("requestheaders", self.streams[eid].flow)
|
||||
|
||||
while other.open_outbound_streams + 1 >= other.remote_settings.max_concurrent_streams:
|
||||
# wait until we get a free slot for a new outgoing stream
|
||||
# TODO make async/re-entry so we can handle other streams!
|
||||
# time.sleep(0.1)
|
||||
break
|
||||
|
||||
server_stream_id = other.get_next_available_stream_id()
|
||||
self.streams[eid].server_stream_id = server_stream_id
|
||||
self.server_to_client_stream_ids[server_stream_id] = h2_event.stream_id
|
||||
|
||||
if h2_event.stream_ended:
|
||||
self.streams[eid].request_data_finished.set()
|
||||
|
||||
headers = self.streams[eid].flow.request.headers.copy()
|
||||
headers.insert(0, ":path", self.streams[eid].flow.request.path)
|
||||
headers.insert(0, ":method", self.streams[eid].flow.request.method)
|
||||
headers.insert(0, ":scheme", self.streams[eid].flow.request.scheme)
|
||||
|
||||
# omit priority information because it is too complex to synchronize
|
||||
other.send_headers(
|
||||
server_stream_id,
|
||||
headers=headers.items(),
|
||||
end_stream=h2_event.stream_ended,
|
||||
)
|
||||
yield commands.SendData(other_conn, other.data_to_send())
|
||||
|
||||
elif isinstance(h2_event, h2events.ResponseReceived):
|
||||
self.streams[eid].queued_data_length = 0
|
||||
self.streams[eid].timestamp_start = time.time()
|
||||
self.streams[eid].response_arrived.set()
|
||||
|
||||
headers = mitmproxy.net.http.Headers([[k, v] for k, v in h2_event.headers])
|
||||
status_code = int(headers.get(':status', 502))
|
||||
headers.pop(":status", None)
|
||||
|
||||
self.streams[eid].flow.response = http.HTTPResponse(
|
||||
http_version=b"HTTP/2.0",
|
||||
status_code=status_code,
|
||||
reason=b'',
|
||||
headers=headers,
|
||||
content=None,
|
||||
timestamp_start=self.streams[eid].timestamp_start,
|
||||
timestamp_end=self.streams[eid].timestamp_end,
|
||||
)
|
||||
|
||||
yield commands.Hook("responseheaders", self.streams[eid].flow)
|
||||
|
||||
if self.streams[eid].flow.response.stream:
|
||||
self.streams[eid].flow.response.data.content = None
|
||||
|
||||
headers = self.streams[eid].flow.response.headers
|
||||
headers.insert(0, ":status", str(self.streams[eid].flow.response.status_code))
|
||||
|
||||
other.send_headers(
|
||||
self.streams[eid].stream_id,
|
||||
headers=headers.items(),
|
||||
)
|
||||
yield commands.SendData(other_conn, other.data_to_send())
|
||||
|
||||
elif isinstance(h2_event, h2events.DataReceived):
|
||||
bsl = human.parse_size(self.context.options.body_size_limit)
|
||||
if bsl and self.streams[eid].queued_data_length > bsl:
|
||||
self.streams[eid].kill()
|
||||
source.reset_stream(eid, h2.errors.ErrorCodes.REFUSED_STREAM)
|
||||
yield commands.SendData(source_conn, source.data_to_send())
|
||||
other.reset_stream(eid, h2.errors.ErrorCodes.REFUSED_STREAM)
|
||||
yield commands.SendData(other_conn, other.data_to_send())
|
||||
yield commands.Log("HTTP body too large. Limit is {}.".format(bsl), "info")
|
||||
else:
|
||||
streaming = (
|
||||
(from_client and self.streams[eid].flow.request.stream) or
|
||||
(not from_client and self.streams[eid].flow.response and self.streams[
|
||||
eid].flow.response.stream)
|
||||
)
|
||||
if streaming:
|
||||
stream_id = self.streams[eid].server_stream_id if from_client else \
|
||||
self.streams[eid].stream_id
|
||||
self.unfinished_bodies[other] = (stream_id, h2_event.data, False)
|
||||
yield from self._send_body(other_conn, other)
|
||||
else:
|
||||
self.streams[eid].data_queue.put(h2_event.data)
|
||||
self.streams[eid].queued_data_length += len(h2_event.data)
|
||||
|
||||
source.acknowledge_received_data(h2_event.flow_controlled_length,
|
||||
h2_event.stream_id)
|
||||
yield commands.SendData(source_conn, source.data_to_send())
|
||||
|
||||
elif isinstance(h2_event, h2events.StreamEnded):
|
||||
self.streams[eid].timestamp_end = time.time()
|
||||
self.streams[eid].data_finished.set()
|
||||
|
||||
if from_client and self.streams[eid].request_data_finished:
|
||||
# end_stream already communicated via request send_headers
|
||||
pass
|
||||
else:
|
||||
streaming = (
|
||||
(from_client and self.streams[eid].flow.request.stream) or
|
||||
(not from_client and self.streams[eid].flow.response and self.streams[
|
||||
eid].flow.response.stream)
|
||||
)
|
||||
if streaming:
|
||||
stream_id = self.streams[eid].server_stream_id if from_client else \
|
||||
self.streams[eid].stream_id
|
||||
self.unfinished_bodies[other] = (stream_id, b'', True, eid)
|
||||
yield from self._send_body(other_conn, other)
|
||||
else:
|
||||
content = b""
|
||||
while True:
|
||||
try:
|
||||
content += self.streams[eid].data_queue.get_nowait()
|
||||
except queue.Empty:
|
||||
break
|
||||
|
||||
if from_client:
|
||||
self.streams[eid].flow.request.data.content = content
|
||||
self.streams[eid].flow.request.timestamp_end = time.time()
|
||||
yield commands.Hook("request", self.streams[eid].flow)
|
||||
content = self.streams[eid].flow.request.data.content
|
||||
stream_id = self.streams[eid].server_stream_id
|
||||
kill_id = None
|
||||
else:
|
||||
self.streams[eid].flow.response.data.content = content
|
||||
self.streams[eid].flow.response.timestamp_end = time.time()
|
||||
yield commands.Hook("response", self.streams[eid].flow)
|
||||
content = self.streams[eid].flow.response.data.content
|
||||
stream_id = self.streams[eid].stream_id
|
||||
kill_id = eid
|
||||
|
||||
self.unfinished_bodies[other] = (stream_id, content, True, kill_id)
|
||||
yield from self._send_body(other_conn, other)
|
||||
|
||||
elif isinstance(h2_event, h2events.StreamReset):
|
||||
if eid in self.streams:
|
||||
self.streams[eid].kill()
|
||||
if h2_event.error_code == h2.errors.ErrorCodes.CANCEL:
|
||||
try:
|
||||
stream_id = self.streams[eid].server_stream_id if from_client else \
|
||||
self.streams[eid].stream_id
|
||||
if stream_id:
|
||||
other.reset_stream(stream_id, h2_event.error_code)
|
||||
except h2.exceptions.StreamClosedError: # pragma: no cover
|
||||
# stream is already closed - good
|
||||
pass
|
||||
yield commands.SendData(other_conn, other.data_to_send())
|
||||
|
||||
elif isinstance(h2_event, h2events.RemoteSettingsChanged):
|
||||
new_settings = dict(
|
||||
[(key, cs.new_value) for (key, cs) in h2_event.changed_settings.items()])
|
||||
other.update_settings(new_settings)
|
||||
yield commands.SendData(other_conn, other.data_to_send())
|
||||
|
||||
elif isinstance(h2_event, h2events.ConnectionTerminated):
|
||||
yield commands.Log(
|
||||
f"HTTP/2 Connection terminated: {h2_event}, {h2_event.additional_data}")
|
||||
elif isinstance(h2_event, h2events.PushedStreamReceived):
|
||||
parent_eid = self.server_to_client_stream_ids[h2_event.parent_stream_id]
|
||||
other.push_stream(parent_eid, h2_event.pushed_stream_id, h2_event.headers)
|
||||
yield commands.SendData(other_conn, other.data_to_send())
|
||||
|
||||
self.streams[h2_event.pushed_stream_id] = Http2Stream(h2_event,
|
||||
self.context.client,
|
||||
self.context.server)
|
||||
self.streams[h2_event.pushed_stream_id].timestamp_start = time.time()
|
||||
self.streams[h2_event.pushed_stream_id].pushed = True
|
||||
self.streams[h2_event.pushed_stream_id].parent_stream_id = parent_eid
|
||||
self.streams[h2_event.pushed_stream_id].timestamp_end = time.time()
|
||||
self.streams[h2_event.pushed_stream_id].request_arrived.set()
|
||||
self.streams[h2_event.pushed_stream_id].request_data_finished.set()
|
||||
|
||||
yield commands.Hook("requestheaders",
|
||||
self.streams[h2_event.pushed_stream_id].flow)
|
||||
|
||||
elif isinstance(h2_event, h2events.WindowUpdated):
|
||||
if source in self.unfinished_bodies:
|
||||
yield from self._send_body(source_conn, source)
|
||||
elif isinstance(h2_event, h2events.TrailersReceived):
|
||||
raise NotImplementedError('TrailersReceived not implemented')
|
||||
|
||||
elif isinstance(event, events.ConnectionClosed):
|
||||
yield commands.Log("Connection closed abnormally")
|
||||
if event.connection == self.context.server:
|
||||
yield commands.CloseConnection(self.context.client)
|
||||
self._handle_event = self.done
|
||||
|
||||
|
||||
@expect(events.DataReceived, events.ConnectionClosed)
|
||||
def done(self, _):
|
||||
yield from ()
|
||||
|
||||
def _send_body(self, send_to_endpoint, endpoint):
|
||||
stream_id, content, end_stream, kill_id = self.unfinished_bodies[endpoint]
|
||||
|
||||
max_outbound_frame_size = endpoint.max_outbound_frame_size
|
||||
position = 0
|
||||
while position < len(content):
|
||||
frame_chunk = content[position:position + max_outbound_frame_size]
|
||||
if endpoint.local_flow_control_window(stream_id) < len(frame_chunk):
|
||||
self.unfinished_bodies[endpoint] = (
|
||||
stream_id, content[position:], end_stream, kill_id)
|
||||
return
|
||||
endpoint.send_data(stream_id, frame_chunk)
|
||||
yield commands.SendData(send_to_endpoint, endpoint.data_to_send())
|
||||
position += max_outbound_frame_size
|
||||
|
||||
del self.unfinished_bodies[endpoint]
|
||||
|
||||
if end_stream:
|
||||
endpoint.end_stream(stream_id)
|
||||
yield commands.SendData(send_to_endpoint, endpoint.data_to_send())
|
||||
if kill_id:
|
||||
self.streams[kill_id].kill()
|
||||
"""
|
||||
|
||||
# def _handle_connection_terminated(self, event, is_server):
|
||||
# self.log("HTTP/2 connection terminated by {}: error code: {}, last stream id: {}, additional data: {}".format(
|
||||
# "server" if is_server else "client",
|
||||
# event.error_code,
|
||||
# event.last_stream_id,
|
||||
# event.additional_data), "info")
|
||||
#
|
||||
# if event.error_code != h2.errors.ErrorCodes.NO_ERROR:
|
||||
# # Something terrible has happened - kill everything!
|
||||
# self.connections[self.client_conn].close_connection(
|
||||
# error_code=event.error_code,
|
||||
# last_stream_id=event.last_stream_id,
|
||||
# additional_data=event.additional_data
|
||||
# )
|
||||
# self.client_conn.send(self.connections[self.client_conn].data_to_send())
|
||||
# self._kill_all_streams()
|
||||
# else:
|
||||
# """
|
||||
# Do not immediately terminate the other connection.
|
||||
# Some streams might be still sending data to the client.
|
||||
# """
|
||||
# return False
|
||||
#
|
||||
|
||||
"""
|
||||
class Http2Stream:
|
||||
|
||||
def __init__(self, h2_event, client_conn, server_conn) -> None:
|
||||
if isinstance(h2_event, h2.events.RequestReceived):
|
||||
self.stream_id = h2_event.stream_id
|
||||
else:
|
||||
self.stream_id = h2_event.pushed_stream_id
|
||||
|
||||
self.server_stream_id: int = None
|
||||
self.pushed = False
|
||||
|
||||
if isinstance(h2_event,
|
||||
h2.events.RequestReceived) and h2_event.priority_updated is not None:
|
||||
self.priority_exclusive = h2_event.priority_updated.exclusive
|
||||
self.priority_depends_on = h2_event.priority_updated.depends_on
|
||||
self.priority_weight = h2_event.priority_updated.weight
|
||||
self.handled_priority_event = h2_event.priority_updated
|
||||
else:
|
||||
self.priority_exclusive: bool = None
|
||||
self.priority_depends_on: int = None
|
||||
self.priority_weight: int = None
|
||||
self.handled_priority_event: Any = None
|
||||
|
||||
self.timestamp_start: float = None
|
||||
self.timestamp_end: float = None
|
||||
self.death_time: float = None
|
||||
|
||||
self.request_arrived = threading.Event()
|
||||
self.request_data_queue: queue.Queue[bytes] = queue.Queue()
|
||||
self.request_queued_data_length = 0
|
||||
self.request_data_finished = threading.Event()
|
||||
|
||||
self.response_arrived = threading.Event()
|
||||
self.response_data_queue: queue.Queue[bytes] = queue.Queue()
|
||||
self.response_queued_data_length = 0
|
||||
self.response_data_finished = threading.Event()
|
||||
|
||||
self.flow = http.HTTPFlow(
|
||||
client_conn,
|
||||
server_conn,
|
||||
live=self,
|
||||
mode='regular',
|
||||
)
|
||||
|
||||
headers = mitmproxy.net.http.Headers([[k, v] for k, v in h2_event.headers])
|
||||
first_line_format, method, scheme, host, port, path = http2.parse_headers(headers)
|
||||
self.flow.request = http.HTTPRequest(
|
||||
first_line_format,
|
||||
method,
|
||||
scheme,
|
||||
host,
|
||||
port,
|
||||
path,
|
||||
b"HTTP/2.0",
|
||||
headers,
|
||||
None,
|
||||
timestamp_start=self.timestamp_start,
|
||||
timestamp_end=self.timestamp_end,
|
||||
)
|
||||
|
||||
def kill(self):
|
||||
self.death_time = time.time()
|
||||
|
||||
@property
|
||||
def data_queue(self):
|
||||
if self.response_arrived.is_set():
|
||||
return self.response_data_queue
|
||||
else:
|
||||
return self.request_data_queue
|
||||
|
||||
@property
|
||||
def queued_data_length(self):
|
||||
if self.response_arrived.is_set():
|
||||
return self.response_queued_data_length
|
||||
else:
|
||||
return self.request_queued_data_length
|
||||
|
||||
@queued_data_length.setter
|
||||
def queued_data_length(self, v):
|
||||
self.request_queued_data_length = v
|
||||
|
||||
@property
|
||||
def data_finished(self):
|
||||
if self.response_arrived.is_set():
|
||||
return self.response_data_finished
|
||||
else:
|
||||
return self.request_data_finished
|
||||
"""
|
@ -1,37 +0,0 @@
|
||||
from mitmproxy import http
|
||||
|
||||
from mitmproxy.proxy2 import commands
|
||||
|
||||
|
||||
class HttpCommand(commands.Command):
|
||||
flow: http.HTTPFlow
|
||||
|
||||
def __init__(self, flow: http.HTTPFlow):
|
||||
self.flow = flow
|
||||
|
||||
class SendRequestHeaders(HttpCommand):
|
||||
pass
|
||||
|
||||
|
||||
class SendRequestData(HttpCommand):
|
||||
pass
|
||||
|
||||
|
||||
class SendRequestComplete(HttpCommand):
|
||||
pass
|
||||
|
||||
|
||||
class SendResponseHeaders(HttpCommand):
|
||||
pass
|
||||
|
||||
|
||||
class SendResponseData(HttpCommand):
|
||||
data: bytes
|
||||
|
||||
def __init__(self, flow, data):
|
||||
super().__init__(flow)
|
||||
self.data = data
|
||||
|
||||
|
||||
class SendResponseComplete(HttpCommand):
|
||||
pass
|
@ -1,38 +0,0 @@
|
||||
from mitmproxy import http
|
||||
|
||||
from mitmproxy.proxy2 import events
|
||||
|
||||
|
||||
class HttpEvent(events.Event):
|
||||
flow: http.HTTPFlow
|
||||
|
||||
def __init__(self, flow: http.HTTPFlow):
|
||||
self.flow = flow
|
||||
|
||||
|
||||
class RequestHeaders(HttpEvent):
|
||||
pass
|
||||
|
||||
|
||||
class RequestData(HttpEvent):
|
||||
pass
|
||||
|
||||
|
||||
class RequestComplete(HttpEvent):
|
||||
pass
|
||||
|
||||
|
||||
class ResponseHeaders(HttpEvent):
|
||||
pass
|
||||
|
||||
|
||||
class ResponseData(HttpEvent):
|
||||
data: bytes
|
||||
|
||||
def __init__(self, flow, data):
|
||||
super().__init__(flow)
|
||||
self.data = data
|
||||
|
||||
|
||||
class ResponseComplete(HttpEvent):
|
||||
pass
|
@ -1,270 +0,0 @@
|
||||
import enum
|
||||
import typing
|
||||
from warnings import warn
|
||||
|
||||
import h11
|
||||
from mitmproxy import http
|
||||
from mitmproxy.net import http as net_http
|
||||
from mitmproxy.net import websockets
|
||||
from mitmproxy.net.http import url
|
||||
from mitmproxy.net.http.http1.read import _parse_authority_form
|
||||
from mitmproxy.proxy.protocol.http import HTTPMode
|
||||
from mitmproxy.proxy2 import events, commands, context
|
||||
from mitmproxy.proxy2.context import Context
|
||||
from mitmproxy.proxy2.layer import Layer, NextLayer
|
||||
from mitmproxy.proxy2.layers.old import websocket
|
||||
from mitmproxy.proxy2.utils import expect
|
||||
|
||||
|
||||
class FirstLineFormat(enum.Enum):
|
||||
authority = "authority"
|
||||
relative = "relative"
|
||||
absolute = "absolute"
|
||||
|
||||
|
||||
MODE_REQUEST_FORMS = {
|
||||
HTTPMode.regular: (FirstLineFormat.authority, FirstLineFormat.absolute),
|
||||
HTTPMode.transparent: (FirstLineFormat.relative,),
|
||||
HTTPMode.upstream: (FirstLineFormat.authority, FirstLineFormat.absolute),
|
||||
}
|
||||
|
||||
|
||||
def _make_request_from_event(event: h11.Request) -> http.HTTPRequest:
|
||||
if event.target == b"*" or event.target.startswith(b"/"):
|
||||
form = "relative"
|
||||
path = event.target
|
||||
scheme, host, port = None, None, None
|
||||
elif event.method == b"CONNECT":
|
||||
form = "authority"
|
||||
host, port = _parse_authority_form(event.target)
|
||||
scheme, path = None, None
|
||||
else:
|
||||
form = "absolute"
|
||||
scheme, host, port, path = url.parse(event.target)
|
||||
|
||||
return http.HTTPRequest(
|
||||
form,
|
||||
event.method,
|
||||
scheme,
|
||||
host,
|
||||
port,
|
||||
path,
|
||||
b"HTTP/" + event.http_version,
|
||||
event.headers,
|
||||
None,
|
||||
-1 # FIXME: first_byte_timestamp
|
||||
)
|
||||
|
||||
def _make_event_from_request(request: http.HTTPRequest) -> h11.Request:
|
||||
if request.first_line_format == FirstLineFormat.relative.value:
|
||||
target = request.path
|
||||
else:
|
||||
target = request.url
|
||||
return h11.Request(
|
||||
method=request.method,
|
||||
headers=request.headers.fields,
|
||||
http_version=request.http_version.replace("HTTP/", ""),
|
||||
target=target
|
||||
)
|
||||
|
||||
|
||||
def validate_request_form(
|
||||
mode: HTTPMode,
|
||||
first_line_format: FirstLineFormat,
|
||||
scheme: str
|
||||
) -> None:
|
||||
if first_line_format == FirstLineFormat.absolute and scheme != "http":
|
||||
raise ValueError(f"Invalid request scheme: {scheme}")
|
||||
|
||||
allowed_request_forms = MODE_REQUEST_FORMS[mode]
|
||||
if first_line_format not in allowed_request_forms:
|
||||
if mode == HTTPMode.transparent:
|
||||
desc = "HTTP CONNECT" if first_line_format == "authority" else "absolute-form"
|
||||
raise ValueError(
|
||||
f"""
|
||||
Mitmproxy received an {desc} request even though it is not running
|
||||
in regular mode. This usually indicates a misconfiguration,
|
||||
please see the mitmproxy mode documentation for details.
|
||||
"""
|
||||
)
|
||||
else:
|
||||
expected = ' or '.join(x.value for x in allowed_request_forms)
|
||||
raise ValueError(
|
||||
f"Invalid HTTP request form (expected: {expected}, got: {first_line_format})")
|
||||
|
||||
|
||||
class OldHTTPLayer(Layer):
|
||||
"""
|
||||
Simple TCP layer that just relays messages right now.
|
||||
"""
|
||||
context: Context = None
|
||||
mode: HTTPMode
|
||||
|
||||
# this is like a mini state machine.
|
||||
state: typing.Callable[[events.Event], commands.TCommandGenerator]
|
||||
|
||||
def __init__(self, context: Context, mode: HTTPMode):
|
||||
super().__init__(context)
|
||||
self.mode = mode
|
||||
|
||||
self.state = self.read_request_headers
|
||||
self.flow = http.HTTPFlow(self.context.client, self.context.server)
|
||||
self.client_conn = h11.Connection(h11.SERVER)
|
||||
self.server_conn = h11.Connection(h11.CLIENT)
|
||||
|
||||
# debug
|
||||
# \/ \/ \/ \/ \/ \/ \/ \/ \/ \/ \/ \/
|
||||
def log_event(orig):
|
||||
def next_event():
|
||||
e = orig()
|
||||
if False:
|
||||
yield commands.Log(f"[h11] {e}")
|
||||
return e
|
||||
|
||||
return next_event
|
||||
|
||||
self.client_conn.next_event = log_event(self.client_conn.next_event)
|
||||
self.server_conn.next_event = log_event(self.server_conn.next_event)
|
||||
# /\ /\ /\ /\ /\ /\ /\ /\ /\ /\ /\ /\
|
||||
# this is very preliminary: [request_events, response_events]
|
||||
self.flow_events = [[], []]
|
||||
|
||||
@expect(events.Start, events.DataReceived, events.ConnectionClosed)
|
||||
def _handle_event(self, event: events.Event) -> commands.TCommandGenerator:
|
||||
if isinstance(event, events.Start):
|
||||
return
|
||||
if isinstance(event, events.DataReceived):
|
||||
if event.connection == self.context.client:
|
||||
self.client_conn.receive_data(event.data)
|
||||
else:
|
||||
self.server_conn.receive_data(event.data)
|
||||
elif isinstance(event, events.ConnectionClosed):
|
||||
return warn("unimplemented: http.handle:close")
|
||||
|
||||
yield from self.state()
|
||||
|
||||
def read_request_headers(self):
|
||||
event = yield from self.client_conn.next_event()
|
||||
if event is h11.NEED_DATA:
|
||||
return
|
||||
elif isinstance(event, h11.Request):
|
||||
yield commands.Log(f"requestheaders: {event}")
|
||||
|
||||
if self.client_conn.client_is_waiting_for_100_continue:
|
||||
raise NotImplementedError()
|
||||
|
||||
self.flow.request = _make_request_from_event(event)
|
||||
validate_request_form(self.mode, FirstLineFormat(self.flow.request.first_line_format), self.flow.request.scheme)
|
||||
|
||||
yield commands.Hook("requestheaders", self.flow)
|
||||
|
||||
self.state = self.read_request_body
|
||||
yield from self.read_request_body() # there may already be further events.
|
||||
else:
|
||||
raise TypeError(f"Unexpected event: {event}")
|
||||
|
||||
def read_request_body(self):
|
||||
while True:
|
||||
event = yield from self.client_conn.next_event()
|
||||
if event is h11.NEED_DATA:
|
||||
return
|
||||
elif isinstance(event, h11.Data):
|
||||
self.flow_events[0].append(event)
|
||||
elif isinstance(event, h11.EndOfMessage):
|
||||
self.flow_events[0].append(event)
|
||||
yield commands.Log(f"request {self.flow_events}")
|
||||
|
||||
if self.flow.request.first_line_format == FirstLineFormat.authority.value:
|
||||
if self.mode == HTTPMode.regular:
|
||||
yield commands.Hook("http_connect", self.flow)
|
||||
self.context.server = context.Server(
|
||||
(self.flow.request.host, self.flow.request.port)
|
||||
)
|
||||
yield commands.SendData(
|
||||
self.context.client,
|
||||
b'%s 200 Connection established\r\n\r\n' % self.flow.request.data.http_version
|
||||
)
|
||||
child_layer = NextLayer(self.context)
|
||||
self._handle_event = child_layer.handle_event
|
||||
yield from child_layer.handle_event(events.Start())
|
||||
return
|
||||
|
||||
if self.mode == HTTPMode.upstream:
|
||||
raise NotImplementedError()
|
||||
elif self.flow.request.first_line_format == FirstLineFormat.absolute.value:
|
||||
if self.mode == HTTPMode.regular:
|
||||
self.context.server.address = (self.flow.request.host, self.flow.request.port)
|
||||
else:
|
||||
raise NotImplementedError()
|
||||
|
||||
yield from self._send_request()
|
||||
return
|
||||
else:
|
||||
raise TypeError(f"Unexpected event: {event}")
|
||||
|
||||
def _send_request(self):
|
||||
if not self.context.server.connected:
|
||||
err = yield commands.OpenConnection(self.context.server)
|
||||
if err:
|
||||
yield commands.Log(f"error {err}")
|
||||
yield commands.CloseConnection(self.context.client)
|
||||
self._handle_event = self.done
|
||||
return
|
||||
|
||||
self.flow_events[0].insert(0, _make_event_from_request(self.flow.request))
|
||||
for e in self.flow_events[0]:
|
||||
bytes_to_send = self.server_conn.send(e)
|
||||
if bytes_to_send:
|
||||
yield commands.SendData(self.context.server, bytes_to_send)
|
||||
self.state = self.read_response_headers
|
||||
|
||||
def read_response_headers(self):
|
||||
event = yield from self.server_conn.next_event()
|
||||
if event is h11.NEED_DATA:
|
||||
return
|
||||
elif isinstance(event, h11.Response):
|
||||
yield commands.Log(f"responseheaders {event}")
|
||||
|
||||
self.flow_events[1].append(event)
|
||||
self.state = self.read_response_body
|
||||
yield from self.read_response_body() # there may already be further events.
|
||||
elif isinstance(event, h11.InformationalResponse):
|
||||
self.flow.response.headers = net_http.Headers(event.headers)
|
||||
if event.status_code == 101 and websockets.check_handshake(self.flow.response.headers):
|
||||
child_layer = websocket.WebsocketLayer(self.context, self.flow)
|
||||
yield from child_layer.handle_event(events.Start())
|
||||
self._handle_event = child_layer.handle_event
|
||||
return
|
||||
else:
|
||||
raise TypeError(f"Unexpected event: {event}")
|
||||
|
||||
def read_response_body(self):
|
||||
while True:
|
||||
event = yield from self.server_conn.next_event()
|
||||
if event is h11.NEED_DATA:
|
||||
return
|
||||
elif isinstance(event, h11.Data):
|
||||
self.flow_events[1].append(event)
|
||||
elif isinstance(event, h11.EndOfMessage):
|
||||
self.flow_events[1].append(event)
|
||||
yield commands.Log(f"response {self.flow_events}")
|
||||
yield from self._send_response()
|
||||
return
|
||||
else:
|
||||
raise TypeError(f"Unexpected event: {event}")
|
||||
|
||||
def _send_response(self):
|
||||
for e in self.flow_events[1]:
|
||||
bytes_to_send = self.client_conn.send(e)
|
||||
if bytes_to_send:
|
||||
yield commands.SendData(self.context.client, bytes_to_send)
|
||||
|
||||
# reset for next request.
|
||||
self.state = self.read_request_headers
|
||||
self.flow_events = [[], []]
|
||||
self.client_conn.start_next_cycle()
|
||||
self.server_conn.start_next_cycle()
|
||||
|
||||
@expect(events.DataReceived, events.ConnectionClosed)
|
||||
def done(self, _):
|
||||
yield from ()
|
@ -1,301 +0,0 @@
|
||||
|
||||
class _TLSLayer(layer.Layer):
|
||||
send_buffer: MutableMapping[SSL.Connection, bytearray]
|
||||
tls: MutableMapping[context.Connection, SSL.Connection]
|
||||
child_layer: Optional[layer.Layer] = None
|
||||
|
||||
def __init__(self, context):
|
||||
super().__init__(context)
|
||||
self.send_buffer = {}
|
||||
self.tls = {}
|
||||
|
||||
def tls_interact(self, conn: context.Connection):
|
||||
while True:
|
||||
try:
|
||||
data = self.tls[conn].bio_read(65535)
|
||||
except SSL.WantReadError:
|
||||
# Okay, nothing more waiting to be sent.
|
||||
return
|
||||
else:
|
||||
yield commands.SendData(conn, data)
|
||||
|
||||
def send(
|
||||
self,
|
||||
send_command: commands.SendData,
|
||||
) -> commands.TCommandGenerator:
|
||||
tls_conn = self.tls[send_command.connection]
|
||||
if send_command.connection.tls_established:
|
||||
tls_conn.sendall(send_command.data)
|
||||
yield from self.tls_interact(send_command.connection)
|
||||
else:
|
||||
buf = self.send_buffer.setdefault(tls_conn, bytearray())
|
||||
buf.extend(send_command.data)
|
||||
|
||||
def negotiate(self, event: events.DataReceived) -> Generator[commands.Command, Any, bool]:
|
||||
"""
|
||||
Make sure to trigger processing if done!
|
||||
"""
|
||||
# bio_write errors for b"", so we need to check first if we actually received something.
|
||||
tls_conn = self.tls[event.connection]
|
||||
if event.data:
|
||||
tls_conn.bio_write(event.data)
|
||||
try:
|
||||
tls_conn.do_handshake()
|
||||
except SSL.WantReadError:
|
||||
yield from self.tls_interact(event.connection)
|
||||
return False
|
||||
else:
|
||||
event.connection.tls_established = True
|
||||
event.connection.alpn = tls_conn.get_alpn_proto_negotiated()
|
||||
print(f"TLS established: {event.connection}")
|
||||
# TODO: Set all other connection attributes here
|
||||
# there might already be data in the OpenSSL BIO, so we need to trigger its processing.
|
||||
yield from self.relay(events.DataReceived(event.connection, b""))
|
||||
if tls_conn in self.send_buffer:
|
||||
data_to_send = bytes(self.send_buffer.pop(tls_conn))
|
||||
yield from self.send(commands.SendData(event.connection, data_to_send))
|
||||
return True
|
||||
|
||||
def relay(self, event: events.DataReceived):
|
||||
tls_conn = self.tls[event.connection]
|
||||
if event.data:
|
||||
tls_conn.bio_write(event.data)
|
||||
yield from self.tls_interact(event.connection)
|
||||
|
||||
plaintext = bytearray()
|
||||
while True:
|
||||
try:
|
||||
plaintext.extend(tls_conn.recv(65535))
|
||||
except (SSL.WantReadError, SSL.ZeroReturnError):
|
||||
break
|
||||
|
||||
if plaintext:
|
||||
evt = events.DataReceived(event.connection, bytes(plaintext))
|
||||
# yield commands.Log(f"Plain{evt}")
|
||||
yield from self.event_to_child(evt)
|
||||
|
||||
def event_to_child(self, event: events.Event) -> commands.TCommandGenerator:
|
||||
for command in self.child_layer.handle_event(event):
|
||||
if isinstance(command, commands.SendData) and command.connection in self.tls:
|
||||
yield from self.send(command)
|
||||
else:
|
||||
yield command
|
||||
|
||||
|
||||
class ServerTLSLayer(_TLSLayer):
|
||||
"""
|
||||
This layer manages TLS for a single server connection.
|
||||
"""
|
||||
lazy_init: bool = False
|
||||
|
||||
def __init__(self, context: context.Context):
|
||||
super().__init__(context)
|
||||
self.child_layer = layer.NextLayer(context)
|
||||
|
||||
@expect(events.Start)
|
||||
def start(self, event: events.Start) -> commands.TCommandGenerator:
|
||||
yield from self.child_layer.handle_event(event)
|
||||
|
||||
server = self.context.server
|
||||
if server.tls:
|
||||
if server.connected:
|
||||
yield from self._start_tls(server)
|
||||
else:
|
||||
self.lazy_init = True
|
||||
self._handle_event = self.process
|
||||
|
||||
_handle_event = start
|
||||
|
||||
def process(self, event: events.Event) -> None:
|
||||
if isinstance(event, events.DataReceived) and event.connection in self.tls:
|
||||
if not event.connection.tls_established:
|
||||
yield from self.negotiate(event)
|
||||
else:
|
||||
yield from self.relay(event)
|
||||
elif isinstance(event, events.OpenConnectionReply):
|
||||
err = event.reply
|
||||
conn = event.command.connection
|
||||
if self.lazy_init and not err and conn == self.context.server:
|
||||
yield from self._start_tls(conn)
|
||||
yield from self.event_to_child(event)
|
||||
elif isinstance(event, events.ConnectionClosed):
|
||||
yield from self.event_to_child(event)
|
||||
self.send_buffer.pop(
|
||||
self.tls.pop(event.connection, None),
|
||||
None
|
||||
)
|
||||
else:
|
||||
yield from self.event_to_child(event)
|
||||
|
||||
def _start_tls(self, server: context.Server):
|
||||
ssl_context = SSL.Context(SSL.SSLv23_METHOD)
|
||||
|
||||
if server.alpn_offers:
|
||||
ssl_context.set_alpn_protos(server.alpn_offers)
|
||||
|
||||
self.tls[server] = SSL.Connection(ssl_context)
|
||||
|
||||
if server.sni:
|
||||
if server.sni is True:
|
||||
if self.context.client.sni:
|
||||
server.sni = self.context.client.sni
|
||||
else:
|
||||
server.sni = server.address[0]
|
||||
self.tls[server].set_tlsext_host_name(server.sni)
|
||||
self.tls[server].set_connect_state()
|
||||
|
||||
yield from self.process(events.DataReceived(server, b""))
|
||||
|
||||
|
||||
class ClientTLSLayer(_TLSLayer):
|
||||
"""
|
||||
This layer establishes TLS on a single client connection.
|
||||
|
||||
┌─────┐
|
||||
│Start│
|
||||
└┬────┘
|
||||
↓
|
||||
┌────────────────────┐
|
||||
│Wait for ClientHello│
|
||||
└┬───────────────────┘
|
||||
│ Do we need server TLS info
|
||||
│ to establish TLS with client?
|
||||
│ ┌───────────────────┐
|
||||
├─────→│Wait for Server TLS│
|
||||
│ yes └┬──────────────────┘
|
||||
│no │
|
||||
↓ ↓
|
||||
┌────────────────┐
|
||||
│Process messages│
|
||||
└────────────────┘
|
||||
|
||||
"""
|
||||
recv_buffer: bytearray
|
||||
|
||||
def __init__(self, context: context.Context):
|
||||
super().__init__(context)
|
||||
self.recv_buffer = bytearray()
|
||||
self.child_layer = ServerTLSLayer(self.context)
|
||||
|
||||
@expect(events.Start)
|
||||
def state_start(self, _) -> commands.TCommandGenerator:
|
||||
self.context.client.tls = True
|
||||
self._handle_event = self.state_wait_for_clienthello
|
||||
yield from ()
|
||||
|
||||
_handle_event = state_start
|
||||
|
||||
@expect(events.DataReceived, events.ConnectionClosed)
|
||||
def state_wait_for_clienthello(self, event: events.Event):
|
||||
client = self.context.client
|
||||
server = self.context.server
|
||||
if isinstance(event, events.DataReceived) and event.connection == client:
|
||||
self.recv_buffer.extend(event.data)
|
||||
try:
|
||||
client_hello = parse_client_hello(self.recv_buffer)
|
||||
except ValueError as e:
|
||||
raise NotImplementedError() from e # TODO
|
||||
|
||||
if client_hello:
|
||||
yield commands.Log(f"Client Hello: {client_hello}")
|
||||
|
||||
# TODO: Don't do double conversion
|
||||
client.sni = client_hello.sni.encode("idna")
|
||||
client.alpn_offers = client_hello.alpn_protocols
|
||||
|
||||
client_tls_requires_server_connection = (
|
||||
self.context.server.tls and
|
||||
self.context.options.upstream_cert and
|
||||
(
|
||||
self.context.options.add_upstream_certs_to_client_chain or
|
||||
# client.alpn_offers or
|
||||
not client.sni
|
||||
)
|
||||
)
|
||||
|
||||
# What do we do with the client connection now?
|
||||
if client_tls_requires_server_connection and not server.tls_established:
|
||||
yield from self.start_server_tls()
|
||||
self._handle_event = self.state_wait_for_server_tls
|
||||
else:
|
||||
yield from self.start_negotiate()
|
||||
self._handle_event = self.state_process
|
||||
|
||||
# In any case, we now have enough information to start server TLS if needed.
|
||||
yield from self.child_layer.handle_event(events.Start())
|
||||
else:
|
||||
raise NotImplementedError(event) # TODO
|
||||
|
||||
def state_wait_for_server_tls(self, event: events.Event):
|
||||
yield from self.event_to_child(event)
|
||||
# TODO: Handle case where TLS establishment fails.
|
||||
# We still need a good way to signal this - one possibility would be by closing
|
||||
# the connection?
|
||||
if self.context.server.tls_established:
|
||||
yield from self.start_negotiate()
|
||||
self._handle_event = self.state_process
|
||||
|
||||
def state_process(self, event: events.Event):
|
||||
if isinstance(event, events.DataReceived) and event.connection == self.context.client:
|
||||
if not self.context.client.tls_established:
|
||||
yield from self.negotiate(event)
|
||||
else:
|
||||
yield from self.relay(event)
|
||||
else:
|
||||
yield from self.event_to_child(event)
|
||||
|
||||
def start_server_tls(self):
|
||||
"""
|
||||
We often need information from the upstream connection to establish TLS with the client.
|
||||
For example, we need to check if the client does ALPN or not.
|
||||
"""
|
||||
if not self.context.server.connected:
|
||||
self.context.server.alpn_offers = [
|
||||
x for x in self.context.client.alpn_offers
|
||||
if not (x.startswith(b"h2-") or x.startswith(b"spdy"))
|
||||
]
|
||||
|
||||
err = yield commands.OpenConnection(self.context.server)
|
||||
if err:
|
||||
yield commands.Log(
|
||||
"Cannot establish server connection, which is required to establish TLS with the client."
|
||||
)
|
||||
|
||||
def start_negotiate(self):
|
||||
# FIXME: Do this properly
|
||||
client = self.context.client
|
||||
server = self.context.server
|
||||
context = SSL.Context(SSL.SSLv23_METHOD)
|
||||
cert, privkey, cert_chain = CertStore.from_store(
|
||||
os.path.expanduser("~/.mitmproxy"), "mitmproxy",
|
||||
self.context.options.key_size
|
||||
).get_cert(client.sni.encode(), (client.sni.encode(),))
|
||||
context.use_privatekey(privkey)
|
||||
context.use_certificate(cert.x509)
|
||||
context.set_cipher_list(tls.DEFAULT_CLIENT_CIPHERS)
|
||||
|
||||
def alpn_select_callback(conn_, options):
|
||||
if server.alpn in options:
|
||||
return server.alpn
|
||||
elif b"h2" in options:
|
||||
return b"h2"
|
||||
elif b"http/1.1" in options:
|
||||
return b"http/1.1"
|
||||
elif b"http/1.0" in options:
|
||||
return b"http/1.0"
|
||||
elif b"http/0.9" in options:
|
||||
return b"http/0.9"
|
||||
else:
|
||||
# FIXME: We MUST return something here. At this point we are at loss.
|
||||
# We probably need better checks when negotiating with the client.
|
||||
return options[0]
|
||||
|
||||
context.set_alpn_select_callback(alpn_select_callback)
|
||||
|
||||
self.tls[self.context.client] = SSL.Connection(context)
|
||||
self.tls[self.context.client].set_accept_state()
|
||||
|
||||
yield from self.state_process(events.DataReceived(
|
||||
client, bytes(self.recv_buffer)
|
||||
))
|
||||
self.recv_buffer = bytearray()
|
@ -1,50 +0,0 @@
|
||||
from mitmproxy.proxy2.layer import Layer
|
||||
from mitmproxy.proxy2.layers.old.http_commands import *
|
||||
from mitmproxy.proxy2.layers.old.http_events import *
|
||||
|
||||
class HTTPLayer(Layer):
|
||||
"""
|
||||
HTTP Semantics layer used by the on-the-wire layers for HTTP/1 and HTTP/2.
|
||||
"""
|
||||
|
||||
def _handle_event(self, event: HttpEvent):
|
||||
if isinstance(event, RequestHeaders):
|
||||
yield commands.Log(f"RequestHeadersReceived: {event}")
|
||||
|
||||
# This is blocking only this layer, none of the parent layers.
|
||||
yield commands.Hook("requestheaders", event.flow)
|
||||
yield commands.Log(f"Hook processed: {event}")
|
||||
|
||||
elif isinstance(event, RequestData):
|
||||
raise NotImplementedError
|
||||
elif isinstance(event, RequestComplete):
|
||||
yield commands.Log(f"RequestComplete: {event}")
|
||||
yield commands.Hook("request", event.flow)
|
||||
yield commands.Log(f"Hook processed: {event}")
|
||||
yield SendRequestHeaders(event.flow)
|
||||
# TODO yield SendRequestData()
|
||||
yield SendRequestComplete(event.flow)
|
||||
elif isinstance(event, ResponseHeaders):
|
||||
yield commands.Log(f"ResponseHeadersReceived: {event}")
|
||||
|
||||
# This is blocking only this layer, none of the parent layers.
|
||||
yield commands.Hook("responseheaders", event.flow)
|
||||
yield commands.Log(f"Hook processed: {event}")
|
||||
|
||||
elif isinstance(event, ResponseData):
|
||||
event.flow.response.raw_content = (
|
||||
(event.flow.response.raw_content or b"")
|
||||
+ event.data
|
||||
)
|
||||
elif isinstance(event, ResponseComplete):
|
||||
yield commands.Log(f"ResponseComplete: {event}")
|
||||
yield commands.Hook("response", event.flow)
|
||||
yield commands.Log(f"Hook processed: {event}")
|
||||
yield SendResponseHeaders(event.flow)
|
||||
yield SendResponseData(event.flow, event.flow.response.raw_content)
|
||||
yield SendResponseComplete(event.flow)
|
||||
|
||||
elif isinstance(event, events.ConnectionClosed):
|
||||
yield commands.Log(f"HTTPLayer unimplemented event: {event}", level="error")
|
||||
else:
|
||||
raise NotImplementedError(event)
|
Loading…
Reference in New Issue
Block a user