mitmproxy/test/examples/test_examples.py
mame82 480052f58b
Grpc contentview (#4851)
* Partial gRPC contentview prototype, not linted, no tests, not as add-on

* Linted (flake8)

* Save dev state

* Rewrote of protobuf parser, use decoding strategy, reduced rendered data. Parser uses  generators

* minor cleanup

* fix: preferred encoding was provided as function instead of value

* flake8: line length

* Backlinked message tree objects, temporary debug out

* Partial implementation of gRPC definitions. Save state to fix a cras (data invalidate in edit mode)

* hack: deal with missing exception handling for generator based content views

* gRPC/Protoparser descriptions (with test code)

* replaced manual gzip decoding with mitmproxy.net.encoding.decode

* Refactored typing imports

* Reafctoring

* distinguish request vs response definitions, separate view config from parser config

* Code cleaning, moved customized protobuf definitions to example addon

* final cleanup

* changelog

* Stubs for tests

* Fixed render_riority of addon example

* Started adding tests

* Work on tests

* mypy

* Added pseudo encoder to tests, to cover special decodings

* Example addon test added

* finalized tests, no 100 percent coverage possible, see comments un uncovered code

* minor adjustments

* fixup tests

* Typos

Co-authored-by: Maximilian Hils <git@maximilianhils.com>
2021-10-12 13:32:56 +02:00

98 lines
4.2 KiB
Python

from mitmproxy import contentviews
from mitmproxy.test import tflow
from mitmproxy.test import tutils
from mitmproxy.test import taddons
from mitmproxy.http import Headers
from ..mitmproxy import tservers
class TestScripts(tservers.MasterTest):
def test_add_header(self, tdata):
with taddons.context() as tctx:
a = tctx.script(tdata.path("../examples/addons/anatomy2.py"))
f = tflow.tflow()
a.request(f)
assert f.request.headers["myheader"] == "value"
def test_custom_contentviews(self, tdata):
with taddons.context() as tctx:
tctx.script(tdata.path("../examples/addons/contentview.py"))
swapcase = contentviews.get("swapcase")
_, fmt = swapcase(b"<html>Test!</html>")
assert any(b'tEST!' in val[0][1] for val in fmt)
def test_custom_grpc_contentview(self, tdata):
with taddons.context() as tctx:
tctx.script(tdata.path("../examples/addons/contentview-custom-grpc.py"))
v = contentviews.get("customized gRPC/protobuf")
p = tdata.path("mitmproxy/contentviews/test_grpc_data/msg1.bin")
with open(p, "rb") as f:
raw = f.read()
sim_msg_req = tutils.treq(
port=443,
host="example.com",
path="/ReverseGeocode"
)
sim_msg_resp = tutils.tresp()
sim_flow = tflow.tflow(
req=sim_msg_req,
resp=sim_msg_resp
)
view_text, output = v(raw, flow=sim_flow, http_message=sim_flow.request) # simulate request message
assert view_text == "Protobuf (flattened) (addon with custom rules)"
output = list(output) # assure list conversion if generator
assert output == [
[('text', '[message] '), ('text', 'position '), ('text', '1 '), ('text', ' ')],
[('text', '[double] '), ('text', 'latitude '), ('text', '1.1 '), ('text', '38.89816675798073 ')],
[('text', '[double] '), ('text', 'longitude '), ('text', '1.2 '), ('text', '-77.03829828366696 ')],
[('text', '[string] '), ('text', 'country '), ('text', '3 '), ('text', 'de_DE ')],
[('text', '[uint32] '), ('text', ' '), ('text', '6 '), ('text', '1 ')],
[('text', '[string] '), ('text', 'app '), ('text', '7 '), ('text', 'de.mcdonalds.mcdonaldsinfoapp ')]
]
def test_modify_form(self, tdata):
with taddons.context() as tctx:
sc = tctx.script(tdata.path("../examples/addons/http-modify-form.py"))
form_header = Headers(content_type="application/x-www-form-urlencoded")
f = tflow.tflow(req=tutils.treq(headers=form_header))
sc.request(f)
assert f.request.urlencoded_form["mitmproxy"] == "rocks"
f.request.headers["content-type"] = ""
sc.request(f)
assert list(f.request.urlencoded_form.items()) == [("foo", "bar")]
def test_modify_querystring(self, tdata):
with taddons.context() as tctx:
sc = tctx.script(tdata.path("../examples/addons/http-modify-query-string.py"))
f = tflow.tflow(req=tutils.treq(path="/search?q=term"))
sc.request(f)
assert f.request.query["mitmproxy"] == "rocks"
f.request.path = "/"
sc.request(f)
assert f.request.query["mitmproxy"] == "rocks"
def test_redirect_requests(self, tdata):
with taddons.context() as tctx:
sc = tctx.script(tdata.path("../examples/addons/http-redirect-requests.py"))
f = tflow.tflow(req=tutils.treq(host="example.org"))
sc.request(f)
assert f.request.host == "mitmproxy.org"
def test_send_reply_from_proxy(self, tdata):
with taddons.context() as tctx:
sc = tctx.script(tdata.path("../examples/addons/http-reply-from-proxy.py"))
f = tflow.tflow(req=tutils.treq(host="example.com", port=80))
sc.request(f)
assert f.response.content == b"Hello World"