Merge branch 'master' into fix-command-bar-issue-3259

This commit is contained in:
Henrique M. D 2019-11-15 13:59:57 -05:00 committed by GitHub
commit 021a141521
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
51 changed files with 458 additions and 270 deletions

View File

@ -0,0 +1,86 @@
---
title: "Install System CA on Android"
menu:
howto:
weight: 4
---
# Install System CA Certificate on Android Emulator
[Since Android 7, apps ignore user certificates](https://android-developers.googleblog.com/2016/07/changes-to-trusted-certificate.html), unless they are configured to use them.
As most applications do not explicitly opt in to use user certificates, we need to place our mitmproxy CA certificate in the system certificate store,
in order to avid having to patch each application, which we want to monitor.
Please note, that apps can decide to ignore the system certificate store and maintain their own CA certificates. In this case you have to patch the application.
## 1. Prerequisites
- Emulator from Android SDK with proxy settings pointing to mitmproxy
- Mitmproxy CA certificate
- Usually located in `~/.mitmproxy/mitmproxy-ca-cert.cer`
- If the folder is empty or does not exist, run `mitmproxy` in order to generate the certificates
## 2. Rename certificate
Enter your certificate folder
{{< highlight bash >}}
cd ~/.mitmproxy/
{{< / highlight >}}
- CA Certificates in Android are stored by the name of their hash, with a '0' as extension
- Now generate the hash of your certificate
{{< highlight bash >}}
openssl x509 -inform PEM -subject_hash_old -in mitmproxy-ca-cert.cer | head -1
{{< / highlight >}}
Lets assume, the output is `c8450d0d`
We can now copy `mitmproxy-ca-cert.cer` to `c8450d0d.0` and our system certificate is ready to use
{{< highlight bash >}}
cp mitmproxy-ca-cert.cer c8450d0d.0
{{< / highlight >}}
## 3. Insert certificate into system certificate store
Note, that Android 9 (API LEVEL 28) was used to test the following steps and that the `emulator` executable is located in the Android SDK
- Start your android emulator.
- Get a list of your AVDs with `emulator -list-avds`
- Make sure to use the `-writable-system` option. Otherwise it will not be possible to write to `/system`
- Keep in mind, that the **emulator will load a clean system image when starting without `-writable-system` option**.
- This means you always have to start the emulator with `-writable-system` option in order to use your certificate
{{< highlight bash >}}
emulator -avd <avd_name_here> -writable-system
{{< / highlight >}}
- Restart adb as root
{{< highlight bash >}}
adb root
{{< / highlight >}}
- Get write access to `/system` on the device
- In earlier versions (API LEVEL < 28) of Android you have to use `adb shell "mount -o rw,remount /system"`
{{< highlight bash >}}
adb shell "mount -o rw,remount /"
{{< / highlight >}}
- Push your certificate to the system certificate store and set file permissions
{{< highlight bash >}}
adb push c8450d0d.0 /system/etc/security/cacerts
adb shell "chmod 664 /system/etc/security/cacerts/c8450d0d.0"
{{< / highlight >}}
## 4. Reboot device and enjoy decrypted TLS traffic
- Reboot your device.
- You CA certificate should now be system trusted
{{< highlight bash >}}
adb reboot
{{< / highlight >}}
**Remember**: You **always** have to start the emulator using the `-writable-system` option in order to use your certificate

View File

@ -86,7 +86,7 @@ def get_cookies(flow: http.HTTPFlow) -> Cookies:
return {name: value for name, value in flow.request.cookies.fields}
def find_unclaimed_URLs(body: str, requestUrl: bytes) -> None:
def find_unclaimed_URLs(body, requestUrl):
""" Look for unclaimed URLs in script tags and log them if found"""
def getValue(attrs: List[Tuple[str, str]], attrName: str) -> Optional[str]:
for name, value in attrs:
@ -111,7 +111,7 @@ def find_unclaimed_URLs(body: str, requestUrl: bytes) -> None:
try:
socket.gethostbyname(domain)
except socket.gaierror:
ctx.log.error("XSS found in %s due to unclaimed URL \"%s\"." % (requestUrl, url))
ctx.log.error(f"XSS found in {requestUrl} due to unclaimed URL \"{url}\".")
def test_end_of_URL_injection(original_body: str, request_URL: str, cookies: Cookies) -> VulnData:

View File

@ -126,20 +126,18 @@ class Cut:
format is UTF-8 encoded CSV. If there is exactly one row and one
column, the data is written to file as-is, with raw bytes preserved.
"""
v: typing.Union[str, bytes]
fp = io.StringIO(newline="")
if len(cuts) == 1 and len(flows) == 1:
v = extract(cuts[0], flows[0])
if isinstance(v, bytes):
fp.write(strutils.always_str(v))
else:
fp.write(v)
fp.write(strutils.always_str(v)) # type: ignore
ctx.log.alert("Clipped single cut.")
else:
writer = csv.writer(fp)
for f in flows:
vals = [extract(c, f) for c in cuts]
writer.writerow(
[strutils.always_str(v) or "" for v in vals] # type: ignore
[strutils.always_str(v) for v in vals]
)
ctx.log.alert("Clipped %s cuts as CSV." % len(cuts))
try:

View File

@ -14,7 +14,7 @@ class EventStore:
self.sig_refresh = blinker.Signal()
@property
def size(self) -> int:
def size(self) -> typing.Optional[int]:
return self.data.maxlen
def log(self, entry: LogEntry) -> None:

View File

@ -16,7 +16,7 @@ from mitmproxy import ctx
import mitmproxy.types as mtypes
def load_script(path: str) -> types.ModuleType:
def load_script(path: str) -> typing.Optional[types.ModuleType]:
fullname = "__mitmproxy_script__.{}".format(
os.path.splitext(os.path.basename(path))[0]
)

View File

@ -68,6 +68,13 @@ class ServerPlayback:
to replay.
"""
)
loader.add_option(
"server_replay_ignore_port", bool, False,
"""
Ignore request's destination port while searching for a saved flow
to replay.
"""
)
@command.command("replay.server")
def load_flows(self, flows: typing.Sequence[flow.Flow]) -> None:
@ -110,7 +117,7 @@ class ServerPlayback:
_, _, path, _, query, _ = urllib.parse.urlparse(r.url)
queriesArray = urllib.parse.parse_qsl(query, keep_blank_values=True)
key: typing.List[typing.Any] = [str(r.port), str(r.scheme), str(r.method), str(path)]
key: typing.List[typing.Any] = [str(r.scheme), str(r.method), str(path)]
if not ctx.options.server_replay_ignore_content:
if ctx.options.server_replay_ignore_payload_params and r.multipart_form:
key.extend(
@ -128,7 +135,9 @@ class ServerPlayback:
key.append(str(r.raw_content))
if not ctx.options.server_replay_ignore_host:
key.append(r.host)
key.append(r.pretty_host)
if not ctx.options.server_replay_ignore_port:
key.append(r.port)
filtered = []
ignore_params = ctx.options.server_replay_ignore_params or []

View File

@ -215,8 +215,8 @@ class Session:
def __init__(self):
self.db_store: SessionDB = None
self._hot_store: collections.OrderedDict = collections.OrderedDict()
self._order_store: typing.Dict[str, typing.Dict[str, typing.Union[int, float, str]]] = {}
self._view: typing.List[typing.Tuple[typing.Union[int, float, str], str]] = []
self._order_store: typing.Dict[str, typing.Dict[str, typing.Union[int, float, str, None]]] = {}
self._view: typing.List[typing.Tuple[typing.Union[int, float, str, None], str]] = []
self.order: str = orders[0]
self.filter = matchall
self._flush_period: float = self._FP_DEFAULT

View File

@ -53,6 +53,7 @@ class StickyCookie:
self.flt = None
def response(self, flow: http.HTTPFlow):
assert flow.response
if self.flt:
for name, (value, attrs) in flow.response.cookies.items(multi=True):
# FIXME: We now know that Cookie.py screws up some cookies with

View File

@ -238,18 +238,24 @@ class View(collections.abc.Sequence):
"""
Set focus to the next flow.
"""
idx = self.focus.index + 1
if self.inbounds(idx):
self.focus.flow = self[idx]
if self.focus.index is not None:
idx = self.focus.index + 1
if self.inbounds(idx):
self.focus.flow = self[idx]
else:
pass
@command.command("view.focus.prev")
def focus_prev(self) -> None:
"""
Set focus to the previous flow.
"""
idx = self.focus.index - 1
if self.inbounds(idx):
self.focus.flow = self[idx]
if self.focus.index is not None:
idx = self.focus.index - 1
if self.inbounds(idx):
self.focus.flow = self[idx]
else:
pass
# Order
@command.command("view.order.options")
@ -584,7 +590,7 @@ class Focus:
"""
def __init__(self, v: View) -> None:
self.view = v
self._flow: mitmproxy.flow.Flow = None
self._flow: typing.Optional[mitmproxy.flow.Flow] = None
self.sig_change = blinker.Signal()
if len(self.view):
self.flow = self.view[0]

View File

@ -315,7 +315,12 @@ class CertStore:
ret.append(b"*." + b".".join(parts[i:]))
return ret
def get_cert(self, commonname: typing.Optional[bytes], sans: typing.List[bytes], organization: typing.Optional[bytes] = None):
def get_cert(
self,
commonname: typing.Optional[bytes],
sans: typing.List[bytes],
organization: typing.Optional[bytes] = None
) -> typing.Tuple["Cert", OpenSSL.SSL.PKey, str]:
"""
Returns an (cert, privkey, cert_chain) tuple.

View File

@ -34,6 +34,8 @@ def typename(t: type) -> str:
class Command:
returntype: typing.Optional[typing.Type]
def __init__(self, manager, path, func) -> None:
self.path = path
self.manager = manager
@ -194,7 +196,7 @@ class CommandManager(mitmproxy.types._CommandBase):
parse: typing.List[ParseResult] = []
params: typing.List[type] = []
typ: typing.Type = None
typ: typing.Type
for i in range(len(parts)):
if i == 0:
typ = mitmproxy.types.Cmd

View File

@ -135,7 +135,9 @@ def get_content_view(viewmode: View, data: bytes, **metadata):
# Third-party viewers can fail in unexpected ways...
except Exception:
desc = "Couldn't parse: falling back to Raw"
_, content = get("Raw")(data, **metadata)
raw = get("Raw")
assert raw
content = raw(data, **metadata)[1]
error = "{} Content viewer failed: \n{}".format(
getattr(viewmode, "name"),
traceback.format_exc()

View File

@ -9,8 +9,8 @@ TViewResult = typing.Tuple[str, typing.Iterator[TViewLine]]
class View:
name: str = None
content_types: typing.List[str] = []
name: typing.ClassVar[str]
content_types: typing.ClassVar[typing.List[str]] = []
def __call__(self, data: bytes, **metadata) -> TViewResult:
"""

View File

@ -1,7 +1,7 @@
import io
import re
import textwrap
from typing import Iterable
from typing import Iterable, Optional
from mitmproxy.contentviews import base
from mitmproxy.utils import sliding_window
@ -124,14 +124,14 @@ def indent_text(data: str, prefix: str) -> str:
return textwrap.indent(dedented, prefix[:32])
def is_inline_text(a: Token, b: Token, c: Token) -> bool:
def is_inline_text(a: Optional[Token], b: Optional[Token], c: Optional[Token]) -> bool:
if isinstance(a, Tag) and isinstance(b, Text) and isinstance(c, Tag):
if a.is_opening and "\n" not in b.data and c.is_closing and a.tag == c.tag:
return True
return False
def is_inline(prev2: Token, prev1: Token, t: Token, next1: Token, next2: Token) -> bool:
def is_inline(prev2: Optional[Token], prev1: Optional[Token], t: Optional[Token], next1: Optional[Token], next2: Optional[Token]) -> bool:
if isinstance(t, Text):
return is_inline_text(prev1, t, next1)
elif isinstance(t, Tag):

View File

@ -1,7 +1,7 @@
import mitmproxy.master # noqa
import mitmproxy.log # noqa
import mitmproxy.options # noqa
import mitmproxy.log
import mitmproxy.master
import mitmproxy.options
master = None # type: mitmproxy.master.Master
log: mitmproxy.log.Log = None
options: mitmproxy.options.Options = None
log: "mitmproxy.log.Log"
master: "mitmproxy.master.Master"
options: "mitmproxy.options.Options"

View File

@ -32,19 +32,17 @@
rex Equivalent to ~u rex
"""
import functools
import re
import sys
import functools
from mitmproxy import http
from mitmproxy import websocket
from mitmproxy import tcp
from mitmproxy import flow
from mitmproxy.utils import strutils
from typing import Callable, ClassVar, Optional, Sequence, Type
import pyparsing as pp
from typing import Callable, Sequence, Type # noqa
from mitmproxy import flow
from mitmproxy import http
from mitmproxy import tcp
from mitmproxy import websocket
def only(*types):
@ -54,7 +52,9 @@ def only(*types):
if isinstance(flow, types):
return fn(self, flow)
return False
return filter_types
return decorator
@ -69,8 +69,8 @@ class _Token:
class _Action(_Token):
code: str = None
help: str = None
code: ClassVar[str]
help: ClassVar[str]
@classmethod
def make(klass, s, loc, toks):
@ -146,10 +146,10 @@ class _Rex(_Action):
def __init__(self, expr):
self.expr = expr
if self.is_binary:
expr = strutils.escaped_str_to_bytes(expr)
expr = expr.encode()
try:
self.re = re.compile(expr, self.flags)
except:
except Exception:
raise ValueError("Cannot compile expression.")
@ -336,6 +336,7 @@ class FUrl(_Rex):
code = "u"
help = "URL"
is_binary = False
# FUrl is special, because it can be "naked".
@classmethod
@ -469,69 +470,51 @@ def _make():
# Order is important - multi-char expressions need to come before narrow
# ones.
parts = []
for klass in filter_unary:
f = pp.Literal("~%s" % klass.code) + pp.WordEnd()
f.setParseAction(klass.make)
for cls in filter_unary:
f = pp.Literal(f"~{cls.code}") + pp.WordEnd()
f.setParseAction(cls.make)
parts.append(f)
simplerex = "".join(c for c in pp.printables if c not in "()~'\"")
alphdevanagari = pp.pyparsing_unicode.Devanagari.alphas
alphcyrillic = pp.pyparsing_unicode.Cyrillic.alphas
alphgreek = pp.pyparsing_unicode.Greek.alphas
alphchinese = pp.pyparsing_unicode.Chinese.alphas
alpharabic = pp.pyparsing_unicode.Arabic.alphas
alphhebrew = pp.pyparsing_unicode.Hebrew.alphas
alphjapanese = pp.pyparsing_unicode.Japanese.alphas
alphkorean = pp.pyparsing_unicode.Korean.alphas
alphlatin1 = pp.pyparsing_unicode.Latin1.alphas
alphlatinA = pp.pyparsing_unicode.LatinA.alphas
alphlatinB = pp.pyparsing_unicode.LatinB.alphas
rex = pp.Word(simplerex) |\
pp.Word(alphcyrillic) |\
pp.Word(alphgreek) |\
pp.Word(alphchinese) |\
pp.Word(alpharabic) |\
pp.Word(alphdevanagari) |\
pp.Word(alphhebrew) |\
pp.Word(alphjapanese) |\
pp.Word(alphkorean) |\
pp.Word(alphlatin1) |\
pp.Word(alphlatinA) |\
pp.Word(alphlatinB) |\
pp.QuotedString("\"", escChar='\\') |\
pp.QuotedString("'", escChar='\\')
for klass in filter_rex:
f = pp.Literal("~%s" % klass.code) + pp.WordEnd() + rex.copy()
f.setParseAction(klass.make)
# This is a bit of a hack to simulate Word(pyparsing_unicode.printables),
# which has a horrible performance with len(pyparsing.pyparsing_unicode.printables) == 1114060
unicode_words = pp.CharsNotIn("()~'\"" + pp.ParserElement.DEFAULT_WHITE_CHARS)
unicode_words.skipWhitespace = True
regex = (
unicode_words
| pp.QuotedString('"', escChar='\\')
| pp.QuotedString("'", escChar='\\')
)
for cls in filter_rex:
f = pp.Literal(f"~{cls.code}") + pp.WordEnd() + regex.copy()
f.setParseAction(cls.make)
parts.append(f)
for klass in filter_int:
f = pp.Literal("~%s" % klass.code) + pp.WordEnd() + pp.Word(pp.nums)
f.setParseAction(klass.make)
for cls in filter_int:
f = pp.Literal(f"~{cls.code}") + pp.WordEnd() + pp.Word(pp.nums)
f.setParseAction(cls.make)
parts.append(f)
# A naked rex is a URL rex:
f = rex.copy()
f = regex.copy()
f.setParseAction(FUrl.make)
parts.append(f)
atom = pp.MatchFirst(parts)
expr = pp.operatorPrecedence(atom,
[(pp.Literal("!").suppress(),
1,
pp.opAssoc.RIGHT,
lambda x: FNot(*x)),
(pp.Literal("&").suppress(),
2,
pp.opAssoc.LEFT,
lambda x: FAnd(*x)),
(pp.Literal("|").suppress(),
2,
pp.opAssoc.LEFT,
lambda x: FOr(*x)),
])
expr = pp.infixNotation(
atom,
[(pp.Literal("!").suppress(),
1,
pp.opAssoc.RIGHT,
lambda x: FNot(*x)),
(pp.Literal("&").suppress(),
2,
pp.opAssoc.LEFT,
lambda x: FAnd(*x)),
(pp.Literal("|").suppress(),
2,
pp.opAssoc.LEFT,
lambda x: FOr(*x)),
])
expr = pp.OneOrMore(expr)
return expr.setParseAction(lambda x: FAnd(x) if len(x) != 1 else x)
@ -540,7 +523,7 @@ bnf = _make()
TFilter = Callable[[flow.Flow], bool]
def parse(s: str) -> TFilter:
def parse(s: str) -> Optional[TFilter]:
try:
flt = bnf.parseString(s, parseAll=True)[0]
flt.pattern = s
@ -571,15 +554,15 @@ def match(flt, flow):
help = []
for a in filter_unary:
help.append(
("~%s" % a.code, a.help)
(f"~{a.code}", a.help)
)
for b in filter_rex:
help.append(
("~%s regex" % b.code, b.help)
(f"~{b.code} regex", b.help)
)
for c in filter_int:
help.append(
("~%s int" % c.code, c.help)
(f"~{c.code} int", c.help)
)
help.sort()
help.extend(

View File

@ -1,15 +1,13 @@
import html
from typing import Optional
from mitmproxy import connections
from mitmproxy import flow
from mitmproxy.net import http
from mitmproxy import version
from mitmproxy import connections # noqa
from mitmproxy.net import http
class HTTPRequest(http.Request):
"""
A mitmproxy HTTP request.
"""
@ -85,10 +83,10 @@ class HTTPRequest(http.Request):
class HTTPResponse(http.Response):
"""
A mitmproxy HTTP response.
"""
# This is a very thin wrapper on top of :py:class:`mitmproxy.net.http.Response` and
# may be removed in the future.
@ -136,34 +134,28 @@ class HTTPResponse(http.Response):
class HTTPFlow(flow.Flow):
"""
An HTTPFlow is a collection of objects representing a single HTTP
transaction.
"""
request: HTTPRequest
response: Optional[HTTPResponse] = None
error: Optional[flow.Error] = None
"""
Note that it's possible for a Flow to have both a response and an error
object. This might happen, for instance, when a response was received
from the server, but there was an error sending it back to the client.
"""
server_conn: connections.ServerConnection
client_conn: connections.ClientConnection
intercepted: bool = False
""" Is this flow currently being intercepted? """
mode: str
""" What mode was the proxy layer in when receiving this request? """
def __init__(self, client_conn, server_conn, live=None, mode="regular"):
super().__init__("http", client_conn, server_conn, live)
self.request: HTTPRequest = None
""" :py:class:`HTTPRequest` object """
self.response: HTTPResponse = None
""" :py:class:`HTTPResponse` object """
self.error: flow.Error = None
""" :py:class:`Error` object
Note that it's possible for a Flow to have both a response and an error
object. This might happen, for instance, when a response was received
from the server, but there was an error sending it back to the client.
"""
self.server_conn: connections.ServerConnection = server_conn
""" :py:class:`ServerConnection` object """
self.client_conn: connections.ClientConnection = client_conn
""":py:class:`ClientConnection` object """
self.intercepted: bool = False
""" Is this flow currently being intercepted? """
self.mode = mode
""" What mode was the proxy layer in when receiving this request? """
_stateobject_attributes = flow.Flow._stateobject_attributes.copy()
# mypy doesn't support update with kwargs
@ -205,8 +197,8 @@ class HTTPFlow(flow.Flow):
def make_error_response(
status_code: int,
message: str="",
headers: Optional[http.Headers]=None,
message: str = "",
headers: Optional[http.Headers] = None,
) -> HTTPResponse:
reason = http.status_codes.RESPONSES.get(status_code, "Unknown")
body = """

View File

@ -192,22 +192,22 @@ def parse(data_type: int, data: bytes) -> TSerializable:
try:
return int(data)
except ValueError:
raise ValueError("not a tnetstring: invalid integer literal: {}".format(data))
raise ValueError(f"not a tnetstring: invalid integer literal: {data!r}")
if data_type == ord(b'^'):
try:
return float(data)
except ValueError:
raise ValueError("not a tnetstring: invalid float literal: {}".format(data))
raise ValueError(f"not a tnetstring: invalid float literal: {data!r}")
if data_type == ord(b'!'):
if data == b'true':
return True
elif data == b'false':
return False
else:
raise ValueError("not a tnetstring: invalid boolean literal: {}".format(data))
raise ValueError(f"not a tnetstring: invalid boolean literal: {data!r}")
if data_type == ord(b'~'):
if data:
raise ValueError("not a tnetstring: invalid null literal")
raise ValueError(f"not a tnetstring: invalid null literal: {data!r}")
return None
if data_type == ord(b']'):
l = []
@ -236,7 +236,7 @@ def pop(data: bytes) -> typing.Tuple[TSerializable, bytes]:
blength, data = data.split(b':', 1)
length = int(blength)
except ValueError:
raise ValueError("not a tnetstring: missing or invalid length prefix: {}".format(data))
raise ValueError(f"not a tnetstring: missing or invalid length prefix: {data!r}")
try:
data, data_type, remain = data[:length], data[length], data[length + 1:]
except IndexError:

View File

@ -82,7 +82,7 @@ class Message(serializable.Serializable):
def raw_content(self, content):
self.data.content = content
def get_content(self, strict: bool=True) -> bytes:
def get_content(self, strict: bool=True) -> Optional[bytes]:
"""
The uncompressed HTTP message body as bytes.
@ -195,10 +195,9 @@ class Message(serializable.Serializable):
See also: :py:attr:`content`, :py:class:`raw_content`
"""
if self.raw_content is None:
return None
content = self.get_content(strict)
if content is None:
return None
enc = self._guess_encoding(content)
try:
return encoding.decode(content, enc)

View File

@ -1,8 +1,43 @@
import re
import mimetypes
from urllib.parse import quote
from mitmproxy.net.http import headers
def encode(head, l):
k = head.get("content-type")
if k:
k = headers.parse_content_type(k)
if k is not None:
try:
boundary = k[2]["boundary"].encode("ascii")
boundary = quote(boundary)
except (KeyError, UnicodeError):
return b""
hdrs = []
for key, value in l:
file_type = mimetypes.guess_type(str(key))[0] or "text/plain; charset=utf-8"
if key:
hdrs.append(b"--%b" % boundary.encode('utf-8'))
disposition = b'form-data; name="%b"' % key
hdrs.append(b"Content-Disposition: %b" % disposition)
hdrs.append(b"Content-Type: %b" % file_type.encode('utf-8'))
hdrs.append(b'')
hdrs.append(value)
hdrs.append(b'')
if value is not None:
# If boundary is found in value then raise ValueError
if re.search(rb"^--%b$" % re.escape(boundary.encode('utf-8')), value):
raise ValueError(b"boundary found in encoded string")
hdrs.append(b"--%b--\r\n" % boundary.encode('utf-8'))
temp = b"\r\n".join(hdrs)
return temp
def decode(hdrs, content):
"""
Takes a multipart boundary encoded string and returns list of (key, value) tuples.
@ -19,14 +54,14 @@ def decode(hdrs, content):
rx = re.compile(br'\bname="([^"]+)"')
r = []
for i in content.split(b"--" + boundary):
parts = i.splitlines()
if len(parts) > 1 and parts[0][0:2] != b"--":
match = rx.search(parts[1])
if match:
key = match.group(1)
value = b"".join(parts[3 + parts[2:].index(b""):])
r.append((key, value))
if content is not None:
for i in content.split(b"--" + boundary):
parts = i.splitlines()
if len(parts) > 1 and parts[0][0:2] != b"--":
match = rx.search(parts[1])
if match:
key = match.group(1)
value = b"".join(parts[3 + parts[2:].index(b""):])
r.append((key, value))
return r
return []

View File

@ -472,7 +472,8 @@ class Request(message.Message):
return ()
def _set_multipart_form(self, value):
raise NotImplementedError()
self.content = mitmproxy.net.http.multipart.encode(self.headers, value)
self.headers["content-type"] = "multipart/form-data"
@property
def multipart_form(self):

View File

@ -295,6 +295,17 @@ def create_client_context(
return context
def accept_all(
conn_: SSL.Connection,
x509: SSL.X509,
errno: int,
err_depth: int,
is_cert_verified: bool,
) -> bool:
# Return true to prevent cert verification error
return True
def create_server_context(
cert: typing.Union[certs.Cert, str],
key: SSL.PKey,
@ -324,16 +335,6 @@ def create_server_context(
until then we're conservative.
"""
def accept_all(
conn_: SSL.Connection,
x509: SSL.X509,
errno: int,
err_depth: int,
is_cert_verified: bool,
) -> bool:
# Return true to prevent cert verification error
return True
if request_client_cert:
verify = SSL.VERIFY_PEER
else:
@ -425,7 +426,7 @@ class ClientHello:
return self._client_hello.cipher_suites.cipher_suites
@property
def sni(self):
def sni(self) -> typing.Optional[bytes]:
if self._client_hello.extensions:
for extension in self._client_hello.extensions.extensions:
is_valid_sni_extension = (
@ -435,7 +436,7 @@ class ClientHello:
check.is_valid_host(extension.body.server_names[0].host_name)
)
if is_valid_sni_extension:
return extension.body.server_names[0].host_name.decode("idna")
return extension.body.server_names[0].host_name
return None
@property
@ -473,10 +474,8 @@ class ClientHello:
return cls(raw_client_hello)
except EOFError as e:
raise exceptions.TlsProtocolException(
'Cannot parse Client Hello: %s, Raw Client Hello: %s' %
(repr(e), binascii.hexlify(raw_client_hello))
f"Cannot parse Client Hello: {e!r}, Raw Client Hello: {binascii.hexlify(raw_client_hello)!r}"
)
def __repr__(self):
return "ClientHello(sni: %s, alpn_protocols: %s, cipher_suites: %s)" % \
(self.sni, self.alpn_protocols, self.cipher_suites)
return f"ClientHello(sni: {self.sni}, alpn_protocols: {self.alpn_protocols})"

View File

@ -551,7 +551,9 @@ def serialize(opts: OptManager, text: str, defaults: bool = False) -> str:
for k in list(data.keys()):
if k not in opts._options:
del data[k]
return ruamel.yaml.round_trip_dump(data)
ret = ruamel.yaml.round_trip_dump(data)
assert ret
return ret
def save(opts: OptManager, path: str, defaults: bool =False) -> None:

View File

@ -1,7 +1,7 @@
import re
import socket
import sys
from typing import Tuple
from typing import Callable, Optional, Tuple
def init_transparent_mode() -> None:
@ -10,30 +10,34 @@ def init_transparent_mode() -> None:
"""
def original_addr(csock: socket.socket) -> Tuple[str, int]:
"""
Get the original destination for the given socket.
This function will be None if transparent mode is not supported.
"""
original_addr: Optional[Callable[[socket.socket], Tuple[str, int]]]
"""
Get the original destination for the given socket.
This function will be None if transparent mode is not supported.
"""
if re.match(r"linux(?:2)?", sys.platform):
from . import linux
original_addr = linux.original_addr # noqa
original_addr = linux.original_addr
elif sys.platform == "darwin" or sys.platform.startswith("freebsd"):
from . import osx
original_addr = osx.original_addr # noqa
original_addr = osx.original_addr
elif sys.platform.startswith("openbsd"):
from . import openbsd
original_addr = openbsd.original_addr # noqa
original_addr = openbsd.original_addr
elif sys.platform == "win32":
from . import windows
resolver = windows.Resolver()
init_transparent_mode = resolver.setup # noqa
original_addr = resolver.original_addr # noqa
original_addr = resolver.original_addr
else:
original_addr = None # noqa
original_addr = None
__all__ = [
"original_addr",
"init_transparent_mode"
]

View File

@ -34,9 +34,9 @@ class ProxyConfig:
def __init__(self, options: moptions.Options) -> None:
self.options = options
self.check_filter: HostMatcher = None
self.check_tcp: HostMatcher = None
self.certstore: certs.CertStore = None
self.certstore: certs.CertStore
self.check_filter: typing.Optional[HostMatcher] = None
self.check_tcp: typing.Optional[HostMatcher] = None
self.upstream_server: typing.Optional[server_spec.ServerSpec] = None
self.configure(options, set(options.keys()))
options.changed.connect(self.configure)

View File

@ -263,7 +263,7 @@ class HttpLayer(base.Layer):
else:
msg = "Unexpected CONNECT request."
self.send_error_response(400, msg)
raise exceptions.ProtocolException(msg)
return False
validate_request_form(self.mode, request)
self.channel.ask("requestheaders", f)
@ -289,9 +289,12 @@ class HttpLayer(base.Layer):
f.request = None
f.error = flow.Error(str(e))
self.channel.ask("error", f)
raise exceptions.ProtocolException(
"HTTP protocol error in client request: {}".format(e)
) from e
self.log(
"request",
"warn",
["HTTP protocol error in client request: {}".format(e)]
)
return False
self.log("request", "debug", [repr(request)])
@ -448,8 +451,8 @@ class HttpLayer(base.Layer):
return False # should never be reached
except (exceptions.ProtocolException, exceptions.NetlibException) as e:
self.send_error_response(502, repr(e))
if not f.response:
self.send_error_response(502, repr(e))
f.error = flow.Error(str(e))
self.channel.ask("error", f)
return False

View File

@ -1,7 +1,7 @@
import threading
import time
import functools
from typing import Dict, Callable, Any, List # noqa
from typing import Dict, Callable, Any, List, Optional # noqa
import h2.exceptions
from h2 import connection
@ -382,15 +382,15 @@ class Http2SingleStreamLayer(httpbase._HttpTransmissionLayer, basethread.BaseThr
ctx, name="Http2SingleStreamLayer-{}".format(stream_id)
)
self.h2_connection = h2_connection
self.zombie: float = None
self.zombie: Optional[float] = None
self.client_stream_id: int = stream_id
self.server_stream_id: int = None
self.server_stream_id: Optional[int] = None
self.request_headers = request_headers
self.response_headers: mitmproxy.net.http.Headers = None
self.response_headers: Optional[mitmproxy.net.http.Headers] = None
self.pushed = False
self.timestamp_start: float = None
self.timestamp_end: float = None
self.timestamp_start: Optional[float] = None
self.timestamp_end: Optional[float] = None
self.request_arrived = threading.Event()
self.request_data_queue: queue.Queue[bytes] = queue.Queue()
@ -404,9 +404,9 @@ class Http2SingleStreamLayer(httpbase._HttpTransmissionLayer, basethread.BaseThr
self.no_body = False
self.priority_exclusive: bool = None
self.priority_depends_on: int = None
self.priority_weight: int = None
self.priority_exclusive: bool
self.priority_depends_on: Optional[int] = None
self.priority_weight: Optional[int] = None
self.handled_priority_event: Any = None
def kill(self):

View File

@ -196,17 +196,14 @@ CIPHER_ID_NAME_MAP = {
}
# We manually need to specify this, otherwise OpenSSL may select a non-HTTP2 cipher by default.
# https://mozilla.github.io/server-side-tls/ssl-config-generator/?server=apache-2.2.15&openssl=1.0.2&hsts=yes&profile=old
# https://ssl-config.mozilla.org/#config=old
DEFAULT_CLIENT_CIPHERS = (
"ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:"
"ECDHE-ECDSA-AES256-GCM-SHA384:DHE-RSA-AES128-GCM-SHA256:DHE-DSS-AES128-GCM-SHA256:kEDH+AESGCM:"
"ECDHE-RSA-AES128-SHA256:ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA:ECDHE-ECDSA-AES128-SHA:"
"ECDHE-RSA-AES256-SHA384:ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA:ECDHE-ECDSA-AES256-SHA:"
"DHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA:DHE-DSS-AES128-SHA256:DHE-RSA-AES256-SHA256:DHE-DSS-AES256-SHA:"
"DHE-RSA-AES256-SHA:ECDHE-RSA-DES-CBC3-SHA:ECDHE-ECDSA-DES-CBC3-SHA:AES128-GCM-SHA256:AES256-GCM-SHA384:"
"AES128-SHA256:AES256-SHA256:AES128-SHA:AES256-SHA:AES:DES-CBC3-SHA:"
"HIGH:!aNULL:!eNULL:!EXPORT:!DES:!RC4:!MD5:!PSK:!aECDH:"
"!EDH-DSS-DES-CBC3-SHA:!EDH-RSA-DES-CBC3-SHA:!KRB5-DES-CBC3-SHA"
"ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:"
"ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:DHE-RSA-AES128-GCM-SHA256:"
"DHE-RSA-AES256-GCM-SHA384:DHE-RSA-CHACHA20-POLY1305:ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA256:"
"ECDHE-ECDSA-AES128-SHA:ECDHE-RSA-AES128-SHA:ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:"
"ECDHE-ECDSA-AES256-SHA:ECDHE-RSA-AES256-SHA:DHE-RSA-AES128-SHA256:DHE-RSA-AES256-SHA256:AES128-GCM-SHA256:"
"AES256-GCM-SHA384:AES128-SHA256:AES256-SHA256:AES128-SHA:AES256-SHA:DES-CBC3-SHA"
)
@ -323,14 +320,18 @@ class TlsLayer(base.Layer):
return self._server_tls
@property
def server_sni(self):
def server_sni(self) -> Optional[str]:
"""
The Server Name Indication we want to send with the next server TLS handshake.
"""
if self._custom_server_sni is False:
return None
elif self._custom_server_sni:
return self._custom_server_sni
elif self._client_hello and self._client_hello.sni:
return self._client_hello.sni.decode("idna")
else:
return self._custom_server_sni or self._client_hello and self._client_hello.sni
return None
@property
def alpn_for_client_connection(self):
@ -391,11 +392,12 @@ class TlsLayer(base.Layer):
# raises ann error.
self.client_conn.rfile.peek(1)
except exceptions.TlsException as e:
sni_str = self._client_hello.sni and self._client_hello.sni.decode("idna")
raise exceptions.ClientHandshakeException(
"Cannot establish TLS with client (sni: {sni}): {e}".format(
sni=self._client_hello.sni, e=repr(e)
sni=sni_str, e=repr(e)
),
self._client_hello.sni or repr(self.server_conn.address)
sni_str or repr(self.server_conn.address)
)
def _establish_tls_with_server(self):
@ -493,7 +495,7 @@ class TlsLayer(base.Layer):
organization = upstream_cert.organization
# Also add SNI values.
if self._client_hello.sni:
sans.add(self._client_hello.sni.encode("idna"))
sans.add(self._client_hello.sni)
if self._custom_server_sni:
sans.add(self._custom_server_sni.encode("idna"))

View File

@ -57,7 +57,8 @@ class RootContext:
except exceptions.TlsProtocolException as e:
self.log("Cannot parse Client Hello: %s" % repr(e), "error")
else:
is_filtered = self.config.check_filter((client_hello.sni, 443))
sni_str = client_hello.sni and client_hello.sni.decode("idna")
is_filtered = self.config.check_filter((sni_str, 443))
if is_filtered:
return protocol.RawTCPLayer(top_layer, ignore=True)

View File

@ -35,6 +35,7 @@ class DummyServer:
class ProxyServer(tcp.TCPServer):
allow_reuse_address = True
bound = True
channel: controller.Channel
def __init__(self, config: config.ProxyConfig) -> None:
"""
@ -53,7 +54,6 @@ class ProxyServer(tcp.TCPServer):
raise exceptions.ServerException(
'Error starting proxy server: ' + repr(e)
) from e
self.channel: controller.Channel = None
def set_channel(self, channel):
self.channel = channel

View File

@ -1,7 +1,5 @@
import typing
from typing import Any # noqa
from typing import MutableMapping # noqa
import json
import typing
from mitmproxy.coretypes import serializable
from mitmproxy.utils import typecheck
@ -15,7 +13,7 @@ class StateObject(serializable.Serializable):
or StateObject instances themselves.
"""
_stateobject_attributes: MutableMapping[str, Any] = None
_stateobject_attributes: typing.ClassVar[typing.MutableMapping[str, typing.Any]]
"""
An attribute-name -> class-or-type dict containing all attributes that
should be serialized. If the attribute is a class, it must implement the
@ -42,7 +40,7 @@ class StateObject(serializable.Serializable):
if val is None:
setattr(self, attr, val)
else:
curr = getattr(self, attr)
curr = getattr(self, attr, None)
if hasattr(curr, "set_state"):
curr.set_state(val)
else:

View File

@ -6,19 +6,16 @@ Feel free to import and use whatever new package you deem necessary.
import os
import sys
import asyncio
import argparse # noqa
import signal # noqa
import typing # noqa
import argparse
import signal
import typing
from mitmproxy.tools import cmdline # noqa
from mitmproxy import exceptions, master # noqa
from mitmproxy import options # noqa
from mitmproxy import optmanager # noqa
from mitmproxy import proxy # noqa
from mitmproxy import log # noqa
from mitmproxy.utils import debug, arg_check # noqa
OPTIONS_FILE_NAME = "config.yaml"
from mitmproxy.tools import cmdline
from mitmproxy import exceptions, master
from mitmproxy import options
from mitmproxy import optmanager
from mitmproxy import proxy
from mitmproxy.utils import debug, arg_check
def assert_utf8_env():
@ -90,7 +87,8 @@ def run(
opts.set(*args.setoptions, defer=True)
optmanager.load_paths(
opts,
os.path.join(opts.confdir, OPTIONS_FILE_NAME),
os.path.join(opts.confdir, "config.yaml"),
os.path.join(opts.confdir, "config.yml"),
)
pconf = process_options(parser, opts, args)
server: typing.Any = None

View File

@ -55,7 +55,7 @@ class CommandBuffer:
self.text = start
# Cursor is always within the range [0:len(buffer)].
self._cursor = len(self.text)
self.completion: CompletionState = None
self.completion: typing.Optional[CompletionState] = None
@property
def cursor(self) -> int:

View File

@ -38,7 +38,7 @@ KEY_MAX = 30
def format_keyvals(
entries: typing.List[typing.Tuple[str, typing.Union[None, str, urwid.Widget]]],
entries: typing.Iterable[typing.Tuple[str, typing.Union[None, str, urwid.Widget]]],
key_format: str = "key",
value_format: str = "text",
indent: int = 0

View File

@ -381,7 +381,8 @@ class ConsoleAddon:
"""
return [
"cookies",
"form",
"urlencoded form",
"multipart form",
"path",
"method",
"query",
@ -416,8 +417,10 @@ class ConsoleAddon:
flow.response = http.HTTPResponse.make()
if part == "cookies":
self.master.switch_view("edit_focus_cookies")
elif part == "form":
self.master.switch_view("edit_focus_form")
elif part == "urlencoded form":
self.master.switch_view("edit_focus_urlencoded_form")
elif part == "multipart form":
self.master.switch_view("edit_focus_multipart_form")
elif part == "path":
self.master.switch_view("edit_focus_path")
elif part == "query":

View File

@ -254,7 +254,7 @@ FIRST_WIDTH_MAX = 40
class BaseGridEditor(urwid.WidgetWrap):
title = ""
title: str = ""
keyctx = "grideditor"
def __init__(
@ -402,8 +402,8 @@ class BaseGridEditor(urwid.WidgetWrap):
class GridEditor(BaseGridEditor):
title: str = None
columns: typing.Sequence[Column] = None
title = ""
columns: typing.Sequence[Column] = ()
keyctx = "grideditor"
def __init__(

View File

@ -53,14 +53,30 @@ class ResponseHeaderEditor(HeaderEditor):
flow.response.headers = Headers(vals)
class RequestFormEditor(base.FocusEditor):
title = "Edit URL-encoded Form"
class RequestMultipartEditor(base.FocusEditor):
title = "Edit Multipart Form"
columns = [
col_text.Column("Key"),
col_text.Column("Value")
]
def get_data(self, flow):
return flow.request.multipart_form.items(multi=True)
def set_data(self, vals, flow):
flow.request.multipart_form = vals
class RequestUrlEncodedEditor(base.FocusEditor):
title = "Edit UrlEncoded Form"
columns = [
col_text.Column("Key"),
col_text.Column("Value")
]
def get_data(self, flow):
return flow.request.urlencoded_form.items(multi=True)
def set_data(self, vals, flow):
@ -107,7 +123,7 @@ class CookieAttributeEditor(base.FocusEditor):
col_text.Column("Name"),
col_text.Column("Value"),
]
grideditor: base.BaseGridEditor = None
grideditor: base.BaseGridEditor
def data_in(self, data):
return [(k, v or "") for k, v in data]
@ -169,7 +185,7 @@ class SetCookieEditor(base.FocusEditor):
class OptionsEditor(base.GridEditor, layoutwidget.LayoutWidget):
title: str = None
title = ""
columns = [
col_text.Column("")
]
@ -189,7 +205,7 @@ class OptionsEditor(base.GridEditor, layoutwidget.LayoutWidget):
class DataViewer(base.GridEditor, layoutwidget.LayoutWidget):
title: str = None
title = ""
def __init__(
self,

View File

@ -120,7 +120,7 @@ class ConsoleMaster(master.Master):
with open(fd, "w" if text else "wb") as f:
f.write(data)
# if no EDITOR is set, assume 'vi'
c = os.environ.get("EDITOR") or "vi"
c = os.environ.get("MITMPROXY_EDITOR") or os.environ.get("EDITOR") or "vi"
cmd = shlex.split(c)
cmd.append(name)
with self.uistopped():
@ -159,7 +159,7 @@ class ConsoleMaster(master.Master):
shell = True
if not cmd:
# hm which one should get priority?
c = os.environ.get("PAGER") or os.environ.get("EDITOR")
c = os.environ.get("MITMPROXY_EDITOR") or os.environ.get("PAGER") or os.environ.get("EDITOR")
if not c:
c = "less"
cmd = shlex.split(c)

View File

@ -42,7 +42,7 @@ class Palette:
'commander_command', 'commander_invalid', 'commander_hint'
]
_fields.extend(['gradient_%02d' % i for i in range(100)])
high: typing.Mapping[str, typing.Sequence[str]] = None
high: typing.Optional[typing.Mapping[str, typing.Sequence[str]]] = None
def palette(self, transparent):
l = []

View File

@ -64,7 +64,8 @@ class WindowStack:
edit_focus_cookies = grideditor.CookieEditor(master),
edit_focus_setcookies = grideditor.SetCookieEditor(master),
edit_focus_setcookie_attrs = grideditor.CookieAttributeEditor(master),
edit_focus_form = grideditor.RequestFormEditor(master),
edit_focus_multipart_form=grideditor.RequestMultipartEditor(master),
edit_focus_urlencoded_form=grideditor.RequestUrlEncodedEditor(master),
edit_focus_path = grideditor.PathEditor(master),
edit_focus_request_headers = grideditor.RequestHeaderEditor(master),
edit_focus_response_headers = grideditor.ResponseHeaderEditor(master),

View File

@ -5,6 +5,7 @@ import logging
import os.path
import re
from io import BytesIO
from typing import ClassVar, Optional
import tornado.escape
import tornado.web
@ -50,6 +51,8 @@ def flow_to_json(flow: mitmproxy.flow.Flow) -> dict:
f["error"] = flow.error.get_state()
if isinstance(flow, http.HTTPFlow):
content_length: Optional[int]
content_hash: Optional[str]
if flow.request:
if flow.request.raw_content:
content_length = len(flow.request.raw_content)
@ -193,7 +196,7 @@ class FilterHelp(RequestHandler):
class WebSocketEventBroadcaster(tornado.websocket.WebSocketHandler):
# raise an error if inherited class doesn't specify its own instance.
connections: set = None
connections: ClassVar[set]
def open(self):
self.connections.add(self)
@ -213,7 +216,7 @@ class WebSocketEventBroadcaster(tornado.websocket.WebSocketHandler):
class ClientConnection(WebSocketEventBroadcaster):
connections: set = set()
connections: ClassVar[set] = set()
class Flows(RequestHandler):

View File

@ -423,7 +423,7 @@ class TypeManager:
for t in types:
self.typemap[t.typ] = t()
def get(self, t: type, default=None) -> _BaseType:
def get(self, t: typing.Optional[typing.Type], default=None) -> _BaseType:
if type(t) in self.typemap:
return self.typemap[type(t)]
return self.typemap.get(t, default)

View File

@ -1,5 +1,5 @@
import itertools
from typing import TypeVar, Iterable, Iterator, Tuple, Optional
from typing import TypeVar, Iterable, Iterator, Tuple, Optional, List
T = TypeVar('T')
@ -18,7 +18,7 @@ def window(iterator: Iterable[T], behind: int = 0, ahead: int = 0) -> Iterator[T
2 3 None
"""
# TODO: move into utils
iters = list(itertools.tee(iterator, behind + 1 + ahead))
iters: List[Iterator[Optional[T]]] = list(itertools.tee(iterator, behind + 1 + ahead))
for i in range(behind):
iters[i] = itertools.chain((behind - i) * [None], iters[i])
for i in range(ahead):

View File

@ -1,10 +1,10 @@
import codecs
import io
import re
import codecs
from typing import AnyStr, Optional, cast, Iterable
from typing import Iterable, Optional, Union, cast
def always_bytes(str_or_bytes: Optional[AnyStr], *encode_args) -> Optional[bytes]:
def always_bytes(str_or_bytes: Union[str, bytes, None], *encode_args) -> Optional[bytes]:
if isinstance(str_or_bytes, bytes) or str_or_bytes is None:
return cast(Optional[bytes], str_or_bytes)
elif isinstance(str_or_bytes, str):
@ -13,13 +13,15 @@ def always_bytes(str_or_bytes: Optional[AnyStr], *encode_args) -> Optional[bytes
raise TypeError("Expected str or bytes, but got {}.".format(type(str_or_bytes).__name__))
def always_str(str_or_bytes: Optional[AnyStr], *decode_args) -> Optional[str]:
def always_str(str_or_bytes: Union[str, bytes, None], *decode_args) -> Optional[str]:
"""
Returns,
str_or_bytes unmodified, if
"""
if isinstance(str_or_bytes, str) or str_or_bytes is None:
return cast(Optional[str], str_or_bytes)
if str_or_bytes is None:
return None
if isinstance(str_or_bytes, str):
return cast(str, str_or_bytes)
elif isinstance(str_or_bytes, bytes):
return str_or_bytes.decode(*decode_args)
else:
@ -39,7 +41,6 @@ _control_char_trans_newline = _control_char_trans.copy()
for x in ("\r", "\n", "\t"):
del _control_char_trans_newline[ord(x)]
_control_char_trans = str.maketrans(_control_char_trans)
_control_char_trans_newline = str.maketrans(_control_char_trans_newline)

View File

@ -25,9 +25,9 @@ def get_dev_version() -> str:
stderr=subprocess.STDOUT,
cwd=here,
)
last_tag, tag_dist, commit = git_describe.decode().strip().rsplit("-", 2)
last_tag, tag_dist_str, commit = git_describe.decode().strip().rsplit("-", 2)
commit = commit.lstrip("g")[:7]
tag_dist = int(tag_dist)
tag_dist = int(tag_dist_str)
except Exception:
pass
else:

View File

@ -25,7 +25,7 @@ class Daemon:
def __enter__(self):
return self
def __exit__(self, type, value, traceback) -> bool:
def __exit__(self, type, value, traceback):
self.logfp.truncate(0)
self.shutdown()
return False

View File

@ -1,7 +1,7 @@
[flake8]
max-line-length = 140
max-complexity = 25
ignore = E251,E252,C901,W292,W503,W504,W605,E722,E741
ignore = E251,E252,C901,W292,W503,W504,W605,E722,E741,E126
exclude = mitmproxy/contrib/*,test/mitmproxy/data/*,release/build/*,mitmproxy/io/proto/*
addons = file,open,basestring,xrange,unicode,long,cmp

View File

@ -72,7 +72,7 @@ setup(
"kaitaistruct>=0.7,<0.9",
"ldap3>=2.6.1,<2.7",
"passlib>=1.6.5, <1.8",
"protobuf>=3.6.0, <3.10",
"protobuf>=3.6.0, <3.11",
"pyasn1>=0.3.1,<0.5",
"pyOpenSSL>=19.0.0,<20",
"pyparsing>=2.4.2,<2.5",
@ -93,7 +93,7 @@ setup(
"asynctest>=0.12.0",
"flake8>=3.7.8,<3.8",
"Flask>=1.0,<1.2",
"mypy>=0.590,<0.591",
"mypy>=0.740,<0.741",
"parver>=0.1,<2.0",
"pytest-asyncio>=0.10.0,<0.11",
"pytest-cov>=2.7.1,<3",

View File

@ -1,5 +1,6 @@
from mitmproxy.net.http import Headers
from mitmproxy.net.http import multipart
import pytest
def test_decode():
@ -22,3 +23,39 @@ def test_decode():
assert len(form) == 2
assert form[0] == (b"field1", b"value1")
assert form[1] == (b"field2", b"value2")
boundary = 'boundary茅莽'
headers = Headers(
content_type='multipart/form-data; boundary=' + boundary
)
result = multipart.decode(headers, content)
assert result == []
headers = Headers(
content_type=''
)
assert multipart.decode(headers, content) == []
def test_encode():
data = [("file".encode('utf-8'), "shell.jpg".encode('utf-8')),
("file_size".encode('utf-8'), "1000".encode('utf-8'))]
headers = Headers(
content_type='multipart/form-data; boundary=127824672498'
)
content = multipart.encode(headers, data)
assert b'Content-Disposition: form-data; name="file"' in content
assert b'Content-Type: text/plain; charset=utf-8\r\n\r\nshell.jpg\r\n\r\n--127824672498\r\n' in content
assert b'1000\r\n\r\n--127824672498--\r\n'
assert len(content) == 252
with pytest.raises(ValueError, match=r"boundary found in encoded string"):
multipart.encode(headers, [("key".encode('utf-8'), "--127824672498".encode('utf-8'))])
boundary = 'boundary茅莽'
headers = Headers(
content_type='multipart/form-data; boundary=' + boundary
)
result = multipart.encode(headers, data)
assert result == b''

View File

@ -371,6 +371,7 @@ class TestRequestUtils:
assert list(request.multipart_form.items()) == []
def test_set_multipart_form(self):
request = treq(content=b"foobar")
with pytest.raises(NotImplementedError):
request.multipart_form = "foobar"
request = treq()
request.multipart_form = [("file", "shell.jpg"), ("file_size", "1000")]
assert request.headers["Content-Type"] == 'multipart/form-data'
assert request.content is None

View File

@ -116,7 +116,7 @@ class TestClientHello:
)
c = tls.ClientHello(data)
assert repr(c)
assert c.sni == 'example.com'
assert c.sni == b'example.com'
assert c.cipher_suites == [
49195, 49199, 49196, 49200, 52393, 52392, 52244, 52243, 49161,
49171, 49162, 49172, 156, 157, 47, 53, 10