mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-22 15:37:45 +00:00
remove script contexts
This commit is contained in:
parent
c048ae1d5b
commit
7c67faa8da
@ -15,9 +15,7 @@ client:
|
||||
:caption: examples/add_header.py
|
||||
:language: python
|
||||
|
||||
The first argument to each event method is an instance of
|
||||
:py:class:`~mitmproxy.script.ScriptContext` that lets the script interact with the global mitmproxy
|
||||
state. The **response** event also gets an instance of :py:class:`~mitmproxy.models.HTTPFlow`,
|
||||
All events that deal with an HTTP request get an instance of :py:class:`~mitmproxy.models.HTTPFlow`,
|
||||
which we can use to manipulate the response itself.
|
||||
|
||||
We can now run this script using mitmdump or mitmproxy as follows:
|
||||
@ -36,11 +34,6 @@ We encourage you to either browse them locally or on `GitHub`_.
|
||||
Events
|
||||
------
|
||||
|
||||
The ``context`` argument passed to each event method is always a
|
||||
:py:class:`~mitmproxy.script.ScriptContext` instance. It is guaranteed to be the same object
|
||||
for the scripts lifetime and is not shared between multiple inline scripts. You can safely use it
|
||||
to store any form of state you require.
|
||||
|
||||
Script Lifecycle Events
|
||||
^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
@ -155,8 +148,9 @@ The canonical API documentation is the code, which you can browse here, locally
|
||||
|
||||
The main classes you will deal with in writing mitmproxy scripts are:
|
||||
|
||||
:py:class:`~mitmproxy.script.ScriptContext`
|
||||
- A handle for interacting with mitmproxy's Flow Master from within scripts.
|
||||
:py:class:`mitmproxy.flow.FlowMaster`
|
||||
- The "heart" of mitmproxy, usually subclassed as :py:class:`mitmproxy.dump.DumpMaster` or
|
||||
:py:class:`mitmproxy.console.ConsoleMaster`.
|
||||
:py:class:`~mitmproxy.models.ClientConnection`
|
||||
- Describes a client connection.
|
||||
:py:class:`~mitmproxy.models.ServerConnection`
|
||||
@ -173,16 +167,7 @@ The main classes you will deal with in writing mitmproxy scripts are:
|
||||
- A dictionary-like object for managing HTTP headers.
|
||||
:py:class:`netlib.certutils.SSLCert`
|
||||
- Exposes information SSL certificates.
|
||||
:py:class:`mitmproxy.flow.FlowMaster`
|
||||
- The "heart" of mitmproxy, usually subclassed as :py:class:`mitmproxy.dump.DumpMaster` or
|
||||
:py:class:`mitmproxy.console.ConsoleMaster`.
|
||||
|
||||
Script Context
|
||||
--------------
|
||||
|
||||
.. autoclass:: mitmproxy.script.ScriptContext
|
||||
:members:
|
||||
:undoc-members:
|
||||
|
||||
Running scripts in parallel
|
||||
---------------------------
|
||||
|
@ -2,6 +2,7 @@
|
||||
This inline script utilizes harparser.HAR from
|
||||
https://github.com/JustusW/harparser to generate a HAR log object.
|
||||
"""
|
||||
import mitmproxy
|
||||
import six
|
||||
import sys
|
||||
import pytz
|
||||
@ -218,17 +219,17 @@ def done():
|
||||
compressed_json_dump = context.HARLog.compress()
|
||||
|
||||
if context.dump_file == '-':
|
||||
context.log(pprint.pformat(json.loads(json_dump)))
|
||||
mitmproxy.log(pprint.pformat(json.loads(json_dump)))
|
||||
elif context.dump_file.endswith('.zhar'):
|
||||
file(context.dump_file, "w").write(compressed_json_dump)
|
||||
else:
|
||||
file(context.dump_file, "w").write(json_dump)
|
||||
context.log(
|
||||
mitmproxy.log(
|
||||
"HAR log finished with %s bytes (%s bytes compressed)" % (
|
||||
len(json_dump), len(compressed_json_dump)
|
||||
)
|
||||
)
|
||||
context.log(
|
||||
mitmproxy.log(
|
||||
"Compression rate is %s%%" % str(
|
||||
100. * len(compressed_json_dump) / len(json_dump)
|
||||
)
|
||||
|
@ -0,0 +1,5 @@
|
||||
from typing import Callable # noqa
|
||||
from mitmproxy import flow # noqa
|
||||
|
||||
master = None # type: flow.FlowMaster
|
||||
log = None # type: Callable[[str], None]
|
@ -366,7 +366,7 @@ class ConsoleMaster(flow.FlowMaster):
|
||||
signals.add_event("Running script on flow: %s" % command, "debug")
|
||||
|
||||
try:
|
||||
s = script.Script(command, script.ScriptContext(self))
|
||||
s = script.Script(command)
|
||||
s.load()
|
||||
except script.ScriptException as e:
|
||||
signals.status_message.send(
|
||||
@ -812,6 +812,6 @@ class ConsoleMaster(flow.FlowMaster):
|
||||
@controller.handler
|
||||
def script_change(self, script):
|
||||
if super(ConsoleMaster, self).script_change(script):
|
||||
signals.status_message.send(message='"{}" reloaded.'.format(script.filename))
|
||||
signals.status_message.send(message='"{}" reloaded.'.format(script.path))
|
||||
else:
|
||||
signals.status_message.send(message='Error reloading "{}".'.format(script.filename))
|
||||
signals.status_message.send(message='Error reloading "{}".'.format(script.path))
|
||||
|
@ -2,11 +2,12 @@ from __future__ import absolute_import, print_function, division
|
||||
|
||||
import functools
|
||||
import threading
|
||||
import contextlib
|
||||
|
||||
from six.moves import queue
|
||||
|
||||
import mitmproxy
|
||||
from netlib import basethread
|
||||
|
||||
from . import exceptions
|
||||
|
||||
|
||||
@ -34,6 +35,16 @@ Events = frozenset([
|
||||
])
|
||||
|
||||
|
||||
class Log(object):
|
||||
def __init__(self, master):
|
||||
self.master = master
|
||||
|
||||
def __call__(self, text, level="info"):
|
||||
self.master.add_event(text, level)
|
||||
|
||||
# We may want to add .log(), .warn() etc. here at a later point in time
|
||||
|
||||
|
||||
class Master(object):
|
||||
"""
|
||||
The master handles mitmproxy's main event loop.
|
||||
@ -45,6 +56,20 @@ class Master(object):
|
||||
for i in servers:
|
||||
self.add_server(i)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def handlecontext(self):
|
||||
# Handlecontexts also have to nest - leave cleanup to the outermost
|
||||
if mitmproxy.master:
|
||||
yield
|
||||
return
|
||||
mitmproxy.master = self
|
||||
mitmproxy.log = Log(self)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
mitmproxy.master = None
|
||||
mitmproxy.log = None
|
||||
|
||||
def add_server(self, server):
|
||||
# We give a Channel to the server which can be used to communicate with the master
|
||||
channel = Channel(self.event_queue, self.should_exit)
|
||||
@ -77,8 +102,8 @@ class Master(object):
|
||||
if mtype not in Events:
|
||||
raise exceptions.ControlException("Unknown event %s" % repr(mtype))
|
||||
handle_func = getattr(self, mtype)
|
||||
if not hasattr(handle_func, "__dict__"):
|
||||
raise exceptions.ControlException("Handler %s not a function" % mtype)
|
||||
if not callable(handle_func):
|
||||
raise exceptions.ControlException("Handler %s not callable" % mtype)
|
||||
if not handle_func.__dict__.get("__handler"):
|
||||
raise exceptions.ControlException(
|
||||
"Handler function %s is not decorated with controller.handler" % (
|
||||
@ -151,15 +176,7 @@ class Channel(object):
|
||||
|
||||
def handler(f):
|
||||
@functools.wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
# We can either be called as a method, or as a wrapped solo function
|
||||
if len(args) == 1:
|
||||
message = args[0]
|
||||
elif len(args) == 2:
|
||||
message = args[1]
|
||||
else:
|
||||
raise exceptions.ControlException("Handler takes one argument: a message")
|
||||
|
||||
def wrapper(master, message):
|
||||
if not hasattr(message, "reply"):
|
||||
raise exceptions.ControlException("Message %s has no reply attribute" % message)
|
||||
|
||||
@ -172,7 +189,8 @@ def handler(f):
|
||||
handling = True
|
||||
message.reply.handled = True
|
||||
|
||||
ret = f(*args, **kwargs)
|
||||
with master.handlecontext():
|
||||
ret = f(master, message)
|
||||
|
||||
if handling and not message.reply.acked and not message.reply.taken:
|
||||
message.reply.ack()
|
||||
@ -216,7 +234,7 @@ class Reply(object):
|
||||
def __del__(self):
|
||||
if not self.acked:
|
||||
# This will be ignored by the interpreter, but emit a warning
|
||||
raise exceptions.ControlException("Un-acked message")
|
||||
raise exceptions.ControlException("Un-acked message: %s" % self.obj)
|
||||
|
||||
|
||||
class DummyReply(object):
|
||||
|
@ -89,9 +89,10 @@ class FlowMaster(controller.Master):
|
||||
Raises:
|
||||
ScriptException
|
||||
"""
|
||||
s = script.Script(command, script.ScriptContext(self))
|
||||
s = script.Script(command)
|
||||
s.load()
|
||||
if use_reloader:
|
||||
s.reply = controller.DummyReply()
|
||||
script.reloader.watch(s, lambda: self.event_queue.put(("script_change", s)))
|
||||
self.scripts.append(s)
|
||||
|
||||
@ -234,8 +235,12 @@ class FlowMaster(controller.Master):
|
||||
return super(FlowMaster, self).tick(timeout)
|
||||
|
||||
def duplicate_flow(self, f):
|
||||
"""
|
||||
Duplicate flow, and insert it into state without triggering any of
|
||||
the normal flow events.
|
||||
"""
|
||||
f2 = f.copy()
|
||||
self.load_flow(f2)
|
||||
self.state.add_flow(f2)
|
||||
return f2
|
||||
|
||||
def create_request(self, method, scheme, host, port, path):
|
||||
@ -479,14 +484,14 @@ class FlowMaster(controller.Master):
|
||||
s.unload()
|
||||
except script.ScriptException as e:
|
||||
ok = False
|
||||
self.add_event('Error reloading "{}":\n{}'.format(s.filename, e), 'error')
|
||||
self.add_event('Error reloading "{}":\n{}'.format(s.path, e), 'error')
|
||||
try:
|
||||
s.load()
|
||||
except script.ScriptException as e:
|
||||
ok = False
|
||||
self.add_event('Error reloading "{}":\n{}'.format(s.filename, e), 'error')
|
||||
self.add_event('Error reloading "{}":\n{}'.format(s.path, e), 'error')
|
||||
else:
|
||||
self.add_event('"{}" reloaded.'.format(s.filename), 'info')
|
||||
self.add_event('"{}" reloaded.'.format(s.path), 'info')
|
||||
return ok
|
||||
|
||||
@controller.handler
|
||||
|
@ -1,12 +1,10 @@
|
||||
from . import reloader
|
||||
from .concurrent import concurrent
|
||||
from .script import Script
|
||||
from .script_context import ScriptContext
|
||||
from ..exceptions import ScriptException
|
||||
|
||||
__all__ = [
|
||||
"Script",
|
||||
"ScriptContext",
|
||||
"concurrent",
|
||||
"ScriptException",
|
||||
"reloader"
|
||||
|
@ -18,9 +18,9 @@ def concurrent(fn):
|
||||
"Concurrent decorator not supported for '%s' method." % fn.__name__
|
||||
)
|
||||
|
||||
def _concurrent(ctx, obj):
|
||||
def _concurrent(obj):
|
||||
def run():
|
||||
fn(ctx, obj)
|
||||
fn(obj)
|
||||
if not obj.reply.acked:
|
||||
obj.reply.ack()
|
||||
obj.reply.take()
|
||||
|
@ -15,8 +15,8 @@ _observers = {}
|
||||
def watch(script, callback):
|
||||
if script in _observers:
|
||||
raise RuntimeError("Script already observed")
|
||||
script_dir = os.path.dirname(os.path.abspath(script.filename))
|
||||
script_name = os.path.basename(script.filename)
|
||||
script_dir = os.path.dirname(os.path.abspath(script.path))
|
||||
script_name = os.path.basename(script.path)
|
||||
event_handler = _ScriptModificationHandler(callback, filename=script_name)
|
||||
observer = Observer()
|
||||
observer.schedule(event_handler, script_dir)
|
||||
|
@ -6,38 +6,40 @@ by the mitmproxy-specific ScriptContext.
|
||||
# Do not import __future__ here, this would apply transitively to the inline scripts.
|
||||
from __future__ import absolute_import, print_function, division
|
||||
|
||||
import inspect
|
||||
import os
|
||||
import shlex
|
||||
import sys
|
||||
import contextlib
|
||||
import warnings
|
||||
|
||||
import six
|
||||
from typing import List # noqa
|
||||
|
||||
from mitmproxy import exceptions
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def setargs(args):
|
||||
def scriptenv(path, args):
|
||||
# type: (str, List[str]) -> None
|
||||
oldargs = sys.argv
|
||||
sys.argv = args
|
||||
script_dir = os.path.dirname(os.path.abspath(path))
|
||||
|
||||
sys.argv = [path] + args
|
||||
sys.path.append(script_dir)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
sys.argv = oldargs
|
||||
sys.path.pop()
|
||||
|
||||
|
||||
class Script(object):
|
||||
|
||||
"""
|
||||
Script object representing an inline script.
|
||||
"""
|
||||
|
||||
def __init__(self, command, context):
|
||||
def __init__(self, command):
|
||||
self.command = command
|
||||
self.args = self.parse_command(command)
|
||||
self.ctx = context
|
||||
self.path, self.args = self.parse_command(command)
|
||||
self.ns = None
|
||||
|
||||
def __enter__(self):
|
||||
@ -46,15 +48,15 @@ class Script(object):
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
if exc_val:
|
||||
return False # reraise the exception
|
||||
return False # re-raise the exception
|
||||
self.unload()
|
||||
|
||||
@property
|
||||
def filename(self):
|
||||
return self.args[0]
|
||||
|
||||
@staticmethod
|
||||
def parse_command(command):
|
||||
# type: (str) -> Tuple[str,List[str]]
|
||||
"""
|
||||
Returns a (path, args) tuple.
|
||||
"""
|
||||
if not command or not command.strip():
|
||||
raise exceptions.ScriptException("Empty script command.")
|
||||
# Windows: escape all backslashes in the path.
|
||||
@ -71,7 +73,7 @@ class Script(object):
|
||||
args[0])
|
||||
elif os.path.isdir(args[0]):
|
||||
raise exceptions.ScriptException("Not a file: %s" % args[0])
|
||||
return args
|
||||
return args[0], args[1:]
|
||||
|
||||
def load(self):
|
||||
"""
|
||||
@ -85,13 +87,12 @@ class Script(object):
|
||||
"""
|
||||
if self.ns is not None:
|
||||
raise exceptions.ScriptException("Script is already loaded")
|
||||
script_dir = os.path.dirname(os.path.abspath(self.args[0]))
|
||||
self.ns = {'__file__': os.path.abspath(self.args[0])}
|
||||
sys.path.append(script_dir)
|
||||
sys.path.append(os.path.join(script_dir, ".."))
|
||||
self.ns = {'__file__': os.path.abspath(self.path)}
|
||||
|
||||
with scriptenv(self.path, self.args):
|
||||
try:
|
||||
with open(self.filename) as f:
|
||||
code = compile(f.read(), self.filename, 'exec')
|
||||
with open(self.path) as f:
|
||||
code = compile(f.read(), self.path, 'exec')
|
||||
exec(code, self.ns, self.ns)
|
||||
except Exception:
|
||||
six.reraise(
|
||||
@ -99,17 +100,6 @@ class Script(object):
|
||||
exceptions.ScriptException.from_exception_context(),
|
||||
sys.exc_info()[2]
|
||||
)
|
||||
finally:
|
||||
sys.path.pop()
|
||||
sys.path.pop()
|
||||
|
||||
start_fn = self.ns.get("start")
|
||||
if start_fn and len(inspect.getargspec(start_fn).args) == 2:
|
||||
warnings.warn(
|
||||
"The 'args' argument of the start() script hook is deprecated. "
|
||||
"Please use sys.argv instead."
|
||||
)
|
||||
return self.run("start", self.args)
|
||||
return self.run("start")
|
||||
|
||||
def unload(self):
|
||||
@ -134,8 +124,8 @@ class Script(object):
|
||||
f = self.ns.get(name)
|
||||
if f:
|
||||
try:
|
||||
with setargs(self.args):
|
||||
return f(self.ctx, *args, **kwargs)
|
||||
with scriptenv(self.path, self.args):
|
||||
return f(*args, **kwargs)
|
||||
except Exception:
|
||||
six.reraise(
|
||||
exceptions.ScriptException,
|
||||
|
@ -1,61 +0,0 @@
|
||||
"""
|
||||
The mitmproxy script context provides an API to inline scripts.
|
||||
"""
|
||||
from __future__ import absolute_import, print_function, division
|
||||
|
||||
from mitmproxy import contentviews
|
||||
|
||||
|
||||
class ScriptContext(object):
|
||||
|
||||
"""
|
||||
The script context should be used to interact with the global mitmproxy state from within a
|
||||
script.
|
||||
"""
|
||||
|
||||
def __init__(self, master):
|
||||
self._master = master
|
||||
|
||||
def log(self, message, level="info"):
|
||||
"""
|
||||
Logs an event.
|
||||
|
||||
By default, only events with level "error" get displayed. This can be controlled with the "-v" switch.
|
||||
How log messages are handled depends on the front-end. mitmdump will print them to stdout,
|
||||
mitmproxy sends output to the eventlog for display ("e" keyboard shortcut).
|
||||
"""
|
||||
self._master.add_event(message, level)
|
||||
|
||||
def kill_flow(self, f):
|
||||
"""
|
||||
Kills a flow immediately. No further data will be sent to the client or the server.
|
||||
"""
|
||||
f.kill(self._master)
|
||||
|
||||
def duplicate_flow(self, f):
|
||||
"""
|
||||
Returns a duplicate of the specified flow. The flow is also
|
||||
injected into the current state, and is ready for editing, replay,
|
||||
etc.
|
||||
"""
|
||||
self._master.pause_scripts = True
|
||||
f = self._master.duplicate_flow(f)
|
||||
self._master.pause_scripts = False
|
||||
return f
|
||||
|
||||
def replay_request(self, f):
|
||||
"""
|
||||
Replay the request on the current flow. The response will be added
|
||||
to the flow object.
|
||||
"""
|
||||
return self._master.replay_request(f, block=True, run_scripthooks=False)
|
||||
|
||||
@property
|
||||
def app_registry(self):
|
||||
return self._master.apps
|
||||
|
||||
def add_contentview(self, view_obj):
|
||||
contentviews.add(view_obj)
|
||||
|
||||
def remove_contentview(self, view_obj):
|
||||
contentviews.remove(view_obj)
|
@ -11,7 +11,7 @@ class Thing:
|
||||
|
||||
@tutils.skip_appveyor
|
||||
def test_concurrent():
|
||||
with Script(tutils.test_data.path("data/scripts/concurrent_decorator.py"), None) as s:
|
||||
with Script(tutils.test_data.path("data/scripts/concurrent_decorator.py")) as s:
|
||||
f1, f2 = Thing(), Thing()
|
||||
s.run("request", f1)
|
||||
s.run("request", f2)
|
||||
@ -23,6 +23,6 @@ def test_concurrent():
|
||||
|
||||
|
||||
def test_concurrent_err():
|
||||
s = Script(tutils.test_data.path("data/scripts/concurrent_decorator_err.py"), None)
|
||||
s = Script(tutils.test_data.path("data/scripts/concurrent_decorator_err.py"))
|
||||
with tutils.raises("Concurrent decorator not supported for 'start' method"):
|
||||
s.load()
|
||||
|
@ -10,7 +10,7 @@ def test_simple():
|
||||
pass
|
||||
|
||||
script = mock.Mock()
|
||||
script.filename = "foo.py"
|
||||
script.path = "foo.py"
|
||||
|
||||
e = Event()
|
||||
|
||||
|
@ -21,21 +21,21 @@ class TestParseCommand:
|
||||
|
||||
def test_parse_args(self):
|
||||
with tutils.chdir(tutils.test_data.dirname):
|
||||
assert Script.parse_command("data/scripts/a.py") == ["data/scripts/a.py"]
|
||||
assert Script.parse_command("data/scripts/a.py foo bar") == ["data/scripts/a.py", "foo", "bar"]
|
||||
assert Script.parse_command("data/scripts/a.py 'foo bar'") == ["data/scripts/a.py", "foo bar"]
|
||||
assert Script.parse_command("data/scripts/a.py") == ("data/scripts/a.py", [])
|
||||
assert Script.parse_command("data/scripts/a.py foo bar") == ("data/scripts/a.py", ["foo", "bar"])
|
||||
assert Script.parse_command("data/scripts/a.py 'foo bar'") == ("data/scripts/a.py", ["foo bar"])
|
||||
|
||||
@tutils.skip_not_windows
|
||||
def test_parse_windows(self):
|
||||
with tutils.chdir(tutils.test_data.dirname):
|
||||
assert Script.parse_command("data\\scripts\\a.py") == ["data\\scripts\\a.py"]
|
||||
assert Script.parse_command("data\\scripts\\a.py 'foo \\ bar'") == ["data\\scripts\\a.py", 'foo \\ bar']
|
||||
assert Script.parse_command("data\\scripts\\a.py") == ("data\\scripts\\a.py", [])
|
||||
assert Script.parse_command("data\\scripts\\a.py 'foo \\ bar'") == ("data\\scripts\\a.py", ['foo \\ bar'])
|
||||
|
||||
|
||||
def test_simple():
|
||||
with tutils.chdir(tutils.test_data.path("data/scripts")):
|
||||
s = Script("a.py --var 42", None)
|
||||
assert s.filename == "a.py"
|
||||
s = Script("a.py --var 42")
|
||||
assert s.path == "a.py"
|
||||
assert s.ns is None
|
||||
|
||||
s.load()
|
||||
@ -50,34 +50,34 @@ def test_simple():
|
||||
with tutils.raises(ScriptException):
|
||||
s.run("here")
|
||||
|
||||
with Script("a.py --var 42", None) as s:
|
||||
with Script("a.py --var 42") as s:
|
||||
s.run("here")
|
||||
|
||||
|
||||
def test_script_exception():
|
||||
with tutils.chdir(tutils.test_data.path("data/scripts")):
|
||||
s = Script("syntaxerr.py", None)
|
||||
s = Script("syntaxerr.py")
|
||||
with tutils.raises(ScriptException):
|
||||
s.load()
|
||||
|
||||
s = Script("starterr.py", None)
|
||||
s = Script("starterr.py")
|
||||
with tutils.raises(ScriptException):
|
||||
s.load()
|
||||
|
||||
s = Script("a.py", None)
|
||||
s = Script("a.py")
|
||||
s.load()
|
||||
with tutils.raises(ScriptException):
|
||||
s.load()
|
||||
|
||||
s = Script("a.py", None)
|
||||
s = Script("a.py")
|
||||
with tutils.raises(ScriptException):
|
||||
s.run("here")
|
||||
|
||||
with tutils.raises(ScriptException):
|
||||
with Script("reqerr.py", None) as s:
|
||||
with Script("reqerr.py") as s:
|
||||
s.run("request", None)
|
||||
|
||||
s = Script("unloaderr.py", None)
|
||||
s = Script("unloaderr.py")
|
||||
s.load()
|
||||
with tutils.raises(ScriptException):
|
||||
s.unload()
|
||||
|
@ -1,47 +1,31 @@
|
||||
import glob
|
||||
import json
|
||||
import mock
|
||||
import os
|
||||
import sys
|
||||
from contextlib import contextmanager
|
||||
|
||||
from mitmproxy import script
|
||||
from mitmproxy.proxy import config
|
||||
import netlib.utils
|
||||
from netlib import tutils as netutils
|
||||
from netlib.http import Headers
|
||||
from . import tservers, tutils
|
||||
from . import tutils
|
||||
|
||||
example_dir = netlib.utils.Data(__name__).path("../../examples")
|
||||
|
||||
|
||||
class DummyContext(object):
|
||||
"""Emulate script.ScriptContext() functionality."""
|
||||
|
||||
contentview = None
|
||||
|
||||
def log(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def add_contentview(self, view_obj):
|
||||
self.contentview = view_obj
|
||||
|
||||
def remove_contentview(self, view_obj):
|
||||
self.contentview = None
|
||||
|
||||
|
||||
@contextmanager
|
||||
def example(command):
|
||||
command = os.path.join(example_dir, command)
|
||||
ctx = DummyContext()
|
||||
with script.Script(command, ctx) as s:
|
||||
with script.Script(command) as s:
|
||||
yield s
|
||||
|
||||
|
||||
def test_load_scripts():
|
||||
@mock.patch("mitmproxy.master")
|
||||
@mock.patch("mitmproxy.log")
|
||||
def test_load_scripts(log, master):
|
||||
scripts = glob.glob("%s/*.py" % example_dir)
|
||||
|
||||
tmaster = tservers.TestMaster(config.ProxyConfig())
|
||||
|
||||
for f in scripts:
|
||||
if "har_extractor" in f:
|
||||
continue
|
||||
@ -54,7 +38,7 @@ def test_load_scripts():
|
||||
if "modify_response_body" in f:
|
||||
f += " foo bar" # two arguments required
|
||||
|
||||
s = script.Script(f, script.ScriptContext(tmaster))
|
||||
s = script.Script(f)
|
||||
try:
|
||||
s.load()
|
||||
except Exception as v:
|
||||
@ -71,17 +55,21 @@ def test_add_header():
|
||||
assert flow.response.headers["newheader"] == "foo"
|
||||
|
||||
|
||||
def test_custom_contentviews():
|
||||
with example("custom_contentviews.py") as ex:
|
||||
pig = ex.ctx.contentview
|
||||
@mock.patch("mitmproxy.contentviews.remove")
|
||||
@mock.patch("mitmproxy.contentviews.add")
|
||||
def test_custom_contentviews(add, remove):
|
||||
with example("custom_contentviews.py"):
|
||||
assert add.called
|
||||
pig = add.call_args[0][0]
|
||||
_, fmt = pig(b"<html>test!</html>")
|
||||
assert any(b'esttay!' in val[0][1] for val in fmt)
|
||||
assert not pig(b"gobbledygook")
|
||||
assert remove.called
|
||||
|
||||
|
||||
def test_iframe_injector():
|
||||
with tutils.raises(script.ScriptException):
|
||||
with example("iframe_injector.py") as ex:
|
||||
with example("iframe_injector.py"):
|
||||
pass
|
||||
|
||||
flow = tutils.tflow(resp=netutils.tresp(content=b"<html>mitmproxy</html>"))
|
||||
@ -121,7 +109,7 @@ def test_modify_response_body():
|
||||
|
||||
flow = tutils.tflow(resp=netutils.tresp(content=b"I <3 mitmproxy"))
|
||||
with example("modify_response_body.py mitmproxy rocks") as ex:
|
||||
assert ex.ctx.old == b"mitmproxy" and ex.ctx.new == b"rocks"
|
||||
assert ex.ns["state"]["old"] == b"mitmproxy" and ex.ns["state"]["new"] == b"rocks"
|
||||
ex.run("response", flow)
|
||||
assert flow.response.content == b"I <3 rocks"
|
||||
|
||||
@ -133,7 +121,8 @@ def test_redirect_requests():
|
||||
assert flow.request.host == "mitmproxy.org"
|
||||
|
||||
|
||||
def test_har_extractor():
|
||||
@mock.patch("mitmproxy.log")
|
||||
def test_har_extractor(log):
|
||||
if sys.version_info >= (3, 0):
|
||||
with tutils.raises("does not work on Python 3"):
|
||||
with example("har_extractor.py -"):
|
||||
@ -159,4 +148,4 @@ def test_har_extractor():
|
||||
|
||||
with open(tutils.test_data.path("data/har_extractor.har")) as fp:
|
||||
test_data = json.load(fp)
|
||||
assert json.loads(ex.ctx.HARLog.json()) == test_data["test_response"]
|
||||
assert json.loads(ex.ns["context"].HARLog.json()) == test_data["test_response"]
|
||||
|
@ -1,6 +1,7 @@
|
||||
import os
|
||||
import socket
|
||||
import time
|
||||
import types
|
||||
from OpenSSL import SSL
|
||||
from netlib.exceptions import HttpReadDisconnect, HttpException
|
||||
from netlib.tcp import Address
|
||||
@ -945,7 +946,7 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxyTest):
|
||||
f.reply.kill()
|
||||
return _func(f)
|
||||
|
||||
setattr(master, attr, handler)
|
||||
setattr(master, attr, types.MethodType(handler, master))
|
||||
|
||||
kill_requests(
|
||||
self.chain[1].tmaster,
|
||||
|
Loading…
Reference in New Issue
Block a user