Add a light-weight custom event system, use it for keepserving

This patch implements the lightweight event system I propose in #2144, adds a
custom event "processing_complete" that is triggered after file read, client
replay and server replay, and introduces a KeepServing addon to handle this for
mitmdump.
This commit is contained in:
Aldo Cortesi 2017-03-16 10:29:02 +13:00
parent 169068c7ec
commit 228a22b3c0
22 changed files with 102 additions and 75 deletions

View File

@ -69,8 +69,8 @@ class AddonManager:
raise exceptions.AddonError(
"invoke_addon called without a handler context."
)
if name not in eventsequence.Events: # prama: no cover
raise NotImplementedError("Unknown event")
if name not in eventsequence.Events:
name = "event_" + name
func = getattr(addon, name, None)
if func:
if not callable(func):
@ -89,4 +89,3 @@ class AddonManager:
self.invoke_addon(i, name, *args, **kwargs)
except exceptions.AddonHalt:
return

View File

@ -10,7 +10,6 @@ class ClientPlayback:
def __init__(self):
self.flows = None
self.current_thread = None
self.keepserving = False
self.has_replayed = False
def count(self) -> int:
@ -32,7 +31,6 @@ class ClientPlayback:
self.load(flows)
else:
self.flows = None
self.keepserving = options.keepserving
def tick(self):
if self.current_thread and not self.current_thread.is_alive():
@ -41,5 +39,5 @@ class ClientPlayback:
self.current_thread = ctx.master.replay_request(self.flows.pop(0))
self.has_replayed = True
if self.has_replayed:
if not self.flows and not self.current_thread and not self.keepserving:
ctx.master.shutdown()
if not self.flows and not self.current_thread:
ctx.master.addons.trigger("processing_complete")

View File

@ -0,0 +1,7 @@
from mitmproxy import ctx
class KeepServing:
def event_processing_complete(self):
if not ctx.master.options.keepserving:
ctx.master.shutdown()

View File

@ -11,7 +11,6 @@ class ReadFile:
"""
def __init__(self):
self.path = None
self.keepserving = False
def load_flows_file(self, path: str) -> int:
path = os.path.expanduser(path)
@ -33,8 +32,6 @@ class ReadFile:
raise exceptions.FlowReadException(v)
def configure(self, options, updated):
if "keepserving" in updated:
self.keepserving = options.keepserving
if "rfile" in updated and options.rfile:
self.path = options.rfile
@ -46,5 +43,4 @@ class ReadFile:
raise exceptions.OptionsError(v)
finally:
self.path = None
if not self.keepserving:
ctx.master.shutdown()
ctx.master.addons.trigger("processing_complete")

View File

@ -9,13 +9,6 @@ class ReadStdin:
An addon that reads from stdin if we're not attached to (someting like)
a tty.
"""
def __init__(self):
self.keepserving = False
def configure(self, options, updated):
if "keepserving" in updated:
self.keepserving = options.keepserving
def running(self, stdin = sys.stdin):
if not stdin.isatty():
ctx.log.info("Reading from stdin")
@ -30,5 +23,4 @@ class ReadStdin:
ctx.master.load_flow(i)
except exceptions.FlowReadException as e:
ctx.log.error("Error reading from stdin: %s" % e)
if not self.keepserving:
ctx.master.shutdown()
ctx.master.addons.trigger("processing_complete")

View File

@ -104,7 +104,7 @@ class ServerPlayback:
def tick(self):
if self.stop and not self.final_flow.live:
ctx.master.shutdown()
ctx.master.addons.trigger("processing_complete")
def request(self, f):
if self.flowmap:
@ -115,7 +115,7 @@ class ServerPlayback:
if self.options.refresh_server_playback:
response.refresh()
f.response = response
if not self.flowmap and not self.options.keepserving:
if not self.flowmap:
self.final_flow = f
self.stop = True
elif self.options.replay_kill_extra:

View File

@ -79,7 +79,10 @@ class Options(optmanager.OptManager):
)
self.add_option(
"keepserving", bool, False,
"Continue serving after client playback or file read."
"""
Instructs mitmdump to continue serving after client playback,
server playback or file read. This option is ignored by interactive tools, which always keep serving.
"""
)
self.add_option(
"server", bool, True,

View File

@ -6,16 +6,36 @@ from mitmproxy import proxy
from mitmproxy import eventsequence
class _AddonWrapper:
def __init__(self, master, addons):
self.master = master
self.addons = addons
def trigger(self, event, *args, **kwargs):
self.master.events.append((event, args, kwargs))
return self.addons.trigger(event, *args, **kwargs)
def __getattr__(self, attr):
return getattr(self.addons, attr)
class RecordingMaster(mitmproxy.master.Master):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.event_log = []
self.addons = _AddonWrapper(self, self.addons)
self.events = []
self.logs = []
def has_event(self, name):
for i in self.events:
if i[0] == name:
return True
def add_log(self, e, level):
self.event_log.append((level, e))
self.logs.append((level, e))
def clear(self):
self.event_log = []
self.logs = []
class context:

View File

@ -2,7 +2,7 @@ from mitmproxy import controller
from mitmproxy import addons
from mitmproxy import options
from mitmproxy import master
from mitmproxy.addons import dumper, termlog, termstatus, readstdin
from mitmproxy.addons import dumper, termlog, termstatus, readstdin, keepserving
class DumpMaster(master.Master):
@ -21,7 +21,7 @@ class DumpMaster(master.Master):
self.addons.add(*addons.default_addons())
if with_dumper:
self.addons.add(dumper.Dumper())
self.addons.add(readstdin.ReadStdin())
self.addons.add(readstdin.ReadStdin(), keepserving.KeepServing())
@controller.handler
def log(self, e):

View File

@ -12,7 +12,7 @@ class TestCheckALPN:
with taddons.context() as tctx:
a = check_alpn.CheckALPN()
tctx.configure(a)
assert not any(msg in m for l, m in tctx.master.event_log)
assert not any(msg in m for l, m in tctx.master.logs)
def test_check_no_alpn(self, disable_alpn):
msg = 'ALPN support missing'
@ -20,4 +20,4 @@ class TestCheckALPN:
with taddons.context() as tctx:
a = check_alpn.CheckALPN()
tctx.configure(a)
assert any(msg in m for l, m in tctx.master.event_log)
assert any(msg in m for l, m in tctx.master.logs)

View File

@ -16,4 +16,4 @@ class TestCheckCA:
tctx.master.server.config.certstore.default_ca.has_expired = mock.MagicMock(return_value=expired)
a = check_ca.CheckCA()
tctx.configure(a)
assert any(msg in m for l, m in tctx.master.event_log) is expired
assert any(msg in m for l, m in tctx.master.logs) is expired

View File

@ -23,7 +23,7 @@ class MockThread():
class TestClientPlayback:
def test_playback(self):
cp = clientplayback.ClientPlayback()
with taddons.context():
with taddons.context() as tctx:
assert cp.count() == 0
f = tflow.tflow(resp=True)
cp.load([f])
@ -35,15 +35,12 @@ class TestClientPlayback:
assert rp.called
assert cp.current_thread
cp.keepserving = False
cp.flows = None
cp.current_thread = None
with mock.patch("mitmproxy.master.Master.shutdown") as sd:
cp.tick()
assert sd.called
assert tctx.master.has_event("processing_complete")
cp.current_thread = MockThread()
with mock.patch("mitmproxy.master.Master.shutdown") as sd:
cp.tick()
assert cp.current_thread is None

View File

@ -151,7 +151,7 @@ class TestContentView:
with taddons.context(options=options.Options()) as ctx:
ctx.configure(d, flow_detail=4, verbosity=3)
d.response(tflow.tflow())
assert "Content viewer failed" in ctx.master.event_log[0][1]
assert "Content viewer failed" in ctx.master.logs[0][1]
def test_tcp():

View File

@ -0,0 +1,10 @@
from mitmproxy.addons import keepserving
from mitmproxy.test import taddons
def test_keepserving():
ks = keepserving.KeepServing()
with taddons.context() as tctx:
ks.event_processing_complete()
assert tctx.master.should_exit.is_set()

View File

@ -32,13 +32,13 @@ def test_configure(mck, tmpdir):
with taddons.context() as tctx:
tf = str(tmpdir.join("tfile"))
write_data(tf)
tctx.configure(rf, rfile=str(tf), keepserving=False)
tctx.configure(rf, rfile=str(tf))
assert not mck.called
rf.running()
assert mck.called
write_data(tf, corrupt=True)
tctx.configure(rf, rfile=str(tf), keepserving=False)
tctx.configure(rf, rfile=str(tf))
with pytest.raises(exceptions.OptionsError):
rf.running()
@ -51,7 +51,7 @@ def test_corruption(mck, tmpdir):
with pytest.raises(exceptions.FlowReadException):
rf.load_flows_file("nonexistent")
assert not mck.called
assert len(tctx.master.event_log) == 1
assert len(tctx.master.logs) == 1
tfc = str(tmpdir.join("tfile"))
write_data(tfc, corrupt=True)
@ -59,4 +59,4 @@ def test_corruption(mck, tmpdir):
with pytest.raises(exceptions.FlowReadException):
rf.load_flows_file(tfc)
assert mck.called
assert len(tctx.master.event_log) == 2
assert len(tctx.master.logs) == 2

View File

@ -26,12 +26,6 @@ def gen_data(corrupt=False):
return tf
def test_configure(tmpdir):
rf = readstdin.ReadStdin()
with taddons.context() as tctx:
tctx.configure(rf, keepserving=False)
class mStdin:
def __init__(self, d):
self.buffer = d
@ -49,11 +43,11 @@ def test_read(m, tmpdir):
assert m.called
rf.running(stdin=mStdin(None))
assert tctx.master.event_log
assert tctx.master.logs
tctx.master.clear()
m.reset_mock()
assert not m.called
rf.running(stdin=mStdin(gen_data(corrupt=True)))
assert m.called
assert tctx.master.event_log
assert tctx.master.logs

View File

@ -97,6 +97,6 @@ class TestReplaceFile:
tmpfile.remove()
f = tflow.tflow()
f.request.content = b"foo"
assert not tctx.master.event_log
assert not tctx.master.logs
r.request(f)
assert tctx.master.event_log
assert tctx.master.logs

View File

@ -22,14 +22,14 @@ def test_scriptenv():
with taddons.context() as tctx:
with script.scriptenv("path", []):
raise SystemExit
assert tctx.master.event_log[0][0] == "error"
assert "exited" in tctx.master.event_log[0][1]
assert tctx.master.logs[0][0] == "error"
assert "exited" in tctx.master.logs[0][1]
tctx.master.clear()
with script.scriptenv("path", []):
raise ValueError("fooo")
assert tctx.master.event_log[0][0] == "error"
assert "foo" in tctx.master.event_log[0][1]
assert tctx.master.logs[0][0] == "error"
assert "foo" in tctx.master.logs[0][1]
class Called:
@ -135,7 +135,7 @@ class TestScript:
f.write(".")
sc.tick()
time.sleep(0.1)
if tctx.master.event_log:
if tctx.master.logs:
return
raise AssertionError("Change event not detected.")
@ -147,11 +147,11 @@ class TestScript:
sc.start(tctx.options)
f = tflow.tflow(resp=True)
sc.request(f)
assert tctx.master.event_log[0][0] == "error"
assert len(tctx.master.event_log[0][1].splitlines()) == 6
assert re.search(r'addonscripts[\\/]error.py", line \d+, in request', tctx.master.event_log[0][1])
assert re.search(r'addonscripts[\\/]error.py", line \d+, in mkerr', tctx.master.event_log[0][1])
assert tctx.master.event_log[0][1].endswith("ValueError: Error!\n")
assert tctx.master.logs[0][0] == "error"
assert len(tctx.master.logs[0][1].splitlines()) == 6
assert re.search(r'addonscripts[\\/]error.py", line \d+, in request', tctx.master.logs[0][1])
assert re.search(r'addonscripts[\\/]error.py", line \d+, in mkerr', tctx.master.logs[0][1])
assert tctx.master.logs[0][1].endswith("ValueError: Error!\n")
def test_addon(self):
with taddons.context() as tctx:
@ -256,7 +256,7 @@ class TestScriptLoader:
"%s %s" % (rec, "c"),
]
)
debug = [(i[0], i[1]) for i in tctx.master.event_log if i[0] == "debug"]
debug = [(i[0], i[1]) for i in tctx.master.logs if i[0] == "debug"]
assert debug == [
('debug', 'a start'),
('debug', 'a configure'),
@ -270,7 +270,7 @@ class TestScriptLoader:
('debug', 'c configure'),
('debug', 'c running'),
]
tctx.master.event_log = []
tctx.master.logs = []
tctx.configure(
sc,
scripts = [
@ -279,11 +279,11 @@ class TestScriptLoader:
"%s %s" % (rec, "b"),
]
)
debug = [(i[0], i[1]) for i in tctx.master.event_log if i[0] == "debug"]
debug = [(i[0], i[1]) for i in tctx.master.logs if i[0] == "debug"]
# No events, only order has changed
assert debug == []
tctx.master.event_log = []
tctx.master.logs = []
tctx.configure(
sc,
scripts = [
@ -291,7 +291,7 @@ class TestScriptLoader:
"%s %s" % (rec, "a"),
]
)
debug = [(i[0], i[1]) for i in tctx.master.event_log if i[0] == "debug"]
debug = [(i[0], i[1]) for i in tctx.master.logs if i[0] == "debug"]
assert debug == [
('debug', 'c done'),
('debug', 'b done'),

View File

@ -34,7 +34,7 @@ def test_tick():
s.final_flow = tflow.tflow()
s.final_flow.live = False
s.tick()
assert tctx.master.should_exit.is_set()
assert tctx.master.has_event("processing_complete")
def test_server_playback():
@ -315,7 +315,6 @@ def test_server_playback_full():
tctx.configure(
s,
refresh_server_playback = True,
keepserving=False
)
f = tflow.tflow()

View File

@ -6,7 +6,7 @@ def test_configure():
ts = termstatus.TermStatus()
with taddons.context() as ctx:
ts.running()
assert not ctx.master.event_log
assert not ctx.master.logs
ctx.configure(ts, server=True)
ts.running()
assert ctx.master.event_log
assert ctx.master.logs

View File

@ -43,7 +43,7 @@ class TestConcurrent(tservers.MasterTest):
)
)
sc.start(tctx.options)
assert "decorator not supported" in tctx.master.event_log[0][1]
assert "decorator not supported" in tctx.master.logs[0][1]
def test_concurrent_class(self):
with taddons.context() as tctx:

View File

@ -11,6 +11,7 @@ class TAddon:
def __init__(self, name):
self.name = name
self.tick = True
self.custom_called = False
def __repr__(self):
return "Addon(%s)" % self.name
@ -18,11 +19,17 @@ class TAddon:
def done(self):
pass
def event_custom(self):
self.custom_called = True
def test_simple():
o = options.Options()
m = master.Master(o, proxy.DummyServer(o))
a = addonmanager.AddonManager(m)
with pytest.raises(exceptions.AddonError):
a.invoke_addon(TAddon("one"), "done")
a.add(TAddon("one"))
assert a.get("one")
assert not a.get("two")
@ -33,3 +40,8 @@ def test_simple():
a.trigger("done")
with pytest.raises(exceptions.AddonError):
a.trigger("tick")
ta = TAddon("one")
a.add(ta)
a.trigger("custom")
assert ta.custom_called