mitmproxy/test/mitmproxy/test_flow.py

1277 lines
35 KiB
Python
Raw Normal View History

2015-05-30 00:03:28 +00:00
import os.path
2016-03-20 18:40:03 +00:00
from six.moves import cStringIO as StringIO
2015-08-30 13:27:29 +00:00
2015-08-01 08:40:19 +00:00
import mock
2015-08-01 08:40:19 +00:00
import netlib.utils
2016-03-26 08:00:51 +00:00
from netlib.http import Headers
from mitmproxy import filt, controller, tnetstring, flow
2016-05-11 17:15:26 +00:00
from mitmproxy.exceptions import FlowReadException, ScriptException
from mitmproxy.models import Error
from mitmproxy.models import Flow
from mitmproxy.models import HTTPFlow
from mitmproxy.models import HTTPRequest
from mitmproxy.models import HTTPResponse
from mitmproxy.proxy.config import HostMatcher
from mitmproxy.proxy import ProxyConfig
from mitmproxy.proxy.server import DummyServer
from mitmproxy.models.connections import ClientConnection
2016-02-02 12:25:31 +00:00
from . import tutils
def test_app_registry():
ar = flow.AppRegistry()
ar.add("foo", "domain", 80)
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
r.host = "domain"
r.port = 80
assert ar.get(r)
r.port = 81
assert not ar.get(r)
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
r.host = "domain2"
r.port = 80
assert not ar.get(r)
2015-09-05 18:45:58 +00:00
r.headers["host"] = "domain"
assert ar.get(r)
class TestStickyCookieState:
2016-01-27 09:12:18 +00:00
def _response(self, cookie, host):
s = flow.StickyCookieState(filt.parse(".*"))
2015-08-01 08:40:19 +00:00
f = tutils.tflow(req=netlib.tutils.treq(host=host, port=80), resp=True)
2015-09-05 18:45:58 +00:00
f.response.headers["Set-Cookie"] = cookie
s.handle_response(f)
return s, f
2012-02-10 02:04:20 +00:00
def test_domain_match(self):
s = flow.StickyCookieState(filt.parse(".*"))
assert s.domain_match("www.google.com", ".google.com")
assert s.domain_match("google.com", ".google.com")
def test_response(self):
c = "SSID=mooo; domain=.google.com, FOO=bar; Domain=.google.com; Path=/; "\
"Expires=Wed, 13-Jan-2021 22:23:01 GMT; Secure; "
s, f = self._response(c, "host")
assert not s.jar.keys()
s, f = self._response(c, "www.google.com")
assert s.jar.keys()
s, f = self._response("SSID=mooo", "www.google.com")
assert s.jar.keys()[0] == ('www.google.com', 80, '/')
# Test setting of multiple cookies
c1 = "somecookie=test; Path=/"
c2 = "othercookie=helloworld; Path=/"
s, f = self._response(c1, "www.google.com")
f.response.headers["Set-Cookie"] = c2
s.handle_response(f)
googlekey = s.jar.keys()[0]
assert len(s.jar[googlekey].keys()) == 2
# Test setting of weird cookie keys
s = flow.StickyCookieState(filt.parse(".*"))
f = tutils.tflow(req=netlib.tutils.treq(host="www.google.com", port=80), resp=True)
cs = [
"foo/bar=hello",
"foo:bar=world",
"foo@bar=fizz",
"foo,bar=buzz",
]
for c in cs:
f.response.headers["Set-Cookie"] = c
s.handle_response(f)
googlekey = s.jar.keys()[0]
assert len(s.jar[googlekey].keys()) == len(cs)
# Test overwriting of a cookie value
c1 = "somecookie=helloworld; Path=/"
c2 = "somecookie=newvalue; Path=/"
s, f = self._response(c1, "www.google.com")
f.response.headers["Set-Cookie"] = c2
s.handle_response(f)
googlekey = s.jar.keys()[0]
assert len(s.jar[googlekey].keys()) == 1
assert s.jar[googlekey]["somecookie"].items()[0][1] == "newvalue"
def test_request(self):
s, f = self._response("SSID=mooo", "www.google.com")
assert "cookie" not in f.request.headers
s.handle_request(f)
assert "cookie" in f.request.headers
class TestStickyAuthState:
2016-01-27 09:12:18 +00:00
def test_response(self):
s = flow.StickyAuthState(filt.parse(".*"))
f = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
f.request.headers["authorization"] = "foo"
s.handle_request(f)
assert "address" in s.hosts
f = tutils.tflow(resp=True)
s.handle_request(f)
2015-09-05 18:45:58 +00:00
assert f.request.headers["authorization"] == "foo"
class TestClientPlaybackState:
2016-01-27 09:12:18 +00:00
2011-03-04 00:08:43 +00:00
def test_tick(self):
first = tutils.tflow()
2011-03-04 00:08:43 +00:00
s = flow.State()
fm = flow.FlowMaster(None, s)
fm.start_client_playback([first, tutils.tflow()], True)
c = fm.client_playback
c.testing = True
assert not c.done()
assert not s.flow_count()
2011-03-04 00:08:43 +00:00
assert c.count() == 2
c.tick(fm)
assert s.flow_count()
2011-03-04 00:08:43 +00:00
assert c.count() == 1
c.tick(fm)
2011-03-04 00:08:43 +00:00
assert c.count() == 1
2011-03-05 22:21:31 +00:00
c.clear(c.current)
c.tick(fm)
2011-03-04 00:08:43 +00:00
assert c.count() == 0
c.clear(c.current)
assert c.done()
2011-03-04 00:08:43 +00:00
fm.state.clear()
fm.tick(timeout=0)
2011-03-04 00:08:43 +00:00
fm.stop_client_playback()
assert not fm.client_playback
class TestServerPlaybackState:
2016-01-27 09:12:18 +00:00
def test_hash(self):
2015-05-30 00:03:28 +00:00
s = flow.ServerPlaybackState(
None,
[],
False,
False,
None,
False,
None,
False)
r = tutils.tflow()
r2 = tutils.tflow()
assert s._hash(r)
assert s._hash(r) == s._hash(r2)
2015-09-05 18:45:58 +00:00
r.request.headers["foo"] = "bar"
assert s._hash(r) == s._hash(r2)
r.request.path = "voing"
assert s._hash(r) != s._hash(r2)
r.request.path = "path?blank_value"
r2.request.path = "path?"
assert s._hash(r) != s._hash(r2)
def test_headers(self):
2015-05-30 00:03:28 +00:00
s = flow.ServerPlaybackState(
["foo"],
[],
False,
False,
None,
False,
None,
False)
r = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
r.request.headers["foo"] = "bar"
r2 = tutils.tflow(resp=True)
assert not s._hash(r) == s._hash(r2)
2015-09-05 18:45:58 +00:00
r2.request.headers["foo"] = "bar"
assert s._hash(r) == s._hash(r2)
2015-09-05 18:45:58 +00:00
r2.request.headers["oink"] = "bar"
assert s._hash(r) == s._hash(r2)
r = tutils.tflow(resp=True)
r2 = tutils.tflow(resp=True)
assert s._hash(r) == s._hash(r2)
def test_load(self):
r = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
r.request.headers["key"] = "one"
r2 = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
r2.request.headers["key"] = "two"
2015-05-30 00:03:28 +00:00
s = flow.ServerPlaybackState(
None, [
r, r2], False, False, None, False, None, False)
assert s.count() == 2
assert len(s.fmap.keys()) == 1
n = s.next_flow(r)
2015-09-05 18:45:58 +00:00
assert n.request.headers["key"] == "one"
assert s.count() == 1
n = s.next_flow(r)
2015-09-05 18:45:58 +00:00
assert n.request.headers["key"] == "two"
assert s.count() == 0
assert not s.next_flow(r)
def test_load_with_nopop(self):
r = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
r.request.headers["key"] = "one"
r2 = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
r2.request.headers["key"] = "two"
2015-05-30 00:03:28 +00:00
s = flow.ServerPlaybackState(
None, [
r, r2], False, True, None, False, None, False)
assert s.count() == 2
2012-07-15 20:42:59 +00:00
s.next_flow(r)
assert s.count() == 2
def test_ignore_params(self):
2015-05-30 00:03:28 +00:00
s = flow.ServerPlaybackState(
None, [], False, False, [
"param1", "param2"], False, None, False)
r = tutils.tflow(resp=True)
2015-05-30 00:03:28 +00:00
r.request.path = "/test?param1=1"
r2 = tutils.tflow(resp=True)
2015-05-30 00:03:28 +00:00
r2.request.path = "/test"
assert s._hash(r) == s._hash(r2)
2015-05-30 00:03:28 +00:00
r2.request.path = "/test?param1=2"
assert s._hash(r) == s._hash(r2)
2015-05-30 00:03:28 +00:00
r2.request.path = "/test?param2=1"
assert s._hash(r) == s._hash(r2)
2015-05-30 00:03:28 +00:00
r2.request.path = "/test?param3=2"
assert not s._hash(r) == s._hash(r2)
def test_ignore_payload_params(self):
2015-05-30 00:03:28 +00:00
s = flow.ServerPlaybackState(
None, [], False, False, None, False, [
"param1", "param2"], False)
r = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
r.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
r.request.content = "paramx=x&param1=1"
r2 = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
r2.request.content = "paramx=x&param1=1"
# same parameters
assert s._hash(r) == s._hash(r2)
# ignored parameters !=
r2.request.content = "paramx=x&param1=2"
assert s._hash(r) == s._hash(r2)
# missing parameter
2015-05-30 00:03:28 +00:00
r2.request.content = "paramx=x"
assert s._hash(r) == s._hash(r2)
# ignorable parameter added
2015-05-30 00:03:28 +00:00
r2.request.content = "paramx=x&param1=2"
assert s._hash(r) == s._hash(r2)
# not ignorable parameter changed
2015-05-30 00:03:28 +00:00
r2.request.content = "paramx=y&param1=1"
assert not s._hash(r) == s._hash(r2)
# not ignorable parameter missing
2015-05-30 00:03:28 +00:00
r2.request.content = "param1=1"
assert not s._hash(r) == s._hash(r2)
def test_ignore_payload_params_other_content_type(self):
2015-05-30 00:03:28 +00:00
s = flow.ServerPlaybackState(
None, [], False, False, None, False, [
"param1", "param2"], False)
r = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
r.request.headers["Content-Type"] = "application/json"
r.request.content = '{"param1":"1"}'
r2 = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
r2.request.headers["Content-Type"] = "application/json"
r2.request.content = '{"param1":"1"}'
# same content
assert s._hash(r) == s._hash(r2)
# distint content (note only x-www-form-urlencoded payload is analysed)
r2.request.content = '{"param1":"2"}'
assert not s._hash(r) == s._hash(r2)
def test_ignore_payload_wins_over_params(self):
2015-05-30 00:03:28 +00:00
# NOTE: parameters are mutually exclusive in options
s = flow.ServerPlaybackState(
None, [], False, False, None, True, [
"param1", "param2"], False)
r = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
r.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
r.request.content = "paramx=y"
r2 = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
r2.request.content = "paramx=x"
# same parameters
assert s._hash(r) == s._hash(r2)
def test_ignore_content(self):
2015-05-30 00:03:28 +00:00
s = flow.ServerPlaybackState(
None,
[],
False,
False,
None,
False,
None,
False)
r = tutils.tflow(resp=True)
r2 = tutils.tflow(resp=True)
r.request.content = "foo"
r2.request.content = "foo"
assert s._hash(r) == s._hash(r2)
r2.request.content = "bar"
assert not s._hash(r) == s._hash(r2)
2015-05-30 00:03:28 +00:00
# now ignoring content
s = flow.ServerPlaybackState(
None,
[],
False,
False,
None,
True,
None,
False)
r = tutils.tflow(resp=True)
r2 = tutils.tflow(resp=True)
r.request.content = "foo"
r2.request.content = "foo"
assert s._hash(r) == s._hash(r2)
r2.request.content = "bar"
assert s._hash(r) == s._hash(r2)
r2.request.content = ""
assert s._hash(r) == s._hash(r2)
r2.request.content = None
assert s._hash(r) == s._hash(r2)
def test_ignore_host(self):
2015-05-30 00:03:28 +00:00
s = flow.ServerPlaybackState(
None,
[],
False,
False,
None,
False,
None,
True)
r = tutils.tflow(resp=True)
r2 = tutils.tflow(resp=True)
2015-05-30 00:03:28 +00:00
r.request.host = "address"
r2.request.host = "address"
assert s._hash(r) == s._hash(r2)
2015-05-30 00:03:28 +00:00
r2.request.host = "wrong_address"
assert s._hash(r) == s._hash(r2)
class TestFlow(object):
2016-01-27 09:12:18 +00:00
2012-02-18 10:56:40 +00:00
def test_copy(self):
f = tutils.tflow(resp=True)
2016-02-14 13:45:27 +00:00
f.get_state()
2012-02-18 10:56:40 +00:00
f2 = f.copy()
a = f.get_state()
b = f2.get_state()
2014-12-24 00:39:38 +00:00
del a["id"]
del b["id"]
assert a == b
assert not f == f2
2012-02-18 10:56:40 +00:00
assert not f is f2
assert f.request.get_state() == f2.request.get_state()
2012-02-18 10:56:40 +00:00
assert not f.request is f2.request
assert f.request.headers == f2.request.headers
assert not f.request.headers is f2.request.headers
assert f.response.get_state() == f2.response.get_state()
2012-02-24 23:19:54 +00:00
assert not f.response is f2.response
f = tutils.tflow(err=True)
2012-02-24 23:19:54 +00:00
f2 = f.copy()
assert not f is f2
assert not f.request is f2.request
assert f.request.headers == f2.request.headers
assert not f.request.headers is f2.request.headers
assert f.error.get_state() == f2.error.get_state()
2012-02-24 23:19:54 +00:00
assert not f.error is f2.error
2012-02-18 10:56:40 +00:00
def test_match(self):
f = tutils.tflow(resp=True)
assert not f.match("~b test")
assert f.match(None)
assert not f.match("~b test")
f = tutils.tflow(err=True)
assert f.match("~e")
tutils.raises(ValueError, f.match, "~")
def test_backup(self):
f = tutils.tflow()
2015-08-30 13:27:29 +00:00
f.response = HTTPResponse.wrap(netlib.tutils.tresp())
f.request.content = "foo"
assert not f.modified()
f.backup()
f.request.content = "bar"
assert f.modified()
f.revert()
assert f.request.content == "foo"
def test_backup_idempotence(self):
f = tutils.tflow(resp=True)
f.backup()
f.revert()
f.backup()
f.revert()
def test_getset_state(self):
f = tutils.tflow(resp=True)
state = f.get_state()
2015-08-30 13:27:29 +00:00
assert f.get_state() == HTTPFlow.from_state(
2015-05-30 00:03:28 +00:00
state).get_state()
2011-02-19 04:21:08 +00:00
f.response = None
2014-02-04 04:02:17 +00:00
f.error = Error("error")
state = f.get_state()
2015-08-30 13:27:29 +00:00
assert f.get_state() == HTTPFlow.from_state(
2015-05-30 00:03:28 +00:00
state).get_state()
2014-02-04 04:02:17 +00:00
f2 = f.copy()
2014-12-24 00:39:38 +00:00
f2.id = f.id # copy creates a different uuid
assert f.get_state() == f2.get_state()
assert not f == f2
2014-02-04 04:02:17 +00:00
f2.error = Error("e2")
assert not f == f2
2016-02-08 03:19:25 +00:00
f.set_state(f2.get_state())
assert f.get_state() == f2.get_state()
def test_kill(self):
2011-03-13 03:50:11 +00:00
s = flow.State()
fm = flow.FlowMaster(None, s)
f = tutils.tflow()
2014-12-23 19:33:42 +00:00
f.intercept(mock.Mock())
2011-03-13 03:50:11 +00:00
f.kill(fm)
for i in s.view:
assert "killed" in str(i.error)
2011-08-02 04:52:47 +00:00
def test_killall(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
f = tutils.tflow()
f.intercept(fm)
2011-08-02 04:52:47 +00:00
s.killall(fm)
for i in s.view:
assert "killed" in str(i.error)
2011-08-02 04:52:47 +00:00
def test_accept_intercept(self):
f = tutils.tflow()
2014-12-23 19:33:42 +00:00
f.intercept(mock.Mock())
assert not f.reply.acked
2014-12-23 19:33:42 +00:00
f.accept_intercept(mock.Mock())
assert f.reply.acked
def test_replace_unicode(self):
f = tutils.tflow(resp=True)
f.response.content = "\xc2foo"
f.replace("foo", u"bar")
2016-02-08 00:06:18 +00:00
def test_replace_no_content(self):
f = tutils.tflow()
2016-03-26 08:00:51 +00:00
f.request.content = None
2016-02-08 00:06:18 +00:00
assert f.replace("foo", "bar") == 0
def test_replace(self):
f = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
f.request.headers["foo"] = "foo"
f.request.content = "afoob"
2015-09-05 18:45:58 +00:00
f.response.headers["foo"] = "foo"
f.response.content = "afoob"
assert f.replace("foo", "bar") == 6
2015-09-05 18:45:58 +00:00
assert f.request.headers["bar"] == "bar"
assert f.request.content == "abarb"
2015-09-05 18:45:58 +00:00
assert f.response.headers["bar"] == "bar"
assert f.response.content == "abarb"
def test_replace_encoded(self):
f = tutils.tflow(resp=True)
f.request.content = "afoob"
f.request.encode("gzip")
f.response.content = "afoob"
f.response.encode("gzip")
f.replace("foo", "bar")
assert f.request.content != "abarb"
f.request.decode()
assert f.request.content == "abarb"
assert f.response.content != "abarb"
f.response.decode()
assert f.response.content == "abarb"
class TestState:
2016-01-27 09:12:18 +00:00
2011-02-02 23:16:03 +00:00
def test_backup(self):
c = flow.State()
f = tutils.tflow()
c.add_flow(f)
2011-02-02 23:16:03 +00:00
f.backup()
c.revert(f)
def test_flow(self):
"""
normal flow:
connect -> request -> response
"""
c = flow.State()
f = tutils.tflow()
c.add_flow(f)
assert f
assert c.flow_count() == 1
assert c.active_flow_count() == 1
2011-02-02 23:16:03 +00:00
newf = tutils.tflow()
assert c.add_flow(newf)
assert c.active_flow_count() == 2
2011-02-02 23:16:03 +00:00
2015-08-30 13:27:29 +00:00
f.response = HTTPResponse.wrap(netlib.tutils.tresp())
assert c.update_flow(f)
assert c.flow_count() == 2
assert c.active_flow_count() == 1
2011-02-02 23:16:03 +00:00
assert not c.update_flow(None)
assert c.active_flow_count() == 1
2015-08-30 13:27:29 +00:00
newf.response = HTTPResponse.wrap(netlib.tutils.tresp())
assert c.update_flow(newf)
assert c.active_flow_count() == 0
2011-02-02 23:16:03 +00:00
def test_err(self):
c = flow.State()
f = tutils.tflow()
c.add_flow(f)
f.error = Error("message")
assert c.update_flow(f)
2011-02-02 23:16:03 +00:00
2012-02-10 02:04:20 +00:00
c = flow.State()
f = tutils.tflow()
c.add_flow(f)
c.set_limit("~e")
assert not c.view
f.error = tutils.terr()
assert c.update_flow(f)
assert c.view
2012-02-10 02:04:20 +00:00
2011-03-13 01:51:25 +00:00
def test_set_limit(self):
2011-02-02 23:16:03 +00:00
c = flow.State()
f = tutils.tflow()
assert len(c.view) == 0
c.add_flow(f)
2011-02-02 23:16:03 +00:00
assert len(c.view) == 1
c.set_limit("~s")
2011-08-02 04:52:47 +00:00
assert c.limit_txt == "~s"
2011-02-02 23:16:03 +00:00
assert len(c.view) == 0
2015-08-30 13:27:29 +00:00
f.response = HTTPResponse.wrap(netlib.tutils.tresp())
c.update_flow(f)
assert len(c.view) == 1
2011-02-02 23:16:03 +00:00
c.set_limit(None)
assert len(c.view) == 1
2011-02-02 23:16:03 +00:00
f = tutils.tflow()
c.add_flow(f)
2011-02-02 23:16:03 +00:00
assert len(c.view) == 2
c.set_limit("~q")
assert len(c.view) == 1
c.set_limit("~s")
assert len(c.view) == 1
2011-02-02 23:16:03 +00:00
2011-03-13 01:51:25 +00:00
assert "Invalid" in c.set_limit("~")
def test_set_intercept(self):
c = flow.State()
assert not c.set_intercept("~q")
assert c.intercept_txt == "~q"
assert "Invalid" in c.set_intercept("~")
assert not c.set_intercept(None)
2015-05-30 00:03:28 +00:00
assert c.intercept_txt is None
2011-03-13 01:51:25 +00:00
2011-02-02 23:16:03 +00:00
def _add_request(self, state):
f = tutils.tflow()
state.add_flow(f)
2011-02-02 23:16:03 +00:00
return f
def _add_response(self, state):
f = tutils.tflow()
state.add_flow(f)
2015-08-30 13:27:29 +00:00
f.response = HTTPResponse.wrap(netlib.tutils.tresp())
state.update_flow(f)
2011-02-02 23:16:03 +00:00
def _add_error(self, state):
f = tutils.tflow(err=True)
state.add_flow(f)
2011-02-02 23:16:03 +00:00
def test_clear(self):
c = flow.State()
f = self._add_request(c)
2014-12-23 19:33:42 +00:00
f.intercepted = True
2011-02-02 23:16:03 +00:00
c.clear()
assert c.flow_count() == 0
2011-02-02 23:16:03 +00:00
def test_dump_flows(self):
c = flow.State()
self._add_request(c)
self._add_response(c)
self._add_request(c)
self._add_response(c)
self._add_request(c)
self._add_response(c)
self._add_error(c)
flows = c.view[:]
2011-02-02 23:16:03 +00:00
c.clear()
c.load_flows(flows)
assert isinstance(c.flows[0], Flow)
2011-02-02 23:16:03 +00:00
def test_accept_all(self):
c = flow.State()
self._add_request(c)
self._add_response(c)
self._add_request(c)
2014-12-23 19:33:42 +00:00
c.accept_all(mock.Mock())
2011-02-02 23:16:03 +00:00
class TestSerialize:
2016-01-27 09:12:18 +00:00
def _treader(self):
sio = StringIO()
w = flow.FlowWriter(sio)
for i in range(3):
f = tutils.tflow(resp=True)
w.add(f)
for i in range(3):
f = tutils.tflow(err=True)
w.add(f)
2016-05-20 20:27:26 +00:00
f = tutils.ttcpflow()
w.add(f)
f = tutils.ttcpflow(err=True)
w.add(f)
sio.seek(0)
return flow.FlowReader(sio)
def test_roundtrip(self):
sio = StringIO()
f = tutils.tflow()
f.request.content = "".join(chr(i) for i in range(255))
w = flow.FlowWriter(sio)
w.add(f)
sio.seek(0)
r = flow.FlowReader(sio)
l = list(r.stream())
assert len(l) == 1
f2 = l[0]
assert f2.get_state() == f.get_state()
assert f2.request == f.request
def test_load_flows(self):
r = self._treader()
s = flow.State()
fm = flow.FlowMaster(None, s)
fm.load_flows(r)
assert len(s.flows) == 6
def test_load_flows_reverse(self):
r = self._treader()
s = flow.State()
2015-05-30 00:03:28 +00:00
conf = ProxyConfig(
mode="reverse",
upstream_server=("https", ("use-this-domain", 80))
)
fm = flow.FlowMaster(DummyServer(conf), s)
fm.load_flows(r)
assert s.flows[0].request.host == "use-this-domain"
def test_filter(self):
sio = StringIO()
fl = filt.parse("~c 200")
w = flow.FilteredFlowWriter(sio, fl)
f = tutils.tflow(resp=True)
f.response.status_code = 200
w.add(f)
f = tutils.tflow(resp=True)
f.response.status_code = 201
w.add(f)
sio.seek(0)
r = flow.FlowReader(sio)
assert len(list(r.stream()))
def test_error(self):
sio = StringIO()
sio.write("bogus")
sio.seek(0)
r = flow.FlowReader(sio)
2016-04-29 18:17:49 +00:00
tutils.raises(FlowReadException, list, r.stream())
2016-04-29 18:17:49 +00:00
f = FlowReadException("foo")
assert str(f) == "foo"
2012-04-10 22:10:53 +00:00
def test_versioncheck(self):
f = tutils.tflow()
d = f.get_state()
2012-04-10 22:10:53 +00:00
d["version"] = (0, 0)
sio = StringIO()
tnetstring.dump(d, sio)
sio.seek(0)
r = flow.FlowReader(sio)
tutils.raises("version", list, r.stream())
2012-04-10 22:10:53 +00:00
class TestFlowMaster:
2016-01-27 09:12:18 +00:00
def test_load_script(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
2016-05-11 17:15:26 +00:00
fm.load_script(tutils.test_data.path("scripts/a.py"))
fm.load_script(tutils.test_data.path("scripts/a.py"))
fm.unload_scripts()
with tutils.raises(ScriptException):
fm.load_script("nonexistent")
try:
fm.load_script(tutils.test_data.path("scripts/starterr.py"))
except ScriptException as e:
assert "ValueError" in str(e)
assert len(fm.scripts) == 0
2014-09-08 10:20:40 +00:00
def test_getset_ignore(self):
p = mock.Mock()
2014-10-18 16:29:35 +00:00
p.config.check_ignore = HostMatcher()
2014-09-08 10:20:40 +00:00
fm = flow.FlowMaster(p, flow.State())
2014-10-18 16:29:35 +00:00
assert not fm.get_ignore_filter()
fm.set_ignore_filter(["^apple\.com:", ":443$"])
assert fm.get_ignore_filter()
2014-09-08 10:20:40 +00:00
def test_replay(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
f = tutils.tflow(resp=True)
2016-03-26 08:00:51 +00:00
f.request.content = None
assert "missing" in fm.replay_request(f)
2014-12-23 19:33:42 +00:00
f.intercepted = True
assert "intercepting" in fm.replay_request(f)
2015-02-07 18:33:36 +00:00
f.live = True
assert "live" in fm.replay_request(f, run_scripthooks=True)
2012-02-10 02:55:58 +00:00
def test_script_reqerr(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
2016-05-11 17:15:26 +00:00
fm.load_script(tutils.test_data.path("scripts/reqerr.py"))
f = tutils.tflow()
fm.clientconnect(f.client_conn)
assert fm.request(f)
2012-02-10 02:55:58 +00:00
def test_script(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
2016-05-11 17:15:26 +00:00
fm.load_script(tutils.test_data.path("scripts/all.py"))
f = tutils.tflow(resp=True)
fm.clientconnect(f.client_conn)
assert fm.scripts[0].ns["log"][-1] == "clientconnect"
fm.serverconnect(f.server_conn)
assert fm.scripts[0].ns["log"][-1] == "serverconnect"
fm.request(f)
assert fm.scripts[0].ns["log"][-1] == "request"
fm.response(f)
assert fm.scripts[0].ns["log"][-1] == "response"
2015-05-30 00:03:28 +00:00
# load second script
2016-05-11 17:15:26 +00:00
fm.load_script(tutils.test_data.path("scripts/all.py"))
assert len(fm.scripts) == 2
fm.clientdisconnect(f.server_conn)
assert fm.scripts[0].ns["log"][-1] == "clientdisconnect"
assert fm.scripts[1].ns["log"][-1] == "clientdisconnect"
2015-05-30 00:03:28 +00:00
# unload first script
fm.unload_scripts()
assert len(fm.scripts) == 0
2016-05-11 17:15:26 +00:00
fm.load_script(tutils.test_data.path("scripts/all.py"))
f.error = tutils.terr()
fm.error(f)
assert fm.scripts[0].ns["log"][-1] == "error"
2012-02-18 10:56:40 +00:00
def test_duplicate_flow(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
f = tutils.tflow(resp=True)
fm.load_flow(f)
2012-02-18 10:56:40 +00:00
assert s.flow_count() == 1
f2 = fm.duplicate_flow(f)
assert f2.response
2012-02-18 10:56:40 +00:00
assert s.flow_count() == 2
assert s.index(f2) == 1
2012-02-18 10:56:40 +00:00
def test_create_flow(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
assert fm.create_request("GET", "http", "example.com", 80, "/")
2011-02-19 04:21:08 +00:00
def test_all(self):
2011-02-16 03:43:35 +00:00
s = flow.State()
2011-02-19 04:21:08 +00:00
fm = flow.FlowMaster(None, s)
fm.anticache = True
fm.anticomp = True
f = tutils.tflow(req=None)
fm.clientconnect(f.client_conn)
2015-08-30 13:27:29 +00:00
f.request = HTTPRequest.wrap(netlib.tutils.treq())
fm.request(f)
assert s.flow_count() == 1
2015-08-30 13:27:29 +00:00
f.response = HTTPResponse.wrap(netlib.tutils.tresp())
fm.response(f)
assert s.flow_count() == 1
2011-02-19 04:21:08 +00:00
fm.clientdisconnect(f.client_conn)
2011-02-19 04:21:08 +00:00
f.error = Error("msg")
f.error.reply = controller.DummyReply()
fm.error(f)
2011-02-19 04:21:08 +00:00
fm.load_script(tutils.test_data.path("scripts/a.py"))
2012-02-10 02:55:58 +00:00
fm.shutdown()
def test_client_playback(self):
s = flow.State()
f = tutils.tflow(resp=True)
pb = [tutils.tflow(resp=True), f]
2014-11-11 12:09:05 +00:00
fm = flow.FlowMaster(DummyServer(ProxyConfig()), s)
2015-05-30 00:03:28 +00:00
assert not fm.start_server_playback(
pb,
False,
[],
False,
False,
None,
False,
None,
False)
assert not fm.start_client_playback(pb, False)
fm.client_playback.testing = True
assert not fm.state.flow_count()
fm.tick(0)
assert fm.state.flow_count()
f.error = Error("error")
fm.error(f)
def test_server_playback(self):
s = flow.State()
f = tutils.tflow()
f.response = HTTPResponse.wrap(netlib.tutils.tresp(content=f.request))
pb = [f]
fm = flow.FlowMaster(None, s)
fm.refresh_server_playback = True
assert not fm.do_server_playback(tutils.tflow())
2015-05-30 00:03:28 +00:00
fm.start_server_playback(
pb,
False,
[],
False,
False,
None,
False,
None,
False)
assert fm.do_server_playback(tutils.tflow())
2015-05-30 00:03:28 +00:00
fm.start_server_playback(
pb,
False,
[],
True,
False,
None,
False,
None,
False)
r = tutils.tflow()
r.request.content = "gibble"
assert not fm.do_server_playback(r)
assert fm.do_server_playback(tutils.tflow())
fm.tick(0)
assert fm.should_exit.is_set()
fm.stop_server_playback()
assert not fm.server_playback
def test_server_playback_kill(self):
s = flow.State()
f = tutils.tflow()
f.response = HTTPResponse.wrap(netlib.tutils.tresp(content=f.request))
pb = [f]
fm = flow.FlowMaster(None, s)
fm.refresh_server_playback = True
2015-05-30 00:03:28 +00:00
fm.start_server_playback(
pb,
True,
[],
False,
False,
None,
False,
None,
False)
f = tutils.tflow()
f.request.host = "nonexistent"
fm.process_new_request(f)
assert "killed" in f.error.msg
def test_stickycookie(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
assert "Invalid" in fm.set_stickycookie("~h")
fm.set_stickycookie(".*")
assert fm.stickycookie_state
fm.set_stickycookie(None)
assert not fm.stickycookie_state
fm.set_stickycookie(".*")
f = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
f.response.headers["set-cookie"] = "foo=bar"
fm.request(f)
fm.response(f)
assert fm.stickycookie_state.jar
assert not "cookie" in f.request.headers
f = f.copy()
fm.request(f)
2015-09-05 18:45:58 +00:00
assert f.request.headers["cookie"] == "foo=bar"
def test_stickyauth(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
assert "Invalid" in fm.set_stickyauth("~h")
fm.set_stickyauth(".*")
assert fm.stickyauth_state
fm.set_stickyauth(None)
assert not fm.stickyauth_state
fm.set_stickyauth(".*")
f = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
f.request.headers["authorization"] = "foo"
fm.request(f)
f = tutils.tflow(resp=True)
assert fm.stickyauth_state.hosts
assert not "authorization" in f.request.headers
fm.request(f)
2015-09-05 18:45:58 +00:00
assert f.request.headers["authorization"] == "foo"
2012-07-08 22:18:37 +00:00
def test_stream(self):
with tutils.tmpdir() as tdir:
p = os.path.join(tdir, "foo")
2015-05-30 00:03:28 +00:00
2012-07-08 22:18:37 +00:00
def r():
2015-05-30 00:03:28 +00:00
r = flow.FlowReader(open(p, "rb"))
2012-07-08 22:18:37 +00:00
return list(r.stream())
s = flow.State()
fm = flow.FlowMaster(None, s)
f = tutils.tflow(resp=True)
2012-07-08 22:18:37 +00:00
fm.start_stream(file(p, "ab"), None)
fm.request(f)
fm.response(f)
2012-07-08 22:18:37 +00:00
fm.stop_stream()
assert r()[0].response
f = tutils.tflow()
fm.start_stream(file(p, "ab"), None)
fm.request(f)
2012-07-08 22:18:37 +00:00
fm.shutdown()
assert not r()[1].response
2015-05-30 00:03:28 +00:00
class TestRequest:
2016-01-27 09:12:18 +00:00
def test_simple(self):
f = tutils.tflow()
r = f.request
2014-09-03 21:44:54 +00:00
u = r.url
r.url = u
tutils.raises(ValueError, setattr, r, "url", "")
assert r.url == u
r2 = r.copy()
assert r.get_state() == r2.get_state()
def test_get_url(self):
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
2014-09-03 21:44:54 +00:00
assert r.url == "http://address:22/path"
r.scheme = "https"
2014-09-03 21:44:54 +00:00
assert r.url == "https://address:22/path"
r.host = "host"
r.port = 42
2014-09-03 21:44:54 +00:00
assert r.url == "https://host:42/path"
r.host = "address"
r.port = 22
2015-05-30 00:03:28 +00:00
assert r.url == "https://address:22/path"
assert r.pretty_url == "https://address:22/path"
2016-02-18 16:28:32 +00:00
r.headers["Host"] = "foo.com:22"
assert r.url == "https://address:22/path"
assert r.pretty_url == "https://foo.com:22/path"
def test_anticache(self):
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
2015-09-05 18:45:58 +00:00
r.headers = Headers()
r.headers["if-modified-since"] = "test"
r.headers["if-none-match"] = "test"
r.anticache()
assert not "if-modified-since" in r.headers
assert not "if-none-match" in r.headers
def test_replace(self):
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
r.path = "path/foo"
2015-09-05 18:45:58 +00:00
r.headers["Foo"] = "fOo"
r.content = "afoob"
assert r.replace("foo(?i)", "boo") == 4
assert r.path == "path/boo"
assert not "foo" in r.content
2015-09-05 18:45:58 +00:00
assert r.headers["boo"] == "boo"
2012-02-10 02:04:20 +00:00
def test_constrain_encoding(self):
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
2015-09-05 18:45:58 +00:00
r.headers["accept-encoding"] = "gzip, oink"
r.constrain_encoding()
assert "oink" not in r.headers["accept-encoding"]
r.headers.set_all("accept-encoding", ["gzip", "oink"])
2012-02-10 02:04:20 +00:00
r.constrain_encoding()
assert "oink" not in r.headers["accept-encoding"]
def test_get_decoded_content(self):
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
r.content = None
2015-09-05 18:45:58 +00:00
r.headers["content-encoding"] = "identity"
2016-01-27 09:12:18 +00:00
assert r.get_decoded_content() is None
r.content = "falafel"
r.encode("gzip")
assert r.get_decoded_content() == "falafel"
def test_get_content_type(self):
2015-08-30 13:27:29 +00:00
resp = HTTPResponse.wrap(netlib.tutils.tresp())
2015-09-05 18:45:58 +00:00
resp.headers = Headers(content_type="text/plain")
assert resp.headers["content-type"] == "text/plain"
2015-05-30 00:03:28 +00:00
class TestResponse:
2016-01-27 09:12:18 +00:00
def test_simple(self):
f = tutils.tflow(resp=True)
resp = f.response
resp2 = resp.copy()
assert resp2.get_state() == resp.get_state()
def test_replace(self):
2015-08-30 13:27:29 +00:00
r = HTTPResponse.wrap(netlib.tutils.tresp())
2015-09-05 18:45:58 +00:00
r.headers["Foo"] = "fOo"
r.content = "afoob"
assert r.replace("foo(?i)", "boo") == 3
assert not "foo" in r.content
2015-09-05 18:45:58 +00:00
assert r.headers["boo"] == "boo"
def test_get_content_type(self):
2015-08-30 13:27:29 +00:00
resp = HTTPResponse.wrap(netlib.tutils.tresp())
2015-09-05 18:45:58 +00:00
resp.headers = Headers(content_type="text/plain")
assert resp.headers["content-type"] == "text/plain"
class TestError:
2016-01-27 09:12:18 +00:00
def test_getset_state(self):
2014-02-04 04:02:17 +00:00
e = Error("Error")
state = e.get_state()
assert Error.from_state(state).get_state() == e.get_state()
assert e.copy()
2014-02-04 04:02:17 +00:00
e2 = Error("bar")
assert not e == e2
2016-02-08 03:19:25 +00:00
e.set_state(e2.get_state())
assert e.get_state() == e2.get_state()
e3 = e.copy()
assert e3.get_state() == e.get_state()
2016-05-20 20:27:26 +00:00
def test_repr(self):
e = Error("yay")
assert repr(e)
2014-02-04 04:02:17 +00:00
class TestClientConnection:
2016-01-27 09:12:18 +00:00
def test_state(self):
2014-02-04 04:02:17 +00:00
c = tutils.tclient_conn()
assert ClientConnection.from_state(c.get_state()).get_state() ==\
2015-05-30 00:03:28 +00:00
c.get_state()
2014-02-04 04:02:17 +00:00
c2 = tutils.tclient_conn()
c2.address.address = (c2.address.host, 4242)
assert not c == c2
2014-02-04 04:02:17 +00:00
c2.timestamp_start = 42
2016-02-08 03:19:25 +00:00
c.set_state(c2.get_state())
2014-02-04 04:02:17 +00:00
assert c.timestamp_start == 42
c3 = c.copy()
assert c3.get_state() == c.get_state()
2013-04-29 21:13:33 +00:00
assert str(c)
def test_replacehooks():
h = flow.ReplaceHooks()
h.add("~q", "foo", "bar")
assert h.lst
h.set(
[
(".*", "one", "two"),
(".*", "three", "four"),
]
)
assert h.count() == 2
h.clear()
assert not h.lst
h.add("~q", "foo", "bar")
h.add("~s", "foo", "bar")
v = h.get_specs()
assert v == [('~q', 'foo', 'bar'), ('~s', 'foo', 'bar')]
assert h.count() == 2
h.clear()
assert h.count() == 0
f = tutils.tflow()
f.request.content = "foo"
h.add("~s", "foo", "bar")
h.run(f)
assert f.request.content == "foo"
f = tutils.tflow(resp=True)
f.request.content = "foo"
f.response.content = "foo"
h.run(f)
assert f.response.content == "bar"
assert f.request.content == "foo"
f = tutils.tflow()
h.clear()
h.add("~q", "foo", "bar")
f.request.content = "foo"
h.run(f)
assert f.request.content == "bar"
assert not h.add("~", "foo", "bar")
assert not h.add("foo", "*", "bar")
def test_setheaders():
h = flow.SetHeaders()
h.add("~q", "foo", "bar")
assert h.lst
h.set(
[
(".*", "one", "two"),
(".*", "three", "four"),
]
)
assert h.count() == 2
h.clear()
assert not h.lst
h.add("~q", "foo", "bar")
h.add("~s", "foo", "bar")
v = h.get_specs()
assert v == [('~q', 'foo', 'bar'), ('~s', 'foo', 'bar')]
assert h.count() == 2
h.clear()
assert h.count() == 0
f = tutils.tflow()
f.request.content = "foo"
h.add("~s", "foo", "bar")
h.run(f)
assert f.request.content == "foo"
h.clear()
h.add("~s", "one", "two")
h.add("~s", "one", "three")
f = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
f.request.headers["one"] = "xxx"
f.response.headers["one"] = "xxx"
h.run(f)
2015-09-05 18:45:58 +00:00
assert f.request.headers["one"] == "xxx"
assert f.response.headers.get_all("one") == ["two", "three"]
h.clear()
h.add("~q", "one", "two")
h.add("~q", "one", "three")
f = tutils.tflow()
2015-09-05 18:45:58 +00:00
f.request.headers["one"] = "xxx"
h.run(f)
2015-09-05 18:45:58 +00:00
assert f.request.headers.get_all("one") == ["two", "three"]
assert not h.add("~", "foo", "bar")