make static viewer more testable, print slow contentviews

This commit is contained in:
Maximilian Hils 2017-08-16 19:09:02 +02:00
parent 3f497640ab
commit 4fb255a7af
4 changed files with 96 additions and 106 deletions

View File

@ -14,6 +14,7 @@ from mitmproxy.addons import replace
from mitmproxy.addons import script from mitmproxy.addons import script
from mitmproxy.addons import serverplayback from mitmproxy.addons import serverplayback
from mitmproxy.addons import setheaders from mitmproxy.addons import setheaders
from mitmproxy.addons import static_viewer
from mitmproxy.addons import stickyauth from mitmproxy.addons import stickyauth
from mitmproxy.addons import stickycookie from mitmproxy.addons import stickycookie
from mitmproxy.addons import streambodies from mitmproxy.addons import streambodies
@ -39,6 +40,7 @@ def default_addons():
script.ScriptLoader(), script.ScriptLoader(),
serverplayback.ServerPlayback(), serverplayback.ServerPlayback(),
setheaders.SetHeaders(), setheaders.SetHeaders(),
static_viewer.StaticViewer(),
stickyauth.StickyAuth(), stickyauth.StickyAuth(),
stickycookie.StickyCookie(), stickycookie.StickyCookie(),
streambodies.StreamBodies(), streambodies.StreamBodies(),

View File

@ -0,0 +1,93 @@
import json
import os.path
import pathlib
import shutil
import time
import typing
from mitmproxy import contentviews
from mitmproxy import ctx
from mitmproxy import flowfilter
from mitmproxy import io, flow
from mitmproxy.tools.web.app import flow_to_json
web_dir = pathlib.Path(__file__).absolute().parent.parent / "tools" / "web"
def save_static(path: pathlib.Path) -> None:
"""
Save the files for the static web view.
"""
# We want to overwrite the static files to keep track of the update.
if (path / "static").exists():
shutil.rmtree(str(path / "static"))
shutil.copytree(str(web_dir / "static"), str(path / "static"))
shutil.copyfile(str(web_dir / 'templates' / 'index.html'), str(path / "index.html"))
with open(web_dir / "static" / "static.js", "w") as f:
f.write("MITMWEB_STATIC = true;")
def save_filter_help(path: pathlib.Path) -> None:
with open(path / 'filter-help.json', 'w') as f:
json.dump(dict(commands=flowfilter.help), f)
def save_flows(path: pathlib.Path, flows: typing.Iterable[flow.Flow]) -> None:
with open(path / 'flows.json', 'w') as f:
json.dump(
[flow_to_json(f) for f in flows],
f
)
def save_flows_content(path: pathlib.Path, flows: typing.Iterable[flow.Flow]) -> None:
for f in flows:
for m in ('request', 'response'):
message = getattr(f, m)
message_path = path / "flows" / f.id / m
os.makedirs(str(message_path / "content"), exist_ok=True)
with open(message_path / '_content', 'wb') as f:
# don't use raw_content here as this is served with a default content type
f.write(message.content)
# content_view
t = time.time()
description, lines, error = contentviews.get_message_content_view(
'Auto', message
)
if time.time() - t > 0.1:
ctx.log(
"Slow content view: {} took {}s".format(
description.strip(),
round(time.time() - t, 1)
),
"info"
)
with open(message_path / "content" / "Auto.json", "w") as f:
json.dump(
dict(lines=list(lines), description=description),
f
)
class StaticViewer:
# TODO: make this a command at some point.
def load(self, loader):
loader.add_option(
"web_static_viewer", typing.Optional[str], "",
"The path to output a static viewer."
)
def configure(self, updated):
if "web_static_viewer" in updated and ctx.options.web_static_viewer:
flows = io.read_flows_from_paths([ctx.options.rfile])
p = pathlib.Path(ctx.options.web_static_viewer).expanduser()
self.export(p, flows)
def export(self, path: pathlib.Path, flows: typing.Iterable[flow.Flow]) -> None:
save_static(path)
save_filter_help(path)
save_flows(path, flows)
save_flows_content(path, flows)

View File

@ -5,7 +5,6 @@ import os.path
import re import re
from io import BytesIO from io import BytesIO
import mitmproxy.addons.view
import mitmproxy.flow import mitmproxy.flow
import tornado.escape import tornado.escape
import tornado.web import tornado.web
@ -149,7 +148,7 @@ class RequestHandler(tornado.web.RequestHandler):
return self.request.body return self.request.body
@property @property
def view(self) -> mitmproxy.addons.view.View: def view(self) -> "mitmproxy.addons.view.View":
return self.application.master.view return self.application.master.view
@property @property

View File

@ -1,104 +0,0 @@
import os.path
import shutil
import json
from typing import Optional
from mitmproxy import io
from mitmproxy import ctx
from mitmproxy import flowfilter
from mitmproxy import contentviews
from mitmproxy.tools.web.app import flow_to_json
class StaticViewer:
def __init__(self):
self.flows = set() # type: Set[flow.Flow]
self.path = ''
self.flows_path = ''
def load(self, loader):
loader.add_option(
"web_static_viewer", Optional[str], "",
"The path to output a static viewer."
)
def configure(self, updated):
if "web_static_viewer" in updated and ctx.options.web_static_viewer:
self.path = os.path.expanduser(ctx.options.web_static_viewer)
if "rfile" in updated and ctx.options.rfile:
self.flows_path = os.path.expanduser(ctx.options.rfile)
if self.flows_path and self.path:
self.save_static()
self.load_flows()
self.save_flows()
self.save_filter_help()
self.save_flows_content()
def load_flows(self) -> None:
with open(self.flows_path, 'rb') as file:
for i in io.FlowReader(file).stream():
self.flows.add(i)
def save_flows(self) -> None:
with open(os.path.join(self.path, 'flows.json'), 'w') as file:
flows = []
for f in self.flows:
flows.append(flow_to_json(f))
json.dump(flows, file)
def save_flows_content(self) -> None:
for f in self.flows:
for m in ('request', 'response'):
message = getattr(f, m)
path = os.path.join(self.path, 'flows', f.id, m)
if not os.path.exists(path):
os.makedirs(path)
with open(os.path.join(path, '_content'), 'wb') as content_file:
content_file.write(message.raw_content)
# content_view
view_path = os.path.join(path, 'content')
if not os.path.exists(view_path):
os.makedirs(view_path)
description, lines, error = contentviews.get_message_content_view(
'Auto', message
)
with open(os.path.join(view_path, 'Auto.json'), 'w') as view_file:
json.dump(dict(
lines=list(lines),
description=description
), view_file)
def save_static(self) -> None:
"""
Save the files for the static web view.
"""
static_path = os.path.join(os.path.dirname(__file__), 'static')
index_path = os.path.join(os.path.dirname(__file__), 'templates', 'index.html')
# We want to overwrite the static files to keep track of the update.
try:
shutil.copytree(static_path, os.path.join(self.path, 'static'),
ignore=shutil.ignore_patterns('static.js'))
except FileExistsError:
shutil.rmtree(os.path.join(self.path, 'static'))
shutil.copytree(static_path, os.path.join(self.path, 'static'),
ignore=shutil.ignore_patterns('static.js'))
index_template = open(index_path, 'r')
index = open(os.path.join(self.path, 'index.html'), 'w')
# Change the resource files to relative path.
index.write(index_template.read())
index_template.close()
index.close()
static_template = open(os.path.join(static_path, 'static.js'), 'r')
static = open(os.path.join(self.path, 'static', 'static.js'), 'w')
# Turn on MITMWEB_STATIC variable
static.write(static_template.read().replace('false', 'true'))
static_template.close()
static.close()
def save_filter_help(self) -> None:
with open(os.path.join(self.path, 'filter-help.json'), 'w') as file:
json.dump(dict(commands=flowfilter.help), file)