Revamp how addons work

- Addons now nest, which means that addons can manage addons. This has a number
of salutary effects - the scripts addon no longer has to poke into the global
addons list, we no longer have to replace/remove/boot-outof parent addons when
we load scripts, and this paves the way for making our top-level tools into
addons themselves.
- All addon calls are now wrapped in a safe execution environment where
exceptions are caught, and output to stdout/stderr are intercepted and turned
into logs.
- We no longer support script arguments in sys.argv - creating an option
properly is the only way to pass arguments. This means that all scripts are
always directly controllable from interctive tooling, and that arguments are
type-checked.

For now, I've disabled testing of the har dump example - it needs to be moved
to the new argument handling, and become a class addon. I'll address that in a
separate patch.
This commit is contained in:
Aldo Cortesi 2017-04-25 19:06:24 +12:00
parent 90c425bd14
commit e6eeab6094
34 changed files with 462 additions and 547 deletions

View File

@ -123,12 +123,12 @@ You can check you Python version by running ``python3 --version``.
sudo zypper install python3-pip python3-devel libffi-devel openssl-devel gcc-c++
sudo pip3 install mitmproxy
.. _install-source-windows:
🐱💻 Installation from Source on Windows
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Installation from Source on Windows
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. note::
Mitmproxy's console interface is not supported on Windows, but you can use

View File

@ -54,24 +54,8 @@ and is replaced by the class instance.
Handling arguments
------------------
Scripts can handle their own command-line arguments, just like any other Python
program. Let's build on the example above to do something slightly more
sophisticated - replace one value with another in all responses. Mitmproxy's
`HTTPRequest <api.html#mitmproxy.models.http.HTTPRequest>`_ and `HTTPResponse
<api.html#mitmproxy.models.http.HTTPResponse>`_ objects have a handy `replace
<api.html#mitmproxy.models.http.HTTPResponse.replace>`_ method that takes care
of all the details for us.
.. literalinclude:: ../../examples/simple/script_arguments.py
:caption: :src:`examples/simple/script_arguments.py`
:language: python
We can now call this script on the command-line like this:
>>> mitmdump -dd -s "./script_arguments.py html faketml"
Whenever a handler is called, mitpmroxy rewrites the script environment so that
it sees its own arguments as if it was invoked from the command-line.
FIXME
Logging and the context

View File

@ -54,5 +54,4 @@ class Rerouter:
flow.request.port = port
def load(l):
l.boot_into(Rerouter())
addons = [Rerouter()]

View File

@ -3,5 +3,4 @@ class AddHeader:
flow.response.headers["newheader"] = "foo"
def load(l):
return l.boot_into(AddHeader())
addons = [AddHeader()]

View File

@ -17,7 +17,4 @@ class Filter:
print(flow)
def load(l):
if len(sys.argv) != 2:
raise ValueError("Usage: -s 'filt.py FILTER'")
l.boot_into(Filter(sys.argv[1]))
addons = [Filter(sys.argv[1])]

View File

@ -23,7 +23,4 @@ class Writer:
self.w.add(flow)
def load(l):
if len(sys.argv) != 2:
raise ValueError('Usage: -s "flowriter.py filename"')
l.boot_into(Writer(sys.argv[1]))
addons = [Writer(sys.argv[1])]

View File

@ -1,29 +1,31 @@
# Usage: mitmdump -s "iframe_injector.py url"
# (this script works best with --anticache)
import sys
from bs4 import BeautifulSoup
class Injector:
def __init__(self, iframe_url):
self.iframe_url = iframe_url
def __init__(self):
self.iframe_url = None
def load(self, loader):
loader.add_option(
"iframe", str, "", "IFrame to inject"
)
def configure(self, options, updated):
self.iframe_url = options.iframe
def response(self, flow):
if flow.request.host in self.iframe_url:
return
html = BeautifulSoup(flow.response.content, "html.parser")
if html.body:
iframe = html.new_tag(
"iframe",
src=self.iframe_url,
frameborder=0,
height=0,
width=0)
html.body.insert(0, iframe)
flow.response.content = str(html).encode("utf8")
if self.iframe_url:
html = BeautifulSoup(flow.response.content, "html.parser")
if html.body:
iframe = html.new_tag(
"iframe",
src=self.iframe_url,
frameborder=0,
height=0,
width=0)
html.body.insert(0, iframe)
flow.response.content = str(html).encode("utf8")
def load(l):
if len(sys.argv) != 2:
raise ValueError('Usage: -s "iframe_injector.py url"')
return l.boot_into(Injector(sys.argv[1]))
addons = [Injector()]

View File

@ -1,17 +0,0 @@
import argparse
class Replacer:
def __init__(self, src, dst):
self.src, self.dst = src, dst
def response(self, flow):
flow.response.replace(self.src, self.dst)
def load(l):
parser = argparse.ArgumentParser()
parser.add_argument("src", type=str)
parser.add_argument("dst", type=str)
args = parser.parse_args()
l.boot_into(Replacer(args.src, args.dst))

View File

@ -1,4 +1,7 @@
import typing
import traceback
import contextlib
import sys
from mitmproxy import exceptions
from mitmproxy import eventsequence
@ -11,13 +14,66 @@ def _get_name(itm):
return getattr(itm, "name", itm.__class__.__name__.lower())
def cut_traceback(tb, func_name):
"""
Cut off a traceback at the function with the given name.
The func_name's frame is excluded.
Args:
tb: traceback object, as returned by sys.exc_info()[2]
func_name: function name
Returns:
Reduced traceback.
"""
tb_orig = tb
for _, _, fname, _ in traceback.extract_tb(tb):
tb = tb.tb_next
if fname == func_name:
break
return tb or tb_orig
class StreamLog:
"""
A class for redirecting output using contextlib.
"""
def __init__(self, log):
self.log = log
def write(self, buf):
if buf.strip():
self.log(buf)
def flush(self): # pragma: no cover
# Click uses flush sometimes, so we dummy it up
pass
@contextlib.contextmanager
def safecall():
stdout_replacement = StreamLog(ctx.log.warn)
try:
with contextlib.redirect_stdout(stdout_replacement):
yield
except exceptions.AddonHalt:
raise
except Exception as e:
etype, value, tb = sys.exc_info()
tb = cut_traceback(tb, "invoke_addon").tb_next
ctx.log.error(
"Addon error: %s" % "".join(
traceback.format_exception(etype, value, tb)
)
)
class Loader:
"""
A loader object is passed to the load() event when addons start up.
"""
def __init__(self, master):
self.master = master
self.boot_into_addon = None
def add_option(
self,
@ -35,25 +91,33 @@ class Loader:
choices
)
def boot_into(self, addon):
self.boot_into_addon = addon
func = getattr(addon, "load", None)
if func:
func(self)
def traverse(chain):
"""
Recursively traverse an addon chain.
"""
for a in chain:
yield a
if hasattr(a, "addons"):
yield from traverse(a.addons)
class AddonManager:
def __init__(self, master):
self.lookup = {}
self.chain = []
self.master = master
master.options.changed.connect(self.configure_all)
master.options.changed.connect(self._configure_all)
def _configure_all(self, options, updated):
self.trigger("configure", options, updated)
def clear(self):
"""
Remove all addons.
"""
self.done()
self.chain = []
for i in self.chain:
self.remove(i)
def get(self, name):
"""
@ -61,36 +125,52 @@ class AddonManager:
attribute on the instance, or the lower case class name if that
does not exist.
"""
for i in self.chain:
if name == _get_name(i):
return i
return self.lookup.get(name, None)
def configure_all(self, options, updated):
self.trigger("configure", options, updated)
def register(self, addon):
"""
Register an addon and all its sub-addons with the manager without
adding it to the chain. This should be used by addons that
dynamically manage addons. Must be called within a current context.
"""
for a in traverse([addon]):
name = _get_name(a)
if name in self.lookup:
raise exceptions.AddonError(
"An addon called '%s' already exists." % name
)
l = Loader(self.master)
self.invoke_addon(addon, "load", l)
for a in traverse([addon]):
name = _get_name(a)
self.lookup[name] = a
return addon
def add(self, *addons):
"""
Add addons to the end of the chain, and run their startup events.
Add addons to the end of the chain, and run their load event.
If any addon has sub-addons, they are registered.
"""
with self.master.handlecontext():
for i in addons:
l = Loader(self.master)
self.invoke_addon(i, "load", l)
if l.boot_into_addon:
self.chain.append(l.boot_into_addon)
else:
self.chain.append(i)
self.chain.append(self.register(i))
def remove(self, addon):
"""
Remove an addon from the chain, and run its done events.
"""
self.chain = [i for i in self.chain if i is not addon]
with self.master.handlecontext():
self.invoke_addon(addon, "done")
Remove an addon and all its sub-addons.
def done(self):
self.trigger("done")
If the addon is not in the chain - that is, if it's managed by a
parent addon - it's the parent's responsibility to remove it from
its own addons attribute.
"""
for a in traverse([addon]):
n = _get_name(a)
if n not in self.lookup:
raise exceptions.AddonError("No such addon: %s" % n)
self.chain = [i for i in self.chain if i is not a]
del self.lookup[_get_name(a)]
with self.master.handlecontext():
self.invoke_addon(a, "done")
def __len__(self):
return len(self.chain)
@ -126,22 +206,19 @@ class AddonManager:
def invoke_addon(self, addon, name, *args, **kwargs):
"""
Invoke an event on an addon. This method must run within an
established handler context.
Invoke an event on an addon and all its children. This method must
run within an established handler context.
"""
if not ctx.master:
raise exceptions.AddonError(
"invoke_addon called without a handler context."
)
if name not in eventsequence.Events:
name = "event_" + name
func = getattr(addon, name, None)
if func:
if not callable(func):
raise exceptions.AddonError(
"Addon handler %s not callable" % name
)
func(*args, **kwargs)
for a in traverse([addon]):
func = getattr(a, name, None)
if func:
if not callable(func):
raise exceptions.AddonError(
"Addon handler %s not callable" % name
)
func(*args, **kwargs)
def trigger(self, name, *args, **kwargs):
"""
@ -150,6 +227,7 @@ class AddonManager:
with self.master.handlecontext():
for i in self.chain:
try:
self.invoke_addon(i, name, *args, **kwargs)
with safecall():
self.invoke_addon(i, name, *args, **kwargs)
except exceptions.AddonHalt:
return

View File

@ -3,6 +3,8 @@ from mitmproxy.addons.onboardingapp import app
class Onboarding(wsgiapp.WSGIApp):
name = "onboarding"
def __init__(self):
super().__init__(app.Adapter(app.application), None, None)
self.enabled = False

View File

@ -1,123 +1,31 @@
import contextlib
import os
import shlex
import sys
import importlib
import threading
import traceback
import types
import sys
from mitmproxy import addonmanager
from mitmproxy import exceptions
from mitmproxy import ctx
from mitmproxy import eventsequence
import watchdog.events
from watchdog.observers import polling
def parse_command(command):
"""
Returns a (path, args) tuple.
"""
if not command or not command.strip():
raise ValueError("Empty script command.")
# Windows: escape all backslashes in the path.
if os.name == "nt": # pragma: no cover
backslashes = shlex.split(command, posix=False)[0].count("\\")
command = command.replace("\\", "\\\\", backslashes)
args = shlex.split(command) # pragma: no cover
args[0] = os.path.expanduser(args[0])
if not os.path.exists(args[0]):
raise ValueError(
("Script file not found: %s.\r\n"
"If your script path contains spaces, "
"make sure to wrap it in additional quotes, e.g. -s \"'./foo bar/baz.py' --args\".") %
args[0])
elif os.path.isdir(args[0]):
raise ValueError("Not a file: %s" % args[0])
return args[0], args[1:]
def cut_traceback(tb, func_name):
"""
Cut off a traceback at the function with the given name.
The func_name's frame is excluded.
Args:
tb: traceback object, as returned by sys.exc_info()[2]
func_name: function name
Returns:
Reduced traceback.
"""
tb_orig = tb
for _, _, fname, _ in traceback.extract_tb(tb):
tb = tb.tb_next
if fname == func_name:
break
if tb is None:
# We could not find the method, take the full stack trace.
# This may happen on some Python interpreters/flavors (e.g. PyInstaller).
return tb_orig
else:
return tb
class StreamLog:
"""
A class for redirecting output using contextlib.
"""
def __init__(self, log):
self.log = log
def write(self, buf):
if buf.strip():
self.log(buf)
@contextlib.contextmanager
def scriptenv(path, args):
oldargs = sys.argv
sys.argv = [path] + args
script_dir = os.path.dirname(os.path.abspath(path))
sys.path.append(script_dir)
stdout_replacement = StreamLog(ctx.log.warn)
def load_script(actx, path):
if not os.path.exists(path):
ctx.log.info("No such file: %s" % path)
return
loader = importlib.machinery.SourceFileLoader(os.path.basename(path), path)
try:
with contextlib.redirect_stdout(stdout_replacement):
yield
except SystemExit as v:
ctx.log.error("Script exited with code %s" % v.code)
except Exception:
etype, value, tb = sys.exc_info()
tb = cut_traceback(tb, "scriptenv").tb_next
ctx.log.error(
"Script error: %s" % "".join(
traceback.format_exception(etype, value, tb)
)
)
oldpath = sys.path
sys.path.insert(0, os.path.dirname(path))
with addonmanager.safecall():
m = loader.load_module()
if not getattr(m, "name", None):
m.name = path
return m
finally:
sys.argv = oldargs
sys.path.pop()
def load_script(path, args):
with open(path, "rb") as f:
try:
code = compile(f.read(), path, 'exec')
except SyntaxError as e:
ctx.log.error(
"Script error: %s line %s: %s" % (
e.filename, e.lineno, e.msg
)
)
return
ns = {'__file__': os.path.abspath(path)}
with scriptenv(path, args):
exec(code, ns)
return types.SimpleNamespace(**ns)
sys.path[:] = oldpath
class ReloadHandler(watchdog.events.FileSystemEventHandler):
@ -149,59 +57,39 @@ class Script:
"""
An addon that manages a single script.
"""
def __init__(self, command):
self.name = command
self.command = command
self.path, self.args = parse_command(command)
def __init__(self, path):
self.name = "scriptmanager:" + path
self.path = path
self.ns = None
self.observer = None
self.dead = False
self.last_options = None
self.should_reload = threading.Event()
for i in eventsequence.Events:
if not hasattr(self, i):
def mkprox():
evt = i
def load(self, l):
self.ns = load_script(ctx, self.path)
def prox(*args, **kwargs):
self.run(evt, *args, **kwargs)
return prox
setattr(self, i, mkprox())
def run(self, name, *args, **kwargs):
# It's possible for ns to be un-initialised if we failed during
# configure
if self.ns is not None and not self.dead:
func = getattr(self.ns, name, None)
if func:
with scriptenv(self.path, self.args):
return func(*args, **kwargs)
@property
def addons(self):
if self.ns is not None:
return [self.ns]
return []
def reload(self):
self.should_reload.set()
def load_script(self):
self.ns = load_script(self.path, self.args)
l = addonmanager.Loader(ctx.master)
self.run("load", l)
if l.boot_into_addon:
self.ns = l.boot_into_addon
def tick(self):
if self.should_reload.is_set():
self.should_reload.clear()
ctx.log.info("Reloading script: %s" % self.name)
self.ns = load_script(self.path, self.args)
self.configure(self.last_options, self.last_options.keys())
else:
self.run("tick")
def load(self, l):
self.last_options = ctx.master.options
self.load_script()
if self.ns:
ctx.master.addons.remove(self.ns)
self.ns = load_script(ctx, self.path)
if self.ns:
# We're already running, so we have to explicitly register and
# configure the addon
ctx.master.addons.register(self.ns)
self.configure(self.last_options, self.last_options.keys())
def configure(self, options, updated):
self.last_options = options
@ -213,11 +101,6 @@ class Script:
os.path.dirname(self.path) or "."
)
self.observer.start()
self.run("configure", options, updated)
def done(self):
self.run("done")
self.dead = True
class ScriptLoader:
@ -226,21 +109,14 @@ class ScriptLoader:
"""
def __init__(self):
self.is_running = False
self.addons = []
def running(self):
self.is_running = True
def run_once(self, command, flows):
try:
sc = Script(command)
except ValueError as e:
raise ValueError(str(e))
sc.load_script()
for f in flows:
for evt, o in eventsequence.iterate(f):
sc.run(evt, o)
sc.done()
return sc
# Returning once we have proper commands
raise NotImplementedError
def configure(self, options, updated):
if "scripts" in updated:
@ -248,25 +124,21 @@ class ScriptLoader:
if options.scripts.count(s) > 1:
raise exceptions.OptionsError("Duplicate script: %s" % s)
for a in ctx.master.addons.chain[:]:
if isinstance(a, Script) and a.name not in options.scripts:
for a in self.addons[:]:
if a.path not in options.scripts:
ctx.log.info("Un-loading script: %s" % a.name)
ctx.master.addons.remove(a)
self.addons.remove(a)
# The machinations below are to ensure that:
# - Scripts remain in the same order
# - Scripts are listed directly after the script addon. This is
# needed to ensure that interactions with, for instance, flow
# serialization remains correct.
# - Scripts are not initialized un-necessarily. If only a
# script's order in the script list has changed, it should simply
# be moved.
# script's order in the script list has changed, it is just
# moved.
current = {}
for a in ctx.master.addons.chain[:]:
if isinstance(a, Script):
current[a.name] = a
ctx.master.addons.chain.remove(a)
for a in self.addons:
current[a.path] = a
ordered = []
newscripts = []
@ -275,24 +147,15 @@ class ScriptLoader:
ordered.append(current[s])
else:
ctx.log.info("Loading script: %s" % s)
try:
sc = Script(s)
except ValueError as e:
raise exceptions.OptionsError(str(e))
sc = Script(s)
ordered.append(sc)
newscripts.append(sc)
ochain = ctx.master.addons.chain
pos = ochain.index(self)
ctx.master.addons.chain = ochain[:pos + 1] + ordered + ochain[pos + 1:]
self.addons = ordered
for s in newscripts:
l = addonmanager.Loader(ctx.master)
ctx.master.addons.invoke_addon(s, "load", l)
ctx.master.addons.register(s)
if self.is_running:
# If we're already running, we configure and tell the addon
# we're up and running.
ctx.master.addons.invoke_addon(
s, "configure", options, options.keys()
)
ctx.master.addons.invoke_addon(s, "running")

View File

@ -3,6 +3,11 @@ import click
from mitmproxy import log
# These get over-ridden by the save execution context. Keep them around so we
# can log directly.
realstdout = sys.stdout
realstderr = sys.stderr
class TermLog:
def __init__(self, outfile=None):
@ -14,9 +19,9 @@ class TermLog:
def log(self, e):
if log.log_tier(e.level) == log.log_tier("error"):
outfile = self.outfile or sys.stderr
outfile = self.outfile or realstderr
else:
outfile = self.outfile or sys.stdout
outfile = self.outfile or realstdout
if self.options.verbosity >= log.log_tier(e.level):
click.secho(

View File

@ -13,6 +13,10 @@ class WSGIApp:
def __init__(self, app, host, port):
self.app, self.host, self.port = app, host, port
@property
def name(self):
return "wsgiapp:%s:%s" % (self.host, self.port)
def serve(self, app, flow):
"""
Serves app on flow, and prevents further handling of the flow.

View File

@ -103,7 +103,7 @@ class Master:
def shutdown(self):
self.server.shutdown()
self.should_exit.set()
self.addons.done()
self.addons.trigger("done")
def create_request(self, method, url):
"""

View File

@ -1,3 +1,4 @@
import sys
import contextlib
import mitmproxy.master
@ -5,6 +6,7 @@ import mitmproxy.options
from mitmproxy import proxy
from mitmproxy import addonmanager
from mitmproxy import eventsequence
from mitmproxy.addons import script
class TestAddons(addonmanager.AddonManager):
@ -26,6 +28,10 @@ class RecordingMaster(mitmproxy.master.Master):
self.events = []
self.logs = []
def dump_log(self, outf=sys.stdout):
for i in self.logs:
print("%s: %s" % (i.level, i.msg), file=outf)
def has_log(self, txt, level=None):
for i in self.logs:
if level and i.level != level:
@ -51,14 +57,21 @@ class context:
provides a number of helper methods for common testing scenarios.
"""
def __init__(self, master = None, options = None):
self.options = options or mitmproxy.options.Options()
options = options or mitmproxy.options.Options()
self.master = master or RecordingMaster(
options, proxy.DummyServer(options)
)
self.options = self.master.options
self.wrapped = None
def ctx(self):
"""
Returns a new handler context.
"""
return self.master.handlecontext()
def __enter__(self):
self.wrapped = self.master.handlecontext()
self.wrapped = self.ctx()
self.wrapped.__enter__()
return self
@ -75,11 +88,13 @@ class context:
"""
f.reply._state = "start"
for evt, arg in eventsequence.iterate(f):
h = getattr(addon, evt, None)
if h:
h(arg)
if f.reply.state == "taken":
return
self.master.addons.invoke_addon(
addon,
evt,
arg
)
if f.reply.state == "taken":
return
def configure(self, addon, **kwargs):
"""
@ -89,4 +104,17 @@ class context:
"""
with self.options.rollback(kwargs.keys(), reraise=True):
self.options.update(**kwargs)
addon.configure(self.options, kwargs.keys())
self.master.addons.invoke_addon(
addon,
"configure",
self.options,
kwargs.keys()
)
def script(self, path):
sc = script.Script(path)
loader = addonmanager.Loader(self.master)
sc.load(loader)
for a in addonmanager.traverse(sc.addons):
getattr(a, "load", lambda x: None)(loader)
return sc

View File

@ -76,7 +76,7 @@ def run(MasterKlass, args, extra=None): # pragma: no cover
unknown = optmanager.load_paths(opts, args.conf)
server = process_options(parser, opts, args)
master = MasterKlass(opts, server)
master.addons.configure_all(opts, opts.keys())
master.addons.trigger("configure", opts, opts.keys())
remaining = opts.update_known(**unknown)
if remaining and opts.verbosity > 1:
print("Ignored options: %s" % remaining)

View File

@ -1,5 +1,3 @@
import pytest
from mitmproxy import options
from mitmproxy import contentviews
from mitmproxy import proxy
@ -8,6 +6,7 @@ from mitmproxy.addons import script
from mitmproxy.test import tflow
from mitmproxy.test import tutils
from mitmproxy.test import taddons
from mitmproxy.net.http import Headers
from ..mitmproxy import tservers
@ -27,7 +26,7 @@ class RaiseMaster(master.Master):
def tscript(cmd, args=""):
o = options.Options()
cmd = example_dir.path(cmd) + " " + args
cmd = example_dir.path(cmd)
m = RaiseMaster(o, proxy.DummyServer())
sc = script.Script(cmd)
m.addons.add(sc)
@ -48,14 +47,18 @@ class TestScripts(tservers.MasterTest):
assert any(b'tEST!' in val[0][1] for val in fmt)
def test_iframe_injector(self):
with pytest.raises(ScriptError):
tscript("simple/modify_body_inject_iframe.py")
m, sc = tscript("simple/modify_body_inject_iframe.py", "http://example.org/evil_iframe")
f = tflow.tflow(resp=tutils.tresp(content=b"<html><body>mitmproxy</body></html>"))
m.addons.handle_lifecycle("response", f)
content = f.response.content
assert b'iframe' in content and b'evil_iframe' in content
with taddons.context() as tctx:
sc = tctx.script(example_dir.path("simple/modify_body_inject_iframe.py"))
tctx.configure(
sc,
iframe = "http://example.org/evil_iframe"
)
f = tflow.tflow(
resp=tutils.tresp(content=b"<html><body>mitmproxy</body></html>")
)
tctx.master.addons.invoke_addon(sc, "response", f)
content = f.response.content
assert b'iframe' in content and b'evil_iframe' in content
def test_modify_form(self):
m, sc = tscript("simple/modify_form.py")
@ -81,12 +84,6 @@ class TestScripts(tservers.MasterTest):
m.addons.handle_lifecycle("request", f)
assert f.request.query["mitmproxy"] == "rocks"
def test_arguments(self):
m, sc = tscript("simple/script_arguments.py", "mitmproxy rocks")
f = tflow.tflow(resp=tutils.tresp(content=b"I <3 mitmproxy"))
m.addons.handle_lifecycle("response", f)
assert f.response.content == b"I <3 rocks"
def test_redirect_requests(self):
m, sc = tscript("simple/redirect_requests.py")
f = tflow.tflow(req=tutils.treq(host="example.org"))

View File

@ -11,7 +11,7 @@ def test_simple():
with pytest.raises(exceptions.OptionsError):
tctx.configure(sa, body_size_limit = "invalid")
tctx.configure(sa, body_size_limit = "1m")
assert tctx.options._processed["body_size_limit"]
assert tctx.master.options._processed["body_size_limit"]
with pytest.raises(exceptions.OptionsError, match="mutually exclusive"):
tctx.configure(

View File

@ -1,7 +1,6 @@
import traceback
import sys
import time
import re
import watchdog.events
import pytest
@ -14,23 +13,8 @@ from mitmproxy import exceptions
from mitmproxy import options
from mitmproxy import proxy
from mitmproxy import master
from mitmproxy import utils
from mitmproxy.addons import script
from ...conftest import skip_not_windows
def test_scriptenv():
with taddons.context() as tctx:
with script.scriptenv("path", []):
raise SystemExit
assert tctx.master.has_log("exited", "error")
tctx.master.clear()
with script.scriptenv("path", []):
raise ValueError("fooo")
assert tctx.master.has_log("fooo", "error")
class Called:
def __init__(self):
@ -60,113 +44,86 @@ def test_reloadhandler():
assert rh.callback.called
class TestParseCommand:
def test_empty_command(self):
with pytest.raises(ValueError):
script.parse_command("")
with pytest.raises(ValueError):
script.parse_command(" ")
def test_no_script_file(self, tmpdir):
with pytest.raises(Exception, match="not found"):
script.parse_command("notfound")
with pytest.raises(Exception, match="Not a file"):
script.parse_command(str(tmpdir))
def test_parse_args(self):
with utils.chdir(tutils.test_data.dirname):
assert script.parse_command(
"mitmproxy/data/addonscripts/recorder.py"
) == ("mitmproxy/data/addonscripts/recorder.py", [])
assert script.parse_command(
"mitmproxy/data/addonscripts/recorder.py foo bar"
) == ("mitmproxy/data/addonscripts/recorder.py", ["foo", "bar"])
assert script.parse_command(
"mitmproxy/data/addonscripts/recorder.py 'foo bar'"
) == ("mitmproxy/data/addonscripts/recorder.py", ["foo bar"])
@skip_not_windows
def test_parse_windows(self):
with utils.chdir(tutils.test_data.dirname):
assert script.parse_command(
"mitmproxy/data\\addonscripts\\recorder.py"
) == ("mitmproxy/data\\addonscripts\\recorder.py", [])
assert script.parse_command(
"mitmproxy/data\\addonscripts\\recorder.py 'foo \\ bar'"
) == ("mitmproxy/data\\addonscripts\\recorder.py", ['foo \\ bar'])
def test_load_script():
with taddons.context():
with taddons.context() as tctx:
ns = script.load_script(
tctx.ctx(),
tutils.test_data.path(
"mitmproxy/data/addonscripts/recorder.py"
), []
"mitmproxy/data/addonscripts/recorder/recorder.py"
)
)
assert ns.load
assert ns.addons
ns = script.load_script(
tctx.ctx(),
"nonexistent"
)
assert not ns
def test_script_print_stdout():
with taddons.context() as tctx:
with mock.patch('mitmproxy.ctx.log.warn') as mock_warn:
with script.scriptenv("path", []):
with addonmanager.safecall():
ns = script.load_script(
tctx.ctx(),
tutils.test_data.path(
"mitmproxy/data/addonscripts/print.py"
), []
)
)
ns.load(addonmanager.Loader(tctx.master))
mock_warn.assert_called_once_with("stdoutprint")
class TestScript:
def test_notfound(self):
with taddons.context() as tctx:
sc = script.Script("nonexistent")
tctx.master.addons.add(sc)
def test_simple(self):
with taddons.context():
with taddons.context() as tctx:
sc = script.Script(
tutils.test_data.path(
"mitmproxy/data/addonscripts/recorder.py"
"mitmproxy/data/addonscripts/recorder/recorder.py"
)
)
sc.load_script()
assert sc.ns.call_log[0][0:2] == ("solo", "load")
tctx.master.addons.add(sc)
sc.ns.call_log = []
rec = tctx.master.addons.get("recorder")
assert rec.call_log[0][0:2] == ("recorder", "load")
rec.call_log = []
f = tflow.tflow(resp=True)
sc.request(f)
tctx.master.addons.trigger("request", f)
recf = sc.ns.call_log[0]
assert recf[1] == "request"
assert rec.call_log[0][1] == "request"
def test_reload(self, tmpdir):
with taddons.context() as tctx:
f = tmpdir.join("foo.py")
f.ensure(file=True)
f.write("\n")
sc = script.Script(str(f))
tctx.configure(sc)
for _ in range(100):
f.write(".")
for _ in range(5):
sc.reload()
sc.tick()
time.sleep(0.1)
if tctx.master.logs:
return
raise AssertionError("Change event not detected.")
def test_exception(self):
with taddons.context() as tctx:
sc = script.Script(
tutils.test_data.path("mitmproxy/data/addonscripts/error.py")
)
l = addonmanager.Loader(tctx.master)
sc.load(l)
tctx.master.addons.add(sc)
f = tflow.tflow(resp=True)
sc.request(f)
tctx.master.addons.trigger("request", f)
assert tctx.master.logs[0].level == "error"
assert len(tctx.master.logs[0].msg.splitlines()) == 6
assert re.search(r'addonscripts[\\/]error.py", line \d+, in request', tctx.master.logs[0].msg)
assert re.search(r'addonscripts[\\/]error.py", line \d+, in mkerr', tctx.master.logs[0].msg)
assert tctx.master.logs[0].msg.endswith("ValueError: Error!\n")
tctx.master.has_log("ValueError: Error!")
tctx.master.has_log("error.py")
def test_addon(self):
with taddons.context() as tctx:
@ -175,11 +132,9 @@ class TestScript:
"mitmproxy/data/addonscripts/addon.py"
)
)
l = addonmanager.Loader(tctx.master)
sc.load(l)
tctx.configure(sc)
tctx.master.addons.add(sc)
assert sc.ns.event_log == [
'scriptload', 'addonload', 'addonconfigure'
'scriptload', 'addonload'
]
@ -194,49 +149,33 @@ class TestCutTraceback:
self.raise_(4)
except RuntimeError:
tb = sys.exc_info()[2]
tb_cut = script.cut_traceback(tb, "test_simple")
tb_cut = addonmanager.cut_traceback(tb, "test_simple")
assert len(traceback.extract_tb(tb_cut)) == 5
tb_cut2 = script.cut_traceback(tb, "nonexistent")
tb_cut2 = addonmanager.cut_traceback(tb, "nonexistent")
assert len(traceback.extract_tb(tb_cut2)) == len(traceback.extract_tb(tb))
class TestScriptLoader:
def test_run_once(self):
o = options.Options(scripts=[])
m = master.Master(o, proxy.DummyServer())
sl = script.ScriptLoader()
m.addons.add(sl)
f = tflow.tflow(resp=True)
with m.handlecontext():
sc = sl.run_once(
tutils.test_data.path(
"mitmproxy/data/addonscripts/recorder.py"
), [f]
)
evts = [i[1] for i in sc.ns.call_log]
assert evts == ['load', 'requestheaders', 'request', 'responseheaders', 'response', 'done']
f = tflow.tflow(resp=True)
with m.handlecontext():
with pytest.raises(Exception, match="file not found"):
sl.run_once("nonexistent", [f])
def test_simple(self):
o = options.Options(scripts=[])
m = master.Master(o, proxy.DummyServer())
sc = script.ScriptLoader()
sc.running()
m.addons.add(sc)
assert len(m.addons) == 1
o.update(
scripts = [
tutils.test_data.path("mitmproxy/data/addonscripts/recorder.py")
tutils.test_data.path(
"mitmproxy/data/addonscripts/recorder/recorder.py"
)
]
)
assert len(m.addons) == 2
assert len(m.addons) == 1
assert len(sc.addons) == 1
o.update(scripts = [])
assert len(m.addons) == 1
assert len(sc.addons) == 0
def test_dupes(self):
sc = script.ScriptLoader()
@ -252,65 +191,70 @@ class TestScriptLoader:
sc = script.ScriptLoader()
with taddons.context() as tctx:
tctx.master.addons.add(sc)
with pytest.raises(exceptions.OptionsError):
tctx.configure(
sc,
scripts = ["nonexistent"]
)
tctx.configure(sc, scripts = ["nonexistent"])
tctx.master.has_log("nonexistent: file not found")
def test_order(self):
rec = tutils.test_data.path("mitmproxy/data/addonscripts/recorder.py")
rec = tutils.test_data.path("mitmproxy/data/addonscripts/recorder")
sc = script.ScriptLoader()
sc.is_running = True
with taddons.context() as tctx:
tctx.master.addons.add(sc)
sc.running()
tctx.configure(
sc,
scripts = [
"%s %s" % (rec, "a"),
"%s %s" % (rec, "b"),
"%s %s" % (rec, "c"),
"%s/a.py" % rec,
"%s/b.py" % rec,
"%s/c.py" % rec,
]
)
debug = [i.msg for i in tctx.master.logs if i.level == "debug"]
assert debug == [
'a load',
'a running',
'b load',
'b running',
'c load',
'c running',
'a configure',
'b configure',
'c configure',
]
tctx.master.logs = []
tctx.configure(
sc,
scripts = [
"%s/c.py" % rec,
"%s/a.py" % rec,
"%s/b.py" % rec,
]
)
debug = [i.msg for i in tctx.master.logs if i.level == "debug"]
assert debug == [
'a load',
'a configure',
'a running',
'b load',
'b configure',
'b running',
'c load',
'c configure',
'c running',
'a configure',
'b configure',
]
tctx.master.logs = []
tctx.configure(
sc,
scripts = [
"%s %s" % (rec, "c"),
"%s %s" % (rec, "a"),
"%s %s" % (rec, "b"),
]
)
debug = [i.msg for i in tctx.master.logs if i.level == "debug"]
assert debug == []
tctx.master.logs = []
tctx.configure(
sc,
scripts = [
"%s %s" % (rec, "x"),
"%s %s" % (rec, "a"),
"%s/e.py" % rec,
"%s/a.py" % rec,
]
)
debug = [i.msg for i in tctx.master.logs if i.level == "debug"]
assert debug == [
'c done',
'b done',
'x load',
'x configure',
'x running',
'e load',
'e running',
'e configure',
'a configure',
]

View File

@ -19,4 +19,6 @@ def configure(options, updated):
def load(l):
event_log.append("scriptload")
l.boot_into(Addon())
addons = [Addon()]

View File

@ -1,4 +1,5 @@
import time
import sys
from mitmproxy.script import concurrent

View File

@ -9,5 +9,4 @@ class ConcurrentClass:
time.sleep(0.1)
def load(l):
l.boot_into(ConcurrentClass())
addons = [ConcurrentClass()]

View File

@ -0,0 +1,3 @@
import recorder
addons = [recorder.Recorder("a")]

View File

@ -0,0 +1,3 @@
import recorder
addons = [recorder.Recorder("b")]

View File

@ -0,0 +1,3 @@
import recorder
addons = [recorder.Recorder("c")]

View File

@ -0,0 +1,3 @@
import recorder
addons = [recorder.Recorder("e")]

View File

@ -1,13 +1,12 @@
from mitmproxy import controller
from mitmproxy import eventsequence
from mitmproxy import ctx
import sys
class CallLogger:
class Recorder:
call_log = []
def __init__(self, name = "solo"):
def __init__(self, name = "recorder"):
self.name = name
def __getattr__(self, attr):
@ -22,5 +21,4 @@ class CallLogger:
raise AttributeError
def load(l):
l.boot_into(CallLogger(*sys.argv[1:]))
addons = [Recorder()]

View File

@ -296,8 +296,8 @@ class TestHTTP(tservers.HTTPProxyTest, CommonMixin):
class TestHTTPAuth(tservers.HTTPProxyTest):
def test_auth(self):
self.master.addons.add(proxyauth.ProxyAuth())
self.master.addons.configure_all(
self.master.options, self.master.options.keys()
self.master.addons.trigger(
"configure", self.master.options, self.master.options.keys()
)
self.master.options.proxyauth = "test:test"
assert self.pathod("202").status_code == 407

View File

@ -20,14 +20,11 @@ class Thing:
class TestConcurrent(tservers.MasterTest):
def test_concurrent(self):
with taddons.context() as tctx:
sc = script.Script(
sc = tctx.script(
tutils.test_data.path(
"mitmproxy/data/addonscripts/concurrent_decorator.py"
)
)
l = addonmanager.Loader(tctx.master)
sc.load(l)
f1, f2 = tflow.tflow(), tflow.tflow()
tctx.cycle(sc, f1)
tctx.cycle(sc, f2)

View File

@ -5,14 +5,17 @@ from mitmproxy import exceptions
from mitmproxy import options
from mitmproxy import master
from mitmproxy import proxy
from mitmproxy.test import taddons
from mitmproxy.test import tflow
class TAddon:
def __init__(self, name):
def __init__(self, name, addons=None):
self.name = name
self.tick = True
self.custom_called = False
if addons:
self.addons = addons
def __repr__(self):
return "Addon(%s)" % self.name
@ -34,19 +37,6 @@ class AOption:
l.add_option("custom_option", bool, False, "help")
class AChain:
def __init__(self, name, next):
self.name = name
self.next = next
def load(self, l):
if self.next:
l.boot_into(self.next)
def __repr__(self):
return "<%s>" % self.name
def test_halt():
o = options.Options()
m = master.Master(o, proxy.DummyServer(o))
@ -70,40 +60,42 @@ def test_lifecycle():
a = addonmanager.AddonManager(m)
a.add(TAddon("one"))
with pytest.raises(exceptions.AddonError):
a.add(TAddon("one"))
with pytest.raises(exceptions.AddonError):
a.remove(TAddon("nonexistent"))
f = tflow.tflow()
a.handle_lifecycle("request", f)
a.configure_all(o, o.keys())
a._configure_all(o, o.keys())
def test_simple():
o = options.Options()
m = master.Master(o, proxy.DummyServer(o))
a = addonmanager.AddonManager(m)
with pytest.raises(exceptions.AddonError):
a.invoke_addon(TAddon("one"), "done")
with taddons.context() as tctx:
a = tctx.master.addons
assert len(a) == 0
a.add(TAddon("one"))
assert a.get("one")
assert not a.get("two")
assert len(a) == 1
a.clear()
assert len(a) == 0
assert not a.chain
assert len(a) == 0
a.add(TAddon("one"))
assert a.get("one")
assert not a.get("two")
assert len(a) == 1
a.clear()
assert len(a) == 0
assert not a.chain
a.add(TAddon("one"))
a.trigger("done")
with pytest.raises(exceptions.AddonError):
a.add(TAddon("one"))
a.trigger("done")
a.trigger("tick")
tctx.master.has_log("not callable")
a.remove(a.get("one"))
assert not a.get("one")
a.remove(a.get("one"))
assert not a.get("one")
ta = TAddon("one")
a.add(ta)
a.trigger("custom")
assert ta.custom_called
ta = TAddon("one")
a.add(ta)
a.trigger("custom")
assert ta.custom_called
def test_load_option():
@ -114,29 +106,47 @@ def test_load_option():
assert "custom_option" in m.options._options
def test_loadchain():
def test_nesting():
o = options.Options()
m = master.Master(o, proxy.DummyServer(o))
a = addonmanager.AddonManager(m)
a.add(AChain("one", None))
a.add(
TAddon(
"one",
addons=[
TAddon("two"),
TAddon("three", addons=[TAddon("four")])
]
)
)
assert len(a.chain) == 1
assert a.get("one")
a.clear()
a.add(AChain("one", AChain("two", None)))
assert not a.get("one")
assert a.get("two")
a.clear()
a.add(AChain("one", AChain("two", AChain("three", None))))
assert not a.get("one")
assert not a.get("two")
assert a.get("three")
a.clear()
a.add(AChain("one", AChain("two", AChain("three", AChain("four", None)))))
assert not a.get("one")
assert not a.get("two")
assert not a.get("three")
assert a.get("four")
a.clear()
a.trigger("custom")
assert a.get("one").custom_called
assert a.get("two").custom_called
assert a.get("three").custom_called
assert a.get("four").custom_called
a.remove(a.get("three"))
assert not a.get("three")
assert not a.get("four")
class D:
def __init__(self):
self.w = None
def log(self, x):
self.w = x
def test_streamlog():
dummy = D()
s = addonmanager.StreamLog(dummy.log)
s.write("foo")
assert dummy.w == "foo"

View File

@ -1,4 +1,6 @@
import io
from mitmproxy.test import taddons
from mitmproxy.test import tutils
from mitmproxy import ctx
@ -9,3 +11,21 @@ def test_recordingmaster():
ctx.log.error("foo")
assert not tctx.master.has_log("foo", level="debug")
assert tctx.master.has_log("foo", level="error")
def test_dumplog():
with taddons.context() as tctx:
ctx.log.info("testing")
s = io.StringIO()
tctx.master.dump_log(s)
assert s.getvalue()
def test_load_script():
with taddons.context() as tctx:
s = tctx.script(
tutils.test_data.path(
"mitmproxy/data/addonscripts/recorder/recorder.py"
)
)
assert s

View File

@ -30,7 +30,7 @@ class TestMaster(tservers.MasterTest):
opts["verbosity"] = 1
o = options.Options(**opts)
m = console.master.ConsoleMaster(o, proxy.DummyServer())
m.addons.configure_all(o, o.keys())
m.addons.trigger("configure", o, o.keys())
return m
def test_basic(self):
@ -42,12 +42,6 @@ class TestMaster(tservers.MasterTest):
pass
assert len(m.view) == i
def test_run_script_once(self):
m = self.mkmaster()
f = tflow.tflow(resp=True)
m.run_script_once("nonexistent", [f])
assert any("Input error" in str(l) for l in m.logbuffer)
def test_intercept(self):
"""regression test for https://github.com/mitmproxy/mitmproxy/issues/1605"""
m = self.mkmaster(intercept="~b bar")

View File

@ -74,7 +74,7 @@ class TestMaster(taddons.RecordingMaster):
self.state = TestState()
self.addons.add(self.state)
self.addons.add(*addons)
self.addons.configure_all(self.options, self.options.keys())
self.addons.trigger("configure", self.options, self.options.keys())
self.addons.trigger("running")
def reset(self, addons):