mitmproxy/test/test_script.py

126 lines
3.5 KiB
Python
Raw Normal View History

from libmproxy import script, flow
2012-02-24 23:19:54 +00:00
import tutils
import shlex
import os
import time
import mock
class TestScript:
def test_simple(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
sp = tutils.test_data.path("scripts/a.py")
2015-05-30 00:03:28 +00:00
p = script.Script("%s --var 40" % sp, fm)
assert "here" in p.ns
assert p.run("here") == (True, 41)
assert p.run("here") == (True, 42)
2012-02-24 23:19:54 +00:00
ret = p.run("errargs")
2012-02-24 23:19:54 +00:00
assert not ret[0]
assert len(ret[1]) == 2
# Check reload
p.load()
assert p.run("here") == (True, 41)
2012-02-24 23:19:54 +00:00
def test_duplicate_flow(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
fm.load_script(tutils.test_data.path("scripts/duplicate_flow.py"))
f = tutils.tflow()
fm.handle_request(f)
2012-02-24 23:19:54 +00:00
assert fm.state.flow_count() == 2
assert not fm.state.view[0].request.is_replay
assert fm.state.view[1].request.is_replay
2012-02-24 23:19:54 +00:00
def test_err(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
tutils.raises(
"not found",
script.Script, "nonexistent", fm
)
tutils.raises(
"not a file",
script.Script, tutils.test_data.path("scripts"), fm
)
tutils.raises(
script.ScriptError,
script.Script, tutils.test_data.path("scripts/syntaxerr.py"), fm
)
tutils.raises(
script.ScriptError,
script.Script, tutils.test_data.path("scripts/loaderr.py"), fm
)
def test_concurrent(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
fm.load_script(tutils.test_data.path("scripts/concurrent_decorator.py"))
with mock.patch("libmproxy.controller.DummyReply.__call__") as m:
f1, f2 = tutils.tflow(), tutils.tflow()
t_start = time.time()
fm.handle_request(f1)
f1.reply()
fm.handle_request(f2)
f2.reply()
# Two instantiations
2014-08-17 23:47:39 +00:00
assert m.call_count == 0 # No calls yet.
assert (time.time() - t_start) < 0.09
def test_concurrent2(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
2015-05-30 00:03:28 +00:00
s = script.Script(
tutils.test_data.path("scripts/concurrent_decorator.py"),
fm)
s.load()
2014-08-17 23:47:39 +00:00
m = mock.Mock()
class Dummy:
def __init__(self):
self.response = self
self.error = self
self.reply = m
t_start = time.time()
for hook in ("clientconnect",
"serverconnect",
"response",
"error",
"clientconnect"):
d = Dummy()
assert s.run(hook, d)[0]
d.reply()
while (time.time() - t_start) < 20 and m.call_count <= 5:
2014-08-17 23:47:39 +00:00
if m.call_count == 5:
return
time.sleep(0.001)
assert False
2013-12-15 01:51:35 +00:00
def test_concurrent_err(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
2013-12-15 01:51:35 +00:00
tutils.raises(
"decorator not supported for this method",
2015-05-30 00:03:28 +00:00
script.Script,
tutils.test_data.path("scripts/concurrent_decorator_err.py"),
fm)
def test_command_parsing():
s = flow.State()
fm = flow.FlowMaster(None, s)
absfilepath = os.path.normcase(tutils.test_data.path("scripts/a.py"))
s = script.Script(absfilepath, fm)
assert os.path.isfile(s.argv[0])