Reorganize test suite to remove confusion between test utils and libmproxy utils.

This commit is contained in:
Aldo Cortesi 2011-03-05 15:58:48 +13:00
parent 5da4099ddf
commit d9cb083809
6 changed files with 191 additions and 187 deletions

View File

@ -1,5 +1,5 @@
from libmproxy import console, proxy, filt, flow
import utils
import tutils
import libpry
@ -46,18 +46,18 @@ class uState(libpry.AutoTree):
assert c.get_focus() == (None, None)
def _add_request(self, state):
r = utils.treq()
r = tutils.treq()
return state.add_request(r)
def _add_response(self, state):
f = self._add_request(state)
r = utils.tresp(f.request)
r = tutils.tresp(f.request)
state.add_response(r)
def test_add_response(self):
c = console.ConsoleState()
f = self._add_request(c)
r = utils.tresp(f.request)
r = tutils.tresp(f.request)
c.focus = None
c.add_response(r)
@ -87,7 +87,7 @@ class uformat_keyvals(libpry.AutoTree):
class uformat_flow(libpry.AutoTree):
def test_simple(self):
f = utils.tflow()
f = tutils.tflow()
foc = ('focus', '>>')
assert foc not in console.format_flow(f, False)
assert foc in console.format_flow(f, True)
@ -95,7 +95,7 @@ class uformat_flow(libpry.AutoTree):
assert foc not in console.format_flow(f, False, True)
assert foc in console.format_flow(f, True, True)
f.response = utils.tresp()
f.response = tutils.tresp()
f.request = f.response.request
f.backup()

View File

@ -2,25 +2,25 @@ import os
from cStringIO import StringIO
import libpry
from libmproxy import dump, flow
import utils
import tutils
class uStrFuncs(libpry.AutoTree):
def test_all(self):
t = utils.tresp()
t = tutils.tresp()
t.set_replay()
dump.str_response(t)
t = utils.treq()
t = tutils.treq()
t.stickycookie = True
assert "stickycookie" in dump.str_request(t)
class uDumpMaster(libpry.AutoTree):
def _cycle(self, m, content):
req = utils.treq()
req = tutils.treq()
req.content = content
cc = req.client_conn
resp = utils.tresp(req)
resp = tutils.tresp(req)
resp.content = content
m.handle_clientconnect(cc)
m.handle_request(req)
@ -43,8 +43,8 @@ class uDumpMaster(libpry.AutoTree):
p = os.path.join(t, "rep")
f = open(p, "w")
fw = flow.FlowWriter(f)
t = utils.tflow_full()
t.response = utils.tresp(t.request)
t = tutils.tflow_full()
t.response = tutils.tresp(t.request)
fw.add(t)
f.close()

View File

@ -1,13 +1,13 @@
from cStringIO import StringIO
from libmproxy import console, proxy, filt, flow
import utils
import tutils
import libpry
class uStickyCookieState(libpry.AutoTree):
def _response(self, cookie, host):
s = flow.StickyCookieState(filt.parse(".*"))
f = utils.tflow_full()
f = tutils.tflow_full()
f.request.host = host
f.response.headers["Set-Cookie"] = [cookie]
s.handle_response(f)
@ -35,9 +35,9 @@ class uStickyCookieState(libpry.AutoTree):
class uClientPlaybackState(libpry.AutoTree):
def test_tick(self):
first = utils.tflow()
first = tutils.tflow()
c = flow.ClientPlaybackState(
[first, utils.tflow()]
[first, tutils.tflow()]
)
s = flow.State()
fm = flow.FlowMaster(None, s)
@ -59,8 +59,8 @@ class uClientPlaybackState(libpry.AutoTree):
class uServerPlaybackState(libpry.AutoTree):
def test_hash(self):
s = flow.ServerPlaybackState(None, [])
r = utils.tflow()
r2 = utils.tflow()
r = tutils.tflow()
r2 = tutils.tflow()
assert s._hash(r)
assert s._hash(r) == s._hash(r2)
@ -71,24 +71,24 @@ class uServerPlaybackState(libpry.AutoTree):
def test_headers(self):
s = flow.ServerPlaybackState(["foo"], [])
r = utils.tflow_full()
r = tutils.tflow_full()
r.request.headers["foo"] = ["bar"]
r2 = utils.tflow_full()
r2 = tutils.tflow_full()
assert not s._hash(r) == s._hash(r2)
r2.request.headers["foo"] = ["bar"]
assert s._hash(r) == s._hash(r2)
r2.request.headers["oink"] = ["bar"]
assert s._hash(r) == s._hash(r2)
r = utils.tflow_full()
r2 = utils.tflow_full()
r = tutils.tflow_full()
r2 = tutils.tflow_full()
assert s._hash(r) == s._hash(r2)
def test_load(self):
r = utils.tflow_full()
r = tutils.tflow_full()
r.request.headers["key"] = ["one"]
r2 = utils.tflow_full()
r2 = tutils.tflow_full()
r2.request.headers["key"] = ["two"]
s = flow.ServerPlaybackState(None, [r, r2])
@ -108,16 +108,16 @@ class uServerPlaybackState(libpry.AutoTree):
class uFlow(libpry.AutoTree):
def test_run_script(self):
f = utils.tflow()
f.response = utils.tresp()
f = tutils.tflow()
f.response = tutils.tresp()
f.request = f.response.request
se = f.run_script("scripts/a")
assert "DEBUG" == se.strip()
assert f.request.host == "TESTOK"
def test_run_script_err(self):
f = utils.tflow()
f.response = utils.tresp()
f = tutils.tflow()
f.response = tutils.tresp()
f.request = f.response.request
libpry.raises("returned error", f.run_script,"scripts/err_return")
libpry.raises("invalid response", f.run_script,"scripts/err_data")
@ -125,15 +125,15 @@ class uFlow(libpry.AutoTree):
libpry.raises("permission denied", f.run_script,"scripts/nonexecutable")
def test_match(self):
f = utils.tflow()
f.response = utils.tresp()
f = tutils.tflow()
f.response = tutils.tresp()
f.request = f.response.request
assert not f.match(filt.parse("~b test"))
assert not f.match(None)
def test_backup(self):
f = utils.tflow()
f.response = utils.tresp()
f = tutils.tflow()
f.response = tutils.tresp()
f.request = f.response.request
f.request.content = "foo"
assert not f.modified()
@ -144,8 +144,8 @@ class uFlow(libpry.AutoTree):
assert f.request.content == "foo"
def test_getset_state(self):
f = utils.tflow()
f.response = utils.tresp(f.request)
f = tutils.tflow()
f.response = tutils.tresp(f.request)
state = f.get_state()
assert f == flow.Flow.from_state(state)
@ -154,7 +154,7 @@ class uFlow(libpry.AutoTree):
state = f.get_state()
assert f == flow.Flow.from_state(state)
f2 = utils.tflow()
f2 = tutils.tflow()
f2.error = proxy.Error(f.request, "e2")
assert not f == f2
f.load_state(f2.get_state())
@ -163,14 +163,14 @@ class uFlow(libpry.AutoTree):
def test_kill(self):
f = utils.tflow()
f.request = utils.treq()
f = tutils.tflow()
f.request = tutils.treq()
f.intercept()
assert not f.request.acked
f.kill()
assert f.request.acked
f.intercept()
f.response = utils.tresp()
f.response = tutils.tresp()
f.request = f.response.request
f.request.ack()
assert not f.response.acked
@ -178,13 +178,13 @@ class uFlow(libpry.AutoTree):
assert f.response.acked
def test_accept_intercept(self):
f = utils.tflow()
f.request = utils.treq()
f = tutils.tflow()
f.request = tutils.treq()
f.intercept()
assert not f.request.acked
f.accept_intercept()
assert f.request.acked
f.response = utils.tresp()
f.response = tutils.tresp()
f.request = f.response.request
f.intercept()
f.request.ack()
@ -194,14 +194,14 @@ class uFlow(libpry.AutoTree):
def test_serialization(self):
f = flow.Flow(None)
f.request = utils.treq()
f.request = tutils.treq()
class uState(libpry.AutoTree):
def test_backup(self):
bc = proxy.ClientConnect(("address", 22))
c = flow.State()
req = utils.treq()
req = tutils.treq()
f = c.add_request(req)
f.backup()
@ -218,22 +218,22 @@ class uState(libpry.AutoTree):
c.clientconnect(bc)
assert len(c.client_connections) == 1
req = utils.treq(bc)
req = tutils.treq(bc)
f = c.add_request(req)
assert f
assert len(c.flow_list) == 1
assert c.flow_map.get(req)
newreq = utils.treq()
newreq = tutils.treq()
assert c.add_request(newreq)
assert c.flow_map.get(newreq)
resp = utils.tresp(req)
resp = tutils.tresp(req)
assert c.add_response(resp)
assert len(c.flow_list) == 2
assert c.flow_map.get(resp.request)
newresp = utils.tresp()
newresp = tutils.tresp()
assert not c.add_response(newresp)
assert not c.flow_map.get(newresp.request)
@ -244,18 +244,18 @@ class uState(libpry.AutoTree):
def test_err(self):
bc = proxy.ClientConnect(("address", 22))
c = flow.State()
req = utils.treq()
req = tutils.treq()
f = c.add_request(req)
e = proxy.Error(f.request, "message")
assert c.add_error(e)
e = proxy.Error(utils.tflow().request, "message")
e = proxy.Error(tutils.tflow().request, "message")
assert not c.add_error(e)
def test_view(self):
c = flow.State()
req = utils.treq()
req = tutils.treq()
c.clientconnect(req.client_conn)
assert len(c.view) == 0
@ -264,13 +264,13 @@ class uState(libpry.AutoTree):
c.set_limit(filt.parse("~s"))
assert len(c.view) == 0
resp = utils.tresp(req)
resp = tutils.tresp(req)
c.add_response(resp)
assert len(c.view) == 1
c.set_limit(None)
assert len(c.view) == 1
req = utils.treq()
req = tutils.treq()
c.clientconnect(req.client_conn)
c.add_request(req)
assert len(c.view) == 2
@ -280,24 +280,24 @@ class uState(libpry.AutoTree):
assert len(c.view) == 1
def _add_request(self, state):
req = utils.treq()
req = tutils.treq()
f = state.add_request(req)
return f
def _add_response(self, state):
req = utils.treq()
req = tutils.treq()
f = state.add_request(req)
resp = utils.tresp(req)
resp = tutils.tresp(req)
state.add_response(resp)
def _add_error(self, state):
req = utils.treq()
req = tutils.treq()
f = state.add_request(req)
f.error = proxy.Error(f.request, "msg")
def test_kill_flow(self):
c = flow.State()
req = utils.treq()
req = tutils.treq()
f = c.add_request(req)
c.kill_flow(f)
assert not c.flow_list
@ -340,7 +340,7 @@ class uState(libpry.AutoTree):
class uSerialize(libpry.AutoTree):
def test_roundtrip(self):
sio = StringIO()
f = utils.tflow()
f = tutils.tflow()
w = flow.FlowWriter(sio)
w.add(f)
@ -355,18 +355,18 @@ class uFlowMaster(libpry.AutoTree):
def test_all(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
req = utils.treq()
req = tutils.treq()
fm.handle_clientconnect(req.client_conn)
f = fm.handle_request(req)
assert len(s.flow_list) == 1
resp = utils.tresp(req)
resp = tutils.tresp(req)
fm.handle_response(resp)
assert len(s.flow_list) == 1
rx = utils.tresp()
rx = tutils.tresp()
assert not fm.handle_response(rx)
dc = proxy.ClientDisconnect(req.client_conn)
@ -378,25 +378,25 @@ class uFlowMaster(libpry.AutoTree):
def test_server_playback(self):
s = flow.State()
f = utils.tflow()
f.response = utils.tresp(f.request)
f = tutils.tflow()
f.response = tutils.tresp(f.request)
pb = [f]
fm = flow.FlowMaster(None, s)
assert not fm.do_server_playback(utils.tflow())
assert not fm.do_server_playback(tutils.tflow())
fm.start_server_playback(pb, False, [])
assert fm.do_server_playback(utils.tflow())
assert fm.do_server_playback(tutils.tflow())
fm.start_server_playback(pb, False, [])
r = utils.tflow()
r = tutils.tflow()
r.request.content = "gibble"
assert not fm.do_server_playback(r)
def test_client_playback(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
pb = [utils.tflow_full()]
pb = [tutils.tflow_full()]
fm.start_client_playback(pb)
def test_stickycookie(self):
@ -409,7 +409,7 @@ class uFlowMaster(libpry.AutoTree):
assert not fm.stickycookie_state
fm.set_stickycookie(".*")
tf = utils.tflow_full()
tf = tutils.tflow_full()
tf.response.headers["set-cookie"] = ["foo=bar"]
fm.handle_request(tf.request)
f = fm.handle_response(tf.response)

View File

@ -1,114 +1,38 @@
import threading, urllib, Queue, urllib2, cStringIO
import urllib, urllib2, cStringIO
import libpry
import serv, sslserv
from libmproxy import proxy, controller, utils, dump, script
import random
# Yes, the random ports are horrible. During development, sockets are often not
# properly closed during error conditions, which means you have to wait until
# you can re-bind to the same port. This is a pain in the ass, so we just pick
# a random port and keep moving.
PROXL_PORT = random.randint(10000, 20000)
HTTP_PORT = random.randint(20000, 30000)
HTTPS_PORT = random.randint(30000, 40000)
import tutils
class TestMaster(controller.Master):
def __init__(self, port, testq):
serv = proxy.ProxyServer(proxy.Config("data/testkey.pem"), port)
controller.Master.__init__(self, serv)
self.testq = testq
self.log = []
def clear(self):
self.log = []
def handle(self, m):
self.log.append(m)
m.ack()
class ProxyThread(threading.Thread):
def __init__(self, port, testq):
self.tmaster = TestMaster(port, testq)
threading.Thread.__init__(self)
def run(self):
self.tmaster.run()
def shutdown(self):
self.tmaster.shutdown()
class ServerThread(threading.Thread):
def __init__(self, server):
self.server = server
threading.Thread.__init__(self)
def run(self):
self.server.serve_forever()
def shutdown(self):
self.server.shutdown()
class _TestServers(libpry.TestContainer):
def setUpAll(self):
self.tqueue = Queue.Queue()
# We don't make any concurrent requests, so we can access
# the attributes on this object safely.
self.proxthread = ProxyThread(PROXL_PORT, self.tqueue)
self.threads = [
ServerThread(serv.make(HTTP_PORT)),
ServerThread(sslserv.make(HTTPS_PORT)),
self.proxthread
]
for i in self.threads:
i.start()
def setUp(self):
self.proxthread.tmaster.clear()
def tearDownAll(self):
for i in self.threads:
i.shutdown()
class _ProxTests(libpry.AutoTree):
def log(self):
pthread = self.findAttr("proxthread")
return pthread.tmaster.log
class uSanity(_ProxTests):
class uSanity(tutils.ProxTest):
def test_http(self):
"""
Just check that the HTTP server is running.
"""
f = urllib.urlopen("http://127.0.0.1:%s"%HTTP_PORT)
f = urllib.urlopen("http://127.0.0.1:%s"%tutils.HTTP_PORT)
assert f.read()
def test_https(self):
"""
Just check that the HTTPS server is running.
"""
f = urllib.urlopen("https://127.0.0.1:%s"%HTTPS_PORT)
f = urllib.urlopen("https://127.0.0.1:%s"%tutils.HTTPS_PORT)
assert f.read()
class uProxy(_ProxTests):
class uProxy(tutils.ProxTest):
HOST = "127.0.0.1"
def _get(self, host=HOST):
r = urllib2.Request("http://%s:%s"%(host, HTTP_PORT))
r.set_proxy("127.0.0.1:%s"%PROXL_PORT, "http")
r = urllib2.Request("http://%s:%s"%(host, tutils.HTTP_PORT))
r.set_proxy("127.0.0.1:%s"%tutils.PROXL_PORT, "http")
return urllib2.urlopen(r)
def _sget(self, host=HOST):
proxy_support = urllib2.ProxyHandler(
{"https" : "https://127.0.0.1:%s"%PROXL_PORT}
{"https" : "https://127.0.0.1:%s"%tutils.PROXL_PORT}
)
opener = urllib2.build_opener(proxy_support)
r = urllib2.Request("https://%s:%s"%(host, HTTPS_PORT))
r = urllib2.Request("https://%s:%s"%(host, tutils.HTTPS_PORT))
return opener.open(r)
def test_http(self):
@ -307,7 +231,7 @@ tests = [
u_parse_url(),
uError(),
uClientConnect(),
_TestServers(), [
tutils.TestServers(), [
uSanity(),
uProxy(),
],

111
test/tutils.py Normal file
View File

@ -0,0 +1,111 @@
import os.path, threading, Queue
import libpry
from libmproxy import proxy, utils, filt, flow, controller
import serv, sslserv
import random
def treq(conn=None):
if not conn:
conn = proxy.ClientConnect(("address", 22))
headers = utils.Headers()
headers["header"] = ["qvalue"]
return proxy.Request(conn, "host", 80, "http", "GET", "/path", headers, "content")
def tresp(req=None):
if not req:
req = treq()
headers = utils.Headers()
headers["header_response"] = ["svalue"]
return proxy.Response(req, 200, "message", headers, "content_response")
def tflow():
r = treq()
return flow.Flow(r)
def tflow_full():
r = treq()
f = flow.Flow(r)
f.response = tresp(r)
return f
# Yes, the random ports are horrible. During development, sockets are often not
# properly closed during error conditions, which means you have to wait until
# you can re-bind to the same port. This is a pain in the ass, so we just pick
# a random port and keep moving.
PROXL_PORT = random.randint(10000, 20000)
HTTP_PORT = random.randint(20000, 30000)
HTTPS_PORT = random.randint(30000, 40000)
class TestMaster(controller.Master):
def __init__(self, port, testq):
serv = proxy.ProxyServer(proxy.Config("data/testkey.pem"), port)
controller.Master.__init__(self, serv)
self.testq = testq
self.log = []
def clear(self):
self.log = []
def handle(self, m):
self.log.append(m)
m.ack()
class ProxyThread(threading.Thread):
def __init__(self, port, testq):
self.tmaster = TestMaster(port, testq)
threading.Thread.__init__(self)
def run(self):
self.tmaster.run()
def shutdown(self):
self.tmaster.shutdown()
class ServerThread(threading.Thread):
def __init__(self, server):
self.server = server
threading.Thread.__init__(self)
def run(self):
self.server.serve_forever()
def shutdown(self):
self.server.shutdown()
class TestServers(libpry.TestContainer):
def setUpAll(self):
self.tqueue = Queue.Queue()
# We don't make any concurrent requests, so we can access
# the attributes on this object safely.
self.proxthread = ProxyThread(PROXL_PORT, self.tqueue)
self.threads = [
ServerThread(serv.make(HTTP_PORT)),
ServerThread(sslserv.make(HTTPS_PORT)),
self.proxthread
]
for i in self.threads:
i.start()
def setUp(self):
self.proxthread.tmaster.clear()
def tearDownAll(self):
for i in self.threads:
i.shutdown()
class ProxTest(libpry.AutoTree):
def log(self):
pthread = self.findAttr("proxthread")
return pthread.tmaster.log

View File

@ -1,31 +0,0 @@
import os.path
from libmproxy import proxy, utils, filt, flow
def treq(conn=None):
if not conn:
conn = proxy.ClientConnect(("address", 22))
headers = utils.Headers()
headers["header"] = ["qvalue"]
return proxy.Request(conn, "host", 80, "http", "GET", "/path", headers, "content")
def tresp(req=None):
if not req:
req = treq()
headers = utils.Headers()
headers["header_response"] = ["svalue"]
return proxy.Response(req, 200, "message", headers, "content_response")
def tflow():
r = treq()
return flow.Flow(r)
def tflow_full():
r = treq()
f = flow.Flow(r)
f.response = tresp(r)
return f