mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-29 19:08:44 +00:00
97 lines
2.1 KiB
Python
97 lines
2.1 KiB
Python
import typing
|
|
|
|
from mitmproxy import controller
|
|
from mitmproxy import flow
|
|
from mitmproxy import http
|
|
from mitmproxy import tcp
|
|
from mitmproxy import websocket
|
|
|
|
Events = frozenset([
|
|
"clientconnect",
|
|
"clientdisconnect",
|
|
"serverconnect",
|
|
"serverdisconnect",
|
|
# TCP
|
|
"tcp_start",
|
|
"tcp_message",
|
|
"tcp_error",
|
|
"tcp_end",
|
|
# HTTP
|
|
"http_connect",
|
|
"request",
|
|
"requestheaders",
|
|
"response",
|
|
"responseheaders",
|
|
"error",
|
|
# WebSocket
|
|
"websocket_handshake",
|
|
"websocket_start",
|
|
"websocket_message",
|
|
"websocket_error",
|
|
"websocket_end",
|
|
# misc
|
|
"next_layer",
|
|
"configure",
|
|
"done",
|
|
"log",
|
|
"load",
|
|
"running",
|
|
"tick",
|
|
"update",
|
|
])
|
|
|
|
TEventGenerator = typing.Iterator[typing.Tuple[str, typing.Any]]
|
|
|
|
|
|
def _iterate_http(f: http.HTTPFlow) -> TEventGenerator:
|
|
if f.request:
|
|
yield "requestheaders", f
|
|
yield "request", f
|
|
if f.response:
|
|
yield "responseheaders", f
|
|
yield "response", f
|
|
if f.error:
|
|
yield "error", f
|
|
|
|
|
|
def _iterate_websocket(f: websocket.WebSocketFlow) -> TEventGenerator:
|
|
messages = f.messages
|
|
f.messages = []
|
|
f.reply = controller.DummyReply()
|
|
yield "websocket_start", f
|
|
while messages:
|
|
f.messages.append(messages.pop(0))
|
|
yield "websocket_message", f
|
|
if f.error:
|
|
yield "websocket_error", f
|
|
yield "websocket_end", f
|
|
|
|
|
|
def _iterate_tcp(f: tcp.TCPFlow) -> TEventGenerator:
|
|
messages = f.messages
|
|
f.messages = []
|
|
f.reply = controller.DummyReply()
|
|
yield "tcp_start", f
|
|
while messages:
|
|
f.messages.append(messages.pop(0))
|
|
yield "tcp_message", f
|
|
if f.error:
|
|
yield "tcp_error", f
|
|
yield "tcp_end", f
|
|
|
|
|
|
_iterate_map = {
|
|
http.HTTPFlow: _iterate_http,
|
|
websocket.WebSocketFlow: _iterate_websocket,
|
|
tcp.TCPFlow: _iterate_tcp,
|
|
} # type: typing.Dict[typing.Type[flow.Flow], typing.Callable[[typing.Any], TEventGenerator]]
|
|
|
|
|
|
def iterate(f: flow.Flow) -> TEventGenerator:
|
|
try:
|
|
e = _iterate_map[type(f)]
|
|
except KeyError as err:
|
|
raise TypeError("Unknown flow type: {}".format(f)) from err
|
|
else:
|
|
yield from e(f)
|