mitmproxy/test/pathod/test_pathoc.py

269 lines
7.9 KiB
Python
Raw Normal View History

2016-10-17 03:38:31 +00:00
import io
from unittest.mock import Mock
2016-12-01 09:36:18 +00:00
import pytest
from mitmproxy.net import http
from mitmproxy.net.http import http1
from mitmproxy import exceptions
2015-07-15 20:04:25 +00:00
from pathod import pathoc, language
from pathod.protocols.http2 import HTTP2StateProtocol
from mitmproxy.test import tutils
from . import tservers
2016-12-01 09:36:18 +00:00
from ..conftest import requires_alpn
def test_response():
2016-06-15 05:03:56 +00:00
r = http.Response(b"HTTP/1.1", 200, b"Message", {}, None, None)
assert repr(r)
class PathocTestDaemon(tservers.DaemonTests):
def tval(self, requests, timeout=None, showssl=False, **kwargs):
2016-10-17 03:38:31 +00:00
s = io.StringIO()
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
2015-06-18 16:12:11 +00:00
ssl=self.ssl,
fp=s,
**kwargs
)
with c.connect(showssl=showssl, fp=s):
if timeout:
c.settimeout(timeout)
for i in requests:
2016-06-15 05:03:56 +00:00
r = next(language.parse_pathoc(i))
if kwargs.get("explain"):
r = r.freeze(language.Settings())
try:
c.request(r)
except exceptions.NetlibException:
pass
self.d.wait_for_silence()
return s.getvalue()
2013-01-03 21:37:26 +00:00
class TestDaemonSSL(PathocTestDaemon):
2013-01-03 21:37:26 +00:00
ssl = True
ssloptions = dict(
2015-06-18 16:12:11 +00:00
request_client_cert=True,
2016-06-15 05:03:56 +00:00
sans=[b"test1.com", b"test2.com"],
2015-09-26 15:40:22 +00:00
alpn_select=b'h2',
)
2013-01-03 21:37:26 +00:00
def test_sni(self):
self.tval(
["get:/p/200"],
2016-07-07 04:03:17 +00:00
sni="foobar.com"
2013-01-03 21:37:26 +00:00
)
log = self.d.log()
2016-07-07 04:03:17 +00:00
assert log[0]["request"]["sni"] == "foobar.com"
2013-01-03 21:37:26 +00:00
def test_showssl(self):
assert "certificate chain" in self.tval(["get:/p/200"], showssl=True)
def test_clientcert(self):
self.tval(
["get:/p/200"],
clientcert=tutils.test_data.path("pathod/data/clientcert/client.pem"),
)
log = self.d.log()
assert log[0]["request"]["clientcert"]["keyinfo"]
2015-06-08 13:28:24 +00:00
def test_http2_without_ssl(self):
2016-10-17 03:38:31 +00:00
fp = io.StringIO()
2015-06-08 13:28:24 +00:00
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
2015-06-18 16:12:11 +00:00
use_http2=True,
ssl=False,
2016-06-15 05:01:19 +00:00
fp=fp
2015-06-08 13:28:24 +00:00
)
tutils.raises(NotImplementedError, c.connect)
2013-01-03 21:37:26 +00:00
class TestDaemon(PathocTestDaemon):
2013-01-03 21:37:26 +00:00
ssl = False
2013-01-03 21:37:26 +00:00
def test_ssl_error(self):
2015-06-18 16:12:11 +00:00
c = pathoc.Pathoc(("127.0.0.1", self.d.port), ssl=True, fp=None)
try:
with c.connect():
pass
except Exception as e:
assert "SSL" in str(e)
else:
raise AssertionError("No exception raised.")
2013-01-03 21:37:26 +00:00
def test_showssl(self):
assert "certificate chain" not in self.tval(
2015-05-30 00:03:13 +00:00
["get:/p/200"],
showssl=True)
def test_ignorecodes(self):
assert "200" in self.tval(["get:'/p/200:b@1'"])
assert "200" in self.tval(["get:'/p/200:b@1'"])
assert "200" in self.tval(["get:'/p/200:b@1'"])
assert "200" not in self.tval(["get:'/p/200:b@1'"], ignorecodes=[200])
2015-05-30 00:03:13 +00:00
assert "200" not in self.tval(
["get:'/p/200:b@1'"],
ignorecodes=[
200,
201])
assert "202" in self.tval(["get:'/p/202:b@1'"], ignorecodes=[200, 201])
def _test_timeout(self):
2015-06-08 02:01:04 +00:00
assert "Timeout" in self.tval(["get:'/p/200:p0,100'"], timeout=0.01)
2015-05-30 00:03:13 +00:00
assert "HTTP" in self.tval(
["get:'/p/200:p5,100'"],
2015-05-30 00:03:13 +00:00
showresp=True,
2015-06-08 04:34:21 +00:00
timeout=1
)
assert "HTTP" not in self.tval(
2015-06-08 04:34:21 +00:00
["get:'/p/200:p3,100'"],
2015-05-30 00:03:13 +00:00
showresp=True,
2015-06-08 04:34:21 +00:00
timeout=1,
ignoretimeout=True
)
def test_showresp(self):
reqs = ["get:/p/200:da", "get:/p/200:da"]
assert self.tval(reqs).count("200 OK") == 2
assert self.tval(reqs, showresp=True).count("HTTP/1.1 200 OK") == 2
assert self.tval(
reqs, showresp=True, hexdump=True
).count("0000000000") == 2
def test_showresp_httperr(self):
v = self.tval(["get:'/p/200:d20'"], showresp=True, showsummary=True)
assert "Invalid header" in v
assert "HTTP/" in v
def test_explain(self):
reqs = ["get:/p/200:b@100"]
assert "b@100" not in self.tval(reqs, explain=True)
def test_showreq(self):
reqs = ["get:/p/200:da", "get:/p/200:da"]
assert self.tval(reqs, showreq=True).count("GET /p/200") == 2
assert self.tval(
reqs, showreq=True, hexdump=True
).count("0000000000") == 2
def test_conn_err(self):
assert "Invalid server response" in self.tval(["get:'/p/200:d2'"])
2015-06-08 04:25:33 +00:00
def test_websocket_shutdown(self):
self.tval(["ws:/"])
2015-06-08 04:25:33 +00:00
def test_wait_finish(self):
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
fp=None,
2015-06-18 16:12:11 +00:00
ws_read_limit=1
2015-06-08 04:25:33 +00:00
)
with c.connect():
c.request("ws:/")
c.request("wf:f'wf'")
# This should read a frame and close the websocket reader
assert len([i for i in c.wait(timeout=5, finish=False)]) == 1
assert not [i for i in c.wait(timeout=0)]
2015-06-08 04:25:33 +00:00
def test_connect_fail(self):
to = ("foobar", 80)
c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None)
2016-10-17 03:38:31 +00:00
c.rfile, c.wfile = io.BytesIO(), io.BytesIO()
with tutils.raises("connect failed"):
2015-09-16 16:44:34 +00:00
c.http_connect(to)
2016-10-17 03:38:31 +00:00
c.rfile = io.BytesIO(
2016-06-15 05:03:56 +00:00
b"HTTP/1.1 500 OK\r\n"
)
with tutils.raises("connect failed"):
2015-09-16 16:44:34 +00:00
c.http_connect(to)
2016-10-17 03:38:31 +00:00
c.rfile = io.BytesIO(
2016-06-15 05:03:56 +00:00
b"HTTP/1.1 200 OK\r\n"
)
c.http_connect(to)
2015-06-08 13:28:24 +00:00
2015-07-03 00:48:35 +00:00
def test_socks_connect(self):
to = ("foobar", 80)
c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None)
2016-10-17 03:38:31 +00:00
c.rfile, c.wfile = tutils.treader(b""), io.BytesIO()
2015-07-03 00:48:35 +00:00
tutils.raises(pathoc.PathocError, c.socks_connect, to)
c.rfile = tutils.treader(
2016-06-15 05:03:56 +00:00
b"\x05\xEE"
2015-07-03 00:48:35 +00:00
)
tutils.raises("SOCKS without authentication", c.socks_connect, ("example.com", 0xDEAD))
c.rfile = tutils.treader(
2016-06-15 05:03:56 +00:00
b"\x05\x00" +
b"\x05\xEE\x00\x03\x0bexample.com\xDE\xAD"
2015-07-03 00:48:35 +00:00
)
tutils.raises("SOCKS server error", c.socks_connect, ("example.com", 0xDEAD))
c.rfile = tutils.treader(
2016-06-15 05:03:56 +00:00
b"\x05\x00" +
b"\x05\x00\x00\x03\x0bexample.com\xDE\xAD"
2015-07-03 00:48:35 +00:00
)
c.socks_connect(("example.com", 0xDEAD))
2015-06-08 13:28:24 +00:00
class TestDaemonHTTP2(PathocTestDaemon):
2015-06-08 13:28:24 +00:00
ssl = True
explain = False
2015-06-08 13:28:24 +00:00
2016-12-01 09:36:18 +00:00
@requires_alpn
def test_http2(self):
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
fp=None,
ssl=True,
use_http2=True,
)
assert isinstance(c.protocol, HTTP2StateProtocol)
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
)
assert c.protocol == http1
@requires_alpn
def test_http2_alpn(self):
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
fp=None,
ssl=True,
use_http2=True,
http2_skip_connection_preface=True,
)
tmp_convert_to_ssl = c.convert_to_ssl
c.convert_to_ssl = Mock()
c.convert_to_ssl.side_effect = tmp_convert_to_ssl
with c.connect():
_, kwargs = c.convert_to_ssl.call_args
assert set(kwargs['alpn_protos']) == set([b'http/1.1', b'h2'])
@requires_alpn
def test_request(self):
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
fp=None,
ssl=True,
use_http2=True,
)
with c.connect():
resp = c.request("get:/p/200")
assert resp.status_code == 200
def test_failing_request(self, disable_alpn):
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
fp=None,
ssl=True,
use_http2=True,
)
with pytest.raises(NotImplementedError):
with c.connect():
2016-12-01 09:36:18 +00:00
c.request("get:/p/200")