Merge pull request #2261 from cortesi/addonrevamp

Revamp how addons work
This commit is contained in:
Aldo Cortesi 2017-04-26 09:03:03 +12:00 committed by GitHub
commit 02c82b1b60
34 changed files with 462 additions and 547 deletions

View File

@ -123,12 +123,12 @@ You can check you Python version by running ``python3 --version``.
sudo zypper install python3-pip python3-devel libffi-devel openssl-devel gcc-c++ sudo zypper install python3-pip python3-devel libffi-devel openssl-devel gcc-c++
sudo pip3 install mitmproxy sudo pip3 install mitmproxy
.. _install-source-windows: .. _install-source-windows:
🐱💻 Installation from Source on Windows Installation from Source on Windows
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. note:: .. note::
Mitmproxy's console interface is not supported on Windows, but you can use Mitmproxy's console interface is not supported on Windows, but you can use

View File

@ -54,24 +54,8 @@ and is replaced by the class instance.
Handling arguments Handling arguments
------------------ ------------------
Scripts can handle their own command-line arguments, just like any other Python
program. Let's build on the example above to do something slightly more
sophisticated - replace one value with another in all responses. Mitmproxy's
`HTTPRequest <api.html#mitmproxy.models.http.HTTPRequest>`_ and `HTTPResponse
<api.html#mitmproxy.models.http.HTTPResponse>`_ objects have a handy `replace
<api.html#mitmproxy.models.http.HTTPResponse.replace>`_ method that takes care
of all the details for us.
.. literalinclude:: ../../examples/simple/script_arguments.py FIXME
:caption: :src:`examples/simple/script_arguments.py`
:language: python
We can now call this script on the command-line like this:
>>> mitmdump -dd -s "./script_arguments.py html faketml"
Whenever a handler is called, mitpmroxy rewrites the script environment so that
it sees its own arguments as if it was invoked from the command-line.
Logging and the context Logging and the context

View File

@ -54,5 +54,4 @@ class Rerouter:
flow.request.port = port flow.request.port = port
def load(l): addons = [Rerouter()]
l.boot_into(Rerouter())

View File

@ -3,5 +3,4 @@ class AddHeader:
flow.response.headers["newheader"] = "foo" flow.response.headers["newheader"] = "foo"
def load(l): addons = [AddHeader()]
return l.boot_into(AddHeader())

View File

@ -17,7 +17,4 @@ class Filter:
print(flow) print(flow)
def load(l): addons = [Filter(sys.argv[1])]
if len(sys.argv) != 2:
raise ValueError("Usage: -s 'filt.py FILTER'")
l.boot_into(Filter(sys.argv[1]))

View File

@ -23,7 +23,4 @@ class Writer:
self.w.add(flow) self.w.add(flow)
def load(l): addons = [Writer(sys.argv[1])]
if len(sys.argv) != 2:
raise ValueError('Usage: -s "flowriter.py filename"')
l.boot_into(Writer(sys.argv[1]))

View File

@ -1,29 +1,31 @@
# Usage: mitmdump -s "iframe_injector.py url"
# (this script works best with --anticache) # (this script works best with --anticache)
import sys
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
class Injector: class Injector:
def __init__(self, iframe_url): def __init__(self):
self.iframe_url = iframe_url self.iframe_url = None
def load(self, loader):
loader.add_option(
"iframe", str, "", "IFrame to inject"
)
def configure(self, options, updated):
self.iframe_url = options.iframe
def response(self, flow): def response(self, flow):
if flow.request.host in self.iframe_url: if self.iframe_url:
return html = BeautifulSoup(flow.response.content, "html.parser")
html = BeautifulSoup(flow.response.content, "html.parser") if html.body:
if html.body: iframe = html.new_tag(
iframe = html.new_tag( "iframe",
"iframe", src=self.iframe_url,
src=self.iframe_url, frameborder=0,
frameborder=0, height=0,
height=0, width=0)
width=0) html.body.insert(0, iframe)
html.body.insert(0, iframe) flow.response.content = str(html).encode("utf8")
flow.response.content = str(html).encode("utf8")
def load(l): addons = [Injector()]
if len(sys.argv) != 2:
raise ValueError('Usage: -s "iframe_injector.py url"')
return l.boot_into(Injector(sys.argv[1]))

View File

@ -1,17 +0,0 @@
import argparse
class Replacer:
def __init__(self, src, dst):
self.src, self.dst = src, dst
def response(self, flow):
flow.response.replace(self.src, self.dst)
def load(l):
parser = argparse.ArgumentParser()
parser.add_argument("src", type=str)
parser.add_argument("dst", type=str)
args = parser.parse_args()
l.boot_into(Replacer(args.src, args.dst))

View File

@ -1,4 +1,7 @@
import typing import typing
import traceback
import contextlib
import sys
from mitmproxy import exceptions from mitmproxy import exceptions
from mitmproxy import eventsequence from mitmproxy import eventsequence
@ -11,13 +14,66 @@ def _get_name(itm):
return getattr(itm, "name", itm.__class__.__name__.lower()) return getattr(itm, "name", itm.__class__.__name__.lower())
def cut_traceback(tb, func_name):
"""
Cut off a traceback at the function with the given name.
The func_name's frame is excluded.
Args:
tb: traceback object, as returned by sys.exc_info()[2]
func_name: function name
Returns:
Reduced traceback.
"""
tb_orig = tb
for _, _, fname, _ in traceback.extract_tb(tb):
tb = tb.tb_next
if fname == func_name:
break
return tb or tb_orig
class StreamLog:
"""
A class for redirecting output using contextlib.
"""
def __init__(self, log):
self.log = log
def write(self, buf):
if buf.strip():
self.log(buf)
def flush(self): # pragma: no cover
# Click uses flush sometimes, so we dummy it up
pass
@contextlib.contextmanager
def safecall():
stdout_replacement = StreamLog(ctx.log.warn)
try:
with contextlib.redirect_stdout(stdout_replacement):
yield
except exceptions.AddonHalt:
raise
except Exception as e:
etype, value, tb = sys.exc_info()
tb = cut_traceback(tb, "invoke_addon").tb_next
ctx.log.error(
"Addon error: %s" % "".join(
traceback.format_exception(etype, value, tb)
)
)
class Loader: class Loader:
""" """
A loader object is passed to the load() event when addons start up. A loader object is passed to the load() event when addons start up.
""" """
def __init__(self, master): def __init__(self, master):
self.master = master self.master = master
self.boot_into_addon = None
def add_option( def add_option(
self, self,
@ -35,25 +91,33 @@ class Loader:
choices choices
) )
def boot_into(self, addon):
self.boot_into_addon = addon def traverse(chain):
func = getattr(addon, "load", None) """
if func: Recursively traverse an addon chain.
func(self) """
for a in chain:
yield a
if hasattr(a, "addons"):
yield from traverse(a.addons)
class AddonManager: class AddonManager:
def __init__(self, master): def __init__(self, master):
self.lookup = {}
self.chain = [] self.chain = []
self.master = master self.master = master
master.options.changed.connect(self.configure_all) master.options.changed.connect(self._configure_all)
def _configure_all(self, options, updated):
self.trigger("configure", options, updated)
def clear(self): def clear(self):
""" """
Remove all addons. Remove all addons.
""" """
self.done() for i in self.chain:
self.chain = [] self.remove(i)
def get(self, name): def get(self, name):
""" """
@ -61,36 +125,52 @@ class AddonManager:
attribute on the instance, or the lower case class name if that attribute on the instance, or the lower case class name if that
does not exist. does not exist.
""" """
for i in self.chain: return self.lookup.get(name, None)
if name == _get_name(i):
return i
def configure_all(self, options, updated): def register(self, addon):
self.trigger("configure", options, updated) """
Register an addon and all its sub-addons with the manager without
adding it to the chain. This should be used by addons that
dynamically manage addons. Must be called within a current context.
"""
for a in traverse([addon]):
name = _get_name(a)
if name in self.lookup:
raise exceptions.AddonError(
"An addon called '%s' already exists." % name
)
l = Loader(self.master)
self.invoke_addon(addon, "load", l)
for a in traverse([addon]):
name = _get_name(a)
self.lookup[name] = a
return addon
def add(self, *addons): def add(self, *addons):
""" """
Add addons to the end of the chain, and run their startup events. Add addons to the end of the chain, and run their load event.
If any addon has sub-addons, they are registered.
""" """
with self.master.handlecontext(): with self.master.handlecontext():
for i in addons: for i in addons:
l = Loader(self.master) self.chain.append(self.register(i))
self.invoke_addon(i, "load", l)
if l.boot_into_addon:
self.chain.append(l.boot_into_addon)
else:
self.chain.append(i)
def remove(self, addon): def remove(self, addon):
""" """
Remove an addon from the chain, and run its done events. Remove an addon and all its sub-addons.
"""
self.chain = [i for i in self.chain if i is not addon]
with self.master.handlecontext():
self.invoke_addon(addon, "done")
def done(self): If the addon is not in the chain - that is, if it's managed by a
self.trigger("done") parent addon - it's the parent's responsibility to remove it from
its own addons attribute.
"""
for a in traverse([addon]):
n = _get_name(a)
if n not in self.lookup:
raise exceptions.AddonError("No such addon: %s" % n)
self.chain = [i for i in self.chain if i is not a]
del self.lookup[_get_name(a)]
with self.master.handlecontext():
self.invoke_addon(a, "done")
def __len__(self): def __len__(self):
return len(self.chain) return len(self.chain)
@ -126,22 +206,19 @@ class AddonManager:
def invoke_addon(self, addon, name, *args, **kwargs): def invoke_addon(self, addon, name, *args, **kwargs):
""" """
Invoke an event on an addon. This method must run within an Invoke an event on an addon and all its children. This method must
established handler context. run within an established handler context.
""" """
if not ctx.master:
raise exceptions.AddonError(
"invoke_addon called without a handler context."
)
if name not in eventsequence.Events: if name not in eventsequence.Events:
name = "event_" + name name = "event_" + name
func = getattr(addon, name, None) for a in traverse([addon]):
if func: func = getattr(a, name, None)
if not callable(func): if func:
raise exceptions.AddonError( if not callable(func):
"Addon handler %s not callable" % name raise exceptions.AddonError(
) "Addon handler %s not callable" % name
func(*args, **kwargs) )
func(*args, **kwargs)
def trigger(self, name, *args, **kwargs): def trigger(self, name, *args, **kwargs):
""" """
@ -150,6 +227,7 @@ class AddonManager:
with self.master.handlecontext(): with self.master.handlecontext():
for i in self.chain: for i in self.chain:
try: try:
self.invoke_addon(i, name, *args, **kwargs) with safecall():
self.invoke_addon(i, name, *args, **kwargs)
except exceptions.AddonHalt: except exceptions.AddonHalt:
return return

View File

@ -3,6 +3,8 @@ from mitmproxy.addons.onboardingapp import app
class Onboarding(wsgiapp.WSGIApp): class Onboarding(wsgiapp.WSGIApp):
name = "onboarding"
def __init__(self): def __init__(self):
super().__init__(app.Adapter(app.application), None, None) super().__init__(app.Adapter(app.application), None, None)
self.enabled = False self.enabled = False

View File

@ -1,123 +1,31 @@
import contextlib
import os import os
import shlex import importlib
import sys
import threading import threading
import traceback import sys
import types
from mitmproxy import addonmanager from mitmproxy import addonmanager
from mitmproxy import exceptions from mitmproxy import exceptions
from mitmproxy import ctx from mitmproxy import ctx
from mitmproxy import eventsequence
import watchdog.events import watchdog.events
from watchdog.observers import polling from watchdog.observers import polling
def parse_command(command): def load_script(actx, path):
""" if not os.path.exists(path):
Returns a (path, args) tuple. ctx.log.info("No such file: %s" % path)
""" return
if not command or not command.strip(): loader = importlib.machinery.SourceFileLoader(os.path.basename(path), path)
raise ValueError("Empty script command.")
# Windows: escape all backslashes in the path.
if os.name == "nt": # pragma: no cover
backslashes = shlex.split(command, posix=False)[0].count("\\")
command = command.replace("\\", "\\\\", backslashes)
args = shlex.split(command) # pragma: no cover
args[0] = os.path.expanduser(args[0])
if not os.path.exists(args[0]):
raise ValueError(
("Script file not found: %s.\r\n"
"If your script path contains spaces, "
"make sure to wrap it in additional quotes, e.g. -s \"'./foo bar/baz.py' --args\".") %
args[0])
elif os.path.isdir(args[0]):
raise ValueError("Not a file: %s" % args[0])
return args[0], args[1:]
def cut_traceback(tb, func_name):
"""
Cut off a traceback at the function with the given name.
The func_name's frame is excluded.
Args:
tb: traceback object, as returned by sys.exc_info()[2]
func_name: function name
Returns:
Reduced traceback.
"""
tb_orig = tb
for _, _, fname, _ in traceback.extract_tb(tb):
tb = tb.tb_next
if fname == func_name:
break
if tb is None:
# We could not find the method, take the full stack trace.
# This may happen on some Python interpreters/flavors (e.g. PyInstaller).
return tb_orig
else:
return tb
class StreamLog:
"""
A class for redirecting output using contextlib.
"""
def __init__(self, log):
self.log = log
def write(self, buf):
if buf.strip():
self.log(buf)
@contextlib.contextmanager
def scriptenv(path, args):
oldargs = sys.argv
sys.argv = [path] + args
script_dir = os.path.dirname(os.path.abspath(path))
sys.path.append(script_dir)
stdout_replacement = StreamLog(ctx.log.warn)
try: try:
with contextlib.redirect_stdout(stdout_replacement): oldpath = sys.path
yield sys.path.insert(0, os.path.dirname(path))
except SystemExit as v: with addonmanager.safecall():
ctx.log.error("Script exited with code %s" % v.code) m = loader.load_module()
except Exception: if not getattr(m, "name", None):
etype, value, tb = sys.exc_info() m.name = path
tb = cut_traceback(tb, "scriptenv").tb_next return m
ctx.log.error(
"Script error: %s" % "".join(
traceback.format_exception(etype, value, tb)
)
)
finally: finally:
sys.argv = oldargs sys.path[:] = oldpath
sys.path.pop()
def load_script(path, args):
with open(path, "rb") as f:
try:
code = compile(f.read(), path, 'exec')
except SyntaxError as e:
ctx.log.error(
"Script error: %s line %s: %s" % (
e.filename, e.lineno, e.msg
)
)
return
ns = {'__file__': os.path.abspath(path)}
with scriptenv(path, args):
exec(code, ns)
return types.SimpleNamespace(**ns)
class ReloadHandler(watchdog.events.FileSystemEventHandler): class ReloadHandler(watchdog.events.FileSystemEventHandler):
@ -149,59 +57,39 @@ class Script:
""" """
An addon that manages a single script. An addon that manages a single script.
""" """
def __init__(self, command): def __init__(self, path):
self.name = command self.name = "scriptmanager:" + path
self.path = path
self.command = command
self.path, self.args = parse_command(command)
self.ns = None self.ns = None
self.observer = None self.observer = None
self.dead = False
self.last_options = None self.last_options = None
self.should_reload = threading.Event() self.should_reload = threading.Event()
for i in eventsequence.Events: def load(self, l):
if not hasattr(self, i): self.ns = load_script(ctx, self.path)
def mkprox():
evt = i
def prox(*args, **kwargs): @property
self.run(evt, *args, **kwargs) def addons(self):
return prox if self.ns is not None:
setattr(self, i, mkprox()) return [self.ns]
return []
def run(self, name, *args, **kwargs):
# It's possible for ns to be un-initialised if we failed during
# configure
if self.ns is not None and not self.dead:
func = getattr(self.ns, name, None)
if func:
with scriptenv(self.path, self.args):
return func(*args, **kwargs)
def reload(self): def reload(self):
self.should_reload.set() self.should_reload.set()
def load_script(self):
self.ns = load_script(self.path, self.args)
l = addonmanager.Loader(ctx.master)
self.run("load", l)
if l.boot_into_addon:
self.ns = l.boot_into_addon
def tick(self): def tick(self):
if self.should_reload.is_set(): if self.should_reload.is_set():
self.should_reload.clear() self.should_reload.clear()
ctx.log.info("Reloading script: %s" % self.name) ctx.log.info("Reloading script: %s" % self.name)
self.ns = load_script(self.path, self.args) if self.ns:
self.configure(self.last_options, self.last_options.keys()) ctx.master.addons.remove(self.ns)
else: self.ns = load_script(ctx, self.path)
self.run("tick") if self.ns:
# We're already running, so we have to explicitly register and
def load(self, l): # configure the addon
self.last_options = ctx.master.options ctx.master.addons.register(self.ns)
self.load_script() self.configure(self.last_options, self.last_options.keys())
def configure(self, options, updated): def configure(self, options, updated):
self.last_options = options self.last_options = options
@ -213,11 +101,6 @@ class Script:
os.path.dirname(self.path) or "." os.path.dirname(self.path) or "."
) )
self.observer.start() self.observer.start()
self.run("configure", options, updated)
def done(self):
self.run("done")
self.dead = True
class ScriptLoader: class ScriptLoader:
@ -226,21 +109,14 @@ class ScriptLoader:
""" """
def __init__(self): def __init__(self):
self.is_running = False self.is_running = False
self.addons = []
def running(self): def running(self):
self.is_running = True self.is_running = True
def run_once(self, command, flows): def run_once(self, command, flows):
try: # Returning once we have proper commands
sc = Script(command) raise NotImplementedError
except ValueError as e:
raise ValueError(str(e))
sc.load_script()
for f in flows:
for evt, o in eventsequence.iterate(f):
sc.run(evt, o)
sc.done()
return sc
def configure(self, options, updated): def configure(self, options, updated):
if "scripts" in updated: if "scripts" in updated:
@ -248,25 +124,21 @@ class ScriptLoader:
if options.scripts.count(s) > 1: if options.scripts.count(s) > 1:
raise exceptions.OptionsError("Duplicate script: %s" % s) raise exceptions.OptionsError("Duplicate script: %s" % s)
for a in ctx.master.addons.chain[:]: for a in self.addons[:]:
if isinstance(a, Script) and a.name not in options.scripts: if a.path not in options.scripts:
ctx.log.info("Un-loading script: %s" % a.name) ctx.log.info("Un-loading script: %s" % a.name)
ctx.master.addons.remove(a) ctx.master.addons.remove(a)
self.addons.remove(a)
# The machinations below are to ensure that: # The machinations below are to ensure that:
# - Scripts remain in the same order # - Scripts remain in the same order
# - Scripts are listed directly after the script addon. This is
# needed to ensure that interactions with, for instance, flow
# serialization remains correct.
# - Scripts are not initialized un-necessarily. If only a # - Scripts are not initialized un-necessarily. If only a
# script's order in the script list has changed, it should simply # script's order in the script list has changed, it is just
# be moved. # moved.
current = {} current = {}
for a in ctx.master.addons.chain[:]: for a in self.addons:
if isinstance(a, Script): current[a.path] = a
current[a.name] = a
ctx.master.addons.chain.remove(a)
ordered = [] ordered = []
newscripts = [] newscripts = []
@ -275,24 +147,15 @@ class ScriptLoader:
ordered.append(current[s]) ordered.append(current[s])
else: else:
ctx.log.info("Loading script: %s" % s) ctx.log.info("Loading script: %s" % s)
try: sc = Script(s)
sc = Script(s)
except ValueError as e:
raise exceptions.OptionsError(str(e))
ordered.append(sc) ordered.append(sc)
newscripts.append(sc) newscripts.append(sc)
ochain = ctx.master.addons.chain self.addons = ordered
pos = ochain.index(self)
ctx.master.addons.chain = ochain[:pos + 1] + ordered + ochain[pos + 1:]
for s in newscripts: for s in newscripts:
l = addonmanager.Loader(ctx.master) ctx.master.addons.register(s)
ctx.master.addons.invoke_addon(s, "load", l)
if self.is_running: if self.is_running:
# If we're already running, we configure and tell the addon # If we're already running, we configure and tell the addon
# we're up and running. # we're up and running.
ctx.master.addons.invoke_addon(
s, "configure", options, options.keys()
)
ctx.master.addons.invoke_addon(s, "running") ctx.master.addons.invoke_addon(s, "running")

View File

@ -3,6 +3,11 @@ import click
from mitmproxy import log from mitmproxy import log
# These get over-ridden by the save execution context. Keep them around so we
# can log directly.
realstdout = sys.stdout
realstderr = sys.stderr
class TermLog: class TermLog:
def __init__(self, outfile=None): def __init__(self, outfile=None):
@ -14,9 +19,9 @@ class TermLog:
def log(self, e): def log(self, e):
if log.log_tier(e.level) == log.log_tier("error"): if log.log_tier(e.level) == log.log_tier("error"):
outfile = self.outfile or sys.stderr outfile = self.outfile or realstderr
else: else:
outfile = self.outfile or sys.stdout outfile = self.outfile or realstdout
if self.options.verbosity >= log.log_tier(e.level): if self.options.verbosity >= log.log_tier(e.level):
click.secho( click.secho(

View File

@ -13,6 +13,10 @@ class WSGIApp:
def __init__(self, app, host, port): def __init__(self, app, host, port):
self.app, self.host, self.port = app, host, port self.app, self.host, self.port = app, host, port
@property
def name(self):
return "wsgiapp:%s:%s" % (self.host, self.port)
def serve(self, app, flow): def serve(self, app, flow):
""" """
Serves app on flow, and prevents further handling of the flow. Serves app on flow, and prevents further handling of the flow.

View File

@ -103,7 +103,7 @@ class Master:
def shutdown(self): def shutdown(self):
self.server.shutdown() self.server.shutdown()
self.should_exit.set() self.should_exit.set()
self.addons.done() self.addons.trigger("done")
def create_request(self, method, url): def create_request(self, method, url):
""" """

View File

@ -1,3 +1,4 @@
import sys
import contextlib import contextlib
import mitmproxy.master import mitmproxy.master
@ -5,6 +6,7 @@ import mitmproxy.options
from mitmproxy import proxy from mitmproxy import proxy
from mitmproxy import addonmanager from mitmproxy import addonmanager
from mitmproxy import eventsequence from mitmproxy import eventsequence
from mitmproxy.addons import script
class TestAddons(addonmanager.AddonManager): class TestAddons(addonmanager.AddonManager):
@ -26,6 +28,10 @@ class RecordingMaster(mitmproxy.master.Master):
self.events = [] self.events = []
self.logs = [] self.logs = []
def dump_log(self, outf=sys.stdout):
for i in self.logs:
print("%s: %s" % (i.level, i.msg), file=outf)
def has_log(self, txt, level=None): def has_log(self, txt, level=None):
for i in self.logs: for i in self.logs:
if level and i.level != level: if level and i.level != level:
@ -51,14 +57,21 @@ class context:
provides a number of helper methods for common testing scenarios. provides a number of helper methods for common testing scenarios.
""" """
def __init__(self, master = None, options = None): def __init__(self, master = None, options = None):
self.options = options or mitmproxy.options.Options() options = options or mitmproxy.options.Options()
self.master = master or RecordingMaster( self.master = master or RecordingMaster(
options, proxy.DummyServer(options) options, proxy.DummyServer(options)
) )
self.options = self.master.options
self.wrapped = None self.wrapped = None
def ctx(self):
"""
Returns a new handler context.
"""
return self.master.handlecontext()
def __enter__(self): def __enter__(self):
self.wrapped = self.master.handlecontext() self.wrapped = self.ctx()
self.wrapped.__enter__() self.wrapped.__enter__()
return self return self
@ -75,11 +88,13 @@ class context:
""" """
f.reply._state = "start" f.reply._state = "start"
for evt, arg in eventsequence.iterate(f): for evt, arg in eventsequence.iterate(f):
h = getattr(addon, evt, None) self.master.addons.invoke_addon(
if h: addon,
h(arg) evt,
if f.reply.state == "taken": arg
return )
if f.reply.state == "taken":
return
def configure(self, addon, **kwargs): def configure(self, addon, **kwargs):
""" """
@ -89,4 +104,17 @@ class context:
""" """
with self.options.rollback(kwargs.keys(), reraise=True): with self.options.rollback(kwargs.keys(), reraise=True):
self.options.update(**kwargs) self.options.update(**kwargs)
addon.configure(self.options, kwargs.keys()) self.master.addons.invoke_addon(
addon,
"configure",
self.options,
kwargs.keys()
)
def script(self, path):
sc = script.Script(path)
loader = addonmanager.Loader(self.master)
sc.load(loader)
for a in addonmanager.traverse(sc.addons):
getattr(a, "load", lambda x: None)(loader)
return sc

View File

@ -76,7 +76,7 @@ def run(MasterKlass, args, extra=None): # pragma: no cover
unknown = optmanager.load_paths(opts, args.conf) unknown = optmanager.load_paths(opts, args.conf)
server = process_options(parser, opts, args) server = process_options(parser, opts, args)
master = MasterKlass(opts, server) master = MasterKlass(opts, server)
master.addons.configure_all(opts, opts.keys()) master.addons.trigger("configure", opts, opts.keys())
remaining = opts.update_known(**unknown) remaining = opts.update_known(**unknown)
if remaining and opts.verbosity > 1: if remaining and opts.verbosity > 1:
print("Ignored options: %s" % remaining) print("Ignored options: %s" % remaining)

View File

@ -1,5 +1,3 @@
import pytest
from mitmproxy import options from mitmproxy import options
from mitmproxy import contentviews from mitmproxy import contentviews
from mitmproxy import proxy from mitmproxy import proxy
@ -8,6 +6,7 @@ from mitmproxy.addons import script
from mitmproxy.test import tflow from mitmproxy.test import tflow
from mitmproxy.test import tutils from mitmproxy.test import tutils
from mitmproxy.test import taddons
from mitmproxy.net.http import Headers from mitmproxy.net.http import Headers
from ..mitmproxy import tservers from ..mitmproxy import tservers
@ -27,7 +26,7 @@ class RaiseMaster(master.Master):
def tscript(cmd, args=""): def tscript(cmd, args=""):
o = options.Options() o = options.Options()
cmd = example_dir.path(cmd) + " " + args cmd = example_dir.path(cmd)
m = RaiseMaster(o, proxy.DummyServer()) m = RaiseMaster(o, proxy.DummyServer())
sc = script.Script(cmd) sc = script.Script(cmd)
m.addons.add(sc) m.addons.add(sc)
@ -48,14 +47,18 @@ class TestScripts(tservers.MasterTest):
assert any(b'tEST!' in val[0][1] for val in fmt) assert any(b'tEST!' in val[0][1] for val in fmt)
def test_iframe_injector(self): def test_iframe_injector(self):
with pytest.raises(ScriptError): with taddons.context() as tctx:
tscript("simple/modify_body_inject_iframe.py") sc = tctx.script(example_dir.path("simple/modify_body_inject_iframe.py"))
tctx.configure(
m, sc = tscript("simple/modify_body_inject_iframe.py", "http://example.org/evil_iframe") sc,
f = tflow.tflow(resp=tutils.tresp(content=b"<html><body>mitmproxy</body></html>")) iframe = "http://example.org/evil_iframe"
m.addons.handle_lifecycle("response", f) )
content = f.response.content f = tflow.tflow(
assert b'iframe' in content and b'evil_iframe' in content resp=tutils.tresp(content=b"<html><body>mitmproxy</body></html>")
)
tctx.master.addons.invoke_addon(sc, "response", f)
content = f.response.content
assert b'iframe' in content and b'evil_iframe' in content
def test_modify_form(self): def test_modify_form(self):
m, sc = tscript("simple/modify_form.py") m, sc = tscript("simple/modify_form.py")
@ -81,12 +84,6 @@ class TestScripts(tservers.MasterTest):
m.addons.handle_lifecycle("request", f) m.addons.handle_lifecycle("request", f)
assert f.request.query["mitmproxy"] == "rocks" assert f.request.query["mitmproxy"] == "rocks"
def test_arguments(self):
m, sc = tscript("simple/script_arguments.py", "mitmproxy rocks")
f = tflow.tflow(resp=tutils.tresp(content=b"I <3 mitmproxy"))
m.addons.handle_lifecycle("response", f)
assert f.response.content == b"I <3 rocks"
def test_redirect_requests(self): def test_redirect_requests(self):
m, sc = tscript("simple/redirect_requests.py") m, sc = tscript("simple/redirect_requests.py")
f = tflow.tflow(req=tutils.treq(host="example.org")) f = tflow.tflow(req=tutils.treq(host="example.org"))

View File

@ -11,7 +11,7 @@ def test_simple():
with pytest.raises(exceptions.OptionsError): with pytest.raises(exceptions.OptionsError):
tctx.configure(sa, body_size_limit = "invalid") tctx.configure(sa, body_size_limit = "invalid")
tctx.configure(sa, body_size_limit = "1m") tctx.configure(sa, body_size_limit = "1m")
assert tctx.options._processed["body_size_limit"] assert tctx.master.options._processed["body_size_limit"]
with pytest.raises(exceptions.OptionsError, match="mutually exclusive"): with pytest.raises(exceptions.OptionsError, match="mutually exclusive"):
tctx.configure( tctx.configure(

View File

@ -1,7 +1,6 @@
import traceback import traceback
import sys import sys
import time import time
import re
import watchdog.events import watchdog.events
import pytest import pytest
@ -14,23 +13,8 @@ from mitmproxy import exceptions
from mitmproxy import options from mitmproxy import options
from mitmproxy import proxy from mitmproxy import proxy
from mitmproxy import master from mitmproxy import master
from mitmproxy import utils
from mitmproxy.addons import script from mitmproxy.addons import script
from ...conftest import skip_not_windows
def test_scriptenv():
with taddons.context() as tctx:
with script.scriptenv("path", []):
raise SystemExit
assert tctx.master.has_log("exited", "error")
tctx.master.clear()
with script.scriptenv("path", []):
raise ValueError("fooo")
assert tctx.master.has_log("fooo", "error")
class Called: class Called:
def __init__(self): def __init__(self):
@ -60,113 +44,86 @@ def test_reloadhandler():
assert rh.callback.called assert rh.callback.called
class TestParseCommand:
def test_empty_command(self):
with pytest.raises(ValueError):
script.parse_command("")
with pytest.raises(ValueError):
script.parse_command(" ")
def test_no_script_file(self, tmpdir):
with pytest.raises(Exception, match="not found"):
script.parse_command("notfound")
with pytest.raises(Exception, match="Not a file"):
script.parse_command(str(tmpdir))
def test_parse_args(self):
with utils.chdir(tutils.test_data.dirname):
assert script.parse_command(
"mitmproxy/data/addonscripts/recorder.py"
) == ("mitmproxy/data/addonscripts/recorder.py", [])
assert script.parse_command(
"mitmproxy/data/addonscripts/recorder.py foo bar"
) == ("mitmproxy/data/addonscripts/recorder.py", ["foo", "bar"])
assert script.parse_command(
"mitmproxy/data/addonscripts/recorder.py 'foo bar'"
) == ("mitmproxy/data/addonscripts/recorder.py", ["foo bar"])
@skip_not_windows
def test_parse_windows(self):
with utils.chdir(tutils.test_data.dirname):
assert script.parse_command(
"mitmproxy/data\\addonscripts\\recorder.py"
) == ("mitmproxy/data\\addonscripts\\recorder.py", [])
assert script.parse_command(
"mitmproxy/data\\addonscripts\\recorder.py 'foo \\ bar'"
) == ("mitmproxy/data\\addonscripts\\recorder.py", ['foo \\ bar'])
def test_load_script(): def test_load_script():
with taddons.context(): with taddons.context() as tctx:
ns = script.load_script( ns = script.load_script(
tctx.ctx(),
tutils.test_data.path( tutils.test_data.path(
"mitmproxy/data/addonscripts/recorder.py" "mitmproxy/data/addonscripts/recorder/recorder.py"
), [] )
) )
assert ns.load assert ns.addons
ns = script.load_script(
tctx.ctx(),
"nonexistent"
)
assert not ns
def test_script_print_stdout(): def test_script_print_stdout():
with taddons.context() as tctx: with taddons.context() as tctx:
with mock.patch('mitmproxy.ctx.log.warn') as mock_warn: with mock.patch('mitmproxy.ctx.log.warn') as mock_warn:
with script.scriptenv("path", []): with addonmanager.safecall():
ns = script.load_script( ns = script.load_script(
tctx.ctx(),
tutils.test_data.path( tutils.test_data.path(
"mitmproxy/data/addonscripts/print.py" "mitmproxy/data/addonscripts/print.py"
), [] )
) )
ns.load(addonmanager.Loader(tctx.master)) ns.load(addonmanager.Loader(tctx.master))
mock_warn.assert_called_once_with("stdoutprint") mock_warn.assert_called_once_with("stdoutprint")
class TestScript: class TestScript:
def test_notfound(self):
with taddons.context() as tctx:
sc = script.Script("nonexistent")
tctx.master.addons.add(sc)
def test_simple(self): def test_simple(self):
with taddons.context(): with taddons.context() as tctx:
sc = script.Script( sc = script.Script(
tutils.test_data.path( tutils.test_data.path(
"mitmproxy/data/addonscripts/recorder.py" "mitmproxy/data/addonscripts/recorder/recorder.py"
) )
) )
sc.load_script() tctx.master.addons.add(sc)
assert sc.ns.call_log[0][0:2] == ("solo", "load")
sc.ns.call_log = [] rec = tctx.master.addons.get("recorder")
assert rec.call_log[0][0:2] == ("recorder", "load")
rec.call_log = []
f = tflow.tflow(resp=True) f = tflow.tflow(resp=True)
sc.request(f) tctx.master.addons.trigger("request", f)
recf = sc.ns.call_log[0] assert rec.call_log[0][1] == "request"
assert recf[1] == "request"
def test_reload(self, tmpdir): def test_reload(self, tmpdir):
with taddons.context() as tctx: with taddons.context() as tctx:
f = tmpdir.join("foo.py") f = tmpdir.join("foo.py")
f.ensure(file=True) f.ensure(file=True)
f.write("\n")
sc = script.Script(str(f)) sc = script.Script(str(f))
tctx.configure(sc) tctx.configure(sc)
for _ in range(100): for _ in range(5):
f.write(".") sc.reload()
sc.tick() sc.tick()
time.sleep(0.1) time.sleep(0.1)
if tctx.master.logs:
return
raise AssertionError("Change event not detected.")
def test_exception(self): def test_exception(self):
with taddons.context() as tctx: with taddons.context() as tctx:
sc = script.Script( sc = script.Script(
tutils.test_data.path("mitmproxy/data/addonscripts/error.py") tutils.test_data.path("mitmproxy/data/addonscripts/error.py")
) )
l = addonmanager.Loader(tctx.master) tctx.master.addons.add(sc)
sc.load(l)
f = tflow.tflow(resp=True) f = tflow.tflow(resp=True)
sc.request(f) tctx.master.addons.trigger("request", f)
assert tctx.master.logs[0].level == "error" assert tctx.master.logs[0].level == "error"
assert len(tctx.master.logs[0].msg.splitlines()) == 6 tctx.master.has_log("ValueError: Error!")
assert re.search(r'addonscripts[\\/]error.py", line \d+, in request', tctx.master.logs[0].msg) tctx.master.has_log("error.py")
assert re.search(r'addonscripts[\\/]error.py", line \d+, in mkerr', tctx.master.logs[0].msg)
assert tctx.master.logs[0].msg.endswith("ValueError: Error!\n")
def test_addon(self): def test_addon(self):
with taddons.context() as tctx: with taddons.context() as tctx:
@ -175,11 +132,9 @@ class TestScript:
"mitmproxy/data/addonscripts/addon.py" "mitmproxy/data/addonscripts/addon.py"
) )
) )
l = addonmanager.Loader(tctx.master) tctx.master.addons.add(sc)
sc.load(l)
tctx.configure(sc)
assert sc.ns.event_log == [ assert sc.ns.event_log == [
'scriptload', 'addonload', 'addonconfigure' 'scriptload', 'addonload'
] ]
@ -194,49 +149,33 @@ class TestCutTraceback:
self.raise_(4) self.raise_(4)
except RuntimeError: except RuntimeError:
tb = sys.exc_info()[2] tb = sys.exc_info()[2]
tb_cut = script.cut_traceback(tb, "test_simple") tb_cut = addonmanager.cut_traceback(tb, "test_simple")
assert len(traceback.extract_tb(tb_cut)) == 5 assert len(traceback.extract_tb(tb_cut)) == 5
tb_cut2 = script.cut_traceback(tb, "nonexistent") tb_cut2 = addonmanager.cut_traceback(tb, "nonexistent")
assert len(traceback.extract_tb(tb_cut2)) == len(traceback.extract_tb(tb)) assert len(traceback.extract_tb(tb_cut2)) == len(traceback.extract_tb(tb))
class TestScriptLoader: class TestScriptLoader:
def test_run_once(self):
o = options.Options(scripts=[])
m = master.Master(o, proxy.DummyServer())
sl = script.ScriptLoader()
m.addons.add(sl)
f = tflow.tflow(resp=True)
with m.handlecontext():
sc = sl.run_once(
tutils.test_data.path(
"mitmproxy/data/addonscripts/recorder.py"
), [f]
)
evts = [i[1] for i in sc.ns.call_log]
assert evts == ['load', 'requestheaders', 'request', 'responseheaders', 'response', 'done']
f = tflow.tflow(resp=True)
with m.handlecontext():
with pytest.raises(Exception, match="file not found"):
sl.run_once("nonexistent", [f])
def test_simple(self): def test_simple(self):
o = options.Options(scripts=[]) o = options.Options(scripts=[])
m = master.Master(o, proxy.DummyServer()) m = master.Master(o, proxy.DummyServer())
sc = script.ScriptLoader() sc = script.ScriptLoader()
sc.running()
m.addons.add(sc) m.addons.add(sc)
assert len(m.addons) == 1 assert len(m.addons) == 1
o.update( o.update(
scripts = [ scripts = [
tutils.test_data.path("mitmproxy/data/addonscripts/recorder.py") tutils.test_data.path(
"mitmproxy/data/addonscripts/recorder/recorder.py"
)
] ]
) )
assert len(m.addons) == 2 assert len(m.addons) == 1
assert len(sc.addons) == 1
o.update(scripts = []) o.update(scripts = [])
assert len(m.addons) == 1 assert len(m.addons) == 1
assert len(sc.addons) == 0
def test_dupes(self): def test_dupes(self):
sc = script.ScriptLoader() sc = script.ScriptLoader()
@ -252,65 +191,70 @@ class TestScriptLoader:
sc = script.ScriptLoader() sc = script.ScriptLoader()
with taddons.context() as tctx: with taddons.context() as tctx:
tctx.master.addons.add(sc) tctx.master.addons.add(sc)
with pytest.raises(exceptions.OptionsError): tctx.configure(sc, scripts = ["nonexistent"])
tctx.configure( tctx.master.has_log("nonexistent: file not found")
sc,
scripts = ["nonexistent"]
)
def test_order(self): def test_order(self):
rec = tutils.test_data.path("mitmproxy/data/addonscripts/recorder.py") rec = tutils.test_data.path("mitmproxy/data/addonscripts/recorder")
sc = script.ScriptLoader() sc = script.ScriptLoader()
sc.is_running = True
with taddons.context() as tctx: with taddons.context() as tctx:
tctx.master.addons.add(sc)
sc.running()
tctx.configure( tctx.configure(
sc, sc,
scripts = [ scripts = [
"%s %s" % (rec, "a"), "%s/a.py" % rec,
"%s %s" % (rec, "b"), "%s/b.py" % rec,
"%s %s" % (rec, "c"), "%s/c.py" % rec,
]
)
debug = [i.msg for i in tctx.master.logs if i.level == "debug"]
assert debug == [
'a load',
'a running',
'b load',
'b running',
'c load',
'c running',
'a configure',
'b configure',
'c configure',
]
tctx.master.logs = []
tctx.configure(
sc,
scripts = [
"%s/c.py" % rec,
"%s/a.py" % rec,
"%s/b.py" % rec,
] ]
) )
debug = [i.msg for i in tctx.master.logs if i.level == "debug"] debug = [i.msg for i in tctx.master.logs if i.level == "debug"]
assert debug == [ assert debug == [
'a load',
'a configure',
'a running',
'b load',
'b configure',
'b running',
'c load',
'c configure', 'c configure',
'c running', 'a configure',
'b configure',
] ]
tctx.master.logs = []
tctx.configure(
sc,
scripts = [
"%s %s" % (rec, "c"),
"%s %s" % (rec, "a"),
"%s %s" % (rec, "b"),
]
)
debug = [i.msg for i in tctx.master.logs if i.level == "debug"]
assert debug == []
tctx.master.logs = [] tctx.master.logs = []
tctx.configure( tctx.configure(
sc, sc,
scripts = [ scripts = [
"%s %s" % (rec, "x"), "%s/e.py" % rec,
"%s %s" % (rec, "a"), "%s/a.py" % rec,
] ]
) )
debug = [i.msg for i in tctx.master.logs if i.level == "debug"] debug = [i.msg for i in tctx.master.logs if i.level == "debug"]
assert debug == [ assert debug == [
'c done', 'c done',
'b done', 'b done',
'x load', 'e load',
'x configure', 'e running',
'x running', 'e configure',
'a configure',
] ]

View File

@ -19,4 +19,6 @@ def configure(options, updated):
def load(l): def load(l):
event_log.append("scriptload") event_log.append("scriptload")
l.boot_into(Addon())
addons = [Addon()]

View File

@ -1,4 +1,5 @@
import time import time
import sys
from mitmproxy.script import concurrent from mitmproxy.script import concurrent

View File

@ -9,5 +9,4 @@ class ConcurrentClass:
time.sleep(0.1) time.sleep(0.1)
def load(l): addons = [ConcurrentClass()]
l.boot_into(ConcurrentClass())

View File

@ -0,0 +1,3 @@
import recorder
addons = [recorder.Recorder("a")]

View File

@ -0,0 +1,3 @@
import recorder
addons = [recorder.Recorder("b")]

View File

@ -0,0 +1,3 @@
import recorder
addons = [recorder.Recorder("c")]

View File

@ -0,0 +1,3 @@
import recorder
addons = [recorder.Recorder("e")]

View File

@ -1,13 +1,12 @@
from mitmproxy import controller from mitmproxy import controller
from mitmproxy import eventsequence from mitmproxy import eventsequence
from mitmproxy import ctx from mitmproxy import ctx
import sys
class CallLogger: class Recorder:
call_log = [] call_log = []
def __init__(self, name = "solo"): def __init__(self, name = "recorder"):
self.name = name self.name = name
def __getattr__(self, attr): def __getattr__(self, attr):
@ -22,5 +21,4 @@ class CallLogger:
raise AttributeError raise AttributeError
def load(l): addons = [Recorder()]
l.boot_into(CallLogger(*sys.argv[1:]))

View File

@ -296,8 +296,8 @@ class TestHTTP(tservers.HTTPProxyTest, CommonMixin):
class TestHTTPAuth(tservers.HTTPProxyTest): class TestHTTPAuth(tservers.HTTPProxyTest):
def test_auth(self): def test_auth(self):
self.master.addons.add(proxyauth.ProxyAuth()) self.master.addons.add(proxyauth.ProxyAuth())
self.master.addons.configure_all( self.master.addons.trigger(
self.master.options, self.master.options.keys() "configure", self.master.options, self.master.options.keys()
) )
self.master.options.proxyauth = "test:test" self.master.options.proxyauth = "test:test"
assert self.pathod("202").status_code == 407 assert self.pathod("202").status_code == 407

View File

@ -20,14 +20,11 @@ class Thing:
class TestConcurrent(tservers.MasterTest): class TestConcurrent(tservers.MasterTest):
def test_concurrent(self): def test_concurrent(self):
with taddons.context() as tctx: with taddons.context() as tctx:
sc = script.Script( sc = tctx.script(
tutils.test_data.path( tutils.test_data.path(
"mitmproxy/data/addonscripts/concurrent_decorator.py" "mitmproxy/data/addonscripts/concurrent_decorator.py"
) )
) )
l = addonmanager.Loader(tctx.master)
sc.load(l)
f1, f2 = tflow.tflow(), tflow.tflow() f1, f2 = tflow.tflow(), tflow.tflow()
tctx.cycle(sc, f1) tctx.cycle(sc, f1)
tctx.cycle(sc, f2) tctx.cycle(sc, f2)

View File

@ -5,14 +5,17 @@ from mitmproxy import exceptions
from mitmproxy import options from mitmproxy import options
from mitmproxy import master from mitmproxy import master
from mitmproxy import proxy from mitmproxy import proxy
from mitmproxy.test import taddons
from mitmproxy.test import tflow from mitmproxy.test import tflow
class TAddon: class TAddon:
def __init__(self, name): def __init__(self, name, addons=None):
self.name = name self.name = name
self.tick = True self.tick = True
self.custom_called = False self.custom_called = False
if addons:
self.addons = addons
def __repr__(self): def __repr__(self):
return "Addon(%s)" % self.name return "Addon(%s)" % self.name
@ -34,19 +37,6 @@ class AOption:
l.add_option("custom_option", bool, False, "help") l.add_option("custom_option", bool, False, "help")
class AChain:
def __init__(self, name, next):
self.name = name
self.next = next
def load(self, l):
if self.next:
l.boot_into(self.next)
def __repr__(self):
return "<%s>" % self.name
def test_halt(): def test_halt():
o = options.Options() o = options.Options()
m = master.Master(o, proxy.DummyServer(o)) m = master.Master(o, proxy.DummyServer(o))
@ -70,40 +60,42 @@ def test_lifecycle():
a = addonmanager.AddonManager(m) a = addonmanager.AddonManager(m)
a.add(TAddon("one")) a.add(TAddon("one"))
with pytest.raises(exceptions.AddonError):
a.add(TAddon("one"))
with pytest.raises(exceptions.AddonError):
a.remove(TAddon("nonexistent"))
f = tflow.tflow() f = tflow.tflow()
a.handle_lifecycle("request", f) a.handle_lifecycle("request", f)
a.configure_all(o, o.keys()) a._configure_all(o, o.keys())
def test_simple(): def test_simple():
o = options.Options() with taddons.context() as tctx:
m = master.Master(o, proxy.DummyServer(o)) a = tctx.master.addons
a = addonmanager.AddonManager(m)
with pytest.raises(exceptions.AddonError):
a.invoke_addon(TAddon("one"), "done")
assert len(a) == 0 assert len(a) == 0
a.add(TAddon("one")) a.add(TAddon("one"))
assert a.get("one") assert a.get("one")
assert not a.get("two") assert not a.get("two")
assert len(a) == 1 assert len(a) == 1
a.clear() a.clear()
assert len(a) == 0 assert len(a) == 0
assert not a.chain assert not a.chain
a.add(TAddon("one")) a.add(TAddon("one"))
a.trigger("done") a.trigger("done")
with pytest.raises(exceptions.AddonError):
a.trigger("tick") a.trigger("tick")
tctx.master.has_log("not callable")
a.remove(a.get("one")) a.remove(a.get("one"))
assert not a.get("one") assert not a.get("one")
ta = TAddon("one") ta = TAddon("one")
a.add(ta) a.add(ta)
a.trigger("custom") a.trigger("custom")
assert ta.custom_called assert ta.custom_called
def test_load_option(): def test_load_option():
@ -114,29 +106,47 @@ def test_load_option():
assert "custom_option" in m.options._options assert "custom_option" in m.options._options
def test_loadchain(): def test_nesting():
o = options.Options() o = options.Options()
m = master.Master(o, proxy.DummyServer(o)) m = master.Master(o, proxy.DummyServer(o))
a = addonmanager.AddonManager(m) a = addonmanager.AddonManager(m)
a.add(AChain("one", None)) a.add(
TAddon(
"one",
addons=[
TAddon("two"),
TAddon("three", addons=[TAddon("four")])
]
)
)
assert len(a.chain) == 1
assert a.get("one") assert a.get("one")
a.clear()
a.add(AChain("one", AChain("two", None)))
assert not a.get("one")
assert a.get("two") assert a.get("two")
a.clear()
a.add(AChain("one", AChain("two", AChain("three", None))))
assert not a.get("one")
assert not a.get("two")
assert a.get("three") assert a.get("three")
a.clear()
a.add(AChain("one", AChain("two", AChain("three", AChain("four", None)))))
assert not a.get("one")
assert not a.get("two")
assert not a.get("three")
assert a.get("four") assert a.get("four")
a.clear()
a.trigger("custom")
assert a.get("one").custom_called
assert a.get("two").custom_called
assert a.get("three").custom_called
assert a.get("four").custom_called
a.remove(a.get("three"))
assert not a.get("three")
assert not a.get("four")
class D:
def __init__(self):
self.w = None
def log(self, x):
self.w = x
def test_streamlog():
dummy = D()
s = addonmanager.StreamLog(dummy.log)
s.write("foo")
assert dummy.w == "foo"

View File

@ -1,4 +1,6 @@
import io
from mitmproxy.test import taddons from mitmproxy.test import taddons
from mitmproxy.test import tutils
from mitmproxy import ctx from mitmproxy import ctx
@ -9,3 +11,21 @@ def test_recordingmaster():
ctx.log.error("foo") ctx.log.error("foo")
assert not tctx.master.has_log("foo", level="debug") assert not tctx.master.has_log("foo", level="debug")
assert tctx.master.has_log("foo", level="error") assert tctx.master.has_log("foo", level="error")
def test_dumplog():
with taddons.context() as tctx:
ctx.log.info("testing")
s = io.StringIO()
tctx.master.dump_log(s)
assert s.getvalue()
def test_load_script():
with taddons.context() as tctx:
s = tctx.script(
tutils.test_data.path(
"mitmproxy/data/addonscripts/recorder/recorder.py"
)
)
assert s

View File

@ -30,7 +30,7 @@ class TestMaster(tservers.MasterTest):
opts["verbosity"] = 1 opts["verbosity"] = 1
o = options.Options(**opts) o = options.Options(**opts)
m = console.master.ConsoleMaster(o, proxy.DummyServer()) m = console.master.ConsoleMaster(o, proxy.DummyServer())
m.addons.configure_all(o, o.keys()) m.addons.trigger("configure", o, o.keys())
return m return m
def test_basic(self): def test_basic(self):
@ -42,12 +42,6 @@ class TestMaster(tservers.MasterTest):
pass pass
assert len(m.view) == i assert len(m.view) == i
def test_run_script_once(self):
m = self.mkmaster()
f = tflow.tflow(resp=True)
m.run_script_once("nonexistent", [f])
assert any("Input error" in str(l) for l in m.logbuffer)
def test_intercept(self): def test_intercept(self):
"""regression test for https://github.com/mitmproxy/mitmproxy/issues/1605""" """regression test for https://github.com/mitmproxy/mitmproxy/issues/1605"""
m = self.mkmaster(intercept="~b bar") m = self.mkmaster(intercept="~b bar")

View File

@ -74,7 +74,7 @@ class TestMaster(taddons.RecordingMaster):
self.state = TestState() self.state = TestState()
self.addons.add(self.state) self.addons.add(self.state)
self.addons.add(*addons) self.addons.add(*addons)
self.addons.configure_all(self.options, self.options.keys()) self.addons.trigger("configure", self.options, self.options.keys())
self.addons.trigger("running") self.addons.trigger("running")
def reset(self, addons): def reset(self, addons):