Add client replay options to mitmdump.

This commit is contained in:
Aldo Cortesi 2011-03-06 11:21:31 +13:00
parent d9cb083809
commit bad77f3470
8 changed files with 115 additions and 101 deletions

View File

@ -10,6 +10,7 @@ class Options(object):
"request_script",
"response_script",
"server_replay",
"client_replay",
"verbosity",
"wfile",
"rheaders",
@ -31,7 +32,11 @@ def str_response(resp):
def str_request(req):
r = "%s %s"%(req.method, req.url())
if req.client_conn:
c = req.client_conn.address[0]
else:
c = "[replay]"
r = "%s %s %s"%(c, req.method, req.url())
if req.stickycookie:
r = "[stickycookie] " + r
return r
@ -65,14 +70,22 @@ class DumpMaster(flow.FlowMaster):
raise DumpError(v.strerror)
if options.server_replay:
path = os.path.expanduser(options.server_replay)
try:
f = file(path, "r")
flows = list(flow.FlowReader(f).stream())
except IOError, v:
raise DumpError(v.strerror)
self.start_server_playback(flows, options.kill, options.rheaders)
self.start_server_playback(
self._readflow(options.server_replay),
options.kill, options.rheaders
)
if options.client_replay:
self.start_client_playback(self._readflow(options.client_replay))
def _readflow(self, path):
path = os.path.expanduser(path)
try:
f = file(path, "r")
flows = list(flow.FlowReader(f).stream())
except IOError, v:
raise DumpError(v.strerror)
return flows
def _runscript(self, f, script):
try:
@ -92,6 +105,7 @@ class DumpMaster(flow.FlowMaster):
f = flow.FlowMaster.handle_request(self, r)
if f:
r.ack()
return f
def indent(self, n, t):
l = str(t).strip().split("\n")
@ -105,12 +119,10 @@ class DumpMaster(flow.FlowMaster):
return
sz = utils.pretty_size(len(f.response.content))
if self.o.verbosity == 1:
print >> self.outfile, f.request.client_conn.address[0],
print >> self.outfile, str_request(f.request)
print >> self.outfile, " <<",
print >> self.outfile, str_response(f.response), sz
elif self.o.verbosity == 2:
print >> self.outfile, f.request.client_conn.address[0],
print >> self.outfile, str_request(f.request)
print >> self.outfile, self.indent(4, f.request.headers)
print >> self.outfile
@ -118,7 +130,6 @@ class DumpMaster(flow.FlowMaster):
print >> self.outfile, self.indent(4, f.response.headers)
print >> self.outfile, "\n"
elif self.o.verbosity == 3:
print >> self.outfile, f.request.client_conn.address[0],
print >> self.outfile, str_request(f.request)
print >> self.outfile, self.indent(4, f.request.headers)
if utils.isBin(f.request.content):
@ -136,6 +147,7 @@ class DumpMaster(flow.FlowMaster):
self.state.delete_flow(f)
if self.o.wfile:
self.fwriter.add(f)
return f
# begin nocover
def run(self):

View File

@ -52,9 +52,8 @@ class ClientPlaybackState:
testing: Disables actual replay for testing.
"""
if self.flows and not self.current:
self.current = self.flows.pop(0)
self.current.response = None
master.handle_request(self.current.request)
n = self.flows.pop(0)
self.current = master.handle_request(n.request)
if not testing:
#begin nocover
master.state.replay_request(self.current, master.masterq)
@ -467,7 +466,7 @@ class FlowMaster(controller.Master):
def tick(self, q):
if self.client_playback:
self.client_playback.tick()
self.client_playback.tick(self)
controller.Master.tick(self, q)
def handle_clientconnect(self, r):

View File

@ -34,10 +34,10 @@ if __name__ == '__main__':
type = "int", dest="port", default=8080,
help = "Proxy service port."
)
parser.add_option("-c",
parser.add_option("-i",
action="store_true", dest="stickycookie_all", default=None,
help="Set sticky cookie for all requests.")
parser.add_option("-C",
parser.add_option("-I",
action="store", dest="stickycookie_filt", default=None, metavar="FILTER",
help="Set sticky cookie filter. Matched against requests.")
parser.add_option("-q",
@ -58,7 +58,7 @@ if __name__ == '__main__':
group = OptionGroup(parser, "Server Replay")
group.add_option("-r", action="store", dest="server_replay", default=None, metavar="PATH",
group.add_option("-s", action="store", dest="server_replay", default=None, metavar="PATH",
help="Replay server responses from a saved file.")
group.add_option("-k", "--kill",
action="store_true", dest="kill", default=False,
@ -70,6 +70,11 @@ if __name__ == '__main__':
parser.add_option_group(group)
group = OptionGroup(parser, "Client Replay")
group.add_option("-c", action="store", dest="client_replay", default=None, metavar="PATH",
help="Replay client requests from a saved file.")
parser.add_option_group(group)
options, args = parser.parse_args()
@ -92,7 +97,8 @@ if __name__ == '__main__':
server_replay = options.server_replay,
kill = options.kill,
rheaders = options.rheaders,
stickycookie = stickycookie
client_replay = options.client_replay,
stickycookie = stickycookie,
)
if args:
filt = " ".join(args)

View File

@ -10,6 +10,7 @@ class TestRequestHandler(BaseHTTPRequestHandler):
self.wfile = socket._fileobject(self.request, "wb", self.wbufsize)
def log_message(self, *args, **kwargs):
# Silence output
pass
def do_GET(self):
@ -21,5 +22,3 @@ class TestRequestHandler(BaseHTTPRequestHandler):
self.end_headers()
self.wfile.write(data)

View File

@ -50,7 +50,7 @@ class uClientPlaybackState(libpry.AutoTree):
c.tick(fm, testing=True)
assert c.count() == 1
c.clear(first)
c.clear(c.current)
c.tick(fm, testing=True)
assert c.count() == 0
@ -419,9 +419,6 @@ class uFlowMaster(libpry.AutoTree):
assert tf.request.headers["cookie"] == ["foo=bar"]
tests = [
uStickyCookieState(),
uServerPlaybackState(),
@ -429,5 +426,5 @@ tests = [
uFlow(),
uState(),
uSerialize(),
uFlowMaster()
uFlowMaster(),
]

View File

@ -1,75 +1,9 @@
import urllib, urllib2, cStringIO
import cStringIO
import libpry
from libmproxy import proxy, controller, utils, dump, script
import tutils
class uSanity(tutils.ProxTest):
def test_http(self):
"""
Just check that the HTTP server is running.
"""
f = urllib.urlopen("http://127.0.0.1:%s"%tutils.HTTP_PORT)
assert f.read()
def test_https(self):
"""
Just check that the HTTPS server is running.
"""
f = urllib.urlopen("https://127.0.0.1:%s"%tutils.HTTPS_PORT)
assert f.read()
class uProxy(tutils.ProxTest):
HOST = "127.0.0.1"
def _get(self, host=HOST):
r = urllib2.Request("http://%s:%s"%(host, tutils.HTTP_PORT))
r.set_proxy("127.0.0.1:%s"%tutils.PROXL_PORT, "http")
return urllib2.urlopen(r)
def _sget(self, host=HOST):
proxy_support = urllib2.ProxyHandler(
{"https" : "https://127.0.0.1:%s"%tutils.PROXL_PORT}
)
opener = urllib2.build_opener(proxy_support)
r = urllib2.Request("https://%s:%s"%(host, tutils.HTTPS_PORT))
return opener.open(r)
def test_http(self):
f = self._get()
assert f.code == 200
assert f.read()
f.close()
l = self.log()
assert l[0].address
assert l[1].headers.has_key("host")
assert l[2].code == 200
def test_https(self):
f = self._sget()
assert f.code == 200
assert f.read()
f.close()
l = self.log()
assert l[0].address
assert l[1].headers.has_key("host")
assert l[2].code == 200
# Disable these two for now: they take a long time.
def _test_http_nonexistent(self):
f = self._get("nonexistent")
assert f.code == 200
assert "Error" in f.read()
def _test_https_nonexistent(self):
f = self._sget("nonexistent")
assert f.code == 200
assert "Error" in f.read()
class u_parse_request_line(libpry.AutoTree):
def test_simple(self):
libpry.raises(proxy.ProxyError, proxy.parse_request_line, "")
@ -202,7 +136,6 @@ class uError(libpry.AutoTree):
assert e == e2
class uProxyError(libpry.AutoTree):
def test_simple(self):
p = proxy.ProxyError(111, "msg")
@ -221,7 +154,6 @@ class uClientConnect(libpry.AutoTree):
assert c == c2
tests = [
uProxyError(),
uRequest(),
@ -231,8 +163,4 @@ tests = [
u_parse_url(),
uError(),
uClientConnect(),
tutils.TestServers(), [
uSanity(),
uProxy(),
],
]

75
test/test_server.py Normal file
View File

@ -0,0 +1,75 @@
import urllib, urllib2
from libmproxy import flow
import tutils
class uSanity(tutils.ProxTest):
def test_http(self):
"""
Just check that the HTTP server is running.
"""
f = urllib.urlopen("http://127.0.0.1:%s"%tutils.HTTP_PORT)
assert f.read()
def test_https(self):
"""
Just check that the HTTPS server is running.
"""
f = urllib.urlopen("https://127.0.0.1:%s"%tutils.HTTPS_PORT)
assert f.read()
class uProxy(tutils.ProxTest):
HOST = "127.0.0.1"
def _get(self, host=HOST):
r = urllib2.Request("http://%s:%s"%(host, tutils.HTTP_PORT))
r.set_proxy("127.0.0.1:%s"%tutils.PROXL_PORT, "http")
return urllib2.urlopen(r)
def _sget(self, host=HOST):
proxy_support = urllib2.ProxyHandler(
{"https" : "https://127.0.0.1:%s"%tutils.PROXL_PORT}
)
opener = urllib2.build_opener(proxy_support)
r = urllib2.Request("https://%s:%s"%(host, tutils.HTTPS_PORT))
return opener.open(r)
def test_http(self):
f = self._get()
assert f.code == 200
assert f.read()
f.close()
l = self.log()
assert l[0].address
assert l[1].headers.has_key("host")
assert l[2].code == 200
def test_https(self):
f = self._sget()
assert f.code == 200
assert f.read()
f.close()
l = self.log()
assert l[0].address
assert l[1].headers.has_key("host")
assert l[2].code == 200
# Disable these two for now: they take a long time.
def _test_http_nonexistent(self):
f = self._get("nonexistent")
assert f.code == 200
assert "Error" in f.read()
def _test_https_nonexistent(self):
f = self._sget("nonexistent")
assert f.code == 200
assert "Error" in f.read()
tests = [
tutils.TestServers(), [
uSanity(),
uProxy(),
]
]

View File

@ -32,8 +32,6 @@ def tflow_full():
return f
# Yes, the random ports are horrible. During development, sockets are often not
# properly closed during error conditions, which means you have to wait until
# you can re-bind to the same port. This is a pain in the ass, so we just pick