mitmproxy/test/test_pathoc.py

275 lines
7.9 KiB
Python
Raw Normal View History

import json
import cStringIO
import re
2015-06-08 13:28:24 +00:00
from mock import Mock
2015-06-08 13:28:24 +00:00
from netlib import tcp, http, http2
from libpathod import pathoc, test, version, pathod, language
import tutils
def test_response():
r = pathoc.Response("1.1", 200, "Message", {}, None, None)
assert repr(r)
2013-01-03 21:37:26 +00:00
class _TestDaemon:
ssloptions = pathod.SSLOptions()
@classmethod
def setUpAll(self):
self.d = test.Daemon(
2015-04-19 20:56:47 +00:00
ssl = self.ssl,
ssloptions = self.ssloptions,
staticdir = tutils.test_data.path("data"),
anchors = [
(re.compile("/anchor/.*"), language.parse_pathod("202"))
]
)
@classmethod
def tearDownAll(self):
self.d.shutdown()
def setUp(self):
self.d.clear_log()
def test_info(self):
2013-01-03 21:37:26 +00:00
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
ssl = self.ssl,
fp = None
2013-01-03 21:37:26 +00:00
)
c.connect()
resp = c.request("get:/api/info")
assert tuple(json.loads(resp.content)["version"]) == version.IVERSION
def tval(
self,
requests,
showreq=False,
showresp=False,
explain=False,
showssl=False,
hexdump=False,
timeout=None,
ignorecodes=(),
ignoretimeout=None,
showsummary=True
):
s = cStringIO.StringIO()
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
ssl = self.ssl,
showreq = showreq,
showresp = showresp,
explain = explain,
hexdump = hexdump,
ignorecodes = ignorecodes,
ignoretimeout = ignoretimeout,
showsummary = showsummary,
fp = s
)
c.connect(showssl=showssl, fp=s)
if timeout:
c.settimeout(timeout)
for i in requests:
r = language.parse_pathoc(i).next()
if explain:
r = r.freeze(language.Settings())
try:
c.request(r)
except (http.HttpError, tcp.NetLibError):
pass
return s.getvalue()
2013-01-03 21:37:26 +00:00
class TestDaemonSSL(_TestDaemon):
ssl = True
ssloptions = pathod.SSLOptions(
2015-06-08 13:28:24 +00:00
request_client_cert = True,
sans = ["test1.com", "test2.com"],
alpn_select = http2.HTTP2Protocol.ALPN_PROTO_H2,
)
2013-01-03 21:37:26 +00:00
def test_sni(self):
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
2013-01-03 21:37:26 +00:00
ssl = True,
sni = "foobar.com",
fp = None
2013-01-03 21:37:26 +00:00
)
c.connect()
c.request("get:/p/200")
r = c.request("get:/api/log")
d = json.loads(r.content)
2013-01-03 21:37:26 +00:00
assert d["log"][0]["request"]["sni"] == "foobar.com"
def test_showssl(self):
assert "certificate chain" in self.tval(["get:/p/200"], showssl=True)
def test_clientcert(self):
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
ssl = True,
clientcert = tutils.test_data.path("data/clientcert/client.pem"),
fp = None
)
c.connect()
c.request("get:/p/200")
r = c.request("get:/api/log")
d = json.loads(r.content)
assert d["log"][0]["request"]["clientcert"]["keyinfo"]
2015-06-08 13:28:24 +00:00
def test_http2_without_ssl(self):
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
use_http2 = True,
ssl = False,
)
tutils.raises(NotImplementedError, c.connect)
2013-01-03 21:37:26 +00:00
class TestDaemon(_TestDaemon):
ssl = False
2013-01-03 21:37:26 +00:00
def test_ssl_error(self):
c = pathoc.Pathoc(("127.0.0.1", self.d.port), ssl = True, fp=None)
2013-01-03 21:37:26 +00:00
tutils.raises("ssl handshake", c.connect)
def test_showssl(self):
2015-05-30 00:03:13 +00:00
assert not "certificate chain" in self.tval(
["get:/p/200"],
showssl=True)
def test_ignorecodes(self):
assert "200" in self.tval(["get:'/p/200:b@1'"])
assert "200" in self.tval(["get:'/p/200:b@1'"])
assert "200" in self.tval(["get:'/p/200:b@1'"])
assert "200" not in self.tval(["get:'/p/200:b@1'"], ignorecodes=[200])
2015-05-30 00:03:13 +00:00
assert "200" not in self.tval(
["get:'/p/200:b@1'"],
ignorecodes=[
200,
201])
assert "202" in self.tval(["get:'/p/202:b@1'"], ignorecodes=[200, 201])
def test_timeout(self):
2015-06-08 02:01:04 +00:00
assert "Timeout" in self.tval(["get:'/p/200:p0,100'"], timeout=0.01)
2015-05-30 00:03:13 +00:00
assert "HTTP" in self.tval(
["get:'/p/200:p5,100'"],
2015-05-30 00:03:13 +00:00
showresp=True,
2015-06-08 04:34:21 +00:00
timeout=1
)
2015-05-30 00:03:13 +00:00
assert not "HTTP" in self.tval(
2015-06-08 04:34:21 +00:00
["get:'/p/200:p3,100'"],
2015-05-30 00:03:13 +00:00
showresp=True,
2015-06-08 04:34:21 +00:00
timeout=1,
ignoretimeout=True
)
def test_showresp(self):
reqs = ["get:/api/info:p0,0", "get:/api/info:p0,0"]
assert self.tval(reqs).count("200") == 2
assert self.tval(reqs, showresp=True).count("HTTP/1.1 200 OK") == 2
assert self.tval(
reqs, showresp=True, hexdump=True
).count("0000000000") == 2
def test_showresp_httperr(self):
v = self.tval(["get:'/p/200:d20'"], showresp=True, showsummary=True)
assert "Invalid headers" in v
assert "HTTP/" in v
def test_explain(self):
reqs = ["get:/p/200:b@100"]
assert "b@100" not in self.tval(reqs, explain=True)
def test_showreq(self):
reqs = ["get:/api/info:p0,0", "get:/api/info:p0,0"]
2015-05-03 01:54:52 +00:00
assert self.tval(reqs, showreq=True).count("GET /api") == 2
assert self.tval(
reqs, showreq=True, hexdump=True
).count("0000000000") == 2
def test_conn_err(self):
assert "Invalid server response" in self.tval(["get:'/p/200:d2'"])
2015-06-08 04:25:33 +00:00
def test_websocket_shutdown(self):
c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None)
c.connect()
c.request("ws:/")
c.stop()
def test_wait_finish(self):
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
fp=None,
ws_read_limit = 1
)
c.connect()
c.request("ws:/")
c.request("wf:f'wf:x100'")
[i for i in c.wait(timeout=0, finish=False)]
[i for i in c.wait(timeout=0)]
def test_connect_fail(self):
to = ("foobar", 80)
c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None)
c.rfile, c.wfile = cStringIO.StringIO(), cStringIO.StringIO()
tutils.raises("connect failed", c.http_connect, to)
c.rfile = cStringIO.StringIO(
"HTTP/1.1 500 OK\r\n"
)
tutils.raises("connect failed", c.http_connect, to)
c.rfile = cStringIO.StringIO(
"HTTP/1.1 200 OK\r\n"
)
c.http_connect(to)
2015-06-08 13:28:24 +00:00
class TestDaemonHTTP2(_TestDaemon):
ssl = True
ssloptions = pathod.SSLOptions(
alpn_select = http2.HTTP2Protocol.ALPN_PROTO_H2,
)
def test_http2(self):
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
use_http2 = True,
ssl = True,
)
assert isinstance(c.protocol, http2.HTTP2Protocol)
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
)
assert c.protocol == None # TODO: change if other protocols get implemented
def test_http2_alpn(self):
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
ssl = True,
use_http2 = True,
http2_skip_connection_preface = True,
)
tmp_convert_to_ssl = c.convert_to_ssl
c.convert_to_ssl = Mock()
c.convert_to_ssl.side_effect = tmp_convert_to_ssl
c.connect()
_, kwargs = c.convert_to_ssl.call_args
assert set(kwargs['alpn_protos']) == set([b'http1.1', b'h2'])
def test_request(self):
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
ssl = True,
use_http2 = True,
)
c.connect()
resp = c.request("get:/api/info")
assert tuple(json.loads(resp.content)["version"]) == version.IVERSION