mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2025-01-30 14:58:38 +00:00
update tests, increase coverage, add type info
This commit is contained in:
parent
f53f079f91
commit
a7ba2f7b46
@ -14,31 +14,37 @@ passed as the ``headers`` keyword argument. For HTTP requests, the query
|
||||
parameters are passed as the ``query`` keyword argument.
|
||||
"""
|
||||
import traceback
|
||||
from typing import Dict, Optional # noqa
|
||||
from typing import List # noqa
|
||||
from typing import Tuple # noqa
|
||||
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy.net import http
|
||||
from mitmproxy.utils import strutils
|
||||
from . import (
|
||||
auto, raw, hex, json, xml, wbxml, html, javascript, css,
|
||||
urlencoded, multipart, image, query, protobuf
|
||||
)
|
||||
from .base import View, VIEW_CUTOFF, KEY_MAX, format_text, format_dict
|
||||
|
||||
from .base import VIEW_CUTOFF, KEY_MAX
|
||||
|
||||
views = []
|
||||
content_types_map = {}
|
||||
view_prompts = []
|
||||
views = [] # type: List[View]
|
||||
content_types_map = {} # type: Dict[str, List[View]]
|
||||
view_prompts = [] # type: List[Tuple[str, str]]
|
||||
|
||||
|
||||
def get(name):
|
||||
def get(name: str) -> Optional[View]:
|
||||
for i in views:
|
||||
if i.name.lower() == name.lower():
|
||||
return i
|
||||
|
||||
|
||||
def get_by_shortcut(c):
|
||||
def get_by_shortcut(c: str) -> Optional[View]:
|
||||
for i in views:
|
||||
if i.prompt[1] == c:
|
||||
return i
|
||||
|
||||
|
||||
def add(view):
|
||||
def add(view: View) -> None:
|
||||
# TODO: auto-select a different name (append an integer?)
|
||||
for i in views:
|
||||
if i.name == view.name:
|
||||
@ -58,7 +64,7 @@ def add(view):
|
||||
view_prompts.append(view.prompt)
|
||||
|
||||
|
||||
def remove(view):
|
||||
def remove(view: View) -> None:
|
||||
for ct in view.content_types:
|
||||
l = content_types_map.setdefault(ct, [])
|
||||
l.remove(view)
|
||||
@ -123,7 +129,7 @@ def get_message_content_view(viewname, message):
|
||||
return description, lines, error
|
||||
|
||||
|
||||
def get_content_view(viewmode, data, **metadata):
|
||||
def get_content_view(viewmode: View, data: bytes, **metadata):
|
||||
"""
|
||||
Args:
|
||||
viewmode: the view to use.
|
||||
@ -153,11 +159,6 @@ def get_content_view(viewmode, data, **metadata):
|
||||
return desc, safe_to_print(content), error
|
||||
|
||||
|
||||
from . import (
|
||||
auto, raw, hex, json, xml, wbxml, html, javascript, css,
|
||||
urlencoded, multipart, image, query, protobuf
|
||||
)
|
||||
|
||||
add(auto.ViewAuto())
|
||||
add(raw.ViewRaw())
|
||||
add(hex.ViewHex())
|
||||
@ -174,4 +175,10 @@ add(image.ViewImage())
|
||||
add(query.ViewQuery())
|
||||
|
||||
if protobuf.ViewProtobuf.is_available():
|
||||
add(protobuf.ViewProtobuf())
|
||||
add(protobuf.ViewProtobuf())
|
||||
|
||||
__all__ = [
|
||||
"View", "VIEW_CUTOFF", "KEY_MAX", "format_text", "format_dict",
|
||||
"get", "get_by_shortcut", "add", "remove",
|
||||
"get_content_view", "get_message_content_view",
|
||||
]
|
||||
|
@ -1,12 +1,12 @@
|
||||
from mitmproxy import contentviews
|
||||
from mitmproxy.net import http
|
||||
from mitmproxy.utils import strutils
|
||||
from . import base
|
||||
from mitmproxy.contentviews import get, content_types_map
|
||||
|
||||
|
||||
class ViewAuto(base.View):
|
||||
name = "Auto"
|
||||
prompt = ("auto", "a")
|
||||
content_types = []
|
||||
|
||||
def __call__(self, data, **metadata):
|
||||
headers = metadata.get("headers", {})
|
||||
@ -14,14 +14,14 @@ class ViewAuto(base.View):
|
||||
if data and ctype:
|
||||
ct = http.parse_content_type(ctype) if ctype else None
|
||||
ct = "%s/%s" % (ct[0], ct[1])
|
||||
if ct in content_types_map:
|
||||
return content_types_map[ct][0](data, **metadata)
|
||||
if ct in contentviews.content_types_map:
|
||||
return contentviews.content_types_map[ct][0](data, **metadata)
|
||||
elif strutils.is_xml(data):
|
||||
return get("XML")(data, **metadata)
|
||||
return contentviews.get("XML")(data, **metadata)
|
||||
if metadata.get("query"):
|
||||
return get("Query")(data, **metadata)
|
||||
return contentviews.get("Query")(data, **metadata)
|
||||
if data and strutils.is_mostly_bin(data):
|
||||
return get("Hex")(data)
|
||||
return contentviews.get("Hex")(data)
|
||||
if not data:
|
||||
return "No content", []
|
||||
return get("Raw")(data)
|
||||
return contentviews.get("Raw")(data)
|
||||
|
@ -1,9 +1,8 @@
|
||||
# Default view cutoff *in lines*
|
||||
|
||||
from typing import Iterable
|
||||
from typing import Iterable, AnyStr, List
|
||||
from typing import Mapping
|
||||
from typing import Tuple
|
||||
from typing import Union
|
||||
|
||||
VIEW_CUTOFF = 512
|
||||
|
||||
@ -11,9 +10,9 @@ KEY_MAX = 30
|
||||
|
||||
|
||||
class View:
|
||||
name = None
|
||||
prompt = ()
|
||||
content_types = []
|
||||
name = None # type: str
|
||||
prompt = None # type: Tuple[str,str]
|
||||
content_types = [] # type: List[str]
|
||||
|
||||
def __call__(self, data: bytes, **metadata):
|
||||
"""
|
||||
@ -35,12 +34,12 @@ class View:
|
||||
The content generator must not yield tuples of tuples,
|
||||
because urwid cannot process that. You have to yield a *list* of tuples per line.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
raise NotImplementedError() # pragma: no cover
|
||||
|
||||
|
||||
def format_dict(
|
||||
d: Mapping[Union[str, bytes], Union[str, bytes]]
|
||||
) -> Iterable[Tuple[Union[str, bytes], Union[str, bytes]]]:
|
||||
d: Mapping[AnyStr, AnyStr]
|
||||
) -> Iterable[List[Tuple[str, AnyStr]]]:
|
||||
"""
|
||||
Helper function that transforms the given dictionary into a list of
|
||||
("key", key )
|
||||
@ -58,7 +57,7 @@ def format_dict(
|
||||
]
|
||||
|
||||
|
||||
def format_text(text):
|
||||
def format_text(text: AnyStr) -> Iterable[List[Tuple[str, AnyStr]]]:
|
||||
"""
|
||||
Helper function that transforms bytes into the view output format.
|
||||
"""
|
||||
|
@ -5,7 +5,6 @@ from . import base
|
||||
class ViewHex(base.View):
|
||||
name = "Hex"
|
||||
prompt = ("hex", "e")
|
||||
content_types = []
|
||||
|
||||
@staticmethod
|
||||
def _format(data):
|
||||
|
@ -1,7 +1,7 @@
|
||||
import json
|
||||
from typing import Optional
|
||||
|
||||
from mitmproxy.contentviews.base import format_text, View
|
||||
from mitmproxy.contentviews import base
|
||||
|
||||
|
||||
def pretty_json(s: bytes) -> Optional[bytes]:
|
||||
@ -10,15 +10,10 @@ def pretty_json(s: bytes) -> Optional[bytes]:
|
||||
except ValueError:
|
||||
return None
|
||||
pretty = json.dumps(p, sort_keys=True, indent=4, ensure_ascii=False)
|
||||
if isinstance(pretty, str):
|
||||
# json.dumps _may_ decide to return unicode, if the JSON object is not ascii.
|
||||
# From limited testing this is always valid utf8 (otherwise json.loads will fail earlier),
|
||||
# so we can just re-encode it here.
|
||||
return pretty.encode("utf8", "strict")
|
||||
return pretty
|
||||
return pretty.encode("utf8", "strict")
|
||||
|
||||
|
||||
class ViewJSON(View):
|
||||
class ViewJSON(base.View):
|
||||
name = "JSON"
|
||||
prompt = ("json", "s")
|
||||
content_types = [
|
||||
@ -29,4 +24,4 @@ class ViewJSON(View):
|
||||
def __call__(self, data, **metadata):
|
||||
pj = pretty_json(data)
|
||||
if pj:
|
||||
return "JSON", format_text(pj)
|
||||
return "JSON", base.format_text(pj)
|
||||
|
@ -2,6 +2,7 @@ from mitmproxy.net import http
|
||||
from mitmproxy.types import multidict
|
||||
from . import base
|
||||
|
||||
|
||||
class ViewMultipart(base.View):
|
||||
name = "Multipart Form"
|
||||
prompt = ("multipart", "m")
|
||||
|
@ -1,10 +1,11 @@
|
||||
from typing import List # noqa
|
||||
|
||||
from . import base
|
||||
|
||||
|
||||
class ViewQuery(base.View):
|
||||
name = "Query"
|
||||
prompt = ("query", "q")
|
||||
content_types = []
|
||||
|
||||
def __call__(self, data, **metadata):
|
||||
query = metadata.get("query")
|
||||
|
@ -1,3 +1,5 @@
|
||||
from typing import List # noqa
|
||||
|
||||
from mitmproxy.utils import strutils
|
||||
from . import base
|
||||
|
||||
@ -5,7 +7,6 @@ from . import base
|
||||
class ViewRaw(base.View):
|
||||
name = "Raw"
|
||||
prompt = ("raw", "r")
|
||||
content_types = []
|
||||
|
||||
def __call__(self, data, **metadata):
|
||||
return "Raw", base.format_text(strutils.bytes_to_escaped_str(data, True))
|
||||
|
30
mitmproxy/utils/sliding_window.py
Normal file
30
mitmproxy/utils/sliding_window.py
Normal file
@ -0,0 +1,30 @@
|
||||
import itertools
|
||||
from typing import TypeVar, Iterator, Tuple, Optional
|
||||
|
||||
T = TypeVar('T')
|
||||
|
||||
|
||||
def window(iterator: Iterator[T], behind: int = 0, ahead: int = 0) -> Iterator[Tuple[Optional[T]]]:
|
||||
"""
|
||||
Sliding window for an iterator.
|
||||
|
||||
Example:
|
||||
>>> for prev, i, nxt in window(range(10), 1, 1):
|
||||
>>> print(prev, i, nxt)
|
||||
|
||||
None 0 1
|
||||
0 1 2
|
||||
1 2 3
|
||||
2 3 None
|
||||
"""
|
||||
# TODO: move into utils
|
||||
iters = list(itertools.tee(iterator, behind + 1 + ahead))
|
||||
for i in range(behind):
|
||||
iters[i] = itertools.chain((behind - i) * [None], iters[i])
|
||||
for i in range(ahead):
|
||||
iters[-1 - i] = itertools.islice(
|
||||
itertools.chain(iters[-1 - i], (ahead - i) * [None]),
|
||||
(ahead - i),
|
||||
None
|
||||
)
|
||||
return zip(*iters)
|
@ -131,7 +131,7 @@ def test_echo_request_line():
|
||||
|
||||
|
||||
class TestContentView:
|
||||
@mock.patch("mitmproxy.contentviews.ViewAuto.__call__")
|
||||
@mock.patch("mitmproxy.contentviews.auto.ViewAuto.__call__")
|
||||
def test_contentview(self, view_auto):
|
||||
view_auto.side_effect = exceptions.ContentViewException("")
|
||||
sio = io.StringIO()
|
||||
|
9
test/mitmproxy/contentviews/__init__.py
Normal file
9
test/mitmproxy/contentviews/__init__.py
Normal file
@ -0,0 +1,9 @@
|
||||
def full_eval(instance):
|
||||
def call(data, **metadata):
|
||||
x = instance(data, **metadata)
|
||||
if x is None:
|
||||
return None
|
||||
name, generator = x
|
||||
return name, list(generator)
|
||||
|
||||
return call
|
85
test/mitmproxy/contentviews/test_api.py
Normal file
85
test/mitmproxy/contentviews/test_api.py
Normal file
@ -0,0 +1,85 @@
|
||||
import mock
|
||||
|
||||
from mitmproxy import contentviews
|
||||
from mitmproxy.exceptions import ContentViewException
|
||||
from mitmproxy.net.http import Headers
|
||||
from mitmproxy.test import tutils
|
||||
|
||||
|
||||
class TestContentView(contentviews.View):
|
||||
name = "test"
|
||||
prompt = ("t", "test")
|
||||
content_types = ["test/123"]
|
||||
|
||||
|
||||
def test_add_remove():
|
||||
tcv = TestContentView()
|
||||
contentviews.add(tcv)
|
||||
|
||||
# repeated addition causes exception
|
||||
with tutils.raises(ContentViewException):
|
||||
contentviews.add(tcv)
|
||||
|
||||
# Same shortcut doesn't work either.
|
||||
with tutils.raises(ContentViewException):
|
||||
contentviews.add(TestContentView())
|
||||
|
||||
contentviews.remove(tcv)
|
||||
|
||||
|
||||
def test_get_content_view():
|
||||
desc, lines, err = contentviews.get_content_view(
|
||||
contentviews.get("Raw"),
|
||||
b"[1, 2, 3]",
|
||||
)
|
||||
assert "Raw" in desc
|
||||
assert list(lines)
|
||||
assert not err
|
||||
|
||||
desc, lines, err = contentviews.get_content_view(
|
||||
contentviews.get("Auto"),
|
||||
b"[1, 2, 3]",
|
||||
headers=Headers(content_type="application/json")
|
||||
)
|
||||
assert desc == "JSON"
|
||||
|
||||
desc, lines, err = contentviews.get_content_view(
|
||||
contentviews.get("JSON"),
|
||||
b"[1, 2",
|
||||
)
|
||||
assert "Couldn't parse" in desc
|
||||
|
||||
with mock.patch("mitmproxy.contentviews.auto.ViewAuto.__call__") as view_auto:
|
||||
view_auto.side_effect = ValueError
|
||||
|
||||
desc, lines, err = contentviews.get_content_view(
|
||||
contentviews.get("Auto"),
|
||||
b"[1, 2",
|
||||
)
|
||||
assert err
|
||||
assert "Couldn't parse" in desc
|
||||
|
||||
|
||||
def test_get_message_content_view():
|
||||
r = tutils.treq()
|
||||
desc, lines, err = contentviews.get_message_content_view("raw", r)
|
||||
assert desc == "Raw"
|
||||
|
||||
desc, lines, err = contentviews.get_message_content_view("unknown", r)
|
||||
assert desc == "Raw"
|
||||
|
||||
r.encode("gzip")
|
||||
desc, lines, err = contentviews.get_message_content_view("raw", r)
|
||||
assert desc == "[decoded gzip] Raw"
|
||||
|
||||
r.headers["content-encoding"] = "deflate"
|
||||
desc, lines, err = contentviews.get_message_content_view("raw", r)
|
||||
assert desc == "[cannot decode] Raw"
|
||||
|
||||
r.content = None
|
||||
desc, lines, err = contentviews.get_message_content_view("raw", r)
|
||||
assert list(lines) == [[("error", "content missing")]]
|
||||
|
||||
|
||||
def test_get_by_shortcut():
|
||||
assert contentviews.get_by_shortcut("h")
|
47
test/mitmproxy/contentviews/test_auto.py
Normal file
47
test/mitmproxy/contentviews/test_auto.py
Normal file
@ -0,0 +1,47 @@
|
||||
from mitmproxy.contentviews import auto
|
||||
from mitmproxy.net import http
|
||||
from mitmproxy.types import multidict
|
||||
from . import full_eval
|
||||
|
||||
|
||||
def test_view_auto():
|
||||
v = full_eval(auto.ViewAuto())
|
||||
f = v(
|
||||
b"foo",
|
||||
headers=http.Headers()
|
||||
)
|
||||
assert f[0] == "Raw"
|
||||
|
||||
f = v(
|
||||
b"<html></html>",
|
||||
headers=http.Headers(content_type="text/html")
|
||||
)
|
||||
assert f[0] == "HTML"
|
||||
|
||||
f = v(
|
||||
b"foo",
|
||||
headers=http.Headers(content_type="text/flibble")
|
||||
)
|
||||
assert f[0] == "Raw"
|
||||
|
||||
f = v(
|
||||
b"<xml></xml>",
|
||||
headers=http.Headers(content_type="text/flibble")
|
||||
)
|
||||
assert f[0].startswith("XML")
|
||||
|
||||
f = v(b"\xFF" * 30)
|
||||
assert f[0] == "Hex"
|
||||
|
||||
f = v(
|
||||
b"",
|
||||
headers=http.Headers()
|
||||
)
|
||||
assert f[0] == "No content"
|
||||
|
||||
f = v(
|
||||
b"",
|
||||
headers=http.Headers(),
|
||||
query=multidict.MultiDict([("foo", "bar")]),
|
||||
)
|
||||
assert f[0] == "Query"
|
29
test/mitmproxy/contentviews/test_css.py
Normal file
29
test/mitmproxy/contentviews/test_css.py
Normal file
@ -0,0 +1,29 @@
|
||||
from mitmproxy.contentviews import css
|
||||
from mitmproxy.test import tutils
|
||||
from . import full_eval
|
||||
|
||||
try:
|
||||
import cssutils
|
||||
except:
|
||||
cssutils = None
|
||||
|
||||
|
||||
def test_view_css():
|
||||
v = full_eval(css.ViewCSS())
|
||||
|
||||
with open(tutils.test_data.path('mitmproxy/data/1.css'), 'r') as fp:
|
||||
fixture_1 = fp.read()
|
||||
|
||||
result = v('a')
|
||||
|
||||
if cssutils:
|
||||
assert len(list(result[1])) == 0
|
||||
else:
|
||||
assert len(list(result[1])) == 1
|
||||
|
||||
result = v(fixture_1)
|
||||
|
||||
if cssutils:
|
||||
assert len(list(result[1])) > 1
|
||||
else:
|
||||
assert len(list(result[1])) == 1
|
7
test/mitmproxy/contentviews/test_hex.py
Normal file
7
test/mitmproxy/contentviews/test_hex.py
Normal file
@ -0,0 +1,7 @@
|
||||
from mitmproxy.contentviews import hex
|
||||
from . import full_eval
|
||||
|
||||
|
||||
def test_view_hex():
|
||||
v = full_eval(hex.ViewHex())
|
||||
assert v(b"foo")
|
18
test/mitmproxy/contentviews/test_html.py
Normal file
18
test/mitmproxy/contentviews/test_html.py
Normal file
@ -0,0 +1,18 @@
|
||||
from mitmproxy.contentviews import html
|
||||
from . import full_eval
|
||||
|
||||
|
||||
def test_view_html():
|
||||
v = full_eval(html.ViewHTML())
|
||||
s = b"<html><br><br></br><p>one</p></html>"
|
||||
assert v(s)
|
||||
|
||||
s = b"gobbledygook"
|
||||
assert not v(s)
|
||||
|
||||
|
||||
def test_view_html_outline():
|
||||
v = full_eval(html.ViewHTMLOutline())
|
||||
s = b"<html><br><br></br><p>one</p></html>"
|
||||
assert v(s)
|
||||
assert v(b'\xfe')
|
17
test/mitmproxy/contentviews/test_image.py
Normal file
17
test/mitmproxy/contentviews/test_image.py
Normal file
@ -0,0 +1,17 @@
|
||||
from mitmproxy.contentviews import image
|
||||
from mitmproxy.test import tutils
|
||||
from . import full_eval
|
||||
|
||||
|
||||
def test_view_image():
|
||||
v = full_eval(image.ViewImage())
|
||||
for img in [
|
||||
"mitmproxy/data/image.png",
|
||||
"mitmproxy/data/image.gif",
|
||||
"mitmproxy/data/image-err1.jpg",
|
||||
"mitmproxy/data/image.ico"
|
||||
]:
|
||||
with open(tutils.test_data.path(img), "rb") as f:
|
||||
assert v(f.read())
|
||||
|
||||
assert not v(b"flibble")
|
10
test/mitmproxy/contentviews/test_javascript.py
Normal file
10
test/mitmproxy/contentviews/test_javascript.py
Normal file
@ -0,0 +1,10 @@
|
||||
from mitmproxy.contentviews import javascript
|
||||
from . import full_eval
|
||||
|
||||
|
||||
def test_view_javascript():
|
||||
v = full_eval(javascript.ViewJavaScript())
|
||||
assert v(b"[1, 2, 3]")
|
||||
assert v(b"[1, 2, 3")
|
||||
assert v(b"function(a){[1, 2, 3]}")
|
||||
assert v(b"\xfe") # invalid utf-8
|
16
test/mitmproxy/contentviews/test_json.py
Normal file
16
test/mitmproxy/contentviews/test_json.py
Normal file
@ -0,0 +1,16 @@
|
||||
from mitmproxy.contentviews import json
|
||||
from . import full_eval
|
||||
|
||||
|
||||
def test_pretty_json():
|
||||
assert json.pretty_json(b'{"foo": 1}')
|
||||
assert not json.pretty_json(b"moo")
|
||||
assert json.pretty_json(b'{"foo" : "\xe4\xb8\x96\xe7\x95\x8c"}') # utf8 with chinese characters
|
||||
assert not json.pretty_json(b'{"foo" : "\xFF"}')
|
||||
|
||||
|
||||
def test_view_json():
|
||||
v = full_eval(json.ViewJSON())
|
||||
assert v(b"{}")
|
||||
assert not v(b"{")
|
||||
assert v(b"[1, 2, 3, 4, 5]")
|
25
test/mitmproxy/contentviews/test_multipart.py
Normal file
25
test/mitmproxy/contentviews/test_multipart.py
Normal file
@ -0,0 +1,25 @@
|
||||
from mitmproxy.contentviews import multipart
|
||||
from mitmproxy.net import http
|
||||
from . import full_eval
|
||||
|
||||
|
||||
def test_view_multipart():
|
||||
view = full_eval(multipart.ViewMultipart())
|
||||
v = b"""
|
||||
--AaB03x
|
||||
Content-Disposition: form-data; name="submit-name"
|
||||
|
||||
Larry
|
||||
--AaB03x
|
||||
""".strip()
|
||||
h = http.Headers(content_type="multipart/form-data; boundary=AaB03x")
|
||||
assert view(v, headers=h)
|
||||
|
||||
h = http.Headers()
|
||||
assert not view(v, headers=h)
|
||||
|
||||
h = http.Headers(content_type="multipart/form-data")
|
||||
assert not view(v, headers=h)
|
||||
|
||||
h = http.Headers(content_type="unparseable")
|
||||
assert not view(v, headers=h)
|
12
test/mitmproxy/contentviews/test_protobuf.py
Normal file
12
test/mitmproxy/contentviews/test_protobuf.py
Normal file
@ -0,0 +1,12 @@
|
||||
from mitmproxy.contentviews import protobuf
|
||||
from mitmproxy.test import tutils
|
||||
from . import full_eval
|
||||
|
||||
if protobuf.ViewProtobuf.is_available():
|
||||
def test_view_protobuf_request():
|
||||
v = full_eval(protobuf.ViewProtobuf())
|
||||
|
||||
p = tutils.test_data.path("mitmproxy/data/protobuf01")
|
||||
content_type, output = v(open(p, "rb").read())
|
||||
assert content_type == "Protobuf"
|
||||
assert output.next()[0][1] == '1: "3bbc333c-e61c-433b-819a-0b9a8cc103b8"'
|
13
test/mitmproxy/contentviews/test_query.py
Normal file
13
test/mitmproxy/contentviews/test_query.py
Normal file
@ -0,0 +1,13 @@
|
||||
from mitmproxy.contentviews import query
|
||||
from mitmproxy.types import multidict
|
||||
from . import full_eval
|
||||
|
||||
|
||||
def test_view_query():
|
||||
d = ""
|
||||
v = full_eval(query.ViewQuery())
|
||||
f = v(d, query=multidict.MultiDict([("foo", "bar")]))
|
||||
assert f[0] == "Query"
|
||||
assert f[1] == [[("header", "foo: "), ("text", "bar")]]
|
||||
|
||||
assert v(d) == ("Query", [])
|
7
test/mitmproxy/contentviews/test_raw.py
Normal file
7
test/mitmproxy/contentviews/test_raw.py
Normal file
@ -0,0 +1,7 @@
|
||||
from mitmproxy.contentviews import raw
|
||||
from . import full_eval
|
||||
|
||||
|
||||
def test_view_raw():
|
||||
v = full_eval(raw.ViewRaw())
|
||||
assert v(b"foo")
|
15
test/mitmproxy/contentviews/test_urlencoded.py
Normal file
15
test/mitmproxy/contentviews/test_urlencoded.py
Normal file
@ -0,0 +1,15 @@
|
||||
from mitmproxy.contentviews import urlencoded
|
||||
from mitmproxy.net.http import url
|
||||
from . import full_eval
|
||||
|
||||
|
||||
def test_view_urlencoded():
|
||||
v = full_eval(urlencoded.ViewURLEncoded())
|
||||
|
||||
d = url.encode([("one", "two"), ("three", "four")]).encode()
|
||||
assert v(d)
|
||||
|
||||
d = url.encode([("adsfa", "")]).encode()
|
||||
assert v(d)
|
||||
|
||||
assert not v(b"\xFF\x00")
|
17
test/mitmproxy/contentviews/test_xml.py
Normal file
17
test/mitmproxy/contentviews/test_xml.py
Normal file
@ -0,0 +1,17 @@
|
||||
from mitmproxy.contentviews import xml
|
||||
from . import full_eval
|
||||
|
||||
|
||||
def test_view_xml():
|
||||
v = full_eval(xml.ViewXML())
|
||||
assert v(b"<foo></foo>")
|
||||
assert not v(b"<foo>")
|
||||
s = b"""<?xml version="1.0" encoding="UTF-8"?>
|
||||
<?xml-stylesheet title="XSL_formatting"?>
|
||||
<rss
|
||||
xmlns:media="http://search.yahoo.com/mrss/"
|
||||
xmlns:atom="http://www.w3.org/2005/Atom"
|
||||
version="2.0">
|
||||
</rss>
|
||||
"""
|
||||
assert v(s)
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -1,284 +0,0 @@
|
||||
import mock
|
||||
from mitmproxy.exceptions import ContentViewException
|
||||
from mitmproxy.net.http import Headers
|
||||
from mitmproxy.net.http import url
|
||||
from mitmproxy.types import multidict
|
||||
|
||||
import mitmproxy.contentviews as cv
|
||||
from mitmproxy.test import tutils
|
||||
|
||||
try:
|
||||
import pyamf
|
||||
except ImportError:
|
||||
pyamf = None
|
||||
|
||||
try:
|
||||
import cssutils
|
||||
except:
|
||||
cssutils = None
|
||||
|
||||
|
||||
class TestContentView:
|
||||
|
||||
def test_view_auto(self):
|
||||
v = cv.ViewAuto()
|
||||
f = v(
|
||||
b"foo",
|
||||
headers=Headers()
|
||||
)
|
||||
assert f[0] == "Raw"
|
||||
|
||||
f = v(
|
||||
b"<html></html>",
|
||||
headers=Headers(content_type="text/html")
|
||||
)
|
||||
assert f[0] == "HTML"
|
||||
|
||||
f = v(
|
||||
b"foo",
|
||||
headers=Headers(content_type="text/flibble")
|
||||
)
|
||||
assert f[0] == "Raw"
|
||||
|
||||
f = v(
|
||||
b"<xml></xml>",
|
||||
headers=Headers(content_type="text/flibble")
|
||||
)
|
||||
assert f[0].startswith("XML")
|
||||
|
||||
f = v(
|
||||
b"",
|
||||
headers=Headers()
|
||||
)
|
||||
assert f[0] == "No content"
|
||||
|
||||
f = v(
|
||||
b"",
|
||||
headers=Headers(),
|
||||
query=multidict.MultiDict([("foo", "bar")]),
|
||||
)
|
||||
assert f[0] == "Query"
|
||||
|
||||
def test_view_urlencoded(self):
|
||||
d = url.encode([("one", "two"), ("three", "four")]).encode()
|
||||
v = cv.ViewURLEncoded()
|
||||
assert v(d)
|
||||
d = url.encode([("adsfa", "")]).encode()
|
||||
v = cv.ViewURLEncoded()
|
||||
assert v(d)
|
||||
|
||||
def test_view_html(self):
|
||||
v = cv.ViewHTML()
|
||||
s = b"<html><br><br></br><p>one</p></html>"
|
||||
assert v(s)
|
||||
|
||||
s = b"gobbledygook"
|
||||
assert not v(s)
|
||||
|
||||
def test_view_html_outline(self):
|
||||
v = cv.ViewHTMLOutline()
|
||||
s = b"<html><br><br></br><p>one</p></html>"
|
||||
assert v(s)
|
||||
assert v(b'\xfe')
|
||||
|
||||
def test_view_json(self):
|
||||
cv.VIEW_CUTOFF = 100
|
||||
v = cv.ViewJSON()
|
||||
assert v(b"{}")
|
||||
assert not v(b"{")
|
||||
assert v(b"[1, 2, 3, 4, 5]")
|
||||
|
||||
def test_view_xml(self):
|
||||
v = cv.ViewXML()
|
||||
assert v(b"<foo></foo>")
|
||||
assert not v(b"<foo>")
|
||||
s = b"""<?xml version="1.0" encoding="UTF-8"?>
|
||||
<?xml-stylesheet title="XSL_formatting"?>
|
||||
<rss
|
||||
xmlns:media="http://search.yahoo.com/mrss/"
|
||||
xmlns:atom="http://www.w3.org/2005/Atom"
|
||||
version="2.0">
|
||||
</rss>
|
||||
"""
|
||||
assert v(s)
|
||||
|
||||
def test_view_raw(self):
|
||||
v = cv.ViewRaw()
|
||||
assert v(b"foo")
|
||||
|
||||
def test_view_javascript(self):
|
||||
v = cv.ViewJavaScript()
|
||||
assert v(b"[1, 2, 3]")
|
||||
assert v(b"[1, 2, 3")
|
||||
assert v(b"function(a){[1, 2, 3]}")
|
||||
assert v(b"\xfe") # invalid utf-8
|
||||
|
||||
def test_view_css(self):
|
||||
v = cv.ViewCSS()
|
||||
|
||||
with open(tutils.test_data.path('mitmproxy/data/1.css'), 'r') as fp:
|
||||
fixture_1 = fp.read()
|
||||
|
||||
result = v('a')
|
||||
|
||||
if cssutils:
|
||||
assert len(list(result[1])) == 0
|
||||
else:
|
||||
assert len(list(result[1])) == 1
|
||||
|
||||
result = v(fixture_1)
|
||||
|
||||
if cssutils:
|
||||
assert len(list(result[1])) > 1
|
||||
else:
|
||||
assert len(list(result[1])) == 1
|
||||
|
||||
def test_view_hex(self):
|
||||
v = cv.ViewHex()
|
||||
assert v(b"foo")
|
||||
|
||||
def test_view_image(self):
|
||||
v = cv.ViewImage()
|
||||
p = tutils.test_data.path("mitmproxy/data/image.png")
|
||||
assert v(open(p, "rb").read())
|
||||
|
||||
p = tutils.test_data.path("mitmproxy/data/image.gif")
|
||||
assert v(open(p, "rb").read())
|
||||
|
||||
p = tutils.test_data.path("mitmproxy/data/image-err1.jpg")
|
||||
assert v(open(p, "rb").read())
|
||||
|
||||
p = tutils.test_data.path("mitmproxy/data/image.ico")
|
||||
assert v(open(p, "rb").read())
|
||||
|
||||
assert not v(b"flibble")
|
||||
|
||||
def test_view_multipart(self):
|
||||
view = cv.ViewMultipart()
|
||||
v = b"""
|
||||
--AaB03x
|
||||
Content-Disposition: form-data; name="submit-name"
|
||||
|
||||
Larry
|
||||
--AaB03x
|
||||
""".strip()
|
||||
h = Headers(content_type="multipart/form-data; boundary=AaB03x")
|
||||
assert view(v, headers=h)
|
||||
|
||||
h = Headers()
|
||||
assert not view(v, headers=h)
|
||||
|
||||
h = Headers(content_type="multipart/form-data")
|
||||
assert not view(v, headers=h)
|
||||
|
||||
h = Headers(content_type="unparseable")
|
||||
assert not view(v, headers=h)
|
||||
|
||||
def test_view_query(self):
|
||||
d = ""
|
||||
v = cv.ViewQuery()
|
||||
f = v(d, query=multidict.MultiDict([("foo", "bar")]))
|
||||
assert f[0] == "Query"
|
||||
assert [x for x in f[1]] == [[("header", "foo: "), ("text", "bar")]]
|
||||
|
||||
def test_add_cv(self):
|
||||
class TestContentView(cv.View):
|
||||
name = "test"
|
||||
prompt = ("t", "test")
|
||||
|
||||
tcv = TestContentView()
|
||||
cv.add(tcv)
|
||||
|
||||
# repeated addition causes exception
|
||||
tutils.raises(
|
||||
ContentViewException,
|
||||
cv.add,
|
||||
tcv
|
||||
)
|
||||
|
||||
|
||||
def test_get_content_view():
|
||||
desc, lines, err = cv.get_content_view(
|
||||
cv.get("Raw"),
|
||||
b"[1, 2, 3]",
|
||||
)
|
||||
assert "Raw" in desc
|
||||
assert list(lines)
|
||||
assert not err
|
||||
|
||||
desc, lines, err = cv.get_content_view(
|
||||
cv.get("Auto"),
|
||||
b"[1, 2, 3]",
|
||||
headers=Headers(content_type="application/json")
|
||||
)
|
||||
assert desc == "JSON"
|
||||
|
||||
desc, lines, err = cv.get_content_view(
|
||||
cv.get("JSON"),
|
||||
b"[1, 2",
|
||||
)
|
||||
assert "Couldn't parse" in desc
|
||||
|
||||
with mock.patch("mitmproxy.contentviews.ViewAuto.__call__") as view_auto:
|
||||
view_auto.side_effect = ValueError
|
||||
|
||||
desc, lines, err = cv.get_content_view(
|
||||
cv.get("Auto"),
|
||||
b"[1, 2",
|
||||
)
|
||||
assert err
|
||||
assert "Couldn't parse" in desc
|
||||
|
||||
|
||||
def test_get_message_content_view():
|
||||
r = tutils.treq()
|
||||
desc, lines, err = cv.get_message_content_view("raw", r)
|
||||
assert desc == "Raw"
|
||||
|
||||
r.encode("gzip")
|
||||
desc, lines, err = cv.get_message_content_view("raw", r)
|
||||
assert desc == "[decoded gzip] Raw"
|
||||
|
||||
r.headers["content-encoding"] = "deflate"
|
||||
desc, lines, err = cv.get_message_content_view("raw", r)
|
||||
assert desc == "[cannot decode] Raw"
|
||||
|
||||
r.content = None
|
||||
desc, lines, err = cv.get_message_content_view("raw", r)
|
||||
assert list(lines) == [[("error", "content missing")]]
|
||||
|
||||
|
||||
if pyamf:
|
||||
def test_view_amf_request():
|
||||
v = cv.ViewAMF()
|
||||
|
||||
p = tutils.test_data.path("mitmproxy/data/amf01")
|
||||
assert v(open(p, "rb").read())
|
||||
|
||||
p = tutils.test_data.path("mitmproxy/data/amf02")
|
||||
assert v(open(p, "rb").read())
|
||||
|
||||
def test_view_amf_response():
|
||||
v = cv.ViewAMF()
|
||||
p = tutils.test_data.path("mitmproxy/data/amf03")
|
||||
assert v(open(p, "rb").read())
|
||||
|
||||
if cv.ViewProtobuf.is_available():
|
||||
def test_view_protobuf_request():
|
||||
v = cv.ViewProtobuf()
|
||||
|
||||
p = tutils.test_data.path("mitmproxy/data/protobuf01")
|
||||
content_type, output = v(open(p, "rb").read())
|
||||
assert content_type == "Protobuf"
|
||||
assert output.next()[0][1] == '1: "3bbc333c-e61c-433b-819a-0b9a8cc103b8"'
|
||||
|
||||
|
||||
def test_get_by_shortcut():
|
||||
assert cv.get_by_shortcut("h")
|
||||
|
||||
|
||||
def test_pretty_json():
|
||||
assert cv.pretty_json(b'{"foo": 1}')
|
||||
assert not cv.pretty_json(b"moo")
|
||||
assert cv.pretty_json(b'{"foo" : "\xe4\xb8\x96\xe7\x95\x8c"}') # utf8 with chinese characters
|
||||
assert not cv.pretty_json(b'{"foo" : "\xFF"}')
|
@ -1,48 +0,0 @@
|
||||
import mitmproxy.contentviews as cv
|
||||
from mitmproxy.net.http import Headers
|
||||
|
||||
|
||||
def test_custom_views():
|
||||
class ViewNoop(cv.View):
|
||||
name = "noop"
|
||||
prompt = ("noop", "n")
|
||||
content_types = ["text/none"]
|
||||
|
||||
def __call__(self, data, **metadata):
|
||||
return "noop", cv.format_text(data)
|
||||
|
||||
view_obj = ViewNoop()
|
||||
|
||||
cv.add(view_obj)
|
||||
|
||||
assert cv.get("noop")
|
||||
|
||||
r = cv.get_content_view(
|
||||
cv.get("noop"),
|
||||
"[1, 2, 3]",
|
||||
headers=Headers(
|
||||
content_type="text/plain"
|
||||
)
|
||||
)
|
||||
assert "noop" in r[0]
|
||||
|
||||
# now try content-type matching
|
||||
r = cv.get_content_view(
|
||||
cv.get("Auto"),
|
||||
"[1, 2, 3]",
|
||||
headers=Headers(
|
||||
content_type="text/none"
|
||||
)
|
||||
)
|
||||
assert "noop" in r[0]
|
||||
|
||||
# now try removing the custom view
|
||||
cv.remove(view_obj)
|
||||
r = cv.get_content_view(
|
||||
cv.get("Auto"),
|
||||
b"[1, 2, 3]",
|
||||
headers=Headers(
|
||||
content_type="text/none"
|
||||
)
|
||||
)
|
||||
assert "noop" not in r[0]
|
27
test/mitmproxy/utils/test_sliding_window.py
Normal file
27
test/mitmproxy/utils/test_sliding_window.py
Normal file
@ -0,0 +1,27 @@
|
||||
from mitmproxy.utils import sliding_window
|
||||
|
||||
|
||||
def test_simple():
|
||||
y = list(sliding_window.window(range(1000, 1005), 1, 2))
|
||||
assert y == [
|
||||
# prev this next next2
|
||||
(None, 1000, 1001, 1002),
|
||||
(1000, 1001, 1002, 1003),
|
||||
(1001, 1002, 1003, 1004),
|
||||
(1002, 1003, 1004, None),
|
||||
(1003, 1004, None, None)
|
||||
]
|
||||
|
||||
|
||||
def test_is_lazy():
|
||||
done = False
|
||||
|
||||
def gen():
|
||||
nonlocal done
|
||||
done = True
|
||||
yield 42
|
||||
|
||||
x = sliding_window.window(gen(), 1, 1)
|
||||
assert not done
|
||||
assert list(x)
|
||||
assert done
|
Loading…
Reference in New Issue
Block a user