mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-27 02:24:18 +00:00
c2ae8285f4
- Move option parsing utiliities to proxy.py - Don't have a global config object. Pass it as an argument to ProxyServer. - Simplify certificate generation logic.
317 lines
8.6 KiB
Python
317 lines
8.6 KiB
Python
import threading, urllib, Queue, urllib2, cStringIO
|
|
import libpry
|
|
import serv, sslserv
|
|
from libmproxy import proxy, controller, utils, dump, script
|
|
import random
|
|
|
|
# Yes, the random ports are horrible. During development, sockets are often not
|
|
# properly closed during error conditions, which means you have to wait until
|
|
# you can re-bind to the same port. This is a pain in the ass, so we just pick
|
|
# a random port and keep moving.
|
|
PROXL_PORT = random.randint(10000, 20000)
|
|
HTTP_PORT = random.randint(20000, 30000)
|
|
HTTPS_PORT = random.randint(30000, 40000)
|
|
|
|
|
|
class TestMaster(controller.Master):
|
|
def __init__(self, port, testq):
|
|
serv = proxy.ProxyServer(proxy.Config("data/testkey.pem"), port)
|
|
controller.Master.__init__(self, serv)
|
|
self.testq = testq
|
|
self.log = []
|
|
|
|
def clear(self):
|
|
self.log = []
|
|
|
|
def handle(self, m):
|
|
self.log.append(m)
|
|
m.ack()
|
|
|
|
|
|
class ProxyThread(threading.Thread):
|
|
def __init__(self, port, testq):
|
|
self.tmaster = TestMaster(port, testq)
|
|
threading.Thread.__init__(self)
|
|
|
|
def run(self):
|
|
self.tmaster.run()
|
|
|
|
def shutdown(self):
|
|
self.tmaster.shutdown()
|
|
|
|
|
|
class ServerThread(threading.Thread):
|
|
def __init__(self, server):
|
|
self.server = server
|
|
threading.Thread.__init__(self)
|
|
|
|
def run(self):
|
|
self.server.serve_forever()
|
|
|
|
def shutdown(self):
|
|
self.server.shutdown()
|
|
|
|
|
|
class _TestServers(libpry.TestContainer):
|
|
def setUpAll(self):
|
|
self.tqueue = Queue.Queue()
|
|
# We don't make any concurrent requests, so we can access
|
|
# the attributes on this object safely.
|
|
self.proxthread = ProxyThread(PROXL_PORT, self.tqueue)
|
|
self.threads = [
|
|
ServerThread(serv.make(HTTP_PORT)),
|
|
ServerThread(sslserv.make(HTTPS_PORT)),
|
|
self.proxthread
|
|
]
|
|
for i in self.threads:
|
|
i.start()
|
|
|
|
def setUp(self):
|
|
self.proxthread.tmaster.clear()
|
|
|
|
def tearDownAll(self):
|
|
for i in self.threads:
|
|
i.shutdown()
|
|
|
|
|
|
class _ProxTests(libpry.AutoTree):
|
|
def log(self):
|
|
pthread = self.findAttr("proxthread")
|
|
return pthread.tmaster.log
|
|
|
|
|
|
class uSanity(_ProxTests):
|
|
def test_http(self):
|
|
"""
|
|
Just check that the HTTP server is running.
|
|
"""
|
|
f = urllib.urlopen("http://127.0.0.1:%s"%HTTP_PORT)
|
|
assert f.read()
|
|
|
|
def test_https(self):
|
|
"""
|
|
Just check that the HTTPS server is running.
|
|
"""
|
|
f = urllib.urlopen("https://127.0.0.1:%s"%HTTPS_PORT)
|
|
assert f.read()
|
|
|
|
|
|
class uProxy(_ProxTests):
|
|
HOST = "127.0.0.1"
|
|
def _get(self, host=HOST):
|
|
r = urllib2.Request("http://%s:%s"%(host, HTTP_PORT))
|
|
r.set_proxy("127.0.0.1:%s"%PROXL_PORT, "http")
|
|
return urllib2.urlopen(r)
|
|
|
|
def _sget(self, host=HOST):
|
|
proxy_support = urllib2.ProxyHandler(
|
|
{"https" : "https://127.0.0.1:%s"%PROXL_PORT}
|
|
)
|
|
opener = urllib2.build_opener(proxy_support)
|
|
r = urllib2.Request("https://%s:%s"%(host, HTTPS_PORT))
|
|
return opener.open(r)
|
|
|
|
def test_http(self):
|
|
f = self._get()
|
|
assert f.code == 200
|
|
assert f.read()
|
|
f.close()
|
|
|
|
l = self.log()
|
|
assert l[0].address
|
|
assert l[1].headers.has_key("host")
|
|
assert l[2].code == 200
|
|
|
|
def test_https(self):
|
|
f = self._sget()
|
|
assert f.code == 200
|
|
assert f.read()
|
|
f.close()
|
|
|
|
l = self.log()
|
|
assert l[0].address
|
|
assert l[1].headers.has_key("host")
|
|
assert l[2].code == 200
|
|
|
|
# Disable these two for now: they take a long time.
|
|
def _test_http_nonexistent(self):
|
|
f = self._get("nonexistent")
|
|
assert f.code == 200
|
|
assert "Error" in f.read()
|
|
|
|
def _test_https_nonexistent(self):
|
|
f = self._sget("nonexistent")
|
|
assert f.code == 200
|
|
assert "Error" in f.read()
|
|
|
|
|
|
|
|
class u_parse_request_line(libpry.AutoTree):
|
|
def test_simple(self):
|
|
libpry.raises(proxy.ProxyError, proxy.parse_request_line, "")
|
|
|
|
u = "GET ... HTTP/1.1"
|
|
libpry.raises("invalid url", proxy.parse_request_line, u)
|
|
|
|
u = "GET http://foo.com:8888/test HTTP/1.1"
|
|
m, s, h, po, pa, minor = proxy.parse_request_line(u)
|
|
assert m == "GET"
|
|
assert s == "http"
|
|
assert h == "foo.com"
|
|
assert po == 8888
|
|
assert pa == "/test"
|
|
assert minor == 1
|
|
|
|
def test_connect(self):
|
|
u = "CONNECT host.com:443 HTTP/1.0"
|
|
expected = ('CONNECT', None, 'host.com', 443, None, 0)
|
|
ret = proxy.parse_request_line(u)
|
|
assert expected == ret
|
|
|
|
def test_inner(self):
|
|
u = "GET / HTTP/1.1"
|
|
assert proxy.parse_request_line(u) == ('GET', None, None, None, '/', 1)
|
|
|
|
|
|
class u_parse_url(libpry.AutoTree):
|
|
def test_simple(self):
|
|
assert not proxy.parse_url("")
|
|
|
|
u = "http://foo.com:8888/test"
|
|
s, h, po, pa = proxy.parse_url(u)
|
|
assert s == "http"
|
|
assert h == "foo.com"
|
|
assert po == 8888
|
|
assert pa == "/test"
|
|
|
|
s, h, po, pa = proxy.parse_url("http://foo/bar")
|
|
assert s == "http"
|
|
assert h == "foo"
|
|
assert po == 80
|
|
assert pa == "/bar"
|
|
|
|
s, h, po, pa = proxy.parse_url("http://foo")
|
|
assert pa == "/"
|
|
|
|
s, h, po, pa = proxy.parse_url("https://foo")
|
|
assert po == 443
|
|
|
|
|
|
class uFileLike(libpry.AutoTree):
|
|
def test_wrap(self):
|
|
s = cStringIO.StringIO("foobar\nfoobar")
|
|
s = proxy.FileLike(s)
|
|
s.flush()
|
|
assert s.readline() == "foobar\n"
|
|
assert s.readline() == "foobar"
|
|
# Test __getattr__
|
|
assert s.isatty
|
|
|
|
|
|
class uRequest(libpry.AutoTree):
|
|
def test_simple(self):
|
|
h = utils.Headers()
|
|
h["test"] = ["test"]
|
|
c = proxy.ClientConnect(("addr", 2222))
|
|
r = proxy.Request(c, "host", 22, "https", "GET", "/", h, "content")
|
|
u = r.url()
|
|
assert r.set_url(u)
|
|
assert not r.set_url("")
|
|
assert r.url() == u
|
|
assert r.short()
|
|
assert r.assemble()
|
|
|
|
def test_getset_state(self):
|
|
h = utils.Headers()
|
|
h["test"] = ["test"]
|
|
c = proxy.ClientConnect(("addr", 2222))
|
|
r = proxy.Request(c, "host", 22, "https", "GET", "/", h, "content")
|
|
state = r.get_state()
|
|
assert proxy.Request.from_state(state) == r
|
|
|
|
r.client_conn = None
|
|
state = r.get_state()
|
|
assert proxy.Request.from_state(state) == r
|
|
|
|
r2 = proxy.Request(c, "testing", 20, "http", "PUT", "/foo", h, "test")
|
|
assert not r == r2
|
|
r.load_state(r2.get_state())
|
|
assert r == r2
|
|
|
|
|
|
class uResponse(libpry.AutoTree):
|
|
def test_simple(self):
|
|
h = utils.Headers()
|
|
h["test"] = ["test"]
|
|
c = proxy.ClientConnect(("addr", 2222))
|
|
req = proxy.Request(c, "host", 22, "https", "GET", "/", h, "content")
|
|
resp = proxy.Response(req, 200, "msg", h.copy(), "content")
|
|
assert resp.short()
|
|
assert resp.assemble()
|
|
|
|
def test_getset_state(self):
|
|
h = utils.Headers()
|
|
h["test"] = ["test"]
|
|
c = proxy.ClientConnect(("addr", 2222))
|
|
r = proxy.Request(c, "host", 22, "https", "GET", "/", h, "content")
|
|
req = proxy.Request(c, "host", 22, "https", "GET", "/", h, "content")
|
|
resp = proxy.Response(req, 200, "msg", h.copy(), "content")
|
|
|
|
state = resp.get_state()
|
|
assert proxy.Response.from_state(req, state) == resp
|
|
|
|
resp2 = proxy.Response(req, 220, "foo", h.copy(), "test")
|
|
assert not resp == resp2
|
|
resp.load_state(resp2.get_state())
|
|
assert resp == resp2
|
|
|
|
|
|
class uError(libpry.AutoTree):
|
|
def test_getset_state(self):
|
|
e = proxy.Error(None, "Error")
|
|
state = e.get_state()
|
|
assert proxy.Error.from_state(state) == e
|
|
|
|
assert e.copy()
|
|
|
|
e2 = proxy.Error(None, "bar")
|
|
assert not e == e2
|
|
e.load_state(e2.get_state())
|
|
assert e == e2
|
|
|
|
|
|
|
|
class uProxyError(libpry.AutoTree):
|
|
def test_simple(self):
|
|
p = proxy.ProxyError(111, "msg")
|
|
assert repr(p)
|
|
|
|
|
|
class uClientConnect(libpry.AutoTree):
|
|
def test_state(self):
|
|
c = proxy.ClientConnect(("a", 22))
|
|
assert proxy.ClientConnect.from_state(c.get_state()) == c
|
|
|
|
c2 = proxy.ClientConnect(("a", 25))
|
|
assert not c == c2
|
|
|
|
c.load_state(c2.get_state())
|
|
assert c == c2
|
|
|
|
|
|
|
|
tests = [
|
|
uProxyError(),
|
|
uRequest(),
|
|
uResponse(),
|
|
uFileLike(),
|
|
u_parse_request_line(),
|
|
u_parse_url(),
|
|
uError(),
|
|
uClientConnect(),
|
|
_TestServers(), [
|
|
uSanity(),
|
|
uProxy(),
|
|
],
|
|
]
|