mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-26 18:18:25 +00:00
Merge pull request #3059 from obscure76/enh/issue-3053
fix Python 3.6 variable type annotations #3053
This commit is contained in:
commit
5f74adc2df
@ -20,11 +20,11 @@ from mitmproxy import ctx
|
||||
from mitmproxy.utils import strutils
|
||||
from mitmproxy.net.http import cookies
|
||||
|
||||
HAR = {} # type: typing.Dict
|
||||
HAR: typing.Dict = {}
|
||||
|
||||
# A list of server seen till now is maintained so we can avoid
|
||||
# using 'connect' time for entries that use an existing connection.
|
||||
SERVERS_SEEN = set() # type: typing.Set[connections.ServerConnection]
|
||||
SERVERS_SEEN: typing.Set[connections.ServerConnection] = set()
|
||||
|
||||
|
||||
def load(l):
|
||||
@ -155,12 +155,12 @@ def done():
|
||||
Called once on script shutdown, after any other events.
|
||||
"""
|
||||
if ctx.options.hardump:
|
||||
json_dump = json.dumps(HAR, indent=2) # type: str
|
||||
json_dump: str = json.dumps(HAR, indent=2)
|
||||
|
||||
if ctx.options.hardump == '-':
|
||||
mitmproxy.ctx.log(json_dump)
|
||||
else:
|
||||
raw = json_dump.encode() # type: bytes
|
||||
raw: bytes = json_dump.encode()
|
||||
if ctx.options.hardump.endswith('.zhar'):
|
||||
raw = zlib.compress(raw, 9)
|
||||
|
||||
|
@ -9,7 +9,7 @@ import typing # noqa
|
||||
from mitmproxy import http
|
||||
|
||||
# set of SSL/TLS capable hosts
|
||||
secure_hosts = set() # type: typing.Set[str]
|
||||
secure_hosts: typing.Set[str] = set()
|
||||
|
||||
|
||||
def request(flow: http.HTTPFlow) -> None:
|
||||
|
@ -95,7 +95,7 @@ def find_unclaimed_URLs(body: str, requestUrl: bytes) -> None:
|
||||
return None
|
||||
|
||||
class ScriptURLExtractor(HTMLParser):
|
||||
script_URLs = [] # type: List[str]
|
||||
script_URLs: List[str] = []
|
||||
|
||||
def handle_starttag(self, tag, attrs):
|
||||
if (tag == "script" or tag == "iframe") and "src" in [name for name, value in attrs]:
|
||||
@ -254,7 +254,7 @@ def paths_to_text(html: str, string: str) -> List[str]:
|
||||
|
||||
class PathHTMLParser(HTMLParser):
|
||||
currentPath = ""
|
||||
paths = [] # type: List[str]
|
||||
paths: List[str] = []
|
||||
|
||||
def handle_starttag(self, tag, attrs):
|
||||
self.currentPath += ("/" + tag)
|
||||
|
@ -7,7 +7,7 @@ from mitmproxy import ctx, http
|
||||
|
||||
class Filter:
|
||||
def __init__(self):
|
||||
self.filter = None # type: flowfilter.TFilter
|
||||
self.filter: flowfilter.TFilter = None
|
||||
|
||||
def configure(self, updated):
|
||||
self.filter = flowfilter.parse(ctx.options.flowfilter)
|
||||
|
@ -13,7 +13,7 @@ import typing # noqa
|
||||
|
||||
class Writer:
|
||||
def __init__(self, path: str) -> None:
|
||||
self.f = open(path, "wb") # type: typing.IO[bytes]
|
||||
self.f: typing.IO[bytes] = open(path, "wb")
|
||||
self.w = io.FlowWriter(self.f)
|
||||
|
||||
def response(self, flow: http.HTTPFlow) -> None:
|
||||
|
@ -10,7 +10,7 @@ import typing
|
||||
|
||||
class ClientPlayback:
|
||||
def __init__(self):
|
||||
self.flows = [] # type: typing.List[flow.Flow]
|
||||
self.flows: typing.List[flow.Flow] = []
|
||||
self.current_thread = None
|
||||
self.configured = False
|
||||
|
||||
|
@ -179,7 +179,7 @@ class Core:
|
||||
"""
|
||||
Quickly set a number of common values on flows.
|
||||
"""
|
||||
val = sval # type: typing.Union[int, str]
|
||||
val: typing.Union[int, str] = sval
|
||||
if spec == "status_code":
|
||||
try:
|
||||
val = int(val) # type: ignore
|
||||
|
@ -24,7 +24,7 @@ def is_addr(v):
|
||||
|
||||
def extract(cut: str, f: flow.Flow) -> typing.Union[str, bytes]:
|
||||
path = cut.split(".")
|
||||
current = f # type: typing.Any
|
||||
current: typing.Any = f
|
||||
for i, spec in enumerate(path):
|
||||
if spec.startswith("_"):
|
||||
raise exceptions.CommandError("Can't access internal attribute %s" % spec)
|
||||
@ -65,7 +65,7 @@ class Cut:
|
||||
or "false", "bytes" are preserved, and all other values are
|
||||
converted to strings.
|
||||
"""
|
||||
ret = [] # type:typing.List[typing.List[typing.Union[str, bytes]]]
|
||||
ret: typing.List[typing.List[typing.Union[str, bytes]]] = []
|
||||
for f in flows:
|
||||
ret.append([extract(c, f) for c in cuts])
|
||||
return ret # type: ignore
|
||||
|
@ -28,8 +28,8 @@ def colorful(line, styles):
|
||||
|
||||
class Dumper:
|
||||
def __init__(self, outfile=sys.stdout):
|
||||
self.filter = None # type: flowfilter.TFilter
|
||||
self.outfp = outfile # type: typing.io.TextIO
|
||||
self.filter: flowfilter.TFilter = None
|
||||
self.outfp: typing.io.TextIO = outfile
|
||||
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
|
@ -9,7 +9,7 @@ from mitmproxy.log import LogEntry
|
||||
|
||||
class EventStore:
|
||||
def __init__(self, size=10000):
|
||||
self.data = collections.deque(maxlen=size) # type: typing.Deque[LogEntry]
|
||||
self.data: typing.Deque[LogEntry] = collections.deque(maxlen=size)
|
||||
self.sig_add = blinker.Signal()
|
||||
self.sig_refresh = blinker.Signal()
|
||||
|
||||
|
@ -77,7 +77,7 @@ class Export():
|
||||
"""
|
||||
if fmt not in formats:
|
||||
raise exceptions.CommandError("No such export format: %s" % fmt)
|
||||
func = formats[fmt] # type: typing.Any
|
||||
func: typing.Any = formats[fmt]
|
||||
v = func(f)
|
||||
try:
|
||||
with open(path, "wb") as fp:
|
||||
@ -95,7 +95,7 @@ class Export():
|
||||
"""
|
||||
if fmt not in formats:
|
||||
raise exceptions.CommandError("No such export format: %s" % fmt)
|
||||
func = formats[fmt] # type: typing.Any
|
||||
func: typing.Any = formats[fmt]
|
||||
v = strutils.always_str(func(f))
|
||||
try:
|
||||
pyperclip.copy(v)
|
||||
|
@ -49,7 +49,7 @@ class ProxyAuth:
|
||||
self.singleuser = None
|
||||
self.ldapconn = None
|
||||
self.ldapserver = None
|
||||
self.authenticated = weakref.WeakKeyDictionary() # type: MutableMapping[connections.ClientConnection, Tuple[str, str]]
|
||||
self.authenticated: MutableMapping[connections.ClientConnection, Tuple[str, str]] = weakref.WeakKeyDictionary()
|
||||
"""Contains all connections that are permanently authenticated after an HTTP CONNECT"""
|
||||
|
||||
def load(self, loader):
|
||||
|
@ -14,7 +14,7 @@ class Save:
|
||||
def __init__(self):
|
||||
self.stream = None
|
||||
self.filt = None
|
||||
self.active_flows = set() # type: Set[flow.Flow]
|
||||
self.active_flows: typing.Set[flow.Flow] = set()
|
||||
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
|
@ -111,7 +111,7 @@ class ServerPlayback:
|
||||
_, _, path, _, query, _ = urllib.parse.urlparse(r.url)
|
||||
queriesArray = urllib.parse.parse_qsl(query, keep_blank_values=True)
|
||||
|
||||
key = [str(r.port), str(r.scheme), str(r.method), str(path)] # type: List[Any]
|
||||
key: typing.List[typing.Any] = [str(r.port), str(r.scheme), str(r.method), str(path)]
|
||||
if not ctx.options.server_replay_ignore_content:
|
||||
if ctx.options.server_replay_ignore_payload_params and r.multipart_form:
|
||||
key.extend(
|
||||
|
@ -31,8 +31,8 @@ def domain_match(a: str, b: str) -> bool:
|
||||
|
||||
class StickyCookie:
|
||||
def __init__(self):
|
||||
self.jar = collections.defaultdict(dict) # type: Dict[TOrigin, Dict[str, str]]
|
||||
self.flt = None # type: Optional[flowfilter.TFilter]
|
||||
self.jar: Dict[TOrigin, Dict[str, str]] = collections.defaultdict(dict)
|
||||
self.flt: Optional[flowfilter.TFilter] = None
|
||||
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
@ -73,7 +73,7 @@ class StickyCookie:
|
||||
|
||||
def request(self, flow: http.HTTPFlow):
|
||||
if self.flt:
|
||||
cookie_list = [] # type: List[Tuple[str,str]]
|
||||
cookie_list: List[Tuple[str, str]] = []
|
||||
if flowfilter.match(self.flt, flow):
|
||||
for (domain, port, path), c in self.jar.items():
|
||||
match = [
|
||||
|
@ -532,7 +532,7 @@ class Focus:
|
||||
"""
|
||||
def __init__(self, v: View) -> None:
|
||||
self.view = v
|
||||
self._flow = None # type: mitmproxy.flow.Flow
|
||||
self._flow: mitmproxy.flow.Flow = None
|
||||
self.sig_change = blinker.Signal()
|
||||
if len(self.view):
|
||||
self.flow = self.view[0]
|
||||
@ -589,7 +589,7 @@ class Focus:
|
||||
class Settings(collections.Mapping):
|
||||
def __init__(self, view: View) -> None:
|
||||
self.view = view
|
||||
self._values = {} # type: typing.MutableMapping[str, typing.Dict]
|
||||
self._values: typing.MutableMapping[str, typing.Dict] = {}
|
||||
view.sig_store_remove.connect(self._sig_store_remove)
|
||||
view.sig_store_refresh.connect(self._sig_store_refresh)
|
||||
|
||||
|
@ -145,7 +145,7 @@ class CertStore:
|
||||
self.default_ca = default_ca
|
||||
self.default_chain_file = default_chain_file
|
||||
self.dhparams = dhparams
|
||||
self.certs = {} # type: typing.Dict[TCertId, CertStoreEntry]
|
||||
self.certs: typing.Dict[TCertId, CertStoreEntry] = {}
|
||||
self.expire_queue = []
|
||||
|
||||
def expire(self, entry):
|
||||
@ -298,7 +298,7 @@ class CertStore:
|
||||
sans: A list of Subject Alternate Names.
|
||||
"""
|
||||
|
||||
potential_keys = [] # type: typing.List[TCertId]
|
||||
potential_keys: typing.List[TCertId] = []
|
||||
if commonname:
|
||||
potential_keys.extend(self.asterisk_forms(commonname))
|
||||
for s in sans:
|
||||
|
@ -79,7 +79,7 @@ class Command:
|
||||
def prepare_args(self, args: typing.Sequence[str]) -> typing.List[typing.Any]:
|
||||
verify_arg_signature(self.func, list(args), {})
|
||||
|
||||
remainder = [] # type: typing.Sequence[str]
|
||||
remainder: typing.Sequence[str] = []
|
||||
if self.has_positional:
|
||||
remainder = args[len(self.paramtypes) - 1:]
|
||||
args = args[:len(self.paramtypes) - 1]
|
||||
@ -121,7 +121,7 @@ ParseResult = typing.NamedTuple(
|
||||
class CommandManager(mitmproxy.types._CommandBase):
|
||||
def __init__(self, master):
|
||||
self.master = master
|
||||
self.commands = {} # type: typing.Dict[str, Command]
|
||||
self.commands: typing.Dict[str, Command] = {}
|
||||
|
||||
def collect_commands(self, addon):
|
||||
for i in dir(addon):
|
||||
@ -146,7 +146,7 @@ class CommandManager(mitmproxy.types._CommandBase):
|
||||
Parse a possibly partial command. Return a sequence of ParseResults and a sequence of remainder type help items.
|
||||
"""
|
||||
buf = io.StringIO(cmdstr)
|
||||
parts = [] # type: typing.List[str]
|
||||
parts: typing.List[str] = []
|
||||
lex = lexer(buf)
|
||||
while 1:
|
||||
remainder = cmdstr[buf.tell():]
|
||||
@ -163,9 +163,9 @@ class CommandManager(mitmproxy.types._CommandBase):
|
||||
elif cmdstr.endswith(" "):
|
||||
parts.append("")
|
||||
|
||||
parse = [] # type: typing.List[ParseResult]
|
||||
params = [] # type: typing.List[type]
|
||||
typ = None # type: typing.Type
|
||||
parse: typing.List[ParseResult] = []
|
||||
params: typing.List[type] = []
|
||||
typ: typing.Type = None
|
||||
for i in range(len(parts)):
|
||||
if i == 0:
|
||||
typ = mitmproxy.types.Cmd
|
||||
@ -197,7 +197,7 @@ class CommandManager(mitmproxy.types._CommandBase):
|
||||
)
|
||||
)
|
||||
|
||||
remhelp = [] # type: typing.List[str]
|
||||
remhelp: typing.List[str] = []
|
||||
for x in params:
|
||||
remt = mitmproxy.types.CommandTypes.get(x, None)
|
||||
remhelp.append(remt.display)
|
||||
|
@ -26,8 +26,8 @@ from . import (
|
||||
)
|
||||
from .base import View, VIEW_CUTOFF, KEY_MAX, format_text, format_dict, TViewResult
|
||||
|
||||
views = [] # type: List[View]
|
||||
content_types_map = {} # type: Dict[str, List[View]]
|
||||
views: List[View] = []
|
||||
content_types_map: Dict[str, List[View]] = {}
|
||||
|
||||
|
||||
def get(name: str) -> Optional[View]:
|
||||
|
@ -11,8 +11,8 @@ TViewResult = typing.Tuple[str, typing.Iterator[TViewLine]]
|
||||
|
||||
|
||||
class View:
|
||||
name = None # type: str
|
||||
content_types = [] # type: typing.List[str]
|
||||
name: str = None
|
||||
content_types: typing.List[str] = []
|
||||
|
||||
def __call__(self, data: bytes, **metadata) -> TViewResult:
|
||||
"""
|
||||
|
@ -86,7 +86,7 @@ class Tag(Token):
|
||||
|
||||
|
||||
def tokenize(data: str) -> Iterable[Token]:
|
||||
token = Text("") # type: Token
|
||||
token: Token = Text("")
|
||||
|
||||
i = 0
|
||||
|
||||
|
@ -3,5 +3,5 @@ import mitmproxy.log # noqa
|
||||
import mitmproxy.options # noqa
|
||||
|
||||
master = None # type: mitmproxy.master.Master
|
||||
log = None # type: mitmproxy.log.Log
|
||||
options = None # type: mitmproxy.options.Options
|
||||
log: mitmproxy.log.Log = None
|
||||
options: mitmproxy.options.Options = None
|
||||
|
@ -80,11 +80,11 @@ def _iterate_tcp(f: tcp.TCPFlow) -> TEventGenerator:
|
||||
yield "tcp_end", f
|
||||
|
||||
|
||||
_iterate_map = {
|
||||
_iterate_map: typing.Dict[typing.Type[flow.Flow], typing.Callable[[typing.Any], TEventGenerator]] = {
|
||||
http.HTTPFlow: _iterate_http,
|
||||
websocket.WebSocketFlow: _iterate_websocket,
|
||||
tcp.TCPFlow: _iterate_tcp,
|
||||
} # type: typing.Dict[typing.Type[flow.Flow], typing.Callable[[typing.Any], TEventGenerator]]
|
||||
}
|
||||
|
||||
|
||||
def iterate(f: flow.Flow) -> TEventGenerator:
|
||||
|
@ -72,12 +72,12 @@ class Flow(stateobject.StateObject):
|
||||
self.server_conn = server_conn
|
||||
self.live = live
|
||||
|
||||
self.error = None # type: typing.Optional[Error]
|
||||
self.intercepted = False # type: bool
|
||||
self._backup = None # type: typing.Optional[Flow]
|
||||
self.reply = None # type: typing.Optional[controller.Reply]
|
||||
self.marked = False # type: bool
|
||||
self.metadata = dict() # type: typing.Dict[str, typing.Any]
|
||||
self.error: typing.Optional[Error] = None
|
||||
self.intercepted: bool = False
|
||||
self._backup: typing.Optional[Flow] = None
|
||||
self.reply: typing.Optional[controller.Reply] = None
|
||||
self.marked: bool = False
|
||||
self.metadata: typing.Dict[str, typing.Any] = dict()
|
||||
|
||||
_stateobject_attributes = dict(
|
||||
id=str,
|
||||
|
@ -69,8 +69,8 @@ class _Token:
|
||||
|
||||
|
||||
class _Action(_Token):
|
||||
code = None # type: str
|
||||
help = None # type: str
|
||||
code: str = None
|
||||
help: str = None
|
||||
|
||||
@classmethod
|
||||
def make(klass, s, loc, toks):
|
||||
@ -434,7 +434,7 @@ class FNot(_Token):
|
||||
return not self.itm(f)
|
||||
|
||||
|
||||
filter_unary = [
|
||||
filter_unary: Sequence[Type[_Action]] = [
|
||||
FAsset,
|
||||
FErr,
|
||||
FHTTP,
|
||||
@ -443,8 +443,8 @@ filter_unary = [
|
||||
FResp,
|
||||
FTCP,
|
||||
FWebSocket,
|
||||
] # type: Sequence[Type[_Action]]
|
||||
filter_rex = [
|
||||
]
|
||||
filter_rex: Sequence[Type[_Rex]] = [
|
||||
FBod,
|
||||
FBodRequest,
|
||||
FBodResponse,
|
||||
@ -459,7 +459,7 @@ filter_rex = [
|
||||
FMethod,
|
||||
FSrc,
|
||||
FUrl,
|
||||
] # type: Sequence[Type[_Rex]]
|
||||
]
|
||||
filter_int = [
|
||||
FCode
|
||||
]
|
||||
|
@ -145,22 +145,22 @@ class HTTPFlow(flow.Flow):
|
||||
def __init__(self, client_conn, server_conn, live=None, mode="regular"):
|
||||
super().__init__("http", client_conn, server_conn, live)
|
||||
|
||||
self.request = None # type: HTTPRequest
|
||||
self.request: HTTPRequest = None
|
||||
""" :py:class:`HTTPRequest` object """
|
||||
self.response = None # type: HTTPResponse
|
||||
self.response: HTTPResponse = None
|
||||
""" :py:class:`HTTPResponse` object """
|
||||
self.error = None # type: flow.Error
|
||||
self.error: flow.Error = None
|
||||
""" :py:class:`Error` object
|
||||
|
||||
Note that it's possible for a Flow to have both a response and an error
|
||||
object. This might happen, for instance, when a response was received
|
||||
from the server, but there was an error sending it back to the client.
|
||||
"""
|
||||
self.server_conn = server_conn # type: connections.ServerConnection
|
||||
self.server_conn: connections.ServerConnection = server_conn
|
||||
""" :py:class:`ServerConnection` object """
|
||||
self.client_conn = client_conn # type: connections.ClientConnection
|
||||
self.client_conn: connections.ClientConnection = client_conn
|
||||
""":py:class:`ClientConnection` object """
|
||||
self.intercepted = False # type: bool
|
||||
self.intercepted: bool = False
|
||||
""" Is this flow currently being intercepted? """
|
||||
self.mode = mode
|
||||
""" What mode was the proxy layer in when receiving this request? """
|
||||
|
@ -127,8 +127,8 @@ def convert_300_4(data):
|
||||
return data
|
||||
|
||||
|
||||
client_connections = {} # type: Mapping[str, str]
|
||||
server_connections = {} # type: Mapping[str, str]
|
||||
client_connections: Mapping[str, str] = {}
|
||||
server_connections: Mapping[str, str] = {}
|
||||
|
||||
|
||||
def convert_4_5(data):
|
||||
|
@ -11,11 +11,11 @@ from mitmproxy import websocket
|
||||
from mitmproxy.io import compat
|
||||
from mitmproxy.io import tnetstring
|
||||
|
||||
FLOW_TYPES = dict(
|
||||
FLOW_TYPES: Dict[str, Type[flow.Flow]] = dict(
|
||||
http=http.HTTPFlow,
|
||||
websocket=websocket.WebSocketFlow,
|
||||
tcp=tcp.TCPFlow,
|
||||
) # type: Dict[str, Type[flow.Flow]]
|
||||
)
|
||||
|
||||
|
||||
class FlowWriter:
|
||||
|
@ -53,7 +53,7 @@ def dumps(value: TSerializable) -> bytes:
|
||||
# This uses a deque to collect output fragments in reverse order,
|
||||
# then joins them together at the end. It's measurably faster
|
||||
# than creating all the intermediate strings.
|
||||
q = collections.deque() # type: collections.deque
|
||||
q: collections.deque = collections.deque()
|
||||
_rdumpq(q, 0, value)
|
||||
return b''.join(q)
|
||||
|
||||
|
@ -49,7 +49,7 @@ class Master:
|
||||
self.should_exit,
|
||||
)
|
||||
|
||||
self.options = opts or options.Options() # type: options.Options
|
||||
self.options: options.Options = opts or options.Options()
|
||||
self.commands = command.CommandManager(self)
|
||||
self.addons = addonmanager.AddonManager(self)
|
||||
self._server = None
|
||||
|
@ -135,8 +135,8 @@ def _read_set_cookie_pairs(s: str, off=0) -> Tuple[List[TPairs], int]:
|
||||
off: start offset
|
||||
specials: attributes that are treated specially
|
||||
"""
|
||||
cookies = [] # type: List[TPairs]
|
||||
pairs = [] # type: TPairs
|
||||
cookies: List[TPairs] = []
|
||||
pairs: TPairs = []
|
||||
|
||||
while True:
|
||||
lhs, off = _read_key(s, off, ";=,")
|
||||
|
@ -8,7 +8,7 @@ from mitmproxy.net.http import headers
|
||||
|
||||
|
||||
class MessageData(serializable.Serializable):
|
||||
content = None # type: bytes
|
||||
content: bytes = None
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, MessageData):
|
||||
@ -33,7 +33,7 @@ class MessageData(serializable.Serializable):
|
||||
|
||||
|
||||
class Message(serializable.Serializable):
|
||||
data = None # type: MessageData
|
||||
data: MessageData = None
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, Message):
|
||||
|
@ -241,9 +241,9 @@ def create_client_context(
|
||||
# Verify hostname of leaf certificate.
|
||||
cert = certs.Cert(x509)
|
||||
try:
|
||||
crt = dict(
|
||||
crt: typing.Dict[str, typing.Any] = dict(
|
||||
subjectAltName=[("DNS", x.decode("ascii", "strict")) for x in cert.altnames]
|
||||
) # type: typing.Dict[str, typing.Any]
|
||||
)
|
||||
if cert.cn:
|
||||
crt["subject"] = [[["commonName", cert.cn.decode("ascii", "strict")]]]
|
||||
if sni:
|
||||
|
@ -203,7 +203,7 @@ class TransparentProxy:
|
||||
self.request_filter = custom_filter or " or ".join(
|
||||
("tcp.DstPort == %d" %
|
||||
p) for p in redirect_ports)
|
||||
self.request_forward_handle = None # type: pydivert.WinDivert
|
||||
self.request_forward_handle: pydivert.WinDivert = None
|
||||
self.request_forward_thread = threading.Thread(
|
||||
target=self.request_forward)
|
||||
self.request_forward_thread.daemon = True
|
||||
@ -212,18 +212,18 @@ class TransparentProxy:
|
||||
self.trusted_pids = set()
|
||||
self.tcptable2 = MIB_TCPTABLE2(0)
|
||||
self.tcptable2_size = ctypes.wintypes.DWORD(0)
|
||||
self.request_local_handle = None # type: pydivert.WinDivert
|
||||
self.request_local_handle: pydivert.WinDivert = None
|
||||
self.request_local_thread = threading.Thread(target=self.request_local)
|
||||
self.request_local_thread.daemon = True
|
||||
|
||||
# The proxy server responds to the client. To the client,
|
||||
# this response should look like it has been sent by the real target
|
||||
self.response_filter = "outbound and tcp.SrcPort == %d" % proxy_port
|
||||
self.response_handle = None # type: pydivert.WinDivert
|
||||
self.response_handle: pydivert.WinDivert = None
|
||||
self.response_thread = threading.Thread(target=self.response)
|
||||
self.response_thread.daemon = True
|
||||
|
||||
self.icmp_handle = None # type: pydivert.WinDivert
|
||||
self.icmp_handle: pydivert.WinDivert = None
|
||||
|
||||
@classmethod
|
||||
def setup(cls):
|
||||
|
@ -36,10 +36,10 @@ class ProxyConfig:
|
||||
def __init__(self, options: moptions.Options) -> None:
|
||||
self.options = options
|
||||
|
||||
self.check_ignore = None # type: HostMatcher
|
||||
self.check_tcp = None # type: HostMatcher
|
||||
self.certstore = None # type: certs.CertStore
|
||||
self.upstream_server = None # type: typing.Optional[server_spec.ServerSpec]
|
||||
self.check_ignore: HostMatcher = None
|
||||
self.check_tcp: HostMatcher = None
|
||||
self.certstore: certs.CertStore = None
|
||||
self.upstream_server: typing.Optional[server_spec.ServerSpec] = None
|
||||
self.configure(options, set(options.keys()))
|
||||
options.changed.connect(self.configure)
|
||||
|
||||
|
@ -14,10 +14,10 @@ class _LayerCodeCompletion:
|
||||
super().__init__(**mixin_args)
|
||||
if True:
|
||||
return
|
||||
self.config = None # type: config.ProxyConfig
|
||||
self.client_conn = None # type: connections.ClientConnection
|
||||
self.server_conn = None # type: connections.ServerConnection
|
||||
self.channel = None # type: controller.Channel
|
||||
self.config: config.ProxyConfig = None
|
||||
self.client_conn: connections.ClientConnection = None
|
||||
self.server_conn: connections.ServerConnection = None
|
||||
self.channel: controller.Channel = None
|
||||
self.ctx = None
|
||||
"""@type: mitmproxy.proxy.protocol.Layer"""
|
||||
|
||||
|
@ -160,12 +160,12 @@ class HttpLayer(base.Layer):
|
||||
|
||||
if False:
|
||||
# mypy type hints
|
||||
server_conn = None # type: connections.ServerConnection
|
||||
server_conn: connections.ServerConnection = None
|
||||
|
||||
def __init__(self, ctx, mode):
|
||||
super().__init__(ctx)
|
||||
self.mode = mode
|
||||
self.__initial_server_address = None # type: tuple
|
||||
self.__initial_server_address: tuple = None
|
||||
"Contains the original destination in transparent mode, which needs to be restored"
|
||||
"if an inline script modified the target server for a single http request"
|
||||
# We cannot rely on server_conn.tls_established,
|
||||
|
@ -85,14 +85,14 @@ class Http2Layer(base.Layer):
|
||||
|
||||
if False:
|
||||
# mypy type hints
|
||||
client_conn = None # type: connections.ClientConnection
|
||||
client_conn: connections.ClientConnection = None
|
||||
|
||||
def __init__(self, ctx, mode: str) -> None:
|
||||
super().__init__(ctx)
|
||||
self.mode = mode
|
||||
self.streams = dict() # type: Dict[int, Http2SingleStreamLayer]
|
||||
self.server_to_client_stream_ids = dict([(0, 0)]) # type: Dict[int, int]
|
||||
self.connections = {} # type: Dict[object, SafeH2Connection]
|
||||
self.streams: Dict[int, Http2SingleStreamLayer] = dict()
|
||||
self.server_to_client_stream_ids: Dict[int, int] = dict([(0, 0)])
|
||||
self.connections: Dict[object, SafeH2Connection] = {}
|
||||
|
||||
config = h2.config.H2Configuration(
|
||||
client_side=False,
|
||||
@ -382,32 +382,32 @@ class Http2SingleStreamLayer(httpbase._HttpTransmissionLayer, basethread.BaseThr
|
||||
ctx, name="Http2SingleStreamLayer-{}".format(stream_id)
|
||||
)
|
||||
self.h2_connection = h2_connection
|
||||
self.zombie = None # type: float
|
||||
self.client_stream_id = stream_id # type: int
|
||||
self.server_stream_id = None # type: int
|
||||
self.zombie: float = None
|
||||
self.client_stream_id: int = stream_id
|
||||
self.server_stream_id: int = None
|
||||
self.request_headers = request_headers
|
||||
self.response_headers = None # type: mitmproxy.net.http.Headers
|
||||
self.response_headers: mitmproxy.net.http.Headers = None
|
||||
self.pushed = False
|
||||
|
||||
self.timestamp_start = None # type: float
|
||||
self.timestamp_end = None # type: float
|
||||
self.timestamp_start: float = None
|
||||
self.timestamp_end: float = None
|
||||
|
||||
self.request_arrived = threading.Event()
|
||||
self.request_data_queue = queue.Queue() # type: queue.Queue[bytes]
|
||||
self.request_data_queue: queue.Queue[bytes] = queue.Queue()
|
||||
self.request_queued_data_length = 0
|
||||
self.request_data_finished = threading.Event()
|
||||
|
||||
self.response_arrived = threading.Event()
|
||||
self.response_data_queue = queue.Queue() # type: queue.Queue[bytes]
|
||||
self.response_data_queue: queue.Queue[bytes] = queue.Queue()
|
||||
self.response_queued_data_length = 0
|
||||
self.response_data_finished = threading.Event()
|
||||
|
||||
self.no_body = False
|
||||
|
||||
self.priority_exclusive = None # type: bool
|
||||
self.priority_depends_on = None # type: int
|
||||
self.priority_weight = None # type: int
|
||||
self.handled_priority_event = None # type: Any
|
||||
self.priority_exclusive: bool = None
|
||||
self.priority_depends_on: int = None
|
||||
self.priority_weight: int = None
|
||||
self.handled_priority_event: Any = None
|
||||
|
||||
def kill(self):
|
||||
if not self.zombie:
|
||||
|
@ -227,7 +227,7 @@ class TlsLayer(base.Layer):
|
||||
self._server_tls = server_tls
|
||||
|
||||
self._custom_server_sni = custom_server_sni
|
||||
self._client_hello = None # type: Optional[net_tls.ClientHello]
|
||||
self._client_hello: Optional[net_tls.ClientHello] = None
|
||||
|
||||
def __call__(self):
|
||||
"""
|
||||
|
@ -44,12 +44,12 @@ class WebSocketLayer(base.Layer):
|
||||
def __init__(self, ctx, handshake_flow):
|
||||
super().__init__(ctx)
|
||||
self.handshake_flow = handshake_flow
|
||||
self.flow = None # type: WebSocketFlow
|
||||
self.flow: WebSocketFlow = None
|
||||
|
||||
self.client_frame_buffer = []
|
||||
self.server_frame_buffer = []
|
||||
|
||||
self.connections = {} # type: Dict[object, WSConnection]
|
||||
self.connections: dict[object, WSConnection] = {}
|
||||
|
||||
extensions = []
|
||||
if 'Sec-WebSocket-Extensions' in handshake_flow.response.headers:
|
||||
|
@ -53,7 +53,7 @@ class ProxyServer(tcp.TCPServer):
|
||||
raise exceptions.ServerException(
|
||||
'Error starting proxy server: ' + repr(e)
|
||||
) from e
|
||||
self.channel = None # type: controller.Channel
|
||||
self.channel: controller.Channel = None
|
||||
|
||||
def set_channel(self, channel):
|
||||
self.channel = channel
|
||||
@ -71,7 +71,7 @@ class ProxyServer(tcp.TCPServer):
|
||||
class ConnectionHandler:
|
||||
|
||||
def __init__(self, client_conn, client_address, config, channel):
|
||||
self.config = config # type: config.ProxyConfig
|
||||
self.config: config.ProxyConfig = config
|
||||
self.client_conn = connections.ClientConnection(
|
||||
client_conn,
|
||||
client_address,
|
||||
|
@ -14,7 +14,7 @@ class StateObject(serializable.Serializable):
|
||||
or StateObject instances themselves.
|
||||
"""
|
||||
|
||||
_stateobject_attributes = None # type: MutableMapping[str, Any]
|
||||
_stateobject_attributes: MutableMapping[str, Any] = None
|
||||
"""
|
||||
An attribute-name -> class-or-type dict containing all attributes that
|
||||
should be serialized. If the attribute is a class, it must implement the
|
||||
|
@ -38,7 +38,7 @@ class TCPFlow(flow.Flow):
|
||||
|
||||
def __init__(self, client_conn, server_conn, live=None):
|
||||
super().__init__("tcp", client_conn, server_conn, live)
|
||||
self.messages = [] # type: List[TCPMessage]
|
||||
self.messages: List[TCPMessage] = []
|
||||
|
||||
_stateobject_attributes = flow.Flow._stateobject_attributes.copy()
|
||||
_stateobject_attributes["messages"] = List[TCPMessage]
|
||||
|
@ -23,7 +23,7 @@ class ListCompleter(Completer):
|
||||
options: typing.Sequence[str],
|
||||
) -> None:
|
||||
self.start = start
|
||||
self.options = [] # type: typing.Sequence[str]
|
||||
self.options: typing.Sequence[str] = []
|
||||
for o in options:
|
||||
if o.startswith(start):
|
||||
self.options.append(o)
|
||||
@ -53,7 +53,7 @@ class CommandBuffer:
|
||||
self.text = self.flatten(start)
|
||||
# Cursor is always within the range [0:len(buffer)].
|
||||
self._cursor = len(self.text)
|
||||
self.completion = None # type: CompletionState
|
||||
self.completion: CompletionState = None
|
||||
|
||||
@property
|
||||
def cursor(self) -> int:
|
||||
|
@ -74,7 +74,7 @@ class FlowListBox(urwid.ListBox, layoutwidget.LayoutWidget):
|
||||
def __init__(
|
||||
self, master: "mitmproxy.tools.console.master.ConsoleMaster"
|
||||
) -> None:
|
||||
self.master = master # type: "mitmproxy.tools.console.master.ConsoleMaster"
|
||||
self.master: "mitmproxy.tools.console.master.ConsoleMaster" = master
|
||||
super().__init__(FlowListWalker(master))
|
||||
|
||||
def keypress(self, size, key):
|
||||
|
@ -39,7 +39,7 @@ class Cell(urwid.WidgetWrap):
|
||||
|
||||
|
||||
class Column(metaclass=abc.ABCMeta):
|
||||
subeditor = None # type: urwid.Edit
|
||||
subeditor: urwid.Edit = None
|
||||
|
||||
def __init__(self, heading):
|
||||
self.heading = heading
|
||||
@ -71,10 +71,10 @@ class GridRow(urwid.WidgetWrap):
|
||||
) -> None:
|
||||
self.focused = focused
|
||||
self.editor = editor
|
||||
self.edit_col = None # type: typing.Optional[Cell]
|
||||
self.edit_col: typing.Optional[Cell] = None
|
||||
|
||||
errors = values[1]
|
||||
self.fields = [] # type: typing.Sequence[typing.Any]
|
||||
self.fields: typing.Sequence[typing.Any] = []
|
||||
for i, v in enumerate(values[0]):
|
||||
if focused == i and editing:
|
||||
self.edit_col = self.editor.columns[i].Edit(v)
|
||||
@ -123,11 +123,11 @@ class GridWalker(urwid.ListWalker):
|
||||
lst: typing.Iterable[list],
|
||||
editor: "GridEditor"
|
||||
) -> None:
|
||||
self.lst = [(i, set()) for i in lst] # type: typing.Sequence[typing.Tuple[typing.Any, typing.Set]]
|
||||
self.lst: typing.Sequence[typing.Tuple[typing.Any, typing.Set]] = [(i, set()) for i in lst]
|
||||
self.editor = editor
|
||||
self.focus = 0
|
||||
self.focus_col = 0
|
||||
self.edit_row = None # type: typing.Optional[GridRow]
|
||||
self.edit_row: typing.Optional[GridRow] = None
|
||||
|
||||
def _modified(self):
|
||||
self.editor.show_empty_msg()
|
||||
@ -402,8 +402,8 @@ class BaseGridEditor(urwid.WidgetWrap):
|
||||
|
||||
|
||||
class GridEditor(BaseGridEditor):
|
||||
title = None # type: str
|
||||
columns = None # type: typing.Sequence[Column]
|
||||
title: str = None
|
||||
columns: typing.Sequence[Column] = None
|
||||
keyctx = "grideditor"
|
||||
|
||||
def __init__(
|
||||
|
@ -107,7 +107,7 @@ class CookieAttributeEditor(base.FocusEditor):
|
||||
col_text.Column("Name"),
|
||||
col_text.Column("Value"),
|
||||
]
|
||||
grideditor = None # type: base.BaseGridEditor
|
||||
grideditor: base.BaseGridEditor = None
|
||||
|
||||
def data_in(self, data):
|
||||
return [(k, v or "") for k, v in data]
|
||||
@ -169,7 +169,7 @@ class SetCookieEditor(base.FocusEditor):
|
||||
|
||||
|
||||
class OptionsEditor(base.GridEditor, layoutwidget.LayoutWidget):
|
||||
title = None # type: str
|
||||
title: str = None
|
||||
columns = [
|
||||
col_text.Column("")
|
||||
]
|
||||
@ -189,7 +189,7 @@ class OptionsEditor(base.GridEditor, layoutwidget.LayoutWidget):
|
||||
|
||||
|
||||
class DataViewer(base.GridEditor, layoutwidget.LayoutWidget):
|
||||
title = None # type: str
|
||||
title: str = None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
@ -35,9 +35,9 @@ class ConsoleMaster(master.Master):
|
||||
def __init__(self, opts):
|
||||
super().__init__(opts)
|
||||
|
||||
self.start_err = None # type: typing.Optional[log.LogEntry]
|
||||
self.start_err: typing.Optional[log.LogEntry] = None
|
||||
|
||||
self.view = view.View() # type: view.View
|
||||
self.view: view.View = view.View()
|
||||
self.events = eventstore.EventStore()
|
||||
self.events.sig_add.connect(self.sig_add_log)
|
||||
|
||||
|
@ -36,7 +36,7 @@ class Palette:
|
||||
# Commander
|
||||
'commander_command', 'commander_invalid', 'commander_hint'
|
||||
]
|
||||
high = None # type: typing.Mapping[str, typing.Sequence[str]]
|
||||
high: typing.Mapping[str, typing.Sequence[str]] = None
|
||||
|
||||
def palette(self, transparent):
|
||||
l = []
|
||||
|
@ -92,7 +92,7 @@ def run(
|
||||
try:
|
||||
unknown = optmanager.load_paths(opts, args.conf)
|
||||
pconf = process_options(parser, opts, args)
|
||||
server = None # type: typing.Any
|
||||
server: typing.Any = None
|
||||
if pconf.options.server:
|
||||
try:
|
||||
server = proxy.server.ProxyServer(pconf)
|
||||
|
@ -191,7 +191,7 @@ class FilterHelp(RequestHandler):
|
||||
|
||||
class WebSocketEventBroadcaster(tornado.websocket.WebSocketHandler):
|
||||
# raise an error if inherited class doesn't specify its own instance.
|
||||
connections = None # type: set
|
||||
connections: set = None
|
||||
|
||||
def open(self):
|
||||
self.connections.add(self)
|
||||
@ -211,7 +211,7 @@ class WebSocketEventBroadcaster(tornado.websocket.WebSocketHandler):
|
||||
|
||||
|
||||
class ClientConnection(WebSocketEventBroadcaster):
|
||||
connections = set() # type: set
|
||||
connections: set = set()
|
||||
|
||||
|
||||
class Flows(RequestHandler):
|
||||
|
@ -45,7 +45,7 @@ class Choice:
|
||||
# Rather than putting types and the CommandManger in the same file, we introduce
|
||||
# a stub type with the signature we use.
|
||||
class _CommandBase:
|
||||
commands = {} # type: typing.MutableMapping[str, typing.Any]
|
||||
commands: typing.MutableMapping[str, typing.Any] = {}
|
||||
|
||||
def call_args(self, path: str, args: typing.Sequence[str]) -> typing.Any:
|
||||
raise NotImplementedError
|
||||
@ -55,8 +55,8 @@ class _CommandBase:
|
||||
|
||||
|
||||
class _BaseType:
|
||||
typ = object # type: typing.Type
|
||||
display = "" # type: str
|
||||
typ: typing.Type = object
|
||||
display: str = ""
|
||||
|
||||
def completion(
|
||||
self, manager: _CommandBase, t: typing.Any, s: str
|
||||
@ -286,7 +286,7 @@ class _CutSpecType(_BaseType):
|
||||
return opts
|
||||
|
||||
def parse(self, manager: _CommandBase, t: type, s: str) -> CutSpec:
|
||||
parts = s.split(",") # type: typing.Any
|
||||
parts: typing.Any = s.split(",")
|
||||
return parts
|
||||
|
||||
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
|
||||
|
@ -24,7 +24,7 @@ class WebSocketMessage(serializable.Serializable):
|
||||
"""True if this messages was sent by the client."""
|
||||
self.content = content
|
||||
"""A byte-string representing the content of this message."""
|
||||
self.timestamp = timestamp or int(time.time()) # type: int
|
||||
self.timestamp: int = timestamp or int(time.time())
|
||||
"""Timestamp of when this message was received or created."""
|
||||
self.killed = killed
|
||||
"""True if this messages was killed and should not be sent to the other endpoint."""
|
||||
@ -63,7 +63,7 @@ class WebSocketFlow(flow.Flow):
|
||||
def __init__(self, client_conn, server_conn, handshake_flow, live=None):
|
||||
super().__init__("websocket", client_conn, server_conn, live)
|
||||
|
||||
self.messages = [] # type: List[WebSocketMessage]
|
||||
self.messages: List[WebSocketMessage] = []
|
||||
"""A list containing all WebSocketMessage's."""
|
||||
self.close_sender = 'client'
|
||||
"""'client' if the client initiated connection closing."""
|
||||
|
@ -334,7 +334,7 @@ class OptionsOrValue(_Component):
|
||||
Can be any of a specified set of options, or a value specifier.
|
||||
"""
|
||||
preamble = ""
|
||||
options = [] # type: typing.List[str]
|
||||
options: typing.List[str] = []
|
||||
|
||||
def __init__(self, value):
|
||||
# If it's a string, we were passed one of the options, so we lower-case
|
||||
@ -376,7 +376,7 @@ class OptionsOrValue(_Component):
|
||||
|
||||
|
||||
class Integer(_Component):
|
||||
bounds = (None, None) # type: typing.Tuple[typing.Optional[int], typing.Optional[int]]
|
||||
bounds: typing.Tuple[typing.Optional[int], typing.Optional[int]] = (None, None)
|
||||
preamble = ""
|
||||
|
||||
def __init__(self, value):
|
||||
@ -442,7 +442,7 @@ class FixedLengthValue(Value):
|
||||
A value component lead by an optional preamble.
|
||||
"""
|
||||
preamble = ""
|
||||
length = None # type: typing.Optional[int]
|
||||
length: typing.Optional[int] = None
|
||||
|
||||
def __init__(self, value):
|
||||
Value.__init__(self, value)
|
||||
@ -511,7 +511,7 @@ class IntField(_Component):
|
||||
"""
|
||||
An integer field, where values can optionally specified by name.
|
||||
"""
|
||||
names = {} # type: typing.Dict[str, int]
|
||||
names: typing.Dict[str, int] = {}
|
||||
max = 16
|
||||
preamble = ""
|
||||
|
||||
|
@ -11,7 +11,7 @@ LOG_TRUNCATE = 1024
|
||||
|
||||
class Message:
|
||||
__metaclass__ = abc.ABCMeta
|
||||
logattrs = [] # type: typing.List[str]
|
||||
logattrs: typing.List[str] = []
|
||||
|
||||
def __init__(self, tokens):
|
||||
track = set([])
|
||||
@ -106,7 +106,7 @@ class NestedMessage(base.Token):
|
||||
A nested message, as an escaped string with a preamble.
|
||||
"""
|
||||
preamble = ""
|
||||
nest_type = None # type: typing.Optional[typing.Type[Message]]
|
||||
nest_type: typing.Optional[typing.Type[Message]] = None
|
||||
|
||||
def __init__(self, value):
|
||||
super().__init__()
|
||||
|
@ -16,14 +16,14 @@ class WF(base.CaselessLiteral):
|
||||
|
||||
|
||||
class OpCode(base.IntField):
|
||||
names = {
|
||||
names: typing.Dict[str, int] = {
|
||||
"continue": mitmproxy.net.websockets.OPCODE.CONTINUE,
|
||||
"text": mitmproxy.net.websockets.OPCODE.TEXT,
|
||||
"binary": mitmproxy.net.websockets.OPCODE.BINARY,
|
||||
"close": mitmproxy.net.websockets.OPCODE.CLOSE,
|
||||
"ping": mitmproxy.net.websockets.OPCODE.PING,
|
||||
"pong": mitmproxy.net.websockets.OPCODE.PONG,
|
||||
} # type: typing.Dict[str, int]
|
||||
}
|
||||
max = 15
|
||||
preamble = "c"
|
||||
|
||||
@ -97,7 +97,7 @@ COMPONENTS = [
|
||||
|
||||
|
||||
class WebsocketFrame(message.Message):
|
||||
components = COMPONENTS # type: typing.List[typing.Type[base._Component]]
|
||||
components: typing.List[typing.Type[base._Component]] = COMPONENTS
|
||||
logattrs = ["body"]
|
||||
# Used for nested frames
|
||||
unique_name = "body"
|
||||
|
@ -69,7 +69,7 @@ class SSLOptions:
|
||||
|
||||
class PathodHandler(tcp.BaseHandler):
|
||||
wbufsize = 0
|
||||
sni = None # type: typing.Union[str, None, bool]
|
||||
sni: typing.Union[str, None, bool] = None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
@ -10,7 +10,7 @@ class Daemon:
|
||||
IFACE = "127.0.0.1"
|
||||
|
||||
def __init__(self, ssl=None, **daemonargs) -> None:
|
||||
self.q = queue.Queue() # type: queue.Queue
|
||||
self.q: queue.Queue = queue.Queue()
|
||||
self.logfp = io.StringIO()
|
||||
daemonargs["logfp"] = self.logfp
|
||||
self.thread = _PaThread(self.IFACE, self.q, ssl, daemonargs)
|
||||
|
@ -11,7 +11,7 @@ class MemBool:
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.v = None # type: typing.Optional[bool]
|
||||
self.v: typing.Optional[bool] = None
|
||||
|
||||
def __call__(self, v: bool) -> bool:
|
||||
self.v = v
|
||||
|
@ -9,7 +9,7 @@ def print_typehints(opts):
|
||||
for name, option in sorted(opts.items()):
|
||||
print(
|
||||
# For Python 3.6, we can just use "{}: {}".
|
||||
"{} = None # type: {}".format(
|
||||
"{}: {} = None".format(
|
||||
name,
|
||||
{
|
||||
int: "int",
|
||||
|
Loading…
Reference in New Issue
Block a user