mitmproxy/test/mitmproxy/test_flow.py

584 lines
16 KiB
Python
Raw Normal View History

2015-08-01 08:40:19 +00:00
import mock
2016-07-08 00:29:22 +00:00
import io
2015-08-01 08:40:19 +00:00
import netlib.utils
2016-03-26 08:00:51 +00:00
from netlib.http import Headers
from mitmproxy import flowfilter, flow, options
from mitmproxy.contrib import tnetstring
2016-08-10 05:29:07 +00:00
from mitmproxy.exceptions import FlowReadException, Kill
from mitmproxy.models import Error
from mitmproxy.models import Flow
from mitmproxy.models import HTTPFlow
from mitmproxy.models import HTTPRequest
from mitmproxy.models import HTTPResponse
from mitmproxy.proxy import ProxyConfig
from mitmproxy.proxy.server import DummyServer
from mitmproxy.models.connections import ClientConnection
2016-02-02 12:25:31 +00:00
from . import tutils
2016-10-17 04:29:45 +00:00
class TestHTTPFlow:
2016-01-27 09:12:18 +00:00
2012-02-18 10:56:40 +00:00
def test_copy(self):
f = tutils.tflow(resp=True)
2016-02-14 13:45:27 +00:00
f.get_state()
2012-02-18 10:56:40 +00:00
f2 = f.copy()
a = f.get_state()
b = f2.get_state()
2014-12-24 00:39:38 +00:00
del a["id"]
del b["id"]
assert a == b
assert not f == f2
assert f is not f2
assert f.request.get_state() == f2.request.get_state()
assert f.request is not f2.request
2012-02-18 10:56:40 +00:00
assert f.request.headers == f2.request.headers
assert f.request.headers is not f2.request.headers
assert f.response.get_state() == f2.response.get_state()
assert f.response is not f2.response
2012-02-24 23:19:54 +00:00
f = tutils.tflow(err=True)
2012-02-24 23:19:54 +00:00
f2 = f.copy()
assert f is not f2
assert f.request is not f2.request
2012-02-24 23:19:54 +00:00
assert f.request.headers == f2.request.headers
assert f.request.headers is not f2.request.headers
assert f.error.get_state() == f2.error.get_state()
assert f.error is not f2.error
2012-02-18 10:56:40 +00:00
def test_match(self):
f = tutils.tflow(resp=True)
2016-10-03 10:04:17 +00:00
assert not flowfilter.match("~b test", f)
assert flowfilter.match(None, f)
assert not flowfilter.match("~b test", f)
f = tutils.tflow(err=True)
2016-10-03 10:04:17 +00:00
assert flowfilter.match("~e", f)
2016-10-03 10:04:17 +00:00
tutils.raises(ValueError, flowfilter.match, "~", f)
def test_backup(self):
f = tutils.tflow()
2015-08-30 13:27:29 +00:00
f.response = HTTPResponse.wrap(netlib.tutils.tresp())
2016-07-08 00:29:22 +00:00
f.request.content = b"foo"
assert not f.modified()
f.backup()
2016-07-08 00:29:22 +00:00
f.request.content = b"bar"
assert f.modified()
f.revert()
2016-07-08 00:29:22 +00:00
assert f.request.content == b"foo"
def test_backup_idempotence(self):
f = tutils.tflow(resp=True)
f.backup()
f.revert()
f.backup()
f.revert()
def test_getset_state(self):
f = tutils.tflow(resp=True)
state = f.get_state()
2015-08-30 13:27:29 +00:00
assert f.get_state() == HTTPFlow.from_state(
2015-05-30 00:03:28 +00:00
state).get_state()
2011-02-19 04:21:08 +00:00
f.response = None
2014-02-04 04:02:17 +00:00
f.error = Error("error")
state = f.get_state()
2015-08-30 13:27:29 +00:00
assert f.get_state() == HTTPFlow.from_state(
2015-05-30 00:03:28 +00:00
state).get_state()
2014-02-04 04:02:17 +00:00
f2 = f.copy()
2014-12-24 00:39:38 +00:00
f2.id = f.id # copy creates a different uuid
assert f.get_state() == f2.get_state()
assert not f == f2
2014-02-04 04:02:17 +00:00
f2.error = Error("e2")
assert not f == f2
2016-02-08 03:19:25 +00:00
f.set_state(f2.get_state())
assert f.get_state() == f2.get_state()
def test_kill(self):
2016-08-10 05:29:07 +00:00
fm = mock.Mock()
f = tutils.tflow()
2016-08-10 03:26:24 +00:00
f.reply.handle()
2016-08-10 05:29:07 +00:00
f.intercept(fm)
assert f.killable
2011-03-13 03:50:11 +00:00
f.kill(fm)
2016-08-10 05:29:07 +00:00
assert not f.killable
assert fm.error.called
assert f.reply.value == Kill
2011-08-02 04:52:47 +00:00
def test_killall(self):
srv = DummyServer(None)
2011-08-02 04:52:47 +00:00
s = flow.State()
fm = flow.FlowMaster(None, srv)
fm.addons.add(s)
2011-08-02 04:52:47 +00:00
f = tutils.tflow()
2016-08-10 03:26:24 +00:00
f.reply.handle()
f.intercept(fm)
2011-08-02 04:52:47 +00:00
s.killall(fm)
for i in s.view:
assert "killed" in str(i.error)
2011-08-02 04:52:47 +00:00
def test_resume(self):
f = tutils.tflow()
2016-08-10 03:26:24 +00:00
f.reply.handle()
2014-12-23 19:33:42 +00:00
f.intercept(mock.Mock())
2016-08-10 03:26:24 +00:00
assert f.reply.state == "taken"
f.resume(mock.Mock())
2016-08-10 03:26:24 +00:00
assert f.reply.state == "committed"
def test_replace_unicode(self):
f = tutils.tflow(resp=True)
2016-07-08 00:29:22 +00:00
f.response.content = b"\xc2foo"
f.replace(b"foo", u"bar")
2016-02-08 00:06:18 +00:00
def test_replace_no_content(self):
f = tutils.tflow()
2016-03-26 08:00:51 +00:00
f.request.content = None
2016-02-08 00:06:18 +00:00
assert f.replace("foo", "bar") == 0
def test_replace(self):
f = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
f.request.headers["foo"] = "foo"
2016-07-08 00:29:22 +00:00
f.request.content = b"afoob"
2015-09-05 18:45:58 +00:00
f.response.headers["foo"] = "foo"
2016-07-08 00:29:22 +00:00
f.response.content = b"afoob"
assert f.replace("foo", "bar") == 6
2015-09-05 18:45:58 +00:00
assert f.request.headers["bar"] == "bar"
2016-07-08 00:29:22 +00:00
assert f.request.content == b"abarb"
2015-09-05 18:45:58 +00:00
assert f.response.headers["bar"] == "bar"
2016-07-08 00:29:22 +00:00
assert f.response.content == b"abarb"
def test_replace_encoded(self):
f = tutils.tflow(resp=True)
2016-07-08 00:29:22 +00:00
f.request.content = b"afoob"
f.request.encode("gzip")
2016-07-08 00:29:22 +00:00
f.response.content = b"afoob"
f.response.encode("gzip")
f.replace("foo", "bar")
assert f.request.raw_content != b"abarb"
f.request.decode()
assert f.request.raw_content == b"abarb"
assert f.response.raw_content != b"abarb"
f.response.decode()
assert f.response.raw_content == b"abarb"
2016-07-15 11:32:08 +00:00
class TestTCPFlow:
def test_match(self):
f = tutils.ttcpflow()
2016-10-03 10:04:17 +00:00
assert not flowfilter.match("~b nonexistent", f)
assert flowfilter.match(None, f)
assert not flowfilter.match("~b nonexistent", f)
2016-07-15 11:32:08 +00:00
f = tutils.ttcpflow(err=True)
2016-10-03 10:04:17 +00:00
assert flowfilter.match("~e", f)
2016-07-15 11:32:08 +00:00
2016-10-03 10:04:17 +00:00
tutils.raises(ValueError, flowfilter.match, "~", f)
2016-07-15 11:32:08 +00:00
class TestState:
2016-01-27 09:12:18 +00:00
2011-02-02 23:16:03 +00:00
def test_backup(self):
c = flow.State()
f = tutils.tflow()
c.add_flow(f)
2011-02-02 23:16:03 +00:00
f.backup()
c.revert(f)
def test_flow(self):
"""
normal flow:
connect -> request -> response
"""
c = flow.State()
f = tutils.tflow()
c.add_flow(f)
assert f
assert c.flow_count() == 1
assert c.active_flow_count() == 1
2011-02-02 23:16:03 +00:00
newf = tutils.tflow()
assert c.add_flow(newf)
assert c.active_flow_count() == 2
2011-02-02 23:16:03 +00:00
2015-08-30 13:27:29 +00:00
f.response = HTTPResponse.wrap(netlib.tutils.tresp())
assert c.update_flow(f)
assert c.flow_count() == 2
assert c.active_flow_count() == 1
2011-02-02 23:16:03 +00:00
assert not c.update_flow(None)
assert c.active_flow_count() == 1
2015-08-30 13:27:29 +00:00
newf.response = HTTPResponse.wrap(netlib.tutils.tresp())
assert c.update_flow(newf)
assert c.active_flow_count() == 0
2011-02-02 23:16:03 +00:00
def test_err(self):
c = flow.State()
f = tutils.tflow()
c.add_flow(f)
f.error = Error("message")
assert c.update_flow(f)
2011-02-02 23:16:03 +00:00
2012-02-10 02:04:20 +00:00
c = flow.State()
f = tutils.tflow()
c.add_flow(f)
c.set_view_filter("~e")
assert not c.view
f.error = tutils.terr()
assert c.update_flow(f)
assert c.view
2012-02-10 02:04:20 +00:00
def test_set_view_filter(self):
2011-02-02 23:16:03 +00:00
c = flow.State()
f = tutils.tflow()
assert len(c.view) == 0
c.add_flow(f)
2011-02-02 23:16:03 +00:00
assert len(c.view) == 1
c.set_view_filter("~s")
assert c.filter_txt == "~s"
2011-02-02 23:16:03 +00:00
assert len(c.view) == 0
2015-08-30 13:27:29 +00:00
f.response = HTTPResponse.wrap(netlib.tutils.tresp())
c.update_flow(f)
assert len(c.view) == 1
c.set_view_filter(None)
assert len(c.view) == 1
2011-02-02 23:16:03 +00:00
f = tutils.tflow()
c.add_flow(f)
2011-02-02 23:16:03 +00:00
assert len(c.view) == 2
c.set_view_filter("~q")
assert len(c.view) == 1
c.set_view_filter("~s")
assert len(c.view) == 1
2011-02-02 23:16:03 +00:00
assert "Invalid" in c.set_view_filter("~")
2011-03-13 01:51:25 +00:00
def test_set_intercept(self):
c = flow.State()
assert not c.set_intercept("~q")
assert c.intercept_txt == "~q"
assert "Invalid" in c.set_intercept("~")
assert not c.set_intercept(None)
2015-05-30 00:03:28 +00:00
assert c.intercept_txt is None
2011-03-13 01:51:25 +00:00
2011-02-02 23:16:03 +00:00
def _add_request(self, state):
f = tutils.tflow()
state.add_flow(f)
2011-02-02 23:16:03 +00:00
return f
def _add_response(self, state):
f = tutils.tflow()
state.add_flow(f)
2015-08-30 13:27:29 +00:00
f.response = HTTPResponse.wrap(netlib.tutils.tresp())
state.update_flow(f)
2011-02-02 23:16:03 +00:00
def _add_error(self, state):
f = tutils.tflow(err=True)
state.add_flow(f)
2011-02-02 23:16:03 +00:00
def test_clear(self):
c = flow.State()
f = self._add_request(c)
2014-12-23 19:33:42 +00:00
f.intercepted = True
2011-02-02 23:16:03 +00:00
c.clear()
assert c.flow_count() == 0
2011-02-02 23:16:03 +00:00
def test_dump_flows(self):
c = flow.State()
self._add_request(c)
self._add_response(c)
self._add_request(c)
self._add_response(c)
self._add_request(c)
self._add_response(c)
self._add_error(c)
flows = c.view[:]
2011-02-02 23:16:03 +00:00
c.clear()
c.load_flows(flows)
assert isinstance(c.flows[0], Flow)
2011-02-02 23:16:03 +00:00
def test_accept_all(self):
c = flow.State()
self._add_request(c)
self._add_response(c)
self._add_request(c)
2014-12-23 19:33:42 +00:00
c.accept_all(mock.Mock())
2011-02-02 23:16:03 +00:00
class TestSerialize:
2016-01-27 09:12:18 +00:00
def _treader(self):
2016-07-08 00:29:22 +00:00
sio = io.BytesIO()
w = flow.FlowWriter(sio)
for i in range(3):
f = tutils.tflow(resp=True)
w.add(f)
for i in range(3):
f = tutils.tflow(err=True)
w.add(f)
2016-05-20 20:27:26 +00:00
f = tutils.ttcpflow()
w.add(f)
f = tutils.ttcpflow(err=True)
w.add(f)
sio.seek(0)
return flow.FlowReader(sio)
def test_roundtrip(self):
2016-07-08 00:29:22 +00:00
sio = io.BytesIO()
f = tutils.tflow()
f.marked = True
2016-07-08 00:29:22 +00:00
f.request.content = bytes(bytearray(range(256)))
w = flow.FlowWriter(sio)
w.add(f)
sio.seek(0)
r = flow.FlowReader(sio)
l = list(r.stream())
assert len(l) == 1
f2 = l[0]
assert f2.get_state() == f.get_state()
assert f2.request == f.request
assert f2.marked
def test_load_flows(self):
r = self._treader()
s = flow.State()
fm = flow.FlowMaster(None, None)
fm.addons.add(s)
fm.load_flows(r)
assert len(s.flows) == 6
def test_load_flows_reverse(self):
r = self._treader()
s = flow.State()
opts = options.Options(
2015-05-30 00:03:28 +00:00
mode="reverse",
upstream_server="https://use-this-domain"
)
conf = ProxyConfig(opts)
fm = flow.FlowMaster(opts, DummyServer(conf))
fm.addons.add(s)
fm.load_flows(r)
assert s.flows[0].request.host == "use-this-domain"
def test_filter(self):
2016-07-08 00:29:22 +00:00
sio = io.BytesIO()
flt = flowfilter.parse("~c 200")
w = flow.FilteredFlowWriter(sio, flt)
f = tutils.tflow(resp=True)
f.response.status_code = 200
w.add(f)
f = tutils.tflow(resp=True)
f.response.status_code = 201
w.add(f)
sio.seek(0)
r = flow.FlowReader(sio)
assert len(list(r.stream()))
def test_error(self):
2016-07-08 00:29:22 +00:00
sio = io.BytesIO()
sio.write(b"bogus")
sio.seek(0)
r = flow.FlowReader(sio)
2016-04-29 18:17:49 +00:00
tutils.raises(FlowReadException, list, r.stream())
2016-04-29 18:17:49 +00:00
f = FlowReadException("foo")
assert str(f) == "foo"
2012-04-10 22:10:53 +00:00
def test_versioncheck(self):
f = tutils.tflow()
d = f.get_state()
2012-04-10 22:10:53 +00:00
d["version"] = (0, 0)
2016-07-08 00:29:22 +00:00
sio = io.BytesIO()
2012-04-10 22:10:53 +00:00
tnetstring.dump(d, sio)
sio.seek(0)
r = flow.FlowReader(sio)
tutils.raises("version", list, r.stream())
2012-04-10 22:10:53 +00:00
class TestFlowMaster:
2016-01-27 09:12:18 +00:00
def test_replay(self):
fm = flow.FlowMaster(None, None)
f = tutils.tflow(resp=True)
2016-03-26 08:00:51 +00:00
f.request.content = None
tutils.raises("missing", fm.replay_request, f)
2014-12-23 19:33:42 +00:00
f.intercepted = True
tutils.raises("intercepted", fm.replay_request, f)
2015-02-07 18:33:36 +00:00
f.live = True
tutils.raises("live", fm.replay_request, f)
def test_create_flow(self):
fm = flow.FlowMaster(None, None)
assert fm.create_request("GET", "http", "example.com", 80, "/")
2011-02-19 04:21:08 +00:00
def test_all(self):
2011-02-16 03:43:35 +00:00
s = flow.State()
fm = flow.FlowMaster(None, None)
fm.addons.add(s)
f = tutils.tflow(req=None)
fm.clientconnect(f.client_conn)
2015-08-30 13:27:29 +00:00
f.request = HTTPRequest.wrap(netlib.tutils.treq())
fm.request(f)
assert s.flow_count() == 1
2015-08-30 13:27:29 +00:00
f.response = HTTPResponse.wrap(netlib.tutils.tresp())
fm.response(f)
assert s.flow_count() == 1
2011-02-19 04:21:08 +00:00
fm.clientdisconnect(f.client_conn)
2011-02-19 04:21:08 +00:00
f.error = Error("msg")
fm.error(f)
2011-02-19 04:21:08 +00:00
2012-02-10 02:55:58 +00:00
fm.shutdown()
2015-05-30 00:03:28 +00:00
class TestRequest:
2016-01-27 09:12:18 +00:00
def test_simple(self):
f = tutils.tflow()
r = f.request
2014-09-03 21:44:54 +00:00
u = r.url
r.url = u
tutils.raises(ValueError, setattr, r, "url", "")
assert r.url == u
r2 = r.copy()
assert r.get_state() == r2.get_state()
def test_get_url(self):
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
2014-09-03 21:44:54 +00:00
assert r.url == "http://address:22/path"
r.scheme = "https"
2014-09-03 21:44:54 +00:00
assert r.url == "https://address:22/path"
r.host = "host"
r.port = 42
2014-09-03 21:44:54 +00:00
assert r.url == "https://host:42/path"
r.host = "address"
r.port = 22
2015-05-30 00:03:28 +00:00
assert r.url == "https://address:22/path"
assert r.pretty_url == "https://address:22/path"
2016-02-18 16:28:32 +00:00
r.headers["Host"] = "foo.com:22"
assert r.url == "https://address:22/path"
assert r.pretty_url == "https://foo.com:22/path"
def test_replace(self):
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
r.path = "path/foo"
2015-09-05 18:45:58 +00:00
r.headers["Foo"] = "fOo"
2016-07-08 00:29:22 +00:00
r.content = b"afoob"
assert r.replace("foo(?i)", "boo") == 4
assert r.path == "path/boo"
2016-07-08 00:29:22 +00:00
assert b"foo" not in r.content
2015-09-05 18:45:58 +00:00
assert r.headers["boo"] == "boo"
2012-02-10 02:04:20 +00:00
def test_constrain_encoding(self):
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
2015-09-05 18:45:58 +00:00
r.headers["accept-encoding"] = "gzip, oink"
r.constrain_encoding()
assert "oink" not in r.headers["accept-encoding"]
r.headers.set_all("accept-encoding", ["gzip", "oink"])
2012-02-10 02:04:20 +00:00
r.constrain_encoding()
assert "oink" not in r.headers["accept-encoding"]
def test_get_content_type(self):
2015-08-30 13:27:29 +00:00
resp = HTTPResponse.wrap(netlib.tutils.tresp())
2015-09-05 18:45:58 +00:00
resp.headers = Headers(content_type="text/plain")
assert resp.headers["content-type"] == "text/plain"
2015-05-30 00:03:28 +00:00
class TestResponse:
2016-01-27 09:12:18 +00:00
def test_simple(self):
f = tutils.tflow(resp=True)
resp = f.response
resp2 = resp.copy()
assert resp2.get_state() == resp.get_state()
def test_replace(self):
2015-08-30 13:27:29 +00:00
r = HTTPResponse.wrap(netlib.tutils.tresp())
2015-09-05 18:45:58 +00:00
r.headers["Foo"] = "fOo"
2016-07-08 00:29:22 +00:00
r.content = b"afoob"
assert r.replace("foo(?i)", "boo") == 3
2016-07-08 00:29:22 +00:00
assert b"foo" not in r.content
2015-09-05 18:45:58 +00:00
assert r.headers["boo"] == "boo"
def test_get_content_type(self):
2015-08-30 13:27:29 +00:00
resp = HTTPResponse.wrap(netlib.tutils.tresp())
2015-09-05 18:45:58 +00:00
resp.headers = Headers(content_type="text/plain")
assert resp.headers["content-type"] == "text/plain"
class TestError:
2016-01-27 09:12:18 +00:00
def test_getset_state(self):
2014-02-04 04:02:17 +00:00
e = Error("Error")
state = e.get_state()
assert Error.from_state(state).get_state() == e.get_state()
assert e.copy()
2014-02-04 04:02:17 +00:00
e2 = Error("bar")
assert not e == e2
2016-02-08 03:19:25 +00:00
e.set_state(e2.get_state())
assert e.get_state() == e2.get_state()
e3 = e.copy()
assert e3.get_state() == e.get_state()
2016-05-20 20:27:26 +00:00
def test_repr(self):
e = Error("yay")
assert repr(e)
2014-02-04 04:02:17 +00:00
class TestClientConnection:
def test_state(self):
2014-02-04 04:02:17 +00:00
c = tutils.tclient_conn()
assert ClientConnection.from_state(c.get_state()).get_state() == \
2015-05-30 00:03:28 +00:00
c.get_state()
2014-02-04 04:02:17 +00:00
c2 = tutils.tclient_conn()
c2.address.address = (c2.address.host, 4242)
assert not c == c2
2014-02-04 04:02:17 +00:00
c2.timestamp_start = 42
2016-02-08 03:19:25 +00:00
c.set_state(c2.get_state())
2014-02-04 04:02:17 +00:00
assert c.timestamp_start == 42
c3 = c.copy()
assert c3.get_state() == c.get_state()
2013-04-29 21:13:33 +00:00
assert str(c)