Merge pull request #4426 from mhils/mypy

update mypy
This commit is contained in:
Maximilian Hils 2021-02-04 03:36:25 +01:00 committed by GitHub
commit c5bd7b1ae1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 52 additions and 49 deletions

View File

@ -4,13 +4,15 @@ import typing
from mitmproxy import command
from mitmproxy import ctx
from mitmproxy import flow
from mitmproxy import http
class MyAddon:
@command.command("myaddon.addheader")
def addheader(self, flows: typing.Sequence[flow.Flow]) -> None:
for f in flows:
f.request.headers["myheader"] = "value"
if isinstance(f, http.HTTPFlow):
f.request.headers["myheader"] = "value"
ctx.log.alert("done")

View File

@ -4,6 +4,7 @@ import typing
from mitmproxy import command
from mitmproxy import ctx
from mitmproxy import flow
from mitmproxy import http
from mitmproxy import types
@ -14,9 +15,10 @@ class MyAddon:
flows: typing.Sequence[flow.Flow],
path: types.Path,
) -> None:
totals = {}
totals: typing.Dict[str, int] = {}
for f in flows:
totals[f.request.host] = totals.setdefault(f.request.host, 0) + 1
if isinstance(f, http.HTTPFlow):
totals[f.request.host] = totals.setdefault(f.request.host, 0) + 1
with open(path, "w+") as fp:
for cnt, dom in sorted([(v, k) for (k, v) in totals.items()]):

View File

@ -36,6 +36,7 @@ def request(flow: http.HTTPFlow):
def response(flow: http.HTTPFlow):
assert flow.response # make type checker happy
if flow.response.trailers:
print("HTTP Trailers detected! Response contains:", flow.response.trailers)

View File

@ -7,5 +7,6 @@ from mitmproxy import http
def response(flow: http.HTTPFlow) -> None:
assert flow.response # make type checker happy
reflector = b"<style>body {transform: scaleX(-1);}</style></head>"
flow.response.content = flow.response.content.replace(b"</head>", reflector)

View File

@ -2,7 +2,7 @@
"""
Read a mitmproxy dump file.
"""
from mitmproxy import io
from mitmproxy import io, http
from mitmproxy.exceptions import FlowReadException
import pprint
import sys
@ -13,7 +13,8 @@ with open(sys.argv[1], "rb") as logfile:
try:
for f in freader.stream():
print(f)
print(f.request.host)
if isinstance(f, http.HTTPFlow):
print(f.request.host)
pp.pprint(f.get_state())
print("")
except FlowReadException as e:

View File

@ -136,9 +136,9 @@ class Cert(serializable.Serializable):
return []
else:
return (
ext.get_values_for_type(x509.DNSName)
+
[str(x) for x in ext.get_values_for_type(x509.IPAddress)]
ext.get_values_for_type(x509.DNSName)
+
[str(x) for x in ext.get_values_for_type(x509.IPAddress)]
)
@ -151,9 +151,9 @@ def _name_to_keyval(name: x509.Name) -> List[Tuple[str, str]]:
def create_ca(
organization: str,
cn: str,
key_size: int,
organization: str,
cn: str,
key_size: int,
) -> Tuple[rsa.RSAPrivateKeyWithSerialization, x509.Certificate]:
now = datetime.datetime.now()
@ -192,11 +192,11 @@ def create_ca(
def dummy_cert(
privkey: rsa.RSAPrivateKey,
cacert: x509.Certificate,
commonname: Optional[str],
sans: List[str],
organization: Optional[str] = None,
privkey: rsa.RSAPrivateKey,
cacert: x509.Certificate,
commonname: Optional[str],
sans: List[str],
organization: Optional[str] = None,
) -> Cert:
"""
Generates a dummy certificate.
@ -220,7 +220,7 @@ def dummy_cert(
subject = []
is_valid_commonname = (
commonname is not None and len(commonname) < 64
commonname is not None and len(commonname) < 64
)
if is_valid_commonname:
assert commonname is not None
@ -268,11 +268,11 @@ class CertStore:
expire_queue: List[CertStoreEntry]
def __init__(
self,
default_privatekey: rsa.RSAPrivateKey,
default_ca: Cert,
default_chain_file: Optional[Path],
dhparams: DHParams
self,
default_privatekey: rsa.RSAPrivateKey,
default_ca: Cert,
default_chain_file: Optional[Path],
dhparams: DHParams
):
self.default_privatekey = default_privatekey
self.default_ca = default_ca
@ -311,11 +311,11 @@ class CertStore:
@classmethod
def from_store(
cls,
path: Union[Path, str],
basename: str,
key_size: int,
passphrase: Optional[bytes] = None
cls,
path: Union[Path, str],
basename: str,
key_size: int,
passphrase: Optional[bytes] = None
) -> "CertStore":
path = Path(path)
ca_file = path / f"{basename}-ca.pem"
@ -393,9 +393,9 @@ class CertStore:
# Dump the certificate in PKCS12 format for Windows devices
(path / f"{basename}-ca-cert.p12").write_bytes(
pkcs12.serialize_key_and_certificates( # type: ignore
pkcs12.serialize_key_and_certificates(
name=basename.encode(),
key=None,
key=None, # type: ignore
cert=ca,
cas=None,
encryption_algorithm=serialization.NoEncryption(),
@ -442,10 +442,10 @@ class CertStore:
return ret
def get_cert(
self,
commonname: Optional[str],
sans: List[str],
organization: Optional[str] = None
self,
commonname: Optional[str],
sans: List[str],
organization: Optional[str] = None
) -> CertStoreEntry:
"""
commonname: Common name for the generated certificate. Must be a

View File

View File

@ -1,15 +1,13 @@
import io
import uuid
from mitmproxy.test import tutils
from mitmproxy import controller
from mitmproxy import flow
from mitmproxy import http
from mitmproxy import tcp
from mitmproxy import websocket
from mitmproxy import controller
from mitmproxy import http
from mitmproxy import flow
from mitmproxy.net import http as net_http
from mitmproxy.proxy import context
from mitmproxy.test import tutils
from wsproto.frame_protocol import Opcode
@ -34,7 +32,6 @@ def ttcpflow(client_conn=True, server_conn=True, messages=True, err=None):
def twebsocketflow(client_conn=True, server_conn=True, messages=True, err=None, handshake_flow=True):
if client_conn is True:
client_conn = tclient_conn()
if server_conn is True:
@ -169,7 +166,7 @@ def tclient_conn() -> context.Client:
alpn_offers=[],
cipher_list=[],
))
c.reply = controller.DummyReply()
c.reply = controller.DummyReply() # type: ignore
return c
@ -197,9 +194,7 @@ def tserver_conn() -> context.Server:
cipher_list=[],
via2=None,
))
c.reply = controller.DummyReply()
c.rfile = io.BytesIO()
c.wfile = io.BytesIO()
c.reply = controller.DummyReply() # type: ignore
return c

View File

@ -21,7 +21,7 @@ def treq(**kwargs) -> http.Request:
timestamp_end=946681201,
)
default.update(kwargs)
return http.Request(**default)
return http.Request(**default) # type: ignore
def tresp(**kwargs) -> http.Response:
@ -40,4 +40,4 @@ def tresp(**kwargs) -> http.Response:
timestamp_end=946681203,
)
default.update(kwargs)
return http.Response(**default)
return http.Response(**default) # type: ignore

View File

@ -18,7 +18,7 @@ from mitmproxy.tools.console import statusbar
if os.name == "nt":
from mitmproxy.contrib.urwid import raw_display
else:
from urwid import raw_display
from urwid import raw_display # type: ignore
class StackWidget(urwid.Frame):

View File

@ -25,6 +25,7 @@ exclude_lines =
[mypy]
ignore_missing_imports = True
files = mitmproxy,examples/addons,release
[mypy-mitmproxy.contrib.*]
ignore_errors = True

View File

@ -28,9 +28,9 @@ commands =
python ./test/filename_matching.py
[testenv:mypy]
deps = mypy==0.790
deps = mypy==0.800
commands =
mypy . {posargs}
mypy {posargs}
[testenv:individual_coverage]
commands =