mitmproxy/test/test_flow.py

1388 lines
39 KiB
Python
Raw Normal View History

2015-05-30 00:03:28 +00:00
import Queue
import time
import os.path
from cStringIO import StringIO
import email.utils
2015-08-30 13:27:29 +00:00
2015-08-01 08:40:19 +00:00
import mock
2015-08-01 08:40:19 +00:00
import netlib.utils
from netlib import odict
2015-09-05 18:45:58 +00:00
from netlib.http.semantics import CONTENT_MISSING, HDR_FORM_URLENCODED, Headers
2015-08-30 13:27:29 +00:00
from libmproxy import filt, protocol, controller, tnetstring, flow
from libmproxy.models import Error, Flow, HTTPRequest, HTTPResponse, HTTPFlow, decoded
2014-10-18 16:29:35 +00:00
from libmproxy.proxy.config import HostMatcher
from libmproxy.proxy import ProxyConfig
from libmproxy.proxy.server import DummyServer
2015-08-30 13:27:29 +00:00
from libmproxy.models.connections import ClientConnection
import tutils
def test_app_registry():
ar = flow.AppRegistry()
ar.add("foo", "domain", 80)
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
r.host = "domain"
r.port = 80
assert ar.get(r)
r.port = 81
assert not ar.get(r)
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
r.host = "domain2"
r.port = 80
assert not ar.get(r)
2015-09-05 18:45:58 +00:00
r.headers["host"] = "domain"
assert ar.get(r)
class TestStickyCookieState:
def _response(self, cookie, host):
s = flow.StickyCookieState(filt.parse(".*"))
2015-08-01 08:40:19 +00:00
f = tutils.tflow(req=netlib.tutils.treq(host=host, port=80), resp=True)
2015-09-05 18:45:58 +00:00
f.response.headers["Set-Cookie"] = cookie
s.handle_response(f)
return s, f
2012-02-10 02:04:20 +00:00
def test_domain_match(self):
s = flow.StickyCookieState(filt.parse(".*"))
assert s.domain_match("www.google.com", ".google.com")
assert s.domain_match("google.com", ".google.com")
def test_handle_response(self):
c = "SSID=mooo; domain=.google.com, FOO=bar; Domain=.google.com; Path=/; "\
"Expires=Wed, 13-Jan-2021 22:23:01 GMT; Secure; "
s, f = self._response(c, "host")
assert not s.jar.keys()
s, f = self._response(c, "www.google.com")
assert s.jar.keys()
s, f = self._response("SSID=mooo", "www.google.com")
assert s.jar.keys()[0] == ('www.google.com', 80, '/')
def test_handle_request(self):
s, f = self._response("SSID=mooo", "www.google.com")
assert "cookie" not in f.request.headers
s.handle_request(f)
assert "cookie" in f.request.headers
class TestStickyAuthState:
def test_handle_response(self):
s = flow.StickyAuthState(filt.parse(".*"))
f = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
f.request.headers["authorization"] = "foo"
s.handle_request(f)
assert "address" in s.hosts
f = tutils.tflow(resp=True)
s.handle_request(f)
2015-09-05 18:45:58 +00:00
assert f.request.headers["authorization"] == "foo"
class TestClientPlaybackState:
2011-03-04 00:08:43 +00:00
def test_tick(self):
first = tutils.tflow()
2011-03-04 00:08:43 +00:00
s = flow.State()
fm = flow.FlowMaster(None, s)
fm.start_client_playback([first, tutils.tflow()], True)
c = fm.client_playback
c.testing = True
assert not c.done()
assert not s.flow_count()
2011-03-04 00:08:43 +00:00
assert c.count() == 2
c.tick(fm)
assert s.flow_count()
2011-03-04 00:08:43 +00:00
assert c.count() == 1
c.tick(fm)
2011-03-04 00:08:43 +00:00
assert c.count() == 1
2011-03-05 22:21:31 +00:00
c.clear(c.current)
c.tick(fm)
2011-03-04 00:08:43 +00:00
assert c.count() == 0
c.clear(c.current)
assert c.done()
2011-03-04 00:08:43 +00:00
q = Queue.Queue()
fm.state.clear()
fm.tick(q, timeout=0)
2011-03-04 00:08:43 +00:00
fm.stop_client_playback()
assert not fm.client_playback
class TestServerPlaybackState:
def test_hash(self):
2015-05-30 00:03:28 +00:00
s = flow.ServerPlaybackState(
None,
[],
False,
False,
None,
False,
None,
False)
r = tutils.tflow()
r2 = tutils.tflow()
assert s._hash(r)
assert s._hash(r) == s._hash(r2)
2015-09-05 18:45:58 +00:00
r.request.headers["foo"] = "bar"
assert s._hash(r) == s._hash(r2)
r.request.path = "voing"
assert s._hash(r) != s._hash(r2)
r.request.path = "path?blank_value"
r2.request.path = "path?"
assert s._hash(r) != s._hash(r2)
def test_headers(self):
2015-05-30 00:03:28 +00:00
s = flow.ServerPlaybackState(
["foo"],
[],
False,
False,
None,
False,
None,
False)
r = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
r.request.headers["foo"] = "bar"
r2 = tutils.tflow(resp=True)
assert not s._hash(r) == s._hash(r2)
2015-09-05 18:45:58 +00:00
r2.request.headers["foo"] = "bar"
assert s._hash(r) == s._hash(r2)
2015-09-05 18:45:58 +00:00
r2.request.headers["oink"] = "bar"
assert s._hash(r) == s._hash(r2)
r = tutils.tflow(resp=True)
r2 = tutils.tflow(resp=True)
assert s._hash(r) == s._hash(r2)
def test_load(self):
r = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
r.request.headers["key"] = "one"
r2 = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
r2.request.headers["key"] = "two"
2015-05-30 00:03:28 +00:00
s = flow.ServerPlaybackState(
None, [
r, r2], False, False, None, False, None, False)
assert s.count() == 2
assert len(s.fmap.keys()) == 1
n = s.next_flow(r)
2015-09-05 18:45:58 +00:00
assert n.request.headers["key"] == "one"
assert s.count() == 1
n = s.next_flow(r)
2015-09-05 18:45:58 +00:00
assert n.request.headers["key"] == "two"
assert s.count() == 0
assert not s.next_flow(r)
def test_load_with_nopop(self):
r = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
r.request.headers["key"] = "one"
r2 = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
r2.request.headers["key"] = "two"
2015-05-30 00:03:28 +00:00
s = flow.ServerPlaybackState(
None, [
r, r2], False, True, None, False, None, False)
assert s.count() == 2
2012-07-15 20:42:59 +00:00
s.next_flow(r)
assert s.count() == 2
def test_ignore_params(self):
2015-05-30 00:03:28 +00:00
s = flow.ServerPlaybackState(
None, [], False, False, [
"param1", "param2"], False, None, False)
r = tutils.tflow(resp=True)
2015-05-30 00:03:28 +00:00
r.request.path = "/test?param1=1"
r2 = tutils.tflow(resp=True)
2015-05-30 00:03:28 +00:00
r2.request.path = "/test"
assert s._hash(r) == s._hash(r2)
2015-05-30 00:03:28 +00:00
r2.request.path = "/test?param1=2"
assert s._hash(r) == s._hash(r2)
2015-05-30 00:03:28 +00:00
r2.request.path = "/test?param2=1"
assert s._hash(r) == s._hash(r2)
2015-05-30 00:03:28 +00:00
r2.request.path = "/test?param3=2"
assert not s._hash(r) == s._hash(r2)
def test_ignore_payload_params(self):
2015-05-30 00:03:28 +00:00
s = flow.ServerPlaybackState(
None, [], False, False, None, False, [
"param1", "param2"], False)
r = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
r.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
r.request.content = "paramx=x&param1=1"
r2 = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
r2.request.content = "paramx=x&param1=1"
# same parameters
assert s._hash(r) == s._hash(r2)
# ignored parameters !=
r2.request.content = "paramx=x&param1=2"
assert s._hash(r) == s._hash(r2)
# missing parameter
2015-05-30 00:03:28 +00:00
r2.request.content = "paramx=x"
assert s._hash(r) == s._hash(r2)
# ignorable parameter added
2015-05-30 00:03:28 +00:00
r2.request.content = "paramx=x&param1=2"
assert s._hash(r) == s._hash(r2)
# not ignorable parameter changed
2015-05-30 00:03:28 +00:00
r2.request.content = "paramx=y&param1=1"
assert not s._hash(r) == s._hash(r2)
# not ignorable parameter missing
2015-05-30 00:03:28 +00:00
r2.request.content = "param1=1"
assert not s._hash(r) == s._hash(r2)
def test_ignore_payload_params_other_content_type(self):
2015-05-30 00:03:28 +00:00
s = flow.ServerPlaybackState(
None, [], False, False, None, False, [
"param1", "param2"], False)
r = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
r.request.headers["Content-Type"] = "application/json"
r.request.content = '{"param1":"1"}'
r2 = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
r2.request.headers["Content-Type"] = "application/json"
r2.request.content = '{"param1":"1"}'
# same content
assert s._hash(r) == s._hash(r2)
# distint content (note only x-www-form-urlencoded payload is analysed)
r2.request.content = '{"param1":"2"}'
assert not s._hash(r) == s._hash(r2)
def test_ignore_payload_wins_over_params(self):
2015-05-30 00:03:28 +00:00
# NOTE: parameters are mutually exclusive in options
s = flow.ServerPlaybackState(
None, [], False, False, None, True, [
"param1", "param2"], False)
r = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
r.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
r.request.content = "paramx=y"
r2 = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
r2.request.content = "paramx=x"
# same parameters
assert s._hash(r) == s._hash(r2)
def test_ignore_content(self):
2015-05-30 00:03:28 +00:00
s = flow.ServerPlaybackState(
None,
[],
False,
False,
None,
False,
None,
False)
r = tutils.tflow(resp=True)
r2 = tutils.tflow(resp=True)
r.request.content = "foo"
r2.request.content = "foo"
assert s._hash(r) == s._hash(r2)
r2.request.content = "bar"
assert not s._hash(r) == s._hash(r2)
2015-05-30 00:03:28 +00:00
# now ignoring content
s = flow.ServerPlaybackState(
None,
[],
False,
False,
None,
True,
None,
False)
r = tutils.tflow(resp=True)
r2 = tutils.tflow(resp=True)
r.request.content = "foo"
r2.request.content = "foo"
assert s._hash(r) == s._hash(r2)
r2.request.content = "bar"
assert s._hash(r) == s._hash(r2)
r2.request.content = ""
assert s._hash(r) == s._hash(r2)
r2.request.content = None
assert s._hash(r) == s._hash(r2)
def test_ignore_host(self):
2015-05-30 00:03:28 +00:00
s = flow.ServerPlaybackState(
None,
[],
False,
False,
None,
False,
None,
True)
r = tutils.tflow(resp=True)
r2 = tutils.tflow(resp=True)
2015-05-30 00:03:28 +00:00
r.request.host = "address"
r2.request.host = "address"
assert s._hash(r) == s._hash(r2)
2015-05-30 00:03:28 +00:00
r2.request.host = "wrong_address"
assert s._hash(r) == s._hash(r2)
class TestFlow:
2012-02-18 10:56:40 +00:00
def test_copy(self):
f = tutils.tflow(resp=True)
a0 = f.get_state()
2012-02-18 10:56:40 +00:00
f2 = f.copy()
a = f.get_state()
b = f2.get_state()
2014-12-24 00:39:38 +00:00
del a["id"]
del b["id"]
assert a == b
assert not f == f2
2012-02-18 10:56:40 +00:00
assert not f is f2
assert f.request.get_state() == f2.request.get_state()
2012-02-18 10:56:40 +00:00
assert not f.request is f2.request
assert f.request.headers == f2.request.headers
assert not f.request.headers is f2.request.headers
assert f.response.get_state() == f2.response.get_state()
2012-02-24 23:19:54 +00:00
assert not f.response is f2.response
f = tutils.tflow(err=True)
2012-02-24 23:19:54 +00:00
f2 = f.copy()
assert not f is f2
assert not f.request is f2.request
assert f.request.headers == f2.request.headers
assert not f.request.headers is f2.request.headers
assert f.error.get_state() == f2.error.get_state()
2012-02-24 23:19:54 +00:00
assert not f.error is f2.error
2012-02-18 10:56:40 +00:00
def test_match(self):
f = tutils.tflow(resp=True)
assert not f.match("~b test")
assert f.match(None)
assert not f.match("~b test")
f = tutils.tflow(err=True)
assert f.match("~e")
tutils.raises(ValueError, f.match, "~")
def test_backup(self):
f = tutils.tflow()
2015-08-30 13:27:29 +00:00
f.response = HTTPResponse.wrap(netlib.tutils.tresp())
f.request.content = "foo"
assert not f.modified()
f.backup()
f.request.content = "bar"
assert f.modified()
f.revert()
assert f.request.content == "foo"
def test_backup_idempotence(self):
f = tutils.tflow(resp=True)
f.backup()
f.revert()
f.backup()
f.revert()
def test_getset_state(self):
f = tutils.tflow(resp=True)
state = f.get_state()
2015-08-30 13:27:29 +00:00
assert f.get_state() == HTTPFlow.from_state(
2015-05-30 00:03:28 +00:00
state).get_state()
2011-02-19 04:21:08 +00:00
f.response = None
2014-02-04 04:02:17 +00:00
f.error = Error("error")
state = f.get_state()
2015-08-30 13:27:29 +00:00
assert f.get_state() == HTTPFlow.from_state(
2015-05-30 00:03:28 +00:00
state).get_state()
2014-02-04 04:02:17 +00:00
f2 = f.copy()
2014-12-24 00:39:38 +00:00
f2.id = f.id # copy creates a different uuid
assert f.get_state() == f2.get_state()
assert not f == f2
2014-02-04 04:02:17 +00:00
f2.error = Error("e2")
assert not f == f2
f.load_state(f2.get_state())
assert f.get_state() == f2.get_state()
def test_kill(self):
2011-03-13 03:50:11 +00:00
s = flow.State()
fm = flow.FlowMaster(None, s)
f = tutils.tflow()
2014-12-23 19:33:42 +00:00
f.intercept(mock.Mock())
assert not f.reply.acked
2011-03-13 03:50:11 +00:00
f.kill(fm)
assert f.reply.acked
2011-08-02 04:52:47 +00:00
def test_killall(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
f = tutils.tflow()
fm.handle_request(f)
2011-08-02 04:52:47 +00:00
f = tutils.tflow()
fm.handle_request(f)
2011-08-02 04:52:47 +00:00
for i in s.view:
assert not i.reply.acked
2011-08-02 04:52:47 +00:00
s.killall(fm)
for i in s.view:
assert i.reply.acked
2011-08-02 04:52:47 +00:00
def test_accept_intercept(self):
f = tutils.tflow()
2014-12-23 19:33:42 +00:00
f.intercept(mock.Mock())
assert not f.reply.acked
2014-12-23 19:33:42 +00:00
f.accept_intercept(mock.Mock())
assert f.reply.acked
def test_replace_unicode(self):
f = tutils.tflow(resp=True)
f.response.content = "\xc2foo"
f.replace("foo", u"bar")
def test_replace(self):
f = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
f.request.headers["foo"] = "foo"
f.request.content = "afoob"
2015-09-05 18:45:58 +00:00
f.response.headers["foo"] = "foo"
f.response.content = "afoob"
assert f.replace("foo", "bar") == 6
2015-09-05 18:45:58 +00:00
assert f.request.headers["bar"] == "bar"
assert f.request.content == "abarb"
2015-09-05 18:45:58 +00:00
assert f.response.headers["bar"] == "bar"
assert f.response.content == "abarb"
def test_replace_encoded(self):
f = tutils.tflow(resp=True)
f.request.content = "afoob"
f.request.encode("gzip")
f.response.content = "afoob"
f.response.encode("gzip")
f.replace("foo", "bar")
assert f.request.content != "abarb"
f.request.decode()
assert f.request.content == "abarb"
assert f.response.content != "abarb"
f.response.decode()
assert f.response.content == "abarb"
class TestState:
2011-02-02 23:16:03 +00:00
def test_backup(self):
c = flow.State()
f = tutils.tflow()
c.add_flow(f)
2011-02-02 23:16:03 +00:00
f.backup()
c.revert(f)
def test_flow(self):
"""
normal flow:
connect -> request -> response
"""
c = flow.State()
f = tutils.tflow()
c.add_flow(f)
assert f
assert c.flow_count() == 1
assert c.active_flow_count() == 1
2011-02-02 23:16:03 +00:00
newf = tutils.tflow()
assert c.add_flow(newf)
assert c.active_flow_count() == 2
2011-02-02 23:16:03 +00:00
2015-08-30 13:27:29 +00:00
f.response = HTTPResponse.wrap(netlib.tutils.tresp())
assert c.update_flow(f)
assert c.flow_count() == 2
assert c.active_flow_count() == 1
2011-02-02 23:16:03 +00:00
2015-08-30 13:27:29 +00:00
_ = HTTPResponse.wrap(netlib.tutils.tresp())
assert not c.update_flow(None)
assert c.active_flow_count() == 1
2015-08-30 13:27:29 +00:00
newf.response = HTTPResponse.wrap(netlib.tutils.tresp())
assert c.update_flow(newf)
assert c.active_flow_count() == 0
2011-02-02 23:16:03 +00:00
def test_err(self):
c = flow.State()
f = tutils.tflow()
c.add_flow(f)
f.error = Error("message")
assert c.update_flow(f)
2011-02-02 23:16:03 +00:00
2012-02-10 02:04:20 +00:00
c = flow.State()
f = tutils.tflow()
c.add_flow(f)
c.set_limit("~e")
assert not c.view
f.error = tutils.terr()
assert c.update_flow(f)
assert c.view
2012-02-10 02:04:20 +00:00
2011-03-13 01:51:25 +00:00
def test_set_limit(self):
2011-02-02 23:16:03 +00:00
c = flow.State()
f = tutils.tflow()
assert len(c.view) == 0
c.add_flow(f)
2011-02-02 23:16:03 +00:00
assert len(c.view) == 1
c.set_limit("~s")
2011-08-02 04:52:47 +00:00
assert c.limit_txt == "~s"
2011-02-02 23:16:03 +00:00
assert len(c.view) == 0
2015-08-30 13:27:29 +00:00
f.response = HTTPResponse.wrap(netlib.tutils.tresp())
c.update_flow(f)
assert len(c.view) == 1
2011-02-02 23:16:03 +00:00
c.set_limit(None)
assert len(c.view) == 1
2011-02-02 23:16:03 +00:00
f = tutils.tflow()
c.add_flow(f)
2011-02-02 23:16:03 +00:00
assert len(c.view) == 2
c.set_limit("~q")
assert len(c.view) == 1
c.set_limit("~s")
assert len(c.view) == 1
2011-02-02 23:16:03 +00:00
2011-03-13 01:51:25 +00:00
assert "Invalid" in c.set_limit("~")
def test_set_intercept(self):
c = flow.State()
assert not c.set_intercept("~q")
assert c.intercept_txt == "~q"
assert "Invalid" in c.set_intercept("~")
assert not c.set_intercept(None)
2015-05-30 00:03:28 +00:00
assert c.intercept_txt is None
2011-03-13 01:51:25 +00:00
2011-02-02 23:16:03 +00:00
def _add_request(self, state):
f = tutils.tflow()
state.add_flow(f)
2011-02-02 23:16:03 +00:00
return f
def _add_response(self, state):
f = tutils.tflow()
state.add_flow(f)
2015-08-30 13:27:29 +00:00
f.response = HTTPResponse.wrap(netlib.tutils.tresp())
state.update_flow(f)
2011-02-02 23:16:03 +00:00
def _add_error(self, state):
f = tutils.tflow(err=True)
state.add_flow(f)
2011-02-02 23:16:03 +00:00
def test_clear(self):
c = flow.State()
f = self._add_request(c)
2014-12-23 19:33:42 +00:00
f.intercepted = True
2011-02-02 23:16:03 +00:00
c.clear()
assert c.flow_count() == 0
2011-02-02 23:16:03 +00:00
def test_dump_flows(self):
c = flow.State()
self._add_request(c)
self._add_response(c)
self._add_request(c)
self._add_response(c)
self._add_request(c)
self._add_response(c)
self._add_error(c)
flows = c.view[:]
2011-02-02 23:16:03 +00:00
c.clear()
c.load_flows(flows)
assert isinstance(c.flows[0], Flow)
2011-02-02 23:16:03 +00:00
def test_accept_all(self):
c = flow.State()
self._add_request(c)
self._add_response(c)
self._add_request(c)
2014-12-23 19:33:42 +00:00
c.accept_all(mock.Mock())
2011-02-02 23:16:03 +00:00
class TestSerialize:
def _treader(self):
sio = StringIO()
w = flow.FlowWriter(sio)
for i in range(3):
f = tutils.tflow(resp=True)
w.add(f)
for i in range(3):
f = tutils.tflow(err=True)
w.add(f)
sio.seek(0)
return flow.FlowReader(sio)
def test_roundtrip(self):
sio = StringIO()
f = tutils.tflow()
f.request.content = "".join(chr(i) for i in range(255))
w = flow.FlowWriter(sio)
w.add(f)
sio.seek(0)
r = flow.FlowReader(sio)
l = list(r.stream())
assert len(l) == 1
f2 = l[0]
assert f2.get_state() == f.get_state()
assert f2.request == f.request
def test_load_flows(self):
r = self._treader()
s = flow.State()
fm = flow.FlowMaster(None, s)
fm.load_flows(r)
assert len(s.flows) == 6
def test_load_flows_reverse(self):
r = self._treader()
s = flow.State()
2015-05-30 00:03:28 +00:00
conf = ProxyConfig(
mode="reverse",
upstream_server=("https", ("use-this-domain", 80))
)
fm = flow.FlowMaster(DummyServer(conf), s)
fm.load_flows(r)
assert s.flows[0].request.host == "use-this-domain"
def test_filter(self):
sio = StringIO()
fl = filt.parse("~c 200")
w = flow.FilteredFlowWriter(sio, fl)
f = tutils.tflow(resp=True)
f.response.code = 200
w.add(f)
f = tutils.tflow(resp=True)
f.response.code = 201
w.add(f)
sio.seek(0)
r = flow.FlowReader(sio)
assert len(list(r.stream()))
def test_error(self):
sio = StringIO()
sio.write("bogus")
sio.seek(0)
r = flow.FlowReader(sio)
tutils.raises(flow.FlowReadError, list, r.stream())
f = flow.FlowReadError("foo")
assert f.strerror == "foo"
2012-04-10 22:10:53 +00:00
def test_versioncheck(self):
f = tutils.tflow()
d = f.get_state()
2012-04-10 22:10:53 +00:00
d["version"] = (0, 0)
sio = StringIO()
tnetstring.dump(d, sio)
sio.seek(0)
r = flow.FlowReader(sio)
tutils.raises("version", list, r.stream())
2012-04-10 22:10:53 +00:00
class TestFlowMaster:
def test_load_script(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
assert not fm.load_script(tutils.test_data.path("scripts/a.py"))
assert not fm.load_script(tutils.test_data.path("scripts/a.py"))
assert not fm.unload_scripts()
assert fm.load_script("nonexistent")
2015-05-30 00:03:28 +00:00
assert "ValueError" in fm.load_script(
tutils.test_data.path("scripts/starterr.py"))
assert len(fm.scripts) == 0
2014-09-08 10:20:40 +00:00
def test_getset_ignore(self):
p = mock.Mock()
2014-10-18 16:29:35 +00:00
p.config.check_ignore = HostMatcher()
2014-09-08 10:20:40 +00:00
fm = flow.FlowMaster(p, flow.State())
2014-10-18 16:29:35 +00:00
assert not fm.get_ignore_filter()
fm.set_ignore_filter(["^apple\.com:", ":443$"])
assert fm.get_ignore_filter()
2014-09-08 10:20:40 +00:00
def test_replay(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
f = tutils.tflow(resp=True)
2014-03-09 20:51:24 +00:00
f.request.content = CONTENT_MISSING
assert "missing" in fm.replay_request(f)
2014-12-23 19:33:42 +00:00
f.intercepted = True
assert "intercepting" in fm.replay_request(f)
2015-02-07 18:33:36 +00:00
f.live = True
assert "live" in fm.replay_request(f, run_scripthooks=True)
2012-02-10 02:55:58 +00:00
def test_script_reqerr(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
assert not fm.load_script(tutils.test_data.path("scripts/reqerr.py"))
f = tutils.tflow()
fm.handle_clientconnect(f.client_conn)
assert fm.handle_request(f)
2012-02-10 02:55:58 +00:00
def test_script(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
assert not fm.load_script(tutils.test_data.path("scripts/all.py"))
f = tutils.tflow(resp=True)
fm.handle_clientconnect(f.client_conn)
assert fm.scripts[0].ns["log"][-1] == "clientconnect"
fm.handle_serverconnect(f.server_conn)
assert fm.scripts[0].ns["log"][-1] == "serverconnect"
fm.handle_request(f)
assert fm.scripts[0].ns["log"][-1] == "request"
fm.handle_response(f)
assert fm.scripts[0].ns["log"][-1] == "response"
2015-05-30 00:03:28 +00:00
# load second script
assert not fm.load_script(tutils.test_data.path("scripts/all.py"))
assert len(fm.scripts) == 2
fm.handle_clientdisconnect(f.server_conn)
assert fm.scripts[0].ns["log"][-1] == "clientdisconnect"
assert fm.scripts[1].ns["log"][-1] == "clientdisconnect"
2015-05-30 00:03:28 +00:00
# unload first script
fm.unload_scripts()
assert len(fm.scripts) == 0
assert not fm.load_script(tutils.test_data.path("scripts/all.py"))
f.error = tutils.terr()
fm.handle_error(f)
assert fm.scripts[0].ns["log"][-1] == "error"
2012-02-18 10:56:40 +00:00
def test_duplicate_flow(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
f = tutils.tflow(resp=True)
f = fm.load_flow(f)
2012-02-18 10:56:40 +00:00
assert s.flow_count() == 1
f2 = fm.duplicate_flow(f)
assert f2.response
2012-02-18 10:56:40 +00:00
assert s.flow_count() == 2
assert s.index(f2) == 1
2012-02-18 10:56:40 +00:00
2011-02-19 04:21:08 +00:00
def test_all(self):
2011-02-16 03:43:35 +00:00
s = flow.State()
2011-02-19 04:21:08 +00:00
fm = flow.FlowMaster(None, s)
fm.anticache = True
fm.anticomp = True
f = tutils.tflow(req=None)
fm.handle_clientconnect(f.client_conn)
2015-08-30 13:27:29 +00:00
f.request = HTTPRequest.wrap(netlib.tutils.treq())
fm.handle_request(f)
assert s.flow_count() == 1
2015-08-30 13:27:29 +00:00
f.response = HTTPResponse.wrap(netlib.tutils.tresp())
fm.handle_response(f)
assert not fm.handle_response(None)
assert s.flow_count() == 1
2011-02-19 04:21:08 +00:00
fm.handle_clientdisconnect(f.client_conn)
2011-02-19 04:21:08 +00:00
f.error = Error("msg")
f.error.reply = controller.DummyReply()
fm.handle_error(f)
2011-02-19 04:21:08 +00:00
fm.load_script(tutils.test_data.path("scripts/a.py"))
2012-02-10 02:55:58 +00:00
fm.shutdown()
def test_client_playback(self):
s = flow.State()
f = tutils.tflow(resp=True)
pb = [tutils.tflow(resp=True), f]
2014-11-11 12:09:05 +00:00
fm = flow.FlowMaster(DummyServer(ProxyConfig()), s)
2015-05-30 00:03:28 +00:00
assert not fm.start_server_playback(
pb,
False,
[],
False,
False,
None,
False,
None,
False)
assert not fm.start_client_playback(pb, False)
fm.client_playback.testing = True
q = Queue.Queue()
assert not fm.state.flow_count()
fm.tick(q, 0)
assert fm.state.flow_count()
f.error = Error("error")
fm.handle_error(f)
def test_server_playback(self):
s = flow.State()
f = tutils.tflow()
2015-08-30 13:27:29 +00:00
f.response = HTTPResponse.wrap(netlib.tutils.tresp(f.request))
pb = [f]
fm = flow.FlowMaster(None, s)
fm.refresh_server_playback = True
assert not fm.do_server_playback(tutils.tflow())
2015-05-30 00:03:28 +00:00
fm.start_server_playback(
pb,
False,
[],
False,
False,
None,
False,
None,
False)
assert fm.do_server_playback(tutils.tflow())
2015-05-30 00:03:28 +00:00
fm.start_server_playback(
pb,
False,
[],
True,
False,
None,
False,
None,
False)
r = tutils.tflow()
r.request.content = "gibble"
assert not fm.do_server_playback(r)
assert fm.do_server_playback(tutils.tflow())
2015-05-30 00:03:28 +00:00
fm.start_server_playback(
pb,
False,
[],
True,
False,
None,
False,
None,
False)
q = Queue.Queue()
fm.tick(q, 0)
assert fm.should_exit.is_set()
fm.stop_server_playback()
assert not fm.server_playback
def test_server_playback_kill(self):
s = flow.State()
f = tutils.tflow()
2015-08-30 13:27:29 +00:00
f.response = HTTPResponse.wrap(netlib.tutils.tresp(f.request))
pb = [f]
fm = flow.FlowMaster(None, s)
fm.refresh_server_playback = True
2015-05-30 00:03:28 +00:00
fm.start_server_playback(
pb,
True,
[],
False,
False,
None,
False,
None,
False)
f = tutils.tflow()
f.request.host = "nonexistent"
fm.process_new_request(f)
assert "killed" in f.error.msg
def test_stickycookie(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
assert "Invalid" in fm.set_stickycookie("~h")
fm.set_stickycookie(".*")
assert fm.stickycookie_state
fm.set_stickycookie(None)
assert not fm.stickycookie_state
fm.set_stickycookie(".*")
f = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
f.response.headers["set-cookie"] = "foo=bar"
fm.handle_request(f)
fm.handle_response(f)
assert fm.stickycookie_state.jar
assert not "cookie" in f.request.headers
f = f.copy()
fm.handle_request(f)
2015-09-05 18:45:58 +00:00
assert f.request.headers["cookie"] == "foo=bar"
def test_stickyauth(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
assert "Invalid" in fm.set_stickyauth("~h")
fm.set_stickyauth(".*")
assert fm.stickyauth_state
fm.set_stickyauth(None)
assert not fm.stickyauth_state
fm.set_stickyauth(".*")
f = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
f.request.headers["authorization"] = "foo"
fm.handle_request(f)
f = tutils.tflow(resp=True)
assert fm.stickyauth_state.hosts
assert not "authorization" in f.request.headers
fm.handle_request(f)
2015-09-05 18:45:58 +00:00
assert f.request.headers["authorization"] == "foo"
2012-07-08 22:18:37 +00:00
def test_stream(self):
with tutils.tmpdir() as tdir:
p = os.path.join(tdir, "foo")
2015-05-30 00:03:28 +00:00
2012-07-08 22:18:37 +00:00
def r():
2015-05-30 00:03:28 +00:00
r = flow.FlowReader(open(p, "rb"))
2012-07-08 22:18:37 +00:00
return list(r.stream())
s = flow.State()
fm = flow.FlowMaster(None, s)
f = tutils.tflow(resp=True)
2012-07-08 22:18:37 +00:00
fm.start_stream(file(p, "ab"), None)
fm.handle_request(f)
fm.handle_response(f)
2012-07-08 22:18:37 +00:00
fm.stop_stream()
assert r()[0].response
f = tutils.tflow()
fm.start_stream(file(p, "ab"), None)
fm.handle_request(f)
2012-07-08 22:18:37 +00:00
fm.shutdown()
assert not r()[1].response
2015-05-30 00:03:28 +00:00
class TestRequest:
def test_simple(self):
f = tutils.tflow()
r = f.request
2014-09-03 21:44:54 +00:00
u = r.url
r.url = u
tutils.raises(ValueError, setattr, r, "url", "")
assert r.url == u
r2 = r.copy()
assert r.get_state() == r2.get_state()
def test_get_url(self):
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
2014-09-03 21:44:54 +00:00
assert r.url == "http://address:22/path"
r.scheme = "https"
2014-09-03 21:44:54 +00:00
assert r.url == "https://address:22/path"
r.host = "host"
r.port = 42
2014-09-03 21:44:54 +00:00
assert r.url == "https://host:42/path"
r.host = "address"
r.port = 22
2015-05-30 00:03:28 +00:00
assert r.url == "https://address:22/path"
2014-09-03 21:44:54 +00:00
assert r.pretty_url(True) == "https://address:22/path"
2015-09-05 18:45:58 +00:00
r.headers["Host"] = "foo.com"
2014-09-03 21:44:54 +00:00
assert r.pretty_url(False) == "https://address:22/path"
assert r.pretty_url(True) == "https://foo.com:22/path"
def test_path_components(self):
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
r.path = "/"
assert r.get_path_components() == []
r.path = "/foo/bar"
assert r.get_path_components() == ["foo", "bar"]
q = odict.ODict()
q["test"] = ["123"]
r.set_query(q)
assert r.get_path_components() == ["foo", "bar"]
r.set_path_components([])
assert r.get_path_components() == []
r.set_path_components(["foo"])
assert r.get_path_components() == ["foo"]
r.set_path_components(["/oo"])
assert r.get_path_components() == ["/oo"]
assert "%2F" in r.path
def test_getset_form_urlencoded(self):
d = odict.ODict([("one", "two"), ("three", "four")])
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq(content=netlib.utils.urlencode(d.lst)))
2015-09-05 18:45:58 +00:00
r.headers["content-type"] = HDR_FORM_URLENCODED
assert r.get_form_urlencoded() == d
d = odict.ODict([("x", "y")])
r.set_form_urlencoded(d)
assert r.get_form_urlencoded() == d
2015-09-05 18:45:58 +00:00
r.headers["content-type"] = "foo"
assert not r.get_form_urlencoded()
def test_getset_query(self):
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
2014-09-03 21:44:54 +00:00
r.path = "/foo?x=y&a=b"
q = r.get_query()
assert q.lst == [("x", "y"), ("a", "b")]
2014-09-03 21:44:54 +00:00
r.path = "/"
q = r.get_query()
assert not q
2014-09-03 21:44:54 +00:00
r.path = "/?adsfa"
q = r.get_query()
assert q.lst == [("adsfa", "")]
2014-09-03 21:44:54 +00:00
r.path = "/foo?x=y&a=b"
assert r.get_query()
r.set_query(odict.ODict([]))
assert not r.get_query()
qv = odict.ODict([("a", "b"), ("c", "d")])
r.set_query(qv)
assert r.get_query() == qv
def test_anticache(self):
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
2015-09-05 18:45:58 +00:00
r.headers = Headers()
r.headers["if-modified-since"] = "test"
r.headers["if-none-match"] = "test"
r.anticache()
assert not "if-modified-since" in r.headers
assert not "if-none-match" in r.headers
def test_replace(self):
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
r.path = "path/foo"
2015-09-05 18:45:58 +00:00
r.headers["Foo"] = "fOo"
r.content = "afoob"
assert r.replace("foo(?i)", "boo") == 4
assert r.path == "path/boo"
assert not "foo" in r.content
2015-09-05 18:45:58 +00:00
assert r.headers["boo"] == "boo"
2012-02-10 02:04:20 +00:00
def test_constrain_encoding(self):
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
2015-09-05 18:45:58 +00:00
r.headers["accept-encoding"] = "gzip, oink"
r.constrain_encoding()
assert "oink" not in r.headers["accept-encoding"]
r.headers.set_all("accept-encoding", ["gzip", "oink"])
2012-02-10 02:04:20 +00:00
r.constrain_encoding()
assert "oink" not in r.headers["accept-encoding"]
def test_decodeencode(self):
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
2015-09-05 18:45:58 +00:00
r.headers["content-encoding"] = "identity"
r.content = "falafel"
r.decode()
2015-09-05 18:45:58 +00:00
assert "content-encoding" not in r.headers
assert r.content == "falafel"
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
2012-02-10 02:04:20 +00:00
r.content = "falafel"
assert not r.decode()
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
2015-09-05 18:45:58 +00:00
r.headers["content-encoding"] = "identity"
r.content = "falafel"
r.encode("identity")
2015-09-05 18:45:58 +00:00
assert r.headers["content-encoding"] == "identity"
assert r.content == "falafel"
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
2015-09-05 18:45:58 +00:00
r.headers["content-encoding"] = "identity"
r.content = "falafel"
r.encode("gzip")
2015-09-05 18:45:58 +00:00
assert r.headers["content-encoding"] == "gzip"
assert r.content != "falafel"
r.decode()
2015-09-05 18:45:58 +00:00
assert "content-encoding" not in r.headers
assert r.content == "falafel"
def test_get_decoded_content(self):
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
r.content = None
2015-09-05 18:45:58 +00:00
r.headers["content-encoding"] = "identity"
assert r.get_decoded_content() == None
r.content = "falafel"
r.encode("gzip")
assert r.get_decoded_content() == "falafel"
def test_get_content_type(self):
2015-08-30 13:27:29 +00:00
resp = HTTPResponse.wrap(netlib.tutils.tresp())
2015-09-05 18:45:58 +00:00
resp.headers = Headers(content_type="text/plain")
assert resp.headers["content-type"] == "text/plain"
2015-05-30 00:03:28 +00:00
class TestResponse:
def test_simple(self):
f = tutils.tflow(resp=True)
resp = f.response
resp2 = resp.copy()
assert resp2.get_state() == resp.get_state()
def test_refresh(self):
2015-08-30 13:27:29 +00:00
r = HTTPResponse.wrap(netlib.tutils.tresp())
n = time.time()
2015-09-05 18:45:58 +00:00
r.headers["date"] = email.utils.formatdate(n)
pre = r.headers["date"]
r.refresh(n)
assert pre == r.headers["date"]
2015-05-30 00:03:28 +00:00
r.refresh(n + 60)
2015-09-05 18:45:58 +00:00
d = email.utils.parsedate_tz(r.headers["date"])
d = email.utils.mktime_tz(d)
# Weird that this is not exact...
2015-05-30 00:03:28 +00:00
assert abs(60 - (d - n)) <= 1
2015-09-05 18:45:58 +00:00
r.headers["set-cookie"] = "MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"
r.refresh()
def test_refresh_cookie(self):
2015-08-30 13:27:29 +00:00
r = HTTPResponse.wrap(netlib.tutils.tresp())
# Invalid expires format, sent to us by Reddit.
c = "rfoo=bar; Domain=reddit.com; expires=Thu, 31 Dec 2037 23:59:59 GMT; Path=/"
assert r._refresh_cookie(c, 60)
c = "MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"
assert "00:21:38" in r._refresh_cookie(c, 60)
def test_replace(self):
2015-08-30 13:27:29 +00:00
r = HTTPResponse.wrap(netlib.tutils.tresp())
2015-09-05 18:45:58 +00:00
r.headers["Foo"] = "fOo"
r.content = "afoob"
assert r.replace("foo(?i)", "boo") == 3
assert not "foo" in r.content
2015-09-05 18:45:58 +00:00
assert r.headers["boo"] == "boo"
def test_decodeencode(self):
2015-08-30 13:27:29 +00:00
r = HTTPResponse.wrap(netlib.tutils.tresp())
2015-09-05 18:45:58 +00:00
r.headers["content-encoding"] = "identity"
r.content = "falafel"
2014-02-07 02:56:57 +00:00
assert r.decode()
2015-09-05 18:45:58 +00:00
assert "content-encoding" not in r.headers
assert r.content == "falafel"
2015-08-30 13:27:29 +00:00
r = HTTPResponse.wrap(netlib.tutils.tresp())
2015-09-05 18:45:58 +00:00
r.headers["content-encoding"] = "identity"
r.content = "falafel"
r.encode("identity")
2015-09-05 18:45:58 +00:00
assert r.headers["content-encoding"] == "identity"
assert r.content == "falafel"
2015-08-30 13:27:29 +00:00
r = HTTPResponse.wrap(netlib.tutils.tresp())
2015-09-05 18:45:58 +00:00
r.headers["content-encoding"] = "identity"
r.content = "falafel"
r.encode("gzip")
2015-09-05 18:45:58 +00:00
assert r.headers["content-encoding"] == "gzip"
assert r.content != "falafel"
2014-02-07 02:56:57 +00:00
assert r.decode()
2015-09-05 18:45:58 +00:00
assert "content-encoding" not in r.headers
assert r.content == "falafel"
2015-09-05 18:45:58 +00:00
r.headers["content-encoding"] = "gzip"
2014-02-07 02:56:57 +00:00
assert not r.decode()
assert r.content == "falafel"
def test_get_content_type(self):
2015-08-30 13:27:29 +00:00
resp = HTTPResponse.wrap(netlib.tutils.tresp())
2015-09-05 18:45:58 +00:00
resp.headers = Headers(content_type="text/plain")
assert resp.headers["content-type"] == "text/plain"
class TestError:
def test_getset_state(self):
2014-02-04 04:02:17 +00:00
e = Error("Error")
state = e.get_state()
assert Error.from_state(state).get_state() == e.get_state()
assert e.copy()
2014-02-04 04:02:17 +00:00
e2 = Error("bar")
assert not e == e2
e.load_state(e2.get_state())
assert e.get_state() == e2.get_state()
e3 = e.copy()
assert e3.get_state() == e.get_state()
2014-02-04 04:02:17 +00:00
class TestClientConnection:
def test_state(self):
2014-02-04 04:02:17 +00:00
c = tutils.tclient_conn()
assert ClientConnection.from_state(c.get_state()).get_state() ==\
2015-05-30 00:03:28 +00:00
c.get_state()
2014-02-04 04:02:17 +00:00
c2 = tutils.tclient_conn()
c2.address.address = (c2.address.host, 4242)
assert not c == c2
2014-02-04 04:02:17 +00:00
c2.timestamp_start = 42
c.load_state(c2.get_state())
2014-02-04 04:02:17 +00:00
assert c.timestamp_start == 42
c3 = c.copy()
assert c3.get_state() == c.get_state()
2013-04-29 21:13:33 +00:00
assert str(c)
def test_decoded():
2015-08-30 13:27:29 +00:00
r = HTTPRequest.wrap(netlib.tutils.treq())
assert r.content == "content"
2015-09-05 18:45:58 +00:00
assert "content-encoding" not in r.headers
r.encode("gzip")
assert r.headers["content-encoding"]
assert r.content != "content"
with decoded(r):
2015-09-05 18:45:58 +00:00
assert "content-encoding" not in r.headers
assert r.content == "content"
assert r.headers["content-encoding"]
assert r.content != "content"
with decoded(r):
r.content = "foo"
assert r.content != "foo"
r.decode()
assert r.content == "foo"
def test_replacehooks():
h = flow.ReplaceHooks()
h.add("~q", "foo", "bar")
assert h.lst
h.set(
[
(".*", "one", "two"),
(".*", "three", "four"),
]
)
assert h.count() == 2
h.clear()
assert not h.lst
h.add("~q", "foo", "bar")
h.add("~s", "foo", "bar")
v = h.get_specs()
assert v == [('~q', 'foo', 'bar'), ('~s', 'foo', 'bar')]
assert h.count() == 2
h.clear()
assert h.count() == 0
f = tutils.tflow()
f.request.content = "foo"
h.add("~s", "foo", "bar")
h.run(f)
assert f.request.content == "foo"
f = tutils.tflow(resp=True)
f.request.content = "foo"
f.response.content = "foo"
h.run(f)
assert f.response.content == "bar"
assert f.request.content == "foo"
f = tutils.tflow()
h.clear()
h.add("~q", "foo", "bar")
f.request.content = "foo"
h.run(f)
assert f.request.content == "bar"
assert not h.add("~", "foo", "bar")
assert not h.add("foo", "*", "bar")
def test_setheaders():
h = flow.SetHeaders()
h.add("~q", "foo", "bar")
assert h.lst
h.set(
[
(".*", "one", "two"),
(".*", "three", "four"),
]
)
assert h.count() == 2
h.clear()
assert not h.lst
h.add("~q", "foo", "bar")
h.add("~s", "foo", "bar")
v = h.get_specs()
assert v == [('~q', 'foo', 'bar'), ('~s', 'foo', 'bar')]
assert h.count() == 2
h.clear()
assert h.count() == 0
f = tutils.tflow()
f.request.content = "foo"
h.add("~s", "foo", "bar")
h.run(f)
assert f.request.content == "foo"
h.clear()
h.add("~s", "one", "two")
h.add("~s", "one", "three")
f = tutils.tflow(resp=True)
2015-09-05 18:45:58 +00:00
f.request.headers["one"] = "xxx"
f.response.headers["one"] = "xxx"
h.run(f)
2015-09-05 18:45:58 +00:00
assert f.request.headers["one"] == "xxx"
assert f.response.headers.get_all("one") == ["two", "three"]
h.clear()
h.add("~q", "one", "two")
h.add("~q", "one", "three")
f = tutils.tflow()
2015-09-05 18:45:58 +00:00
f.request.headers["one"] = "xxx"
h.run(f)
2015-09-05 18:45:58 +00:00
assert f.request.headers.get_all("one") == ["two", "three"]
assert not h.add("~", "foo", "bar")