mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-23 00:01:36 +00:00
First-order integration of scripts addon
This commit is contained in:
parent
1266255842
commit
a3a22fba33
@ -4,6 +4,7 @@ from mitmproxy.builtins import anticache
|
||||
from mitmproxy.builtins import anticomp
|
||||
from mitmproxy.builtins import stickyauth
|
||||
from mitmproxy.builtins import stickycookie
|
||||
from mitmproxy.builtins import script
|
||||
from mitmproxy.builtins import stream
|
||||
|
||||
|
||||
@ -13,5 +14,6 @@ def default_addons():
|
||||
anticomp.AntiComp(),
|
||||
stickyauth.StickyAuth(),
|
||||
stickycookie.StickyCookie(),
|
||||
script.ScriptLoader(),
|
||||
stream.Stream(),
|
||||
]
|
||||
|
156
mitmproxy/builtins/script.py
Normal file
156
mitmproxy/builtins/script.py
Normal file
@ -0,0 +1,156 @@
|
||||
from __future__ import absolute_import, print_function, division
|
||||
|
||||
import contextlib
|
||||
import os
|
||||
import shlex
|
||||
import sys
|
||||
import traceback
|
||||
import copy
|
||||
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import controller
|
||||
from mitmproxy import ctx
|
||||
|
||||
|
||||
import watchdog.events
|
||||
# The OSX reloader in watchdog 0.8.3 breaks when unobserving paths.
|
||||
# We use the PollingObserver instead.
|
||||
if sys.platform == 'darwin': # pragma: no cover
|
||||
from watchdog.observers.polling import PollingObserver as Observer
|
||||
else:
|
||||
from watchdog.observers import Observer
|
||||
|
||||
|
||||
def parse_command(command):
|
||||
"""
|
||||
Returns a (path, args) tuple.
|
||||
"""
|
||||
if not command or not command.strip():
|
||||
raise exceptions.AddonError("Empty script command.")
|
||||
# Windows: escape all backslashes in the path.
|
||||
if os.name == "nt": # pragma: no cover
|
||||
backslashes = shlex.split(command, posix=False)[0].count("\\")
|
||||
command = command.replace("\\", "\\\\", backslashes)
|
||||
args = shlex.split(command) # pragma: no cover
|
||||
args[0] = os.path.expanduser(args[0])
|
||||
if not os.path.exists(args[0]):
|
||||
raise exceptions.AddonError(
|
||||
("Script file not found: %s.\r\n"
|
||||
"If your script path contains spaces, "
|
||||
"make sure to wrap it in additional quotes, e.g. -s \"'./foo bar/baz.py' --args\".") %
|
||||
args[0])
|
||||
elif os.path.isdir(args[0]):
|
||||
raise exceptions.AddonError("Not a file: %s" % args[0])
|
||||
return args[0], args[1:]
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def scriptenv(path, args):
|
||||
oldargs = sys.argv
|
||||
sys.argv = [path] + args
|
||||
script_dir = os.path.dirname(os.path.abspath(path))
|
||||
sys.path.append(script_dir)
|
||||
try:
|
||||
yield
|
||||
except Exception:
|
||||
_, _, tb = sys.exc_info()
|
||||
scriptdir = os.path.dirname(os.path.abspath(path))
|
||||
for i, s in enumerate(reversed(traceback.extract_tb(tb))):
|
||||
if not os.path.abspath(s[0]).startswith(scriptdir):
|
||||
break
|
||||
else:
|
||||
tb = tb.tb_next
|
||||
ctx.log.warn("".join(traceback.format_tb(tb)))
|
||||
finally:
|
||||
sys.argv = oldargs
|
||||
sys.path.pop()
|
||||
|
||||
|
||||
def load_script(path, args):
|
||||
ns = {'__file__': os.path.abspath(path)}
|
||||
with scriptenv(path, args):
|
||||
with open(path, "rb") as f:
|
||||
code = compile(f.read(), path, 'exec')
|
||||
exec(code, ns, ns)
|
||||
return ns
|
||||
|
||||
|
||||
class ReloadHandler(watchdog.events.FileSystemEventHandler):
|
||||
def __init__(self, callback, master, options):
|
||||
self.callback = callback
|
||||
self.master, self.options = master, options
|
||||
|
||||
def on_modified(self, event):
|
||||
self.callback(self.master, self.options)
|
||||
|
||||
def on_created(self, event):
|
||||
self.callback(self.master, self.options)
|
||||
|
||||
|
||||
class Script:
|
||||
"""
|
||||
An addon that manages a single script.
|
||||
"""
|
||||
def __init__(self, command):
|
||||
self.name = command
|
||||
|
||||
self.command = command
|
||||
self.path, self.args = parse_command(command)
|
||||
self.ns = None
|
||||
self.observer = None
|
||||
|
||||
for i in controller.Events:
|
||||
def mkprox():
|
||||
evt = i
|
||||
|
||||
def prox(*args, **kwargs):
|
||||
self.run(evt, *args, **kwargs)
|
||||
return prox
|
||||
setattr(self, i, mkprox())
|
||||
|
||||
def run(self, name, *args, **kwargs):
|
||||
# It's possible for ns to be un-initialised if we failed during
|
||||
# configure
|
||||
if self.ns is not None:
|
||||
func = self.ns.get(name)
|
||||
if func:
|
||||
with scriptenv(self.path, self.args):
|
||||
func(*args, **kwargs)
|
||||
|
||||
def reload(self, master, options):
|
||||
with master.handlecontext():
|
||||
self.ns = None
|
||||
self.configure(options)
|
||||
|
||||
def configure(self, options):
|
||||
if not self.observer:
|
||||
self.observer = Observer()
|
||||
# Bind the handler to the real underlying master object
|
||||
self.observer.schedule(
|
||||
ReloadHandler(
|
||||
self.reload,
|
||||
ctx.master,
|
||||
copy.copy(options),
|
||||
),
|
||||
os.path.dirname(self.path) or "."
|
||||
)
|
||||
self.observer.start()
|
||||
if not self.ns:
|
||||
self.ns = load_script(self.path, self.args)
|
||||
self.run("configure", options)
|
||||
|
||||
|
||||
class ScriptLoader():
|
||||
"""
|
||||
An addon that manages loading scripts from options.
|
||||
"""
|
||||
def configure(self, options):
|
||||
for s in options.scripts or []:
|
||||
if not ctx.master.addons.has_addon(s):
|
||||
ctx.log.info("Loading script: %s" % s)
|
||||
sc = Script(s)
|
||||
ctx.master.addons.add(sc)
|
||||
for a in ctx.master.addons.chain:
|
||||
if isinstance(a, Script):
|
||||
if a.name not in options.scripts or []:
|
||||
ctx.master.addons.remove(a)
|
@ -44,7 +44,17 @@ class Log(object):
|
||||
def __call__(self, text, level="info"):
|
||||
self.master.add_event(text, level)
|
||||
|
||||
# We may want to add .log(), .warn() etc. here at a later point in time
|
||||
def debug(self, txt):
|
||||
self(txt, "debug")
|
||||
|
||||
def info(self, txt):
|
||||
self(txt, "info")
|
||||
|
||||
def warn(self, txt):
|
||||
self(txt, "warn")
|
||||
|
||||
def error(self, txt):
|
||||
self(txt, "error")
|
||||
|
||||
|
||||
class Master(object):
|
||||
|
@ -93,13 +93,6 @@ class DumpMaster(flow.FlowMaster):
|
||||
not options.keepserving
|
||||
)
|
||||
|
||||
scripts = options.scripts or []
|
||||
for command in scripts:
|
||||
try:
|
||||
self.load_script(command, use_reloader=True)
|
||||
except exceptions.ScriptException as e:
|
||||
raise DumpError(str(e))
|
||||
|
||||
if options.rfile:
|
||||
try:
|
||||
self.load_flows_file(options.rfile)
|
||||
@ -335,6 +328,5 @@ class DumpMaster(flow.FlowMaster):
|
||||
|
||||
def run(self): # pragma: no cover
|
||||
if self.options.rfile and not self.options.keepserving:
|
||||
self.unload_scripts() # make sure to trigger script unload events.
|
||||
return
|
||||
super(DumpMaster, self).run()
|
||||
|
@ -99,3 +99,7 @@ class ControlException(ProxyException):
|
||||
|
||||
class OptionsError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class AddonError(Exception):
|
||||
pass
|
||||
|
@ -9,7 +9,6 @@ import netlib.exceptions
|
||||
from mitmproxy import controller
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import models
|
||||
from mitmproxy import script
|
||||
from mitmproxy.flow import io
|
||||
from mitmproxy.flow import modules
|
||||
from mitmproxy.onboarding import app
|
||||
@ -35,8 +34,6 @@ class FlowMaster(controller.Master):
|
||||
self.server_playback = None # type: Optional[modules.ServerPlaybackState]
|
||||
self.client_playback = None # type: Optional[modules.ClientPlaybackState]
|
||||
self.kill_nonreplay = False
|
||||
self.scripts = [] # type: List[script.Script]
|
||||
self.pause_scripts = False
|
||||
|
||||
self.stream_large_bodies = None # type: Optional[modules.StreamLargeBodies]
|
||||
self.refresh_server_playback = False
|
||||
@ -60,44 +57,6 @@ class FlowMaster(controller.Master):
|
||||
level: debug, info, error
|
||||
"""
|
||||
|
||||
def unload_scripts(self):
|
||||
for s in self.scripts[:]:
|
||||
self.unload_script(s)
|
||||
|
||||
def unload_script(self, script_obj):
|
||||
try:
|
||||
script_obj.unload()
|
||||
except script.ScriptException as e:
|
||||
self.add_event("Script error:\n" + str(e), "error")
|
||||
script.reloader.unwatch(script_obj)
|
||||
self.scripts.remove(script_obj)
|
||||
|
||||
def load_script(self, command, use_reloader=False):
|
||||
"""
|
||||
Loads a script.
|
||||
|
||||
Raises:
|
||||
ScriptException
|
||||
"""
|
||||
s = script.Script(command)
|
||||
s.load()
|
||||
if use_reloader:
|
||||
s.reply = controller.DummyReply()
|
||||
script.reloader.watch(s, lambda: self.event_queue.put(("script_change", s)))
|
||||
self.scripts.append(s)
|
||||
|
||||
def _run_single_script_hook(self, script_obj, name, *args, **kwargs):
|
||||
if script_obj and not self.pause_scripts:
|
||||
try:
|
||||
script_obj.run(name, *args, **kwargs)
|
||||
except script.ScriptException as e:
|
||||
self.add_event("Script error:\n{}".format(e), "error")
|
||||
|
||||
def run_scripts(self, name, msg):
|
||||
for script_obj in self.scripts:
|
||||
if not msg.reply.acked:
|
||||
self._run_single_script_hook(script_obj, name, msg)
|
||||
|
||||
def get_ignore_filter(self):
|
||||
return self.server.config.check_ignore.patterns
|
||||
|
||||
@ -298,11 +257,11 @@ class FlowMaster(controller.Master):
|
||||
if not pb and self.kill_nonreplay:
|
||||
f.kill(self)
|
||||
|
||||
def replay_request(self, f, block=False, run_scripthooks=True):
|
||||
def replay_request(self, f, block=False):
|
||||
"""
|
||||
Returns None if successful, or error message if not.
|
||||
"""
|
||||
if f.live and run_scripthooks:
|
||||
if f.live:
|
||||
return "Can't replay live request."
|
||||
if f.intercepted:
|
||||
return "Can't replay while intercepting..."
|
||||
@ -319,7 +278,7 @@ class FlowMaster(controller.Master):
|
||||
rt = http_replay.RequestReplayThread(
|
||||
self.server.config,
|
||||
f,
|
||||
self.event_queue if run_scripthooks else False,
|
||||
self.event_queue,
|
||||
self.should_exit
|
||||
)
|
||||
rt.start() # pragma: no cover
|
||||
@ -332,28 +291,27 @@ class FlowMaster(controller.Master):
|
||||
|
||||
@controller.handler
|
||||
def clientconnect(self, root_layer):
|
||||
self.run_scripts("clientconnect", root_layer)
|
||||
pass
|
||||
|
||||
@controller.handler
|
||||
def clientdisconnect(self, root_layer):
|
||||
self.run_scripts("clientdisconnect", root_layer)
|
||||
pass
|
||||
|
||||
@controller.handler
|
||||
def serverconnect(self, server_conn):
|
||||
self.run_scripts("serverconnect", server_conn)
|
||||
pass
|
||||
|
||||
@controller.handler
|
||||
def serverdisconnect(self, server_conn):
|
||||
self.run_scripts("serverdisconnect", server_conn)
|
||||
pass
|
||||
|
||||
@controller.handler
|
||||
def next_layer(self, top_layer):
|
||||
self.run_scripts("next_layer", top_layer)
|
||||
pass
|
||||
|
||||
@controller.handler
|
||||
def error(self, f):
|
||||
self.state.update_flow(f)
|
||||
self.run_scripts("error", f)
|
||||
if self.client_playback:
|
||||
self.client_playback.clear(f)
|
||||
return f
|
||||
@ -381,8 +339,6 @@ class FlowMaster(controller.Master):
|
||||
self.setheaders.run(f)
|
||||
if not f.reply.acked:
|
||||
self.process_new_request(f)
|
||||
if not f.reply.acked:
|
||||
self.run_scripts("request", f)
|
||||
return f
|
||||
|
||||
@controller.handler
|
||||
@ -393,7 +349,6 @@ class FlowMaster(controller.Master):
|
||||
except netlib.exceptions.HttpException:
|
||||
f.reply.kill()
|
||||
return
|
||||
self.run_scripts("responseheaders", f)
|
||||
return f
|
||||
|
||||
@controller.handler
|
||||
@ -404,7 +359,6 @@ class FlowMaster(controller.Master):
|
||||
self.replacehooks.run(f)
|
||||
if not f.reply.acked:
|
||||
self.setheaders.run(f)
|
||||
self.run_scripts("response", f)
|
||||
if not f.reply.acked:
|
||||
if self.client_playback:
|
||||
self.client_playback.clear(f)
|
||||
@ -416,46 +370,15 @@ class FlowMaster(controller.Master):
|
||||
def handle_accept_intercept(self, f):
|
||||
self.state.update_flow(f)
|
||||
|
||||
@controller.handler
|
||||
def script_change(self, s):
|
||||
"""
|
||||
Handle a script whose contents have been changed on the file system.
|
||||
|
||||
Args:
|
||||
s (script.Script): the changed script
|
||||
|
||||
Returns:
|
||||
True, if reloading was successful.
|
||||
False, otherwise.
|
||||
"""
|
||||
ok = True
|
||||
# We deliberately do not want to fail here.
|
||||
# In the worst case, we have an "empty" script object.
|
||||
try:
|
||||
s.unload()
|
||||
except script.ScriptException as e:
|
||||
ok = False
|
||||
self.add_event('Error reloading "{}":\n{}'.format(s.path, e), 'error')
|
||||
try:
|
||||
s.load()
|
||||
except script.ScriptException as e:
|
||||
ok = False
|
||||
self.add_event('Error reloading "{}":\n{}'.format(s.path, e), 'error')
|
||||
else:
|
||||
self.add_event('"{}" reloaded.'.format(s.path), 'info')
|
||||
return ok
|
||||
|
||||
@controller.handler
|
||||
def tcp_open(self, flow):
|
||||
# TODO: This would break mitmproxy currently.
|
||||
# self.state.add_flow(flow)
|
||||
self.active_flows.add(flow)
|
||||
self.run_scripts("tcp_open", flow)
|
||||
|
||||
@controller.handler
|
||||
def tcp_message(self, flow):
|
||||
# type: (TCPFlow) -> None
|
||||
self.run_scripts("tcp_message", flow)
|
||||
pass
|
||||
|
||||
@controller.handler
|
||||
def tcp_error(self, flow):
|
||||
@ -463,13 +386,10 @@ class FlowMaster(controller.Master):
|
||||
repr(flow.server_conn.address),
|
||||
flow.error
|
||||
), "info")
|
||||
self.run_scripts("tcp_error", flow)
|
||||
|
||||
@controller.handler
|
||||
def tcp_close(self, flow):
|
||||
self.active_flows.discard(flow)
|
||||
self.run_scripts("tcp_close", flow)
|
||||
|
||||
def shutdown(self):
|
||||
super(FlowMaster, self).shutdown()
|
||||
self.unload_scripts()
|
||||
|
136
test/mitmproxy/builtins/test_script.py
Normal file
136
test/mitmproxy/builtins/test_script.py
Normal file
@ -0,0 +1,136 @@
|
||||
import time
|
||||
|
||||
from mitmproxy.builtins import script
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy.flow import master
|
||||
from mitmproxy.flow import state
|
||||
from mitmproxy import options
|
||||
|
||||
from .. import tutils, mastertest
|
||||
|
||||
|
||||
class TestParseCommand:
|
||||
def test_empty_command(self):
|
||||
with tutils.raises(exceptions.AddonError):
|
||||
script.parse_command("")
|
||||
|
||||
with tutils.raises(exceptions.AddonError):
|
||||
script.parse_command(" ")
|
||||
|
||||
def test_no_script_file(self):
|
||||
with tutils.raises("not found"):
|
||||
script.parse_command("notfound")
|
||||
|
||||
with tutils.tmpdir() as dir:
|
||||
with tutils.raises("not a file"):
|
||||
script.parse_command(dir)
|
||||
|
||||
def test_parse_args(self):
|
||||
with tutils.chdir(tutils.test_data.dirname):
|
||||
assert script.parse_command("data/scripts/a.py") == ("data/scripts/a.py", [])
|
||||
assert script.parse_command("data/scripts/a.py foo bar") == ("data/scripts/a.py", ["foo", "bar"])
|
||||
assert script.parse_command("data/scripts/a.py 'foo bar'") == ("data/scripts/a.py", ["foo bar"])
|
||||
|
||||
@tutils.skip_not_windows
|
||||
def test_parse_windows(self):
|
||||
with tutils.chdir(tutils.test_data.dirname):
|
||||
assert script.parse_command("data\\scripts\\a.py") == ("data\\scripts\\a.py", [])
|
||||
assert script.parse_command("data\\scripts\\a.py 'foo \\ bar'") == ("data\\scripts\\a.py", 'foo \\ bar', [])
|
||||
|
||||
|
||||
def test_load_script():
|
||||
ns = script.load_script(
|
||||
tutils.test_data.path(
|
||||
"data/addonscripts/recorder.py"
|
||||
), []
|
||||
)
|
||||
assert ns["configure"]
|
||||
|
||||
|
||||
class RecordingMaster(master.FlowMaster):
|
||||
def __init__(self, *args, **kwargs):
|
||||
master.FlowMaster.__init__(self, *args, **kwargs)
|
||||
self.event_log = []
|
||||
|
||||
def add_event(self, e, level):
|
||||
self.event_log.append((level, e))
|
||||
|
||||
|
||||
class TestScript(mastertest.MasterTest):
|
||||
def test_simple(self):
|
||||
s = state.State()
|
||||
m = master.FlowMaster(options.Options(), None, s)
|
||||
sc = script.Script(
|
||||
tutils.test_data.path(
|
||||
"data/addonscripts/recorder.py"
|
||||
)
|
||||
)
|
||||
m.addons.add(sc)
|
||||
assert sc.ns["call_log"] == [("configure", (options.Options(),), {})]
|
||||
|
||||
sc.ns["call_log"] = []
|
||||
f = tutils.tflow(resp=True)
|
||||
self.invoke(m, "request", f)
|
||||
|
||||
recf = sc.ns["call_log"][0]
|
||||
assert recf[0] == "request"
|
||||
|
||||
def test_reload(self):
|
||||
s = state.State()
|
||||
m = RecordingMaster(options.Options(), None, s)
|
||||
with tutils.tmpdir():
|
||||
with open("foo.py", "w"):
|
||||
pass
|
||||
sc = script.Script("foo.py")
|
||||
m.addons.add(sc)
|
||||
|
||||
for _ in range(100):
|
||||
with open("foo.py", "a") as f:
|
||||
f.write(".")
|
||||
time.sleep(0.1)
|
||||
if m.event_log:
|
||||
return
|
||||
raise AssertionError("Change event not detected.")
|
||||
|
||||
def test_exception(self):
|
||||
s = state.State()
|
||||
m = RecordingMaster(options.Options(), None, s)
|
||||
sc = script.Script(
|
||||
tutils.test_data.path("data/addonscripts/error.py")
|
||||
)
|
||||
m.addons.add(sc)
|
||||
f = tutils.tflow(resp=True)
|
||||
self.invoke(m, "request", f)
|
||||
assert m.event_log[0][0] == "warn"
|
||||
|
||||
def test_duplicate_flow(self):
|
||||
s = state.State()
|
||||
fm = master.FlowMaster(None, None, s)
|
||||
fm.addons.add(
|
||||
script.Script(
|
||||
tutils.test_data.path("data/addonscripts/duplicate_flow.py")
|
||||
)
|
||||
)
|
||||
f = tutils.tflow()
|
||||
fm.request(f)
|
||||
assert fm.state.flow_count() == 2
|
||||
assert not fm.state.view[0].request.is_replay
|
||||
assert fm.state.view[1].request.is_replay
|
||||
|
||||
|
||||
class TestScriptLoader(mastertest.MasterTest):
|
||||
def test_simple(self):
|
||||
s = state.State()
|
||||
o = options.Options(scripts=[])
|
||||
m = master.FlowMaster(o, None, s)
|
||||
sc = script.ScriptLoader()
|
||||
m.addons.add(sc)
|
||||
assert len(m.addons) == 1
|
||||
o.update(
|
||||
scripts = [
|
||||
tutils.test_data.path("data/addonscripts/recorder.py")
|
||||
]
|
||||
)
|
||||
assert len(m.addons) == 2
|
||||
o.update(scripts = [])
|
||||
assert len(m.addons) == 1
|
6
test/mitmproxy/data/addonscripts/duplicate_flow.py
Normal file
6
test/mitmproxy/data/addonscripts/duplicate_flow.py
Normal file
@ -0,0 +1,6 @@
|
||||
from mitmproxy import ctx
|
||||
|
||||
|
||||
def request(flow):
|
||||
f = ctx.master.duplicate_flow(flow)
|
||||
ctx.master.replay_request(f, block=True)
|
7
test/mitmproxy/data/addonscripts/error.py
Normal file
7
test/mitmproxy/data/addonscripts/error.py
Normal file
@ -0,0 +1,7 @@
|
||||
|
||||
def mkerr():
|
||||
raise ValueError("Error!")
|
||||
|
||||
|
||||
def request(flow):
|
||||
mkerr()
|
18
test/mitmproxy/data/addonscripts/recorder.py
Normal file
18
test/mitmproxy/data/addonscripts/recorder.py
Normal file
@ -0,0 +1,18 @@
|
||||
from mitmproxy import controller
|
||||
from mitmproxy import ctx
|
||||
|
||||
call_log = []
|
||||
|
||||
# Keep a log of all possible event calls
|
||||
evts = list(controller.Events) + ["configure"]
|
||||
for i in evts:
|
||||
def mkprox():
|
||||
evt = i
|
||||
|
||||
def prox(*args, **kwargs):
|
||||
lg = (evt, args, kwargs)
|
||||
if evt != "log":
|
||||
ctx.log.info(str(lg))
|
||||
call_log.append(lg)
|
||||
return prox
|
||||
globals()[i] = mkprox()
|
8
test/mitmproxy/data/addonscripts/stream_modify.py
Normal file
8
test/mitmproxy/data/addonscripts/stream_modify.py
Normal file
@ -0,0 +1,8 @@
|
||||
|
||||
def modify(chunks):
|
||||
for chunk in chunks:
|
||||
yield chunk.replace(b"foo", b"bar")
|
||||
|
||||
|
||||
def responseheaders(flow):
|
||||
flow.response.stream = modify
|
5
test/mitmproxy/data/addonscripts/tcp_stream_modify.py
Normal file
5
test/mitmproxy/data/addonscripts/tcp_stream_modify.py
Normal file
@ -0,0 +1,5 @@
|
||||
|
||||
def tcp_message(flow):
|
||||
message = flow.messages[-1]
|
||||
if not message.from_client:
|
||||
message.content = message.content.replace(b"foo", b"bar")
|
@ -245,12 +245,12 @@ class TestDumpMaster(mastertest.MasterTest):
|
||||
assert "XRESPONSE" in ret
|
||||
assert "XCLIENTDISCONNECT" in ret
|
||||
tutils.raises(
|
||||
dump.DumpError,
|
||||
exceptions.AddonError,
|
||||
self.mkmaster,
|
||||
None, scripts=["nonexistent"]
|
||||
)
|
||||
tutils.raises(
|
||||
dump.DumpError,
|
||||
exceptions.AddonError,
|
||||
self.mkmaster,
|
||||
None, scripts=["starterr.py"]
|
||||
)
|
||||
|
@ -5,7 +5,7 @@ import netlib.utils
|
||||
from netlib.http import Headers
|
||||
from mitmproxy import filt, controller, flow
|
||||
from mitmproxy.contrib import tnetstring
|
||||
from mitmproxy.exceptions import FlowReadException, ScriptException
|
||||
from mitmproxy.exceptions import FlowReadException
|
||||
from mitmproxy.models import Error
|
||||
from mitmproxy.models import Flow
|
||||
from mitmproxy.models import HTTPFlow
|
||||
@ -674,21 +674,6 @@ class TestSerialize:
|
||||
|
||||
class TestFlowMaster:
|
||||
|
||||
def test_load_script(self):
|
||||
s = flow.State()
|
||||
fm = flow.FlowMaster(None, None, s)
|
||||
|
||||
fm.load_script(tutils.test_data.path("data/scripts/a.py"))
|
||||
fm.load_script(tutils.test_data.path("data/scripts/a.py"))
|
||||
fm.unload_scripts()
|
||||
with tutils.raises(ScriptException):
|
||||
fm.load_script("nonexistent")
|
||||
try:
|
||||
fm.load_script(tutils.test_data.path("data/scripts/starterr.py"))
|
||||
except ScriptException as e:
|
||||
assert "ValueError" in str(e)
|
||||
assert len(fm.scripts) == 0
|
||||
|
||||
def test_getset_ignore(self):
|
||||
p = mock.Mock()
|
||||
p.config.check_ignore = HostMatcher()
|
||||
@ -708,51 +693,7 @@ class TestFlowMaster:
|
||||
assert "intercepting" in fm.replay_request(f)
|
||||
|
||||
f.live = True
|
||||
assert "live" in fm.replay_request(f, run_scripthooks=True)
|
||||
|
||||
def test_script_reqerr(self):
|
||||
s = flow.State()
|
||||
fm = flow.FlowMaster(None, None, s)
|
||||
fm.load_script(tutils.test_data.path("data/scripts/reqerr.py"))
|
||||
f = tutils.tflow()
|
||||
fm.clientconnect(f.client_conn)
|
||||
assert fm.request(f)
|
||||
|
||||
def test_script(self):
|
||||
s = flow.State()
|
||||
fm = flow.FlowMaster(None, None, s)
|
||||
fm.load_script(tutils.test_data.path("data/scripts/all.py"))
|
||||
f = tutils.tflow(resp=True)
|
||||
|
||||
f.client_conn.acked = False
|
||||
fm.clientconnect(f.client_conn)
|
||||
assert fm.scripts[0].ns["log"][-1] == "clientconnect"
|
||||
f.server_conn.acked = False
|
||||
fm.serverconnect(f.server_conn)
|
||||
assert fm.scripts[0].ns["log"][-1] == "serverconnect"
|
||||
f.reply.acked = False
|
||||
fm.request(f)
|
||||
assert fm.scripts[0].ns["log"][-1] == "request"
|
||||
f.reply.acked = False
|
||||
fm.response(f)
|
||||
assert fm.scripts[0].ns["log"][-1] == "response"
|
||||
# load second script
|
||||
fm.load_script(tutils.test_data.path("data/scripts/all.py"))
|
||||
assert len(fm.scripts) == 2
|
||||
f.server_conn.reply.acked = False
|
||||
fm.clientdisconnect(f.server_conn)
|
||||
assert fm.scripts[0].ns["log"][-1] == "clientdisconnect"
|
||||
assert fm.scripts[1].ns["log"][-1] == "clientdisconnect"
|
||||
|
||||
# unload first script
|
||||
fm.unload_scripts()
|
||||
assert len(fm.scripts) == 0
|
||||
fm.load_script(tutils.test_data.path("data/scripts/all.py"))
|
||||
|
||||
f.error = tutils.terr()
|
||||
f.reply.acked = False
|
||||
fm.error(f)
|
||||
assert fm.scripts[0].ns["log"][-1] == "error"
|
||||
assert "live" in fm.replay_request(f)
|
||||
|
||||
def test_duplicate_flow(self):
|
||||
s = flow.State()
|
||||
@ -789,7 +730,6 @@ class TestFlowMaster:
|
||||
f.error.reply = controller.DummyReply()
|
||||
fm.error(f)
|
||||
|
||||
fm.load_script(tutils.test_data.path("data/scripts/a.py"))
|
||||
fm.shutdown()
|
||||
|
||||
def test_client_playback(self):
|
||||
|
@ -1,13 +0,0 @@
|
||||
from mitmproxy import flow
|
||||
from . import tutils
|
||||
|
||||
|
||||
def test_duplicate_flow():
|
||||
s = flow.State()
|
||||
fm = flow.FlowMaster(None, None, s)
|
||||
fm.load_script(tutils.test_data.path("data/scripts/duplicate_flow.py"))
|
||||
f = tutils.tflow()
|
||||
fm.request(f)
|
||||
assert fm.state.flow_count() == 2
|
||||
assert not fm.state.view[0].request.is_replay
|
||||
assert fm.state.view[1].request.is_replay
|
@ -13,6 +13,7 @@ from netlib.http import authentication, http1
|
||||
from netlib.tutils import raises
|
||||
from pathod import pathoc, pathod
|
||||
|
||||
from mitmproxy.builtins import script
|
||||
from mitmproxy import controller
|
||||
from mitmproxy.proxy.config import HostMatcher
|
||||
from mitmproxy.models import Error, HTTPResponse, HTTPFlow
|
||||
@ -287,10 +288,13 @@ class TestHTTP(tservers.HTTPProxyTest, CommonMixin, AppMixin):
|
||||
self.master.set_stream_large_bodies(None)
|
||||
|
||||
def test_stream_modify(self):
|
||||
self.master.load_script(tutils.test_data.path("data/scripts/stream_modify.py"))
|
||||
s = script.Script(
|
||||
tutils.test_data.path("data/addonscripts/stream_modify.py")
|
||||
)
|
||||
self.master.addons.add(s)
|
||||
d = self.pathod('200:b"foo"')
|
||||
assert d.content == b"bar"
|
||||
self.master.unload_scripts()
|
||||
assert d.content == "bar"
|
||||
self.master.addons.remove(s)
|
||||
|
||||
|
||||
class TestHTTPAuth(tservers.HTTPProxyTest):
|
||||
@ -512,15 +516,15 @@ class TestTransparent(tservers.TransparentProxyTest, CommonMixin, TcpMixin):
|
||||
ssl = False
|
||||
|
||||
def test_tcp_stream_modify(self):
|
||||
self.master.load_script(tutils.test_data.path("data/scripts/tcp_stream_modify.py"))
|
||||
|
||||
s = script.Script(
|
||||
tutils.test_data.path("data/addonscripts/tcp_stream_modify.py")
|
||||
)
|
||||
self.master.addons.add(s)
|
||||
self._tcpproxy_on()
|
||||
d = self.pathod('200:b"foo"')
|
||||
self._tcpproxy_off()
|
||||
|
||||
assert d.content == b"bar"
|
||||
|
||||
self.master.unload_scripts()
|
||||
assert d.content == "bar"
|
||||
self.master.addons.remove(s)
|
||||
|
||||
|
||||
class TestTransparentSSL(tservers.TransparentProxyTest, CommonMixin, TcpMixin):
|
||||
|
Loading…
Reference in New Issue
Block a user