mitmproxy/test/test_dump.py

178 lines
5.5 KiB
Python
Raw Normal View History

2011-02-16 21:18:38 +00:00
import os
2011-02-16 09:10:24 +00:00
from cStringIO import StringIO
from libmproxy import dump, flow, proxy
import tutils
import mock
2011-02-16 09:10:24 +00:00
def test_strfuncs():
t = tutils.tresp()
t._set_replay()
dump.str_response(t)
t = tutils.treq()
t.client_conn = None
t.stickycookie = True
2013-03-17 04:53:39 +00:00
assert "stickycookie" in dump.str_request(t, False)
assert "stickycookie" in dump.str_request(t, True)
assert "replay" in dump.str_request(t, False)
assert "replay" in dump.str_request(t, True)
2011-02-16 09:10:24 +00:00
class TestDumpMaster:
def _cycle(self, m, content):
req = tutils.treq()
req.content = content
l = proxy.Log("connect")
l.reply = mock.MagicMock()
m.handle_log(l)
2011-02-16 09:10:24 +00:00
cc = req.client_conn
2011-08-02 04:52:47 +00:00
cc.connection_error = "error"
resp = tutils.tresp(req)
resp.content = content
m.handle_clientconnect(cc)
sc = proxy.ServerConnection(m.o, req.scheme, req.host, req.port, None)
sc.reply = mock.MagicMock()
m.handle_serverconnection(sc)
2011-02-16 09:10:24 +00:00
m.handle_request(req)
2012-04-02 21:52:26 +00:00
f = m.handle_response(resp)
cd = flow.ClientDisconnect(cc)
cd.reply = mock.MagicMock()
m.handle_clientdisconnect(cd)
2012-04-02 21:52:26 +00:00
return f
2011-02-16 09:10:24 +00:00
2011-08-02 04:52:47 +00:00
def _dummy_cycle(self, n, filt, content, **options):
cs = StringIO()
o = dump.Options(**options)
m = dump.DumpMaster(None, o, filt, outfile=cs)
2011-08-02 04:52:47 +00:00
for i in range(n):
self._cycle(m, content)
m.shutdown()
return cs.getvalue()
2011-08-02 04:52:47 +00:00
def _flowfile(self, path):
2013-06-15 22:23:44 +00:00
f = open(path, "wb")
2011-08-02 04:52:47 +00:00
fw = flow.FlowWriter(f)
t = tutils.tflow_full()
t.response = tutils.tresp(t.request)
fw.add(t)
f.close()
def test_error(self):
cs = StringIO()
o = dump.Options(verbosity=1)
m = dump.DumpMaster(None, o, None, outfile=cs)
f = tutils.tflow_err()
m.handle_request(f.request)
assert m.handle_error(f.error)
assert "error" in cs.getvalue()
def test_replay(self):
cs = StringIO()
o = dump.Options(server_replay="nonexistent", kill=True)
tutils.raises(dump.DumpError, dump.DumpMaster, None, o, None, outfile=cs)
with tutils.tmpdir() as t:
p = os.path.join(t, "rep")
self._flowfile(p)
o = dump.Options(server_replay=p, kill=True)
m = dump.DumpMaster(None, o, None, outfile=cs)
2012-02-10 02:04:20 +00:00
self._cycle(m, "content")
self._cycle(m, "content")
o = dump.Options(server_replay=p, kill=False)
m = dump.DumpMaster(None, o, None, outfile=cs)
self._cycle(m, "nonexistent")
o = dump.Options(client_replay=p, kill=False)
m = dump.DumpMaster(None, o, None, outfile=cs)
2011-08-02 04:52:47 +00:00
def test_read(self):
with tutils.tmpdir() as t:
p = os.path.join(t, "read")
self._flowfile(p)
assert "GET" in self._dummy_cycle(0, None, "", verbosity=1, rfile=p)
2011-08-02 04:52:47 +00:00
tutils.raises(
dump.DumpError, self._dummy_cycle,
0, None, "", verbosity=1, rfile="/nonexistent"
)
2011-08-02 04:52:47 +00:00
# We now just ignore errors
self._dummy_cycle(0, None, "", verbosity=1, rfile=tutils.test_data.path("test_dump.py"))
2012-02-10 02:04:20 +00:00
2011-02-16 10:03:46 +00:00
def test_options(self):
o = dump.Options(verbosity = 2)
assert o.verbosity == 2
2011-02-16 21:44:08 +00:00
def test_filter(self):
2011-08-02 04:52:47 +00:00
assert not "GET" in self._dummy_cycle(1, "~u foo", "", verbosity=1)
2011-02-16 21:44:08 +00:00
def test_app(self):
o = dump.Options(app=True)
s = mock.MagicMock()
m = dump.DumpMaster(s, o, None)
2013-12-08 14:46:11 +00:00
assert s.apps.add.call_count == 1
2012-04-02 21:52:26 +00:00
def test_replacements(self):
o = dump.Options(replacements=[(".*", "content", "foo")])
m = dump.DumpMaster(None, o, None)
f = self._cycle(m, "content")
assert f.request.content == "foo"
2012-08-18 12:22:42 +00:00
def test_setheader(self):
o = dump.Options(setheaders=[(".*", "one", "two")])
m = dump.DumpMaster(None, o, None)
f = self._cycle(m, "content")
assert f.request.headers["one"] == ["two"]
2011-02-16 21:44:08 +00:00
def test_basic(self):
2011-02-16 09:10:24 +00:00
for i in (1, 2, 3):
2011-08-02 04:52:47 +00:00
assert "GET" in self._dummy_cycle(1, "~s", "", verbosity=i, eventlog=True)
assert "GET" in self._dummy_cycle(1, "~s", "\x00\x00\x00", verbosity=i)
assert "GET" in self._dummy_cycle(1, "~s", "ascii", verbosity=i)
2011-02-16 09:10:24 +00:00
2011-02-16 21:18:38 +00:00
def test_write(self):
with tutils.tmpdir() as d:
p = os.path.join(d, "a")
self._dummy_cycle(1, None, "", wfile=p, verbosity=0)
2013-06-15 22:23:44 +00:00
assert len(list(flow.FlowReader(open(p,"rb")).stream())) == 1
2011-02-16 21:18:38 +00:00
def test_write_err(self):
tutils.raises(
dump.DumpError,
self._dummy_cycle,
2011-08-02 04:52:47 +00:00
1,
None,
2012-02-10 02:04:20 +00:00
"",
wfile = "nonexistentdir/foo"
)
def test_script(self):
ret = self._dummy_cycle(
1, None, "",
scripts=[[tutils.test_data.path("scripts/all.py")]], verbosity=0, eventlog=True
)
assert "XCLIENTCONNECT" in ret
assert "XSERVERCONNECT" in ret
assert "XREQUEST" in ret
assert "XRESPONSE" in ret
assert "XCLIENTDISCONNECT" in ret
tutils.raises(
dump.DumpError,
self._dummy_cycle, 1, None, "", scripts=[["nonexistent"]]
)
tutils.raises(
dump.DumpError,
self._dummy_cycle, 1, None, "", scripts=[["starterr.py"]]
2011-02-16 21:18:38 +00:00
)
def test_stickycookie(self):
2011-08-03 22:34:34 +00:00
self._dummy_cycle(1, None, "", stickycookie = ".*")
2011-03-20 05:52:16 +00:00
def test_stickyauth(self):
2011-08-03 22:34:34 +00:00
self._dummy_cycle(1, None, "", stickyauth = ".*")
2011-02-16 21:18:38 +00:00