mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-22 15:37:45 +00:00
Merge pull request #2358 from mhils/make-mypy-great-again
Fix mypy annotations, update mypy
This commit is contained in:
commit
e7f7a608c6
1
.gitignore
vendored
1
.gitignore
vendored
@ -23,3 +23,4 @@ sslkeylogfile.log
|
||||
.python-version
|
||||
coverage.xml
|
||||
web/coverage/
|
||||
.mypy_cache/
|
||||
|
@ -3,10 +3,6 @@ This example shows how one can add a custom contentview to mitmproxy.
|
||||
The content view API is explained in the mitmproxy.contentviews module.
|
||||
"""
|
||||
from mitmproxy import contentviews
|
||||
import typing
|
||||
|
||||
|
||||
CVIEWSWAPCASE = typing.Tuple[str, typing.Iterable[typing.List[typing.Tuple[str, typing.AnyStr]]]]
|
||||
|
||||
|
||||
class ViewSwapCase(contentviews.View):
|
||||
@ -17,7 +13,7 @@ class ViewSwapCase(contentviews.View):
|
||||
prompt = ("swap case text", "z")
|
||||
content_types = ["text/plain"]
|
||||
|
||||
def __call__(self, data: typing.AnyStr, **metadata) -> CVIEWSWAPCASE:
|
||||
def __call__(self, data, **metadata) -> contentviews.TViewResult:
|
||||
return "case-swapped text", contentviews.format_text(data.swapcase())
|
||||
|
||||
|
||||
|
@ -1,6 +1,4 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# type: ignore
|
||||
#
|
||||
# Simple script showing how to read a mitmproxy dump file
|
||||
#
|
||||
|
@ -13,15 +13,15 @@ import typing # noqa
|
||||
|
||||
class Writer:
|
||||
def __init__(self, path: str) -> None:
|
||||
if path == "-":
|
||||
f = sys.stdout # type: typing.IO[typing.Any]
|
||||
else:
|
||||
f = open(path, "wb")
|
||||
self.w = io.FlowWriter(f)
|
||||
self.f = open(path, "wb") # type: typing.IO[bytes]
|
||||
self.w = io.FlowWriter(self.f)
|
||||
|
||||
def response(self, flow: http.HTTPFlow) -> None:
|
||||
if random.choice([True, False]):
|
||||
self.w.add(flow)
|
||||
|
||||
def done(self):
|
||||
self.f.close()
|
||||
|
||||
|
||||
addons = [Writer(sys.argv[1])]
|
||||
|
@ -25,7 +25,7 @@ from . import (
|
||||
auto, raw, hex, json, xml_html, html_outline, wbxml, javascript, css,
|
||||
urlencoded, multipart, image, query, protobuf
|
||||
)
|
||||
from .base import View, VIEW_CUTOFF, KEY_MAX, format_text, format_dict
|
||||
from .base import View, VIEW_CUTOFF, KEY_MAX, format_text, format_dict, TViewResult
|
||||
|
||||
views = [] # type: List[View]
|
||||
content_types_map = {} # type: Dict[str, List[View]]
|
||||
@ -178,7 +178,7 @@ add(query.ViewQuery())
|
||||
add(protobuf.ViewProtobuf())
|
||||
|
||||
__all__ = [
|
||||
"View", "VIEW_CUTOFF", "KEY_MAX", "format_text", "format_dict",
|
||||
"View", "VIEW_CUTOFF", "KEY_MAX", "format_text", "format_dict", "TViewResult",
|
||||
"get", "get_by_shortcut", "add", "remove",
|
||||
"get_content_view", "get_message_content_view",
|
||||
]
|
||||
|
@ -1,20 +1,21 @@
|
||||
# Default view cutoff *in lines*
|
||||
|
||||
from typing import Iterable, AnyStr, List
|
||||
from typing import Mapping
|
||||
from typing import Tuple
|
||||
import typing
|
||||
|
||||
VIEW_CUTOFF = 512
|
||||
|
||||
KEY_MAX = 30
|
||||
|
||||
TTextType = typing.Union[str, bytes] # FIXME: This should be either bytes or str ultimately.
|
||||
TViewLine = typing.List[typing.Tuple[str, TTextType]]
|
||||
TViewResult = typing.Tuple[str, typing.Iterator[TViewLine]]
|
||||
|
||||
|
||||
class View:
|
||||
name = None # type: str
|
||||
prompt = None # type: Tuple[str,str]
|
||||
content_types = [] # type: List[str]
|
||||
prompt = None # type: typing.Tuple[str,str]
|
||||
content_types = [] # type: typing.List[str]
|
||||
|
||||
def __call__(self, data: bytes, **metadata):
|
||||
def __call__(self, data: bytes, **metadata) -> TViewResult:
|
||||
"""
|
||||
Transform raw data into human-readable output.
|
||||
|
||||
@ -38,8 +39,8 @@ class View:
|
||||
|
||||
|
||||
def format_dict(
|
||||
d: Mapping[AnyStr, AnyStr]
|
||||
) -> Iterable[List[Tuple[str, AnyStr]]]:
|
||||
d: typing.Mapping[TTextType, TTextType]
|
||||
) -> typing.Iterator[TViewLine]:
|
||||
"""
|
||||
Helper function that transforms the given dictionary into a list of
|
||||
("key", key )
|
||||
@ -49,7 +50,10 @@ def format_dict(
|
||||
max_key_len = max(len(k) for k in d.keys())
|
||||
max_key_len = min(max_key_len, KEY_MAX)
|
||||
for key, value in d.items():
|
||||
key += b":" if isinstance(key, bytes) else u":"
|
||||
if isinstance(key, bytes):
|
||||
key += b":"
|
||||
else:
|
||||
key += ":"
|
||||
key = key.ljust(max_key_len + 2)
|
||||
yield [
|
||||
("header", key),
|
||||
@ -57,7 +61,7 @@ def format_dict(
|
||||
]
|
||||
|
||||
|
||||
def format_text(text: AnyStr) -> Iterable[List[Tuple[str, AnyStr]]]:
|
||||
def format_text(text: TTextType) -> typing.Iterator[TViewLine]:
|
||||
"""
|
||||
Helper function that transforms bytes into the view output format.
|
||||
"""
|
||||
|
@ -50,7 +50,7 @@ class _Action(base.Token):
|
||||
|
||||
|
||||
class PauseAt(_Action):
|
||||
unique_name = None # type: ignore
|
||||
unique_name = None
|
||||
|
||||
def __init__(self, offset, seconds):
|
||||
_Action.__init__(self, offset)
|
||||
|
@ -6,7 +6,8 @@ import pyparsing as pp
|
||||
from mitmproxy.utils import strutils
|
||||
from mitmproxy.utils import human
|
||||
import typing # noqa
|
||||
from . import generators, exceptions
|
||||
from . import generators
|
||||
from . import exceptions
|
||||
|
||||
|
||||
class Settings:
|
||||
@ -375,7 +376,7 @@ class OptionsOrValue(_Component):
|
||||
|
||||
|
||||
class Integer(_Component):
|
||||
bounds = (None, None) # type: typing.Tuple[typing.Union[int, None], typing.Union[int , None]]
|
||||
bounds = (None, None) # type: typing.Tuple[typing.Optional[int], typing.Optional[int]]
|
||||
preamble = ""
|
||||
|
||||
def __init__(self, value):
|
||||
@ -537,43 +538,3 @@ class IntField(_Component):
|
||||
|
||||
def spec(self):
|
||||
return "%s%s" % (self.preamble, self.origvalue)
|
||||
|
||||
|
||||
class NestedMessage(Token):
|
||||
|
||||
"""
|
||||
A nested message, as an escaped string with a preamble.
|
||||
"""
|
||||
preamble = ""
|
||||
nest_type = None # type: ignore
|
||||
|
||||
def __init__(self, value):
|
||||
Token.__init__(self)
|
||||
self.value = value
|
||||
try:
|
||||
self.parsed = self.nest_type(
|
||||
self.nest_type.expr().parseString(
|
||||
value.val.decode(),
|
||||
parseAll=True
|
||||
)
|
||||
)
|
||||
except pp.ParseException as v:
|
||||
raise exceptions.ParseException(v.msg, v.line, v.col)
|
||||
|
||||
@classmethod
|
||||
def expr(cls):
|
||||
e = pp.Literal(cls.preamble).suppress()
|
||||
e = e + TokValueLiteral.expr()
|
||||
return e.setParseAction(lambda x: cls(*x))
|
||||
|
||||
def values(self, settings):
|
||||
return [
|
||||
self.value.get_generator(settings),
|
||||
]
|
||||
|
||||
def spec(self):
|
||||
return "%s%s" % (self.preamble, self.value.spec())
|
||||
|
||||
def freeze(self, settings):
|
||||
f = self.parsed.freeze(settings).spec()
|
||||
return self.__class__(TokValueLiteral(strutils.bytes_to_escaped_str(f.encode(), escape_single_quotes=True)))
|
||||
|
@ -54,7 +54,9 @@ class Method(base.OptionsOrValue):
|
||||
|
||||
|
||||
class _HeaderMixin:
|
||||
unique_name = None # type: ignore
|
||||
@property
|
||||
def unique_name(self):
|
||||
return None
|
||||
|
||||
def format_header(self, key, value):
|
||||
return [key, b": ", value, b"\r\n"]
|
||||
@ -251,7 +253,7 @@ class Response(_HTTPMessage):
|
||||
return ":".join([i.spec() for i in self.tokens])
|
||||
|
||||
|
||||
class NestedResponse(base.NestedMessage):
|
||||
class NestedResponse(message.NestedMessage):
|
||||
preamble = "s"
|
||||
nest_type = Response
|
||||
|
||||
|
@ -1,9 +1,9 @@
|
||||
import pyparsing as pp
|
||||
|
||||
from mitmproxy.net import http
|
||||
from mitmproxy.net.http import user_agents, Headers
|
||||
from . import base, message
|
||||
|
||||
|
||||
"""
|
||||
Normal HTTP requests:
|
||||
<method>:<path>:<header>:<body>
|
||||
@ -41,7 +41,9 @@ def get_header(val, headers):
|
||||
|
||||
|
||||
class _HeaderMixin:
|
||||
unique_name = None # type: ignore
|
||||
@property
|
||||
def unique_name(self):
|
||||
return None
|
||||
|
||||
def values(self, settings):
|
||||
return (
|
||||
@ -146,7 +148,7 @@ class Times(base.Integer):
|
||||
|
||||
|
||||
class Response(_HTTP2Message):
|
||||
unique_name = None # type: ignore
|
||||
unique_name = None
|
||||
comps = (
|
||||
Header,
|
||||
Body,
|
||||
@ -203,7 +205,7 @@ class Response(_HTTP2Message):
|
||||
return ":".join([i.spec() for i in self.tokens])
|
||||
|
||||
|
||||
class NestedResponse(base.NestedMessage):
|
||||
class NestedResponse(message.NestedMessage):
|
||||
preamble = "s"
|
||||
nest_type = Response
|
||||
|
||||
|
@ -1,8 +1,11 @@
|
||||
import abc
|
||||
from . import actions, exceptions
|
||||
from mitmproxy.utils import strutils
|
||||
import typing # noqa
|
||||
|
||||
import pyparsing as pp
|
||||
|
||||
from mitmproxy.utils import strutils
|
||||
from . import actions, exceptions, base
|
||||
|
||||
LOG_TRUNCATE = 1024
|
||||
|
||||
|
||||
@ -96,3 +99,46 @@ class Message:
|
||||
|
||||
def __repr__(self):
|
||||
return self.spec()
|
||||
|
||||
|
||||
class NestedMessage(base.Token):
|
||||
"""
|
||||
A nested message, as an escaped string with a preamble.
|
||||
"""
|
||||
preamble = ""
|
||||
nest_type = None # type: typing.Optional[typing.Type[Message]]
|
||||
|
||||
def __init__(self, value):
|
||||
super().__init__()
|
||||
self.value = value
|
||||
try:
|
||||
self.parsed = self.nest_type(
|
||||
self.nest_type.expr().parseString(
|
||||
value.val.decode(),
|
||||
parseAll=True
|
||||
)
|
||||
)
|
||||
except pp.ParseException as v:
|
||||
raise exceptions.ParseException(v.msg, v.line, v.col)
|
||||
|
||||
@classmethod
|
||||
def expr(cls):
|
||||
e = pp.Literal(cls.preamble).suppress()
|
||||
e = e + base.TokValueLiteral.expr()
|
||||
return e.setParseAction(lambda x: cls(*x))
|
||||
|
||||
def values(self, settings):
|
||||
return [
|
||||
self.value.get_generator(settings),
|
||||
]
|
||||
|
||||
def spec(self):
|
||||
return "%s%s" % (self.preamble, self.value.spec())
|
||||
|
||||
def freeze(self, settings):
|
||||
f = self.parsed.freeze(settings).spec()
|
||||
return self.__class__(
|
||||
base.TokValueLiteral(
|
||||
strutils.bytes_to_escaped_str(f.encode(), escape_single_quotes=True)
|
||||
)
|
||||
)
|
||||
|
@ -1,10 +1,12 @@
|
||||
import random
|
||||
import string
|
||||
import typing # noqa
|
||||
|
||||
import pyparsing as pp
|
||||
|
||||
import mitmproxy.net.websockets
|
||||
from mitmproxy.utils import strutils
|
||||
import pyparsing as pp
|
||||
from . import base, generators, actions, message
|
||||
import typing # noqa
|
||||
|
||||
NESTED_LEADER = b"pathod!"
|
||||
|
||||
@ -74,7 +76,7 @@ class Times(base.Integer):
|
||||
preamble = "x"
|
||||
|
||||
|
||||
COMPONENTS = (
|
||||
COMPONENTS = [
|
||||
OpCode,
|
||||
Length,
|
||||
# Bit flags
|
||||
@ -89,14 +91,13 @@ COMPONENTS = (
|
||||
KeyNone,
|
||||
Key,
|
||||
Times,
|
||||
|
||||
Body,
|
||||
RawBody,
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
class WebsocketFrame(message.Message):
|
||||
components = COMPONENTS
|
||||
components = COMPONENTS # type: typing.List[typing.Type[base._Component]]
|
||||
logattrs = ["body"]
|
||||
# Used for nested frames
|
||||
unique_name = "body"
|
||||
@ -235,19 +236,10 @@ class WebsocketFrame(message.Message):
|
||||
return ":".join([i.spec() for i in self.tokens])
|
||||
|
||||
|
||||
class NestedFrame(base.NestedMessage):
|
||||
class NestedFrame(message.NestedMessage):
|
||||
preamble = "f"
|
||||
nest_type = WebsocketFrame
|
||||
|
||||
|
||||
COMP = typing.Tuple[
|
||||
typing.Type[OpCode], typing.Type[Length], typing.Type[Fin], typing.Type[RSV1], typing.Type[RSV2], typing.Type[RSV3], typing.Type[Mask],
|
||||
typing.Type[actions.PauseAt], typing.Type[actions.DisconnectAt], typing.Type[actions.InjectAt], typing.Type[KeyNone], typing.Type[Key],
|
||||
typing.Type[Times], typing.Type[Body], typing.Type[RawBody]
|
||||
]
|
||||
|
||||
|
||||
class WebsocketClientFrame(WebsocketFrame):
|
||||
components = typing.cast(COMP, COMPONENTS + (
|
||||
NestedFrame,
|
||||
))
|
||||
components = COMPONENTS + [NestedFrame]
|
||||
|
Loading…
Reference in New Issue
Block a user