import cStringIO, time, re import libpry from libmproxy import proxy, controller, utils, dump, script import email.utils import tutils class u_read_chunked(libpry.AutoTree): def test_all(self): s = cStringIO.StringIO("1\r\na\r\n0\r\n") libpry.raises(IOError, proxy.read_chunked, s) s = cStringIO.StringIO("1\r\na\r\n0\r\n\r\n") assert proxy.read_chunked(s) == "a" s = cStringIO.StringIO("\r\n") libpry.raises(IOError, proxy.read_chunked, s) s = cStringIO.StringIO("1\r\nfoo") libpry.raises(IOError, proxy.read_chunked, s) class u_parse_request_line(libpry.AutoTree): def test_simple(self): libpry.raises(proxy.ProxyError, proxy.parse_request_line, "") u = "GET ... HTTP/1.1" libpry.raises("invalid url", proxy.parse_request_line, u) u = "GET http://foo.com:8888/test HTTP/1.1" m, s, h, po, pa, minor = proxy.parse_request_line(u) assert m == "GET" assert s == "http" assert h == "foo.com" assert po == 8888 assert pa == "/test" assert minor == 1 def test_connect(self): u = "CONNECT host.com:443 HTTP/1.0" expected = ('CONNECT', None, 'host.com', 443, None, 0) ret = proxy.parse_request_line(u) assert expected == ret def test_inner(self): u = "GET / HTTP/1.1" assert proxy.parse_request_line(u) == ('GET', None, None, None, '/', 1) class u_parse_url(libpry.AutoTree): def test_simple(self): assert not proxy.parse_url("") u = "http://foo.com:8888/test" s, h, po, pa = proxy.parse_url(u) assert s == "http" assert h == "foo.com" assert po == 8888 assert pa == "/test" s, h, po, pa = proxy.parse_url("http://foo/bar") assert s == "http" assert h == "foo" assert po == 80 assert pa == "/bar" s, h, po, pa = proxy.parse_url("http://foo") assert pa == "/" s, h, po, pa = proxy.parse_url("https://foo") assert po == 443 class uFileLike(libpry.AutoTree): def test_wrap(self): s = cStringIO.StringIO("foobar\nfoobar") s = proxy.FileLike(s) s.flush() assert s.readline() == "foobar\n" assert s.readline() == "foobar" # Test __getattr__ assert s.isatty class uRequest(libpry.AutoTree): def test_simple(self): h = utils.Headers() h["test"] = ["test"] c = proxy.ClientConnect(("addr", 2222)) r = proxy.Request(c, "host", 22, "https", "GET", "/", h, "content") u = r.url() assert r.set_url(u) assert not r.set_url("") assert r.url() == u assert r.assemble() r2 = r.copy() assert r == r2 def test_anticache(self): h = utils.Headers() r = proxy.Request(None, "host", 22, "https", "GET", "/", h, "content") h["if-modified-since"] = ["test"] h["if-none-match"] = ["test"] r.anticache() assert not "if-modified-since" in r.headers assert not "if-none-match" in r.headers def test_getset_state(self): h = utils.Headers() h["test"] = ["test"] c = proxy.ClientConnect(("addr", 2222)) r = proxy.Request(c, "host", 22, "https", "GET", "/", h, "content") state = r.get_state() assert proxy.Request.from_state(state) == r r.client_conn = None state = r.get_state() assert proxy.Request.from_state(state) == r r2 = proxy.Request(c, "testing", 20, "http", "PUT", "/foo", h, "test") assert not r == r2 r.load_state(r2.get_state()) assert r == r2 r2.client_conn = None r.load_state(r2.get_state()) assert not r.client_conn def test_replace(self): r = tutils.treq() r.path = "path/foo" r.headers["Foo"] = ["fOo"] r.content = "afoob" assert r.replace("foo(?i)", "boo") == 4 assert r.path == "path/boo" assert not "foo" in r.content assert r.headers["boo"] == ["boo"] def test_decodeencode(self): r = tutils.treq() r.headers["content-encoding"] = ["identity"] r.content = "falafel" r.decode() assert not r.headers["content-encoding"] assert r.content == "falafel" r = tutils.treq() r.headers["content-encoding"] = ["identity"] r.content = "falafel" r.encode("identity") assert r.headers["content-encoding"] == ["identity"] assert r.content == "falafel" r = tutils.treq() r.headers["content-encoding"] = ["identity"] r.content = "falafel" r.encode("gzip") assert r.headers["content-encoding"] == ["gzip"] assert r.content != "falafel" r.decode() assert not r.headers["content-encoding"] assert r.content == "falafel" class uResponse(libpry.AutoTree): def test_simple(self): h = utils.Headers() h["test"] = ["test"] c = proxy.ClientConnect(("addr", 2222)) req = proxy.Request(c, "host", 22, "https", "GET", "/", h, "content") resp = proxy.Response(req, 200, "msg", h.copy(), "content") assert resp.assemble() resp2 = resp.copy() assert resp2 == resp def test_refresh(self): r = tutils.tresp() n = time.time() r.headers["date"] = [email.utils.formatdate(n)] pre = r.headers["date"] r.refresh(n) assert pre == r.headers["date"] r.refresh(n+60) d = email.utils.parsedate_tz(r.headers["date"][0]) d = email.utils.mktime_tz(d) # Weird that this is not exact... assert abs(60-(d-n)) <= 1 r.headers["set-cookie"] = ["MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"] r.refresh() def test_refresh_cookie(self): r = tutils.tresp() # Invalid expires format, sent to us by Reddit. c = "rfoo=bar; Domain=reddit.com; expires=Thu, 31 Dec 2037 23:59:59 GMT; Path=/" assert r._refresh_cookie(c, 60) c = "MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure" assert "00:21:38" in r._refresh_cookie(c, 60) def test_getset_state(self): h = utils.Headers() h["test"] = ["test"] c = proxy.ClientConnect(("addr", 2222)) r = proxy.Request(c, "host", 22, "https", "GET", "/", h, "content") req = proxy.Request(c, "host", 22, "https", "GET", "/", h, "content") resp = proxy.Response(req, 200, "msg", h.copy(), "content") state = resp.get_state() assert proxy.Response.from_state(req, state) == resp resp2 = proxy.Response(req, 220, "foo", h.copy(), "test") assert not resp == resp2 resp.load_state(resp2.get_state()) assert resp == resp2 def test_replace(self): r = tutils.tresp() r.headers["Foo"] = ["fOo"] r.content = "afoob" assert r.replace("foo(?i)", "boo") == 3 assert not "foo" in r.content assert r.headers["boo"] == ["boo"] def test_decodeencode(self): r = tutils.tresp() r.headers["content-encoding"] = ["identity"] r.content = "falafel" r.decode() assert not r.headers["content-encoding"] assert r.content == "falafel" r = tutils.tresp() r.headers["content-encoding"] = ["identity"] r.content = "falafel" r.encode("identity") assert r.headers["content-encoding"] == ["identity"] assert r.content == "falafel" r = tutils.tresp() r.headers["content-encoding"] = ["identity"] r.content = "falafel" r.encode("gzip") assert r.headers["content-encoding"] == ["gzip"] assert r.content != "falafel" r.decode() assert not r.headers["content-encoding"] assert r.content == "falafel" class uError(libpry.AutoTree): def test_getset_state(self): e = proxy.Error(None, "Error") state = e.get_state() assert proxy.Error.from_state(state) == e assert e.copy() e2 = proxy.Error(None, "bar") assert not e == e2 e.load_state(e2.get_state()) assert e == e2 e3 = e.copy() assert e3 == e def test_replace(self): e = proxy.Error(None, "amoop") e.replace("moo", "bar") assert e.msg == "abarp" class uProxyError(libpry.AutoTree): def test_simple(self): p = proxy.ProxyError(111, "msg") assert repr(p) class uClientConnect(libpry.AutoTree): def test_state(self): c = proxy.ClientConnect(("a", 22)) assert proxy.ClientConnect.from_state(c.get_state()) == c c2 = proxy.ClientConnect(("a", 25)) assert not c == c2 c.load_state(c2.get_state()) assert c == c2 c3 = c.copy() assert c3 == c tests = [ uProxyError(), uRequest(), uResponse(), uFileLike(), u_parse_request_line(), u_parse_url(), uError(), uClientConnect(), u_read_chunked(), ]