2016-10-17 03:38:31 +00:00
|
|
|
import io
|
2015-06-08 13:28:24 +00:00
|
|
|
|
2016-10-19 22:56:38 +00:00
|
|
|
from mitmproxy.net import tcp
|
|
|
|
from mitmproxy.net.http import user_agents
|
2015-07-15 20:04:25 +00:00
|
|
|
|
2016-02-16 19:59:33 +00:00
|
|
|
from pathod import language
|
2016-05-28 12:36:43 +00:00
|
|
|
from pathod.language import http2
|
2016-06-17 12:15:48 +00:00
|
|
|
from pathod.protocols.http2 import HTTP2StateProtocol
|
|
|
|
|
|
|
|
from . import tutils
|
2015-06-08 13:28:24 +00:00
|
|
|
|
|
|
|
|
|
|
|
def parse_request(s):
|
2016-06-07 06:38:46 +00:00
|
|
|
return next(language.parse_pathoc(s, True))
|
2015-06-08 13:28:24 +00:00
|
|
|
|
2015-06-18 16:12:11 +00:00
|
|
|
|
2015-06-16 11:52:41 +00:00
|
|
|
def parse_response(s):
|
2016-06-07 06:38:46 +00:00
|
|
|
return next(language.parse_pathod(s, True))
|
2015-06-16 11:52:41 +00:00
|
|
|
|
2015-06-18 16:12:11 +00:00
|
|
|
|
2015-06-16 11:52:41 +00:00
|
|
|
def default_settings():
|
|
|
|
return language.Settings(
|
2015-06-18 16:12:11 +00:00
|
|
|
request_host="foo.com",
|
2016-06-17 12:15:48 +00:00
|
|
|
protocol=HTTP2StateProtocol(tcp.TCPClient(('localhost', 1234)))
|
2015-06-16 11:52:41 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def test_make_error_response():
|
2016-10-17 03:38:31 +00:00
|
|
|
d = io.BytesIO()
|
2015-06-16 11:52:41 +00:00
|
|
|
s = http2.make_error_response("foo", "bar")
|
|
|
|
language.serve(s, d, default_settings())
|
|
|
|
|
2015-06-08 13:28:24 +00:00
|
|
|
|
|
|
|
class TestRequest:
|
2015-06-18 16:12:11 +00:00
|
|
|
|
2015-06-16 11:52:41 +00:00
|
|
|
def test_cached_values(self):
|
|
|
|
req = parse_request("get:/")
|
|
|
|
req_id = id(req)
|
|
|
|
assert req_id == id(req.resolve(default_settings()))
|
|
|
|
assert req.values(default_settings()) == req.values(default_settings())
|
|
|
|
|
2015-06-08 13:28:24 +00:00
|
|
|
def test_nonascii(self):
|
|
|
|
tutils.raises("ascii", parse_request, "get:\xf0")
|
|
|
|
|
|
|
|
def test_err(self):
|
|
|
|
tutils.raises(language.ParseException, parse_request, 'GET')
|
|
|
|
|
|
|
|
def test_simple(self):
|
|
|
|
r = parse_request('GET:"/foo"')
|
2016-06-07 06:39:30 +00:00
|
|
|
assert r.method.string() == b"GET"
|
|
|
|
assert r.path.string() == b"/foo"
|
2015-06-08 13:28:24 +00:00
|
|
|
r = parse_request('GET:/foo')
|
2016-06-07 06:39:30 +00:00
|
|
|
assert r.path.string() == b"/foo"
|
2015-06-08 13:28:24 +00:00
|
|
|
|
|
|
|
def test_multiple(self):
|
|
|
|
r = list(language.parse_pathoc("GET:/ PUT:/"))
|
2016-06-07 06:39:30 +00:00
|
|
|
assert r[0].method.string() == b"GET"
|
|
|
|
assert r[1].method.string() == b"PUT"
|
2015-06-08 13:28:24 +00:00
|
|
|
assert len(r) == 2
|
|
|
|
|
|
|
|
l = """
|
|
|
|
GET
|
|
|
|
"/foo"
|
|
|
|
|
|
|
|
PUT
|
|
|
|
|
|
|
|
"/foo
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
bar"
|
|
|
|
"""
|
|
|
|
r = list(language.parse_pathoc(l, True))
|
|
|
|
assert len(r) == 2
|
2016-06-07 06:39:30 +00:00
|
|
|
assert r[0].method.string() == b"GET"
|
|
|
|
assert r[1].method.string() == b"PUT"
|
2015-06-08 13:28:24 +00:00
|
|
|
|
|
|
|
l = """
|
|
|
|
get:"http://localhost:9999/p/200"
|
|
|
|
get:"http://localhost:9999/p/200"
|
|
|
|
"""
|
|
|
|
r = list(language.parse_pathoc(l, True))
|
|
|
|
assert len(r) == 2
|
2016-06-07 06:39:30 +00:00
|
|
|
assert r[0].method.string() == b"GET"
|
|
|
|
assert r[1].method.string() == b"GET"
|
2015-06-08 13:28:24 +00:00
|
|
|
|
2015-06-16 11:52:41 +00:00
|
|
|
def test_render_simple(self):
|
2016-10-17 03:38:31 +00:00
|
|
|
s = io.BytesIO()
|
2015-06-08 13:28:24 +00:00
|
|
|
r = parse_request("GET:'/foo'")
|
|
|
|
assert language.serve(
|
|
|
|
r,
|
|
|
|
s,
|
2015-06-16 11:52:41 +00:00
|
|
|
default_settings(),
|
|
|
|
)
|
|
|
|
|
2015-06-22 10:42:39 +00:00
|
|
|
def test_raw_content_length(self):
|
|
|
|
r = parse_request('GET:/:r')
|
|
|
|
assert len(r.headers) == 0
|
|
|
|
|
|
|
|
r = parse_request('GET:/:r:b"foobar"')
|
|
|
|
assert len(r.headers) == 0
|
|
|
|
|
|
|
|
r = parse_request('GET:/')
|
|
|
|
assert len(r.headers) == 1
|
2016-06-07 06:39:30 +00:00
|
|
|
assert r.headers[0].values(default_settings()) == (b"content-length", b"0")
|
2015-06-22 10:42:39 +00:00
|
|
|
|
|
|
|
r = parse_request('GET:/:b"foobar"')
|
|
|
|
assert len(r.headers) == 1
|
2016-06-07 06:39:30 +00:00
|
|
|
assert r.headers[0].values(default_settings()) == (b"content-length", b"6")
|
2015-06-22 10:42:39 +00:00
|
|
|
|
|
|
|
r = parse_request('GET:/:b"foobar":h"content-length"="42"')
|
|
|
|
assert len(r.headers) == 1
|
2016-06-07 06:39:30 +00:00
|
|
|
assert r.headers[0].values(default_settings()) == (b"content-length", b"42")
|
2015-06-22 10:42:39 +00:00
|
|
|
|
|
|
|
r = parse_request('GET:/:r:b"foobar":h"content-length"="42"')
|
|
|
|
assert len(r.headers) == 1
|
2016-06-07 06:39:30 +00:00
|
|
|
assert r.headers[0].values(default_settings()) == (b"content-length", b"42")
|
2015-06-22 10:42:39 +00:00
|
|
|
|
|
|
|
def test_content_type(self):
|
|
|
|
r = parse_request('GET:/:r:c"foobar"')
|
|
|
|
assert len(r.headers) == 1
|
2016-06-07 06:39:30 +00:00
|
|
|
assert r.headers[0].values(default_settings()) == (b"content-type", b"foobar")
|
2015-06-22 10:42:39 +00:00
|
|
|
|
|
|
|
def test_user_agent(self):
|
|
|
|
r = parse_request('GET:/:r:ua')
|
|
|
|
assert len(r.headers) == 1
|
2016-06-07 06:39:30 +00:00
|
|
|
assert r.headers[0].values(default_settings()) == (b"user-agent", user_agents.get_by_shortcut('a')[2].encode())
|
2015-06-22 10:42:39 +00:00
|
|
|
|
2015-06-16 11:52:41 +00:00
|
|
|
def test_render_with_headers(self):
|
2016-10-17 03:38:31 +00:00
|
|
|
s = io.BytesIO()
|
2015-06-16 11:52:41 +00:00
|
|
|
r = parse_request('GET:/foo:h"foo"="bar"')
|
|
|
|
assert language.serve(
|
|
|
|
r,
|
|
|
|
s,
|
|
|
|
default_settings(),
|
|
|
|
)
|
|
|
|
|
2015-06-22 14:11:55 +00:00
|
|
|
def test_nested_response(self):
|
|
|
|
l = "get:/p/:s'200'"
|
|
|
|
r = parse_request(l)
|
|
|
|
assert len(r.tokens) == 3
|
|
|
|
assert isinstance(r.tokens[2], http2.NestedResponse)
|
|
|
|
assert r.values(default_settings())
|
|
|
|
|
2015-06-16 11:52:41 +00:00
|
|
|
def test_render_with_body(self):
|
2016-10-17 03:38:31 +00:00
|
|
|
s = io.BytesIO()
|
2015-06-16 11:52:41 +00:00
|
|
|
r = parse_request("GET:'/foo':bfoobar")
|
|
|
|
assert language.serve(
|
|
|
|
r,
|
|
|
|
s,
|
|
|
|
default_settings(),
|
2015-06-08 13:28:24 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
def test_spec(self):
|
|
|
|
def rt(s):
|
|
|
|
s = parse_request(s).spec()
|
|
|
|
assert parse_request(s).spec() == s
|
|
|
|
rt("get:/foo")
|
2015-06-16 11:52:41 +00:00
|
|
|
|
|
|
|
|
|
|
|
class TestResponse:
|
2015-06-18 16:12:11 +00:00
|
|
|
|
2015-06-16 11:52:41 +00:00
|
|
|
def test_cached_values(self):
|
|
|
|
res = parse_response("200")
|
|
|
|
res_id = id(res)
|
|
|
|
assert res_id == id(res.resolve(default_settings()))
|
|
|
|
assert res.values(default_settings()) == res.values(default_settings())
|
|
|
|
|
|
|
|
def test_nonascii(self):
|
|
|
|
tutils.raises("ascii", parse_response, "200:\xf0")
|
|
|
|
|
|
|
|
def test_err(self):
|
|
|
|
tutils.raises(language.ParseException, parse_response, 'GET:/')
|
|
|
|
|
2015-06-22 10:42:39 +00:00
|
|
|
def test_raw_content_length(self):
|
|
|
|
r = parse_response('200:r')
|
2015-06-16 11:52:41 +00:00
|
|
|
assert len(r.headers) == 0
|
|
|
|
|
2015-06-22 10:42:39 +00:00
|
|
|
r = parse_response('200')
|
|
|
|
assert len(r.headers) == 1
|
2016-06-07 06:39:30 +00:00
|
|
|
assert r.headers[0].values(default_settings()) == (b"content-length", b"0")
|
2015-06-22 10:42:39 +00:00
|
|
|
|
|
|
|
def test_content_type(self):
|
|
|
|
r = parse_response('200:r:c"foobar"')
|
|
|
|
assert len(r.headers) == 1
|
2016-06-07 06:39:30 +00:00
|
|
|
assert r.headers[0].values(default_settings()) == (b"content-type", b"foobar")
|
2015-06-22 10:42:39 +00:00
|
|
|
|
|
|
|
def test_simple(self):
|
|
|
|
r = parse_response('200:r:h"foo"="bar"')
|
2016-06-07 06:39:30 +00:00
|
|
|
assert r.status_code.string() == b"200"
|
2015-06-16 11:52:41 +00:00
|
|
|
assert len(r.headers) == 1
|
2016-06-07 06:39:30 +00:00
|
|
|
assert r.headers[0].values(default_settings()) == (b"foo", b"bar")
|
2015-06-18 16:12:11 +00:00
|
|
|
assert r.body is None
|
2015-06-16 11:52:41 +00:00
|
|
|
|
2015-06-22 10:42:39 +00:00
|
|
|
r = parse_response('200:r:h"foo"="bar":bfoobar:h"bla"="fasel"')
|
2016-06-07 06:39:30 +00:00
|
|
|
assert r.status_code.string() == b"200"
|
2015-06-16 11:52:41 +00:00
|
|
|
assert len(r.headers) == 2
|
2016-06-07 06:39:30 +00:00
|
|
|
assert r.headers[0].values(default_settings()) == (b"foo", b"bar")
|
|
|
|
assert r.headers[1].values(default_settings()) == (b"bla", b"fasel")
|
|
|
|
assert r.body.string() == b"foobar"
|
2015-06-16 11:52:41 +00:00
|
|
|
|
|
|
|
def test_render_simple(self):
|
2016-10-17 03:38:31 +00:00
|
|
|
s = io.BytesIO()
|
2015-06-16 11:52:41 +00:00
|
|
|
r = parse_response('200')
|
|
|
|
assert language.serve(
|
|
|
|
r,
|
|
|
|
s,
|
|
|
|
default_settings(),
|
|
|
|
)
|
|
|
|
|
|
|
|
def test_render_with_headers(self):
|
2016-10-17 03:38:31 +00:00
|
|
|
s = io.BytesIO()
|
2015-06-16 11:52:41 +00:00
|
|
|
r = parse_response('200:h"foo"="bar"')
|
|
|
|
assert language.serve(
|
|
|
|
r,
|
|
|
|
s,
|
|
|
|
default_settings(),
|
|
|
|
)
|
|
|
|
|
|
|
|
def test_render_with_body(self):
|
2016-10-17 03:38:31 +00:00
|
|
|
s = io.BytesIO()
|
2015-06-16 11:52:41 +00:00
|
|
|
r = parse_response('200:bfoobar')
|
|
|
|
assert language.serve(
|
|
|
|
r,
|
|
|
|
s,
|
|
|
|
default_settings(),
|
|
|
|
)
|
|
|
|
|
|
|
|
def test_spec(self):
|
|
|
|
def rt(s):
|
|
|
|
s = parse_response(s).spec()
|
|
|
|
assert parse_response(s).spec() == s
|
|
|
|
rt("200:bfoobar")
|