mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-22 15:37:45 +00:00
mitmweb: 100% app test coverage, numerous fixes
This commit is contained in:
parent
dc75605e46
commit
45332006a3
@ -2,6 +2,7 @@ from mitmproxy import ctx
|
||||
|
||||
|
||||
def request(flow):
|
||||
f = ctx.master.state.duplicate_flow(flow)
|
||||
f = flow.copy()
|
||||
ctx.master.view.add(f)
|
||||
f.request.path = "/changed"
|
||||
ctx.master.replay_request(f, block=True)
|
||||
|
@ -280,6 +280,13 @@ class View(collections.Sequence):
|
||||
# The value was not in the view
|
||||
pass
|
||||
|
||||
def get_by_id(self, flow_id: str) -> typing.Optional[mitmproxy.flow.Flow]:
|
||||
"""
|
||||
Get flow with the given id from the store.
|
||||
Returns None if the flow is not found.
|
||||
"""
|
||||
return self._store.get(flow_id)
|
||||
|
||||
# Event handlers
|
||||
def configure(self, opts, updated):
|
||||
if "filter" in updated:
|
||||
|
@ -609,7 +609,7 @@ def get_message_content_view(viewname, message):
|
||||
"""
|
||||
viewmode = get(viewname)
|
||||
if not viewmode:
|
||||
get("auto")
|
||||
viewmode = get("auto")
|
||||
try:
|
||||
content = message.content
|
||||
except ValueError:
|
||||
|
@ -1,6 +1,8 @@
|
||||
import os
|
||||
from typing import Iterable
|
||||
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import flow
|
||||
from mitmproxy import flowfilter
|
||||
from mitmproxy import http
|
||||
from mitmproxy import tcp
|
||||
@ -27,7 +29,7 @@ class FlowReader:
|
||||
def __init__(self, fo):
|
||||
self.fo = fo
|
||||
|
||||
def stream(self):
|
||||
def stream(self) -> Iterable[flow.Flow]:
|
||||
"""
|
||||
Yields Flow objects from the dump.
|
||||
"""
|
||||
@ -52,10 +54,10 @@ class FilteredFlowWriter:
|
||||
self.fo = fo
|
||||
self.flt = flt
|
||||
|
||||
def add(self, flow):
|
||||
if self.flt and not flowfilter.match(self.flt, flow):
|
||||
def add(self, f: flow.Flow):
|
||||
if self.flt and not flowfilter.match(self.flt, f):
|
||||
return
|
||||
d = flow.get_state()
|
||||
d = f.get_state()
|
||||
tnetstring.dump(d, self.fo)
|
||||
|
||||
|
||||
|
@ -156,7 +156,7 @@ class Master:
|
||||
for e, o in events.event_sequence(f):
|
||||
getattr(self, e)(o)
|
||||
|
||||
def load_flows(self, fr):
|
||||
def load_flows(self, fr: io.FlowReader) -> int:
|
||||
"""
|
||||
Load flows from a FlowReader object.
|
||||
"""
|
||||
@ -166,7 +166,7 @@ class Master:
|
||||
self.load_flow(i)
|
||||
return cnt
|
||||
|
||||
def load_flows_file(self, path):
|
||||
def load_flows_file(self, path: str) -> int:
|
||||
path = os.path.expanduser(path)
|
||||
try:
|
||||
if path == "-":
|
||||
@ -180,7 +180,11 @@ class Master:
|
||||
except IOError as v:
|
||||
raise exceptions.FlowReadException(v.strerror)
|
||||
|
||||
def replay_request(self, f, block=False):
|
||||
def replay_request(
|
||||
self,
|
||||
f: http.HTTPFlow,
|
||||
block: bool=False
|
||||
) -> http_replay.RequestReplayThread:
|
||||
"""
|
||||
Replay a HTTP request to receive a new response from the server.
|
||||
|
||||
|
@ -303,8 +303,8 @@ class FlowListWalker(urwid.ListWalker):
|
||||
|
||||
class FlowListBox(urwid.ListBox):
|
||||
|
||||
def __init__(self, master):
|
||||
self.master = master
|
||||
def __init__(self, master: "mitmproxy.tools.console.master.ConsoleMaster"):
|
||||
self.master = master # type: "mitmproxy.tools.console.master.ConsoleMaster"
|
||||
super().__init__(FlowListWalker(master))
|
||||
|
||||
def get_method_raw(self, k):
|
||||
@ -348,7 +348,7 @@ class FlowListBox(urwid.ListBox):
|
||||
if key == "A":
|
||||
for f in self.master.view:
|
||||
if f.intercepted:
|
||||
f.resume()
|
||||
f.resume(self.master)
|
||||
signals.flowlist_change.send(self)
|
||||
elif key == "z":
|
||||
self.master.view.clear()
|
||||
|
@ -510,8 +510,10 @@ class FlowView(tabs.Tabs):
|
||||
self.flow.resume(self.master)
|
||||
signals.flow_change.send(self, flow = self.flow)
|
||||
elif key == "A":
|
||||
self.master.accept_all()
|
||||
signals.flow_change.send(self, flow = self.flow)
|
||||
for f in self.view:
|
||||
if f.intercepted:
|
||||
f.resume(self.master)
|
||||
signals.flow_change.send(self, flow=f)
|
||||
elif key == "d":
|
||||
if self.flow.killable:
|
||||
self.flow.kill(self.master)
|
||||
|
@ -71,7 +71,7 @@ class ConsoleMaster(master.Master):
|
||||
|
||||
def __init__(self, options, server):
|
||||
super().__init__(options, server)
|
||||
self.view = view.View()
|
||||
self.view = view.View() # type: view.View
|
||||
self.stream_path = None
|
||||
# This line is just for type hinting
|
||||
self.options = self.options # type: Options
|
||||
|
@ -11,6 +11,7 @@ import tornado.escape
|
||||
import tornado.web
|
||||
import tornado.websocket
|
||||
from mitmproxy import contentviews
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import flowfilter
|
||||
from mitmproxy import http
|
||||
from mitmproxy import io
|
||||
@ -108,9 +109,23 @@ class RequestHandler(tornado.web.RequestHandler):
|
||||
|
||||
@property
|
||||
def json(self):
|
||||
if not self.request.headers.get("Content-Type").startswith("application/json"):
|
||||
return None
|
||||
if not self.request.headers.get("Content-Type", "").startswith("application/json"):
|
||||
raise APIError(400, "Invalid Content-Type, expected application/json.")
|
||||
try:
|
||||
return json.loads(self.request.body.decode())
|
||||
except Exception as e:
|
||||
raise APIError(400, "Malformed JSON: {}".format(str(e)))
|
||||
|
||||
@property
|
||||
def filecontents(self):
|
||||
"""
|
||||
Accept either a multipart/form file upload or just take the plain request body.
|
||||
|
||||
"""
|
||||
if self.request.files:
|
||||
return next(iter(self.request.files.values()))[0].body
|
||||
else:
|
||||
return self.request.body
|
||||
|
||||
@property
|
||||
def view(self) -> mitmproxy.addons.view.View:
|
||||
@ -124,11 +139,11 @@ class RequestHandler(tornado.web.RequestHandler):
|
||||
def flow(self) -> mitmproxy.flow.Flow:
|
||||
flow_id = str(self.path_kwargs["flow_id"])
|
||||
# FIXME: Add a facility to addon.view to safely access the store
|
||||
flow = self.view._store.get(flow_id)
|
||||
flow = self.view.get_by_id(flow_id)
|
||||
if flow:
|
||||
return flow
|
||||
else:
|
||||
raise APIError(400, "Flow not found.")
|
||||
raise APIError(404, "Flow not found.")
|
||||
|
||||
def write_error(self, status_code: int, **kwargs):
|
||||
if "exc_info" in kwargs and isinstance(kwargs["exc_info"][1], APIError):
|
||||
@ -168,7 +183,7 @@ class WebSocketEventBroadcaster(tornado.websocket.WebSocketHandler):
|
||||
for conn in cls.connections:
|
||||
try:
|
||||
conn.write_message(message)
|
||||
except Exception:
|
||||
except Exception: # pragma: no cover
|
||||
logging.error("Error sending message", exc_info=True)
|
||||
|
||||
|
||||
@ -196,10 +211,8 @@ class DumpFlows(RequestHandler):
|
||||
|
||||
def post(self):
|
||||
self.view.clear()
|
||||
|
||||
content = self.request.files.values()[0][0].body
|
||||
bio = BytesIO(content)
|
||||
self.master.load_flows(io.FlowReader(bio).stream())
|
||||
bio = BytesIO(self.filecontents)
|
||||
self.master.load_flows(io.FlowReader(bio))
|
||||
bio.close()
|
||||
|
||||
|
||||
@ -211,7 +224,8 @@ class ClearAll(RequestHandler):
|
||||
|
||||
class AcceptFlows(RequestHandler):
|
||||
def post(self):
|
||||
self.master.accept_all(self.master)
|
||||
for f in self.view:
|
||||
f.resume(self.master)
|
||||
|
||||
|
||||
class AcceptFlow(RequestHandler):
|
||||
@ -228,6 +242,7 @@ class FlowHandler(RequestHandler):
|
||||
def put(self, flow_id):
|
||||
flow = self.flow
|
||||
flow.backup()
|
||||
try:
|
||||
for a, b in self.json.items():
|
||||
if a == "request" and hasattr(flow, "request"):
|
||||
request = flow.request
|
||||
@ -243,17 +258,15 @@ class FlowHandler(RequestHandler):
|
||||
elif k == "content":
|
||||
request.text = v
|
||||
else:
|
||||
print("Warning: Unknown update {}.{}: {}".format(a, k, v))
|
||||
raise APIError(400, "Unknown update request.{}: {}".format(k, v))
|
||||
|
||||
elif a == "response" and hasattr(flow, "response"):
|
||||
response = flow.response
|
||||
for k, v in b.items():
|
||||
if k == "msg":
|
||||
response.msg = str(v)
|
||||
if k in ["msg", "http_version"]:
|
||||
setattr(response, k, str(v))
|
||||
elif k == "code":
|
||||
response.status_code = int(v)
|
||||
elif k == "http_version":
|
||||
response.http_version = str(v)
|
||||
elif k == "headers":
|
||||
response.headers.clear()
|
||||
for header in v:
|
||||
@ -261,20 +274,27 @@ class FlowHandler(RequestHandler):
|
||||
elif k == "content":
|
||||
response.text = v
|
||||
else:
|
||||
print("Warning: Unknown update {}.{}: {}".format(a, k, v))
|
||||
raise APIError(400, "Unknown update response.{}: {}".format(k, v))
|
||||
else:
|
||||
print("Warning: Unknown update {}: {}".format(a, b))
|
||||
raise APIError(400, "Unknown update {}: {}".format(a, b))
|
||||
except APIError:
|
||||
flow.revert()
|
||||
raise
|
||||
self.view.update(flow)
|
||||
|
||||
|
||||
class DuplicateFlow(RequestHandler):
|
||||
def post(self, flow_id):
|
||||
self.master.view.duplicate_flow(self.flow)
|
||||
f = self.flow.copy()
|
||||
self.view.add(f)
|
||||
self.write(f.id)
|
||||
|
||||
|
||||
class RevertFlow(RequestHandler):
|
||||
def post(self, flow_id):
|
||||
if self.flow.modified():
|
||||
self.flow.revert()
|
||||
self.view.update(self.flow)
|
||||
|
||||
|
||||
class ReplayFlow(RequestHandler):
|
||||
@ -283,16 +303,17 @@ class ReplayFlow(RequestHandler):
|
||||
self.flow.response = None
|
||||
self.view.update(self.flow)
|
||||
|
||||
r = self.master.replay_request(self.flow)
|
||||
if r:
|
||||
raise APIError(400, r)
|
||||
try:
|
||||
self.master.replay_request(self.flow)
|
||||
except exceptions.ReplayException as e:
|
||||
raise APIError(400, str(e))
|
||||
|
||||
|
||||
class FlowContent(RequestHandler):
|
||||
def post(self, flow_id, message):
|
||||
self.flow.backup()
|
||||
message = getattr(self.flow, message)
|
||||
message.content = self.request.files.values()[0][0].body
|
||||
message.content = self.filecontents
|
||||
self.view.update(self.flow)
|
||||
|
||||
def get(self, flow_id, message):
|
||||
@ -364,46 +385,16 @@ class Settings(RequestHandler):
|
||||
))
|
||||
|
||||
def put(self):
|
||||
update = {}
|
||||
for k, v in self.json.items():
|
||||
if k == "intercept":
|
||||
self.master.options.intercept = v
|
||||
update[k] = v
|
||||
elif k == "showhost":
|
||||
self.master.options.showhost = v
|
||||
update[k] = v
|
||||
elif k == "no_upstream_cert":
|
||||
self.master.options.no_upstream_cert = v
|
||||
update[k] = v
|
||||
elif k == "rawtcp":
|
||||
self.master.options.rawtcp = v
|
||||
update[k] = v
|
||||
elif k == "http2":
|
||||
self.master.options.http2 = v
|
||||
update[k] = v
|
||||
elif k == "anticache":
|
||||
self.master.options.anticache = v
|
||||
update[k] = v
|
||||
elif k == "anticomp":
|
||||
self.master.options.anticomp = v
|
||||
update[k] = v
|
||||
elif k == "stickycookie":
|
||||
self.master.options.stickycookie = v
|
||||
update[k] = v
|
||||
elif k == "stickyauth":
|
||||
self.master.options.stickyauth = v
|
||||
update[k] = v
|
||||
elif k == "stream":
|
||||
self.master.options.stream_large_bodies = v
|
||||
update[k] = v
|
||||
else:
|
||||
print("Warning: Unknown setting {}: {}".format(k, v))
|
||||
|
||||
ClientConnection.broadcast(
|
||||
resource="settings",
|
||||
cmd="update",
|
||||
data=update
|
||||
)
|
||||
update = self.json
|
||||
option_whitelist = {
|
||||
"intercept", "showhost", "no_upstream_cert",
|
||||
"rawtcp", "http2", "anticache", "anticomp",
|
||||
"stickycookie", "stickyauth", "stream_large_bodies"
|
||||
}
|
||||
for k in update:
|
||||
if k not in option_whitelist:
|
||||
raise APIError(400, "Unknown setting {}".format(k))
|
||||
self.master.options.update(**update)
|
||||
|
||||
|
||||
class Application(tornado.web.Application):
|
||||
|
@ -45,6 +45,8 @@ class WebMaster(master.Master):
|
||||
self.events.sig_add.connect(self._sig_events_add)
|
||||
self.events.sig_refresh.connect(self._sig_events_refresh)
|
||||
|
||||
self.options.changed.connect(self._sig_options_update)
|
||||
|
||||
self.addons.add(*addons.default_addons())
|
||||
self.addons.add(self.view, self.events, intercept.Intercept())
|
||||
self.app = app.Application(
|
||||
@ -101,6 +103,13 @@ class WebMaster(master.Master):
|
||||
cmd="reset"
|
||||
)
|
||||
|
||||
def _sig_options_update(self, options, updated):
|
||||
app.ClientConnection.broadcast(
|
||||
resource="settings",
|
||||
cmd="update",
|
||||
data={k: getattr(options, k) for k in updated}
|
||||
)
|
||||
|
||||
def run(self): # pragma: no cover
|
||||
|
||||
iol = tornado.ioloop.IOLoop.instance()
|
||||
|
@ -1,6 +0,0 @@
|
||||
from mitmproxy import ctx
|
||||
|
||||
|
||||
def request(flow):
|
||||
f = ctx.master.state.duplicate_flow(flow)
|
||||
ctx.master.replay_request(f, block=True)
|
@ -1,15 +1,47 @@
|
||||
import tornado.testing
|
||||
import json as _json
|
||||
|
||||
import mock
|
||||
import tornado.testing
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import proxy
|
||||
from mitmproxy.test import tflow
|
||||
from mitmproxy.tools.web import app
|
||||
from mitmproxy.tools.web import master as webmaster
|
||||
from tornado import httpclient
|
||||
from tornado import websocket
|
||||
|
||||
|
||||
def json(resp: httpclient.HTTPResponse):
|
||||
return _json.loads(resp.body.decode())
|
||||
|
||||
|
||||
class TestApp(tornado.testing.AsyncHTTPTestCase):
|
||||
def get_app(self):
|
||||
o = webmaster.Options()
|
||||
m = webmaster.WebMaster(o, proxy.DummyServer())
|
||||
return app.Application(m, None, None)
|
||||
f = tflow.tflow(resp=True)
|
||||
f.id = "42"
|
||||
m.view.add(f)
|
||||
m.view.add(tflow.tflow(err=True))
|
||||
m.add_log("test log", "info")
|
||||
self.master = m
|
||||
self.view = m.view
|
||||
self.events = m.events
|
||||
webapp = app.Application(m, None)
|
||||
webapp.settings["xsrf_cookies"] = False
|
||||
return webapp
|
||||
|
||||
def fetch(self, *args, **kwargs) -> httpclient.HTTPResponse:
|
||||
# tornado disallows POST without content by default.
|
||||
return super().fetch(*args, **kwargs, allow_nonstandard_methods=True)
|
||||
|
||||
def put_json(self, url, data: dict) -> httpclient.HTTPResponse:
|
||||
return self.fetch(
|
||||
url,
|
||||
method="PUT",
|
||||
body=_json.dumps(data),
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
|
||||
def test_index(self):
|
||||
assert self.fetch("/").code == 200
|
||||
@ -17,8 +49,217 @@ class TestApp(tornado.testing.AsyncHTTPTestCase):
|
||||
def test_filter_help(self):
|
||||
assert self.fetch("/filter-help").code == 200
|
||||
|
||||
def test_events(self):
|
||||
assert self.fetch("/events").code == 200
|
||||
|
||||
def test_flows(self):
|
||||
assert self.fetch("/flows").code == 200
|
||||
resp = self.fetch("/flows")
|
||||
assert resp.code == 200
|
||||
assert json(resp)[0]["request"]["contentHash"]
|
||||
assert json(resp)[1]["error"]
|
||||
|
||||
def test_flows_dump(self):
|
||||
resp = self.fetch("/flows/dump")
|
||||
assert b"address" in resp.body
|
||||
|
||||
self.view.clear()
|
||||
assert not len(self.view)
|
||||
|
||||
assert self.fetch("/flows/dump", method="POST", body=resp.body).code == 200
|
||||
assert len(self.view)
|
||||
|
||||
def test_clear(self):
|
||||
events = self.events.data.copy()
|
||||
flows = list(self.view)
|
||||
|
||||
assert self.fetch("/clear", method="POST").code == 200
|
||||
|
||||
assert not len(self.view)
|
||||
assert not len(self.events.data)
|
||||
|
||||
# restore
|
||||
for f in flows:
|
||||
self.view.add(f)
|
||||
self.events.data = events
|
||||
|
||||
def test_accept(self):
|
||||
for f in self.view:
|
||||
f.reply.handle()
|
||||
f.intercept(self.master)
|
||||
|
||||
assert self.fetch(
|
||||
"/flows/42/accept", method="POST").code == 200
|
||||
assert sum(f.intercepted for f in self.view) == 1
|
||||
assert self.fetch("/flows/accept", method="POST").code == 200
|
||||
assert all(not f.intercepted for f in self.view)
|
||||
|
||||
def test_flow_delete(self):
|
||||
f = self.view.get_by_id("42")
|
||||
assert f
|
||||
|
||||
f.reply.handle()
|
||||
assert self.fetch("/flows/42", method="DELETE").code == 200
|
||||
|
||||
assert not self.view.get_by_id("42")
|
||||
self.view.add(f)
|
||||
|
||||
assert self.fetch("/flows/1234", method="DELETE").code == 404
|
||||
|
||||
def test_flow_update(self):
|
||||
f = self.view.get_by_id("42")
|
||||
assert f.request.method == "GET"
|
||||
f.backup()
|
||||
|
||||
upd = {
|
||||
"request": {
|
||||
"method": "PATCH",
|
||||
"port": 123,
|
||||
"headers": [("foo", "bar")],
|
||||
"content": "req",
|
||||
},
|
||||
"response": {
|
||||
"msg": "Not Found",
|
||||
"code": 404,
|
||||
"headers": [("bar", "baz")],
|
||||
"content": "resp",
|
||||
}
|
||||
}
|
||||
assert self.put_json("/flows/42", upd).code == 200
|
||||
assert f.request.method == "PATCH"
|
||||
assert f.request.port == 123
|
||||
assert f.request.headers["foo"] == "bar"
|
||||
assert f.request.text == "req"
|
||||
assert f.response.msg == "Not Found"
|
||||
assert f.response.status_code == 404
|
||||
assert f.response.headers["bar"] == "baz"
|
||||
assert f.response.text == "resp"
|
||||
|
||||
f.revert()
|
||||
|
||||
assert self.put_json("/flows/42", {"foo": 42}).code == 400
|
||||
assert self.put_json("/flows/42", {"request": {"foo": 42}}).code == 400
|
||||
assert self.put_json("/flows/42", {"response": {"foo": 42}}).code == 400
|
||||
assert self.fetch("/flows/42", method="PUT", body="{}").code == 400
|
||||
assert self.fetch(
|
||||
"/flows/42",
|
||||
method="PUT",
|
||||
headers={"Content-Type": "application/json"},
|
||||
body="!!"
|
||||
).code == 400
|
||||
|
||||
def test_flow_duplicate(self):
|
||||
resp = self.fetch("/flows/42/duplicate", method="POST")
|
||||
assert resp.code == 200
|
||||
f = self.view.get_by_id(resp.body.decode())
|
||||
assert f
|
||||
assert f.id != "42"
|
||||
self.view.remove(f)
|
||||
|
||||
def test_flow_revert(self):
|
||||
f = self.view.get_by_id("42")
|
||||
f.backup()
|
||||
f.request.method = "PATCH"
|
||||
self.fetch("/flows/42/revert", method="POST")
|
||||
assert not f._backup
|
||||
|
||||
def test_flow_replay(self):
|
||||
with mock.patch("mitmproxy.master.Master.replay_request") as replay_request:
|
||||
assert self.fetch("/flows/42/replay", method="POST").code == 200
|
||||
assert replay_request.called
|
||||
replay_request.side_effect = exceptions.ReplayException(
|
||||
"out of replays"
|
||||
)
|
||||
assert self.fetch("/flows/42/replay", method="POST").code == 400
|
||||
|
||||
def test_flow_content(self):
|
||||
f = self.view.get_by_id("42")
|
||||
f.backup()
|
||||
f.response.headers["Content-Encoding"] = "ran\x00dom"
|
||||
f.response.headers["Content-Disposition"] = 'inline; filename="filename.jpg"'
|
||||
|
||||
r = self.fetch("/flows/42/response/content")
|
||||
assert r.body == b"message"
|
||||
assert r.headers["Content-Encoding"] == "random"
|
||||
assert r.headers["Content-Disposition"] == 'attachment; filename="filename.jpg"'
|
||||
|
||||
del f.response.headers["Content-Disposition"]
|
||||
f.request.path = "/foo/bar.jpg"
|
||||
assert self.fetch(
|
||||
"/flows/42/response/content"
|
||||
).headers["Content-Disposition"] == 'attachment; filename=bar.jpg'
|
||||
|
||||
f.response.content = b""
|
||||
assert self.fetch("/flows/42/response/content").code == 400
|
||||
|
||||
f.revert()
|
||||
|
||||
def test_update_flow_content(self):
|
||||
assert self.fetch(
|
||||
"/flows/42/request/content",
|
||||
method="POST",
|
||||
body="new"
|
||||
).code == 200
|
||||
f = self.view.get_by_id("42")
|
||||
assert f.request.content == b"new"
|
||||
assert f.modified()
|
||||
f.revert()
|
||||
|
||||
def test_update_flow_content_multipart(self):
|
||||
body = (
|
||||
b'--somefancyboundary\r\n'
|
||||
b'Content-Disposition: form-data; name="a"; filename="a.txt"\r\n'
|
||||
b'\r\n'
|
||||
b'such multipart. very wow.\r\n'
|
||||
b'--somefancyboundary--\r\n'
|
||||
)
|
||||
assert self.fetch(
|
||||
"/flows/42/request/content",
|
||||
method="POST",
|
||||
headers={"Content-Type": 'multipart/form-data; boundary="somefancyboundary"'},
|
||||
body=body
|
||||
).code == 200
|
||||
f = self.view.get_by_id("42")
|
||||
assert f.request.content == b"such multipart. very wow."
|
||||
assert f.modified()
|
||||
f.revert()
|
||||
|
||||
def test_flow_content_view(self):
|
||||
assert json(self.fetch("/flows/42/request/content/raw")) == {
|
||||
"lines": [
|
||||
[["text", "content"]]
|
||||
],
|
||||
"description": "Raw"
|
||||
}
|
||||
|
||||
def test_events(self):
|
||||
resp = self.fetch("/events")
|
||||
assert resp.code == 200
|
||||
assert json(resp)[0]["level"] == "info"
|
||||
|
||||
def test_settings(self):
|
||||
assert json(self.fetch("/settings"))["mode"] == "regular"
|
||||
|
||||
def test_settings_update(self):
|
||||
assert self.put_json("/settings", {"anticache": True}).code == 200
|
||||
assert self.put_json("/settings", {"wtf": True}).code == 400
|
||||
|
||||
def test_err(self):
|
||||
with mock.patch("mitmproxy.tools.web.app.IndexHandler.get") as f:
|
||||
f.side_effect = RuntimeError
|
||||
assert self.fetch("/").code == 500
|
||||
|
||||
@tornado.testing.gen_test
|
||||
def test_websocket(self):
|
||||
ws_url = "ws://localhost:{}/updates".format(self.get_http_port())
|
||||
|
||||
ws_client = yield websocket.websocket_connect(ws_url)
|
||||
self.master.options.anticomp = True
|
||||
|
||||
response = yield ws_client.read_message()
|
||||
assert _json.loads(response) == {
|
||||
"resource": "settings",
|
||||
"cmd": "update",
|
||||
"data": {"anticomp": True},
|
||||
}
|
||||
ws_client.close()
|
||||
|
||||
# trigger on_close by opening a second connection.
|
||||
ws_client2 = yield websocket.websocket_connect(ws_url)
|
||||
ws_client2.close()
|
||||
|
@ -7,7 +7,7 @@ Footer.propTypes = {
|
||||
}
|
||||
|
||||
function Footer({ settings }) {
|
||||
let {mode, intercept, showhost, no_upstream_cert, rawtcp, http2, anticache, anticomp, stickyauth, stickycookie, stream} = settings;
|
||||
let {mode, intercept, showhost, no_upstream_cert, rawtcp, http2, anticache, anticomp, stickyauth, stickycookie, stream_large_bodies} = settings;
|
||||
return (
|
||||
<footer>
|
||||
{mode && mode != "regular" && (
|
||||
@ -40,8 +40,8 @@ function Footer({ settings }) {
|
||||
{stickycookie && (
|
||||
<span className="label label-success">stickycookie: {stickycookie}</span>
|
||||
)}
|
||||
{stream && (
|
||||
<span className="label label-success">stream: {formatSize(stream)}</span>
|
||||
{stream_large_bodies && (
|
||||
<span className="label label-success">stream: {formatSize(stream_large_bodies)}</span>
|
||||
)}
|
||||
</footer>
|
||||
)
|
||||
|
@ -49,11 +49,11 @@ function OptionMenu({ settings, updateSettings }) {
|
||||
txt={settings.stickycookie}
|
||||
onToggleChanged={txt => updateSettings({ stickycookie: !settings.stickycookie ? txt : null })}
|
||||
/>
|
||||
<ToggleInputButton name="stream" placeholder="stream..."
|
||||
checked={!!settings.stream}
|
||||
txt={settings.stream}
|
||||
<ToggleInputButton name="stream_large_bodies" placeholder="stream..."
|
||||
checked={!!settings.stream_large_bodies}
|
||||
txt={settings.stream_large_bodies}
|
||||
inputType="number"
|
||||
onToggleChanged={txt => updateSettings({ stream: !settings.stream ? txt : null })}
|
||||
onToggleChanged={txt => updateSettings({ stream_large_bodies: !settings.stream_large_bodies ? txt : null })}
|
||||
/>
|
||||
</div>
|
||||
<div className="clearfix"/>
|
||||
|
Loading…
Reference in New Issue
Block a user