2012-06-20 22:56:30 +00:00
|
|
|
import os, cStringIO
|
2012-10-04 21:30:32 +00:00
|
|
|
from libpathod import language, utils
|
2012-06-09 03:08:51 +00:00
|
|
|
import tutils
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-10-04 21:30:32 +00:00
|
|
|
language.TESTING = True
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-04-28 10:28:28 +00:00
|
|
|
|
2012-10-23 22:32:53 +00:00
|
|
|
class TestValueNakedLiteral:
|
|
|
|
def test_expr(self):
|
|
|
|
v = language.ValueNakedLiteral("foo")
|
|
|
|
assert v.expr()
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-10-23 22:32:53 +00:00
|
|
|
def test_spec(self):
|
|
|
|
v = language.ValueNakedLiteral("foo")
|
|
|
|
assert v.spec() == repr(v) == "foo"
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-10-23 22:32:53 +00:00
|
|
|
v = language.ValueNakedLiteral("f\x00oo")
|
|
|
|
assert v.spec() == repr(v) == r"f\x00oo"
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-04-28 01:16:51 +00:00
|
|
|
|
2012-10-23 22:32:53 +00:00
|
|
|
class TestValueLiteral:
|
|
|
|
def test_espr(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
v = language.ValueLiteral("foo")
|
2012-04-28 00:42:03 +00:00
|
|
|
assert v.expr()
|
2012-07-21 02:12:45 +00:00
|
|
|
assert v.val == "foo"
|
|
|
|
|
2012-10-04 21:30:32 +00:00
|
|
|
v = language.ValueLiteral(r"foo\n")
|
2012-07-21 02:12:45 +00:00
|
|
|
assert v.expr()
|
|
|
|
assert v.val == "foo\n"
|
2012-07-23 07:25:57 +00:00
|
|
|
assert repr(v)
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-10-23 22:32:53 +00:00
|
|
|
def test_spec(self):
|
|
|
|
v = language.ValueLiteral("foo")
|
|
|
|
assert v.spec() == r'"foo"'
|
|
|
|
|
|
|
|
v = language.ValueLiteral("f\x00oo")
|
|
|
|
assert v.spec() == repr(v) == r'"f\x00oo"'
|
|
|
|
|
|
|
|
|
|
|
|
class TestValueGenerate:
|
|
|
|
def test_basic(self):
|
|
|
|
v = language.Value.parseString("@10b")[0]
|
|
|
|
assert v.usize == 10
|
|
|
|
assert v.unit == "b"
|
|
|
|
assert v.bytes() == 10
|
|
|
|
v = language.Value.parseString("@10")[0]
|
|
|
|
assert v.unit == "b"
|
|
|
|
v = language.Value.parseString("@10k")[0]
|
|
|
|
assert v.bytes() == 10240
|
|
|
|
v = language.Value.parseString("@10g")[0]
|
|
|
|
assert v.bytes() == 1024**3 * 10
|
2012-06-28 22:42:15 +00:00
|
|
|
|
2012-10-23 22:32:53 +00:00
|
|
|
v = language.Value.parseString("@10g,digits")[0]
|
|
|
|
assert v.datatype == "digits"
|
|
|
|
g = v.get_generator({})
|
|
|
|
assert g[:100]
|
|
|
|
|
|
|
|
v = language.Value.parseString("@10,digits")[0]
|
|
|
|
assert v.unit == "b"
|
|
|
|
assert v.datatype == "digits"
|
|
|
|
|
|
|
|
def test_spec(self):
|
|
|
|
v = language.ValueGenerate(1, "b", "bytes")
|
|
|
|
assert v.spec() == repr(v) == "@1"
|
|
|
|
|
|
|
|
v = language.ValueGenerate(1, "k", "bytes")
|
|
|
|
assert v.spec() == repr(v) == "@1k"
|
|
|
|
|
|
|
|
v = language.ValueGenerate(1, "k", "ascii")
|
|
|
|
assert v.spec() == repr(v) == "@1k,ascii"
|
|
|
|
|
|
|
|
v = language.ValueGenerate(1, "b", "ascii")
|
|
|
|
assert v.spec() == repr(v) == "@1,ascii"
|
|
|
|
|
|
|
|
|
|
|
|
class TestValueFile:
|
2012-04-28 01:16:51 +00:00
|
|
|
def test_file_value(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
v = language.Value.parseString("<'one two'")[0]
|
2012-04-28 10:28:28 +00:00
|
|
|
assert str(v)
|
2012-04-28 01:16:51 +00:00
|
|
|
assert v.path == "one two"
|
|
|
|
|
2012-10-04 21:30:32 +00:00
|
|
|
v = language.Value.parseString("<path")[0]
|
2012-04-28 01:16:51 +00:00
|
|
|
assert v.path == "path"
|
|
|
|
|
2012-10-23 22:32:53 +00:00
|
|
|
def test_access_control(self):
|
|
|
|
v = language.Value.parseString("<path")[0]
|
2012-06-09 03:08:51 +00:00
|
|
|
with tutils.tmpdir() as t:
|
|
|
|
p = os.path.join(t, "path")
|
|
|
|
f = open(p, "w")
|
|
|
|
f.write("x"*10000)
|
|
|
|
f.close()
|
2012-04-28 01:16:51 +00:00
|
|
|
|
2012-06-09 03:08:51 +00:00
|
|
|
assert v.get_generator(dict(staticdir=t))
|
2012-04-28 01:16:51 +00:00
|
|
|
|
2012-10-04 21:30:32 +00:00
|
|
|
v = language.Value.parseString("<path2")[0]
|
|
|
|
tutils.raises(language.FileAccessDenied, v.get_generator, dict(staticdir=t))
|
2012-07-22 11:46:56 +00:00
|
|
|
tutils.raises("access disabled", v.get_generator, dict())
|
|
|
|
|
2012-10-04 21:30:32 +00:00
|
|
|
v = language.Value.parseString("</outside")[0]
|
2012-07-22 11:46:56 +00:00
|
|
|
tutils.raises("outside", v.get_generator, dict(staticdir=t))
|
2012-04-28 01:16:51 +00:00
|
|
|
|
2012-10-23 22:32:53 +00:00
|
|
|
def test_spec(self):
|
|
|
|
v = language.Value.parseString("<'one two'")[0]
|
|
|
|
v2 = language.Value.parseString(v.spec())[0]
|
|
|
|
assert v2.path == "one two"
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-10-23 22:32:53 +00:00
|
|
|
|
|
|
|
class TestMisc:
|
|
|
|
def test_generators(self):
|
|
|
|
v = language.Value.parseString("'val'")[0]
|
2012-04-28 00:42:03 +00:00
|
|
|
g = v.get_generator({})
|
2012-10-23 22:32:53 +00:00
|
|
|
assert g[:] == "val"
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-10-23 22:32:53 +00:00
|
|
|
def test_randomgenerator(self):
|
|
|
|
g = language.RandomGenerator("bytes", 100)
|
|
|
|
assert repr(g)
|
|
|
|
assert len(g[:10]) == 10
|
|
|
|
assert len(g[1:10]) == 9
|
|
|
|
assert len(g[:1000]) == 100
|
|
|
|
assert len(g[1000:1001]) == 0
|
|
|
|
assert g[0]
|
|
|
|
|
|
|
|
def test_literalgenerator(self):
|
|
|
|
g = language.LiteralGenerator("one")
|
|
|
|
assert repr(g)
|
|
|
|
assert g[:] == "one"
|
|
|
|
assert g[1] == "n"
|
|
|
|
|
|
|
|
def test_filegenerator(self):
|
|
|
|
with tutils.tmpdir() as t:
|
|
|
|
path = os.path.join(t, "foo")
|
|
|
|
f = open(path, "w")
|
|
|
|
f.write("x"*10000)
|
|
|
|
f.close()
|
|
|
|
g = language.FileGenerator(path)
|
|
|
|
assert len(g) == 10000
|
|
|
|
assert g[0] == "x"
|
|
|
|
assert g[-1] == "x"
|
|
|
|
assert g[0:5] == "xxxxx"
|
|
|
|
assert repr(g)
|
2012-04-28 00:42:03 +00:00
|
|
|
|
|
|
|
def test_value(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
assert language.Value.parseString("'val'")[0].val == "val"
|
|
|
|
assert language.Value.parseString('"val"')[0].val == "val"
|
|
|
|
assert language.Value.parseString('"\'val\'"')[0].val == "'val'"
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-06-24 06:38:22 +00:00
|
|
|
def test_path(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
e = language.Path.expr()
|
2012-06-24 06:38:22 +00:00
|
|
|
assert e.parseString('"/foo"')[0].value.val == "/foo"
|
2012-10-04 21:30:32 +00:00
|
|
|
e = language.Path("/foo")
|
2012-07-23 07:25:57 +00:00
|
|
|
assert e.value.val == "/foo"
|
2012-07-24 00:18:14 +00:00
|
|
|
|
2012-06-24 05:23:37 +00:00
|
|
|
def test_method(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
e = language.Method.expr()
|
2012-06-24 06:38:22 +00:00
|
|
|
assert e.parseString("get")[0].value.val == "GET"
|
2012-06-24 05:23:37 +00:00
|
|
|
assert e.parseString("'foo'")[0].value.val == "foo"
|
|
|
|
assert e.parseString("'get'")[0].value.val == "get"
|
|
|
|
|
2012-07-24 00:18:14 +00:00
|
|
|
def test_raw(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
e = language.Raw.expr()
|
2012-07-24 00:18:14 +00:00
|
|
|
assert e.parseString("r")[0]
|
|
|
|
|
2012-04-28 00:42:03 +00:00
|
|
|
def test_body(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
e = language.Body.expr()
|
2012-04-28 05:12:39 +00:00
|
|
|
v = e.parseString("b'foo'")[0]
|
2012-04-28 00:42:03 +00:00
|
|
|
assert v.value.val == "foo"
|
|
|
|
|
2012-04-29 02:20:27 +00:00
|
|
|
v = e.parseString("b@100")[0]
|
2012-10-23 22:32:53 +00:00
|
|
|
assert str(v.value) == "@100"
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-04-29 02:20:27 +00:00
|
|
|
v = e.parseString("b@100g,digits", parseAll=True)[0]
|
2012-04-28 00:42:03 +00:00
|
|
|
assert v.value.datatype == "digits"
|
2012-04-29 02:20:27 +00:00
|
|
|
assert str(v.value) == "@100g,digits"
|
2012-04-28 00:42:03 +00:00
|
|
|
|
|
|
|
def test_header(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
e = language.Header.expr()
|
2012-04-28 05:28:40 +00:00
|
|
|
v = e.parseString("h'foo'='bar'")[0]
|
2012-04-28 00:42:03 +00:00
|
|
|
assert v.key.val == "foo"
|
|
|
|
assert v.value.val == "bar"
|
|
|
|
|
|
|
|
def test_code(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
e = language.Code.expr()
|
2012-04-28 00:42:03 +00:00
|
|
|
v = e.parseString("200")[0]
|
2012-10-28 04:07:16 +00:00
|
|
|
assert v.string() == "200"
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-10-28 04:07:16 +00:00
|
|
|
def _test_reason(self):
|
2012-04-28 05:12:39 +00:00
|
|
|
v = e.parseString("404'msg'")[0]
|
2012-10-28 04:07:16 +00:00
|
|
|
assert v.code.string() == "404"
|
|
|
|
assert v.reason == "msg"
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-04-28 05:12:39 +00:00
|
|
|
r = e.parseString("200'foo'")[0]
|
2012-04-28 00:42:03 +00:00
|
|
|
assert r.msg.val == "foo"
|
|
|
|
|
2012-04-28 05:12:39 +00:00
|
|
|
r = e.parseString("200'\"foo\"'")[0]
|
2012-04-28 00:42:03 +00:00
|
|
|
assert r.msg.val == "\"foo\""
|
|
|
|
|
2012-04-28 05:12:39 +00:00
|
|
|
r = e.parseString('200"foo"')[0]
|
2012-04-28 00:42:03 +00:00
|
|
|
assert r.msg.val == "foo"
|
|
|
|
|
|
|
|
r = e.parseString('404')[0]
|
|
|
|
assert r.msg.val == "Not Found"
|
|
|
|
|
|
|
|
r = e.parseString('10')[0]
|
|
|
|
assert r.msg.val == "Unknown code"
|
|
|
|
|
2012-04-28 02:43:57 +00:00
|
|
|
def test_internal_response(self):
|
2012-06-20 22:56:30 +00:00
|
|
|
d = cStringIO.StringIO()
|
2012-10-04 21:30:32 +00:00
|
|
|
s = language.PathodErrorResponse("foo")
|
2012-10-27 20:06:55 +00:00
|
|
|
s.serve(d, {})
|
2012-04-28 00:42:03 +00:00
|
|
|
|
|
|
|
|
2012-10-04 22:23:30 +00:00
|
|
|
class Test_Action:
|
|
|
|
def test_cmp(self):
|
|
|
|
a = language._Action(0)
|
|
|
|
b = language._Action(1)
|
|
|
|
c = language._Action(0)
|
|
|
|
assert a < b
|
|
|
|
assert a == c
|
|
|
|
l = [b, a]
|
|
|
|
l.sort()
|
|
|
|
assert l[0].offset == 0
|
|
|
|
|
2012-10-23 22:32:53 +00:00
|
|
|
def test_resolve_offset(self):
|
|
|
|
r = language.parse_request({}, 'GET:"/foo"')
|
|
|
|
e = language.DisconnectAt("r")
|
2012-10-27 01:00:50 +00:00
|
|
|
ret = e.resolve_offset(r, {}, None)
|
2012-10-23 22:32:53 +00:00
|
|
|
assert isinstance(ret.offset, int)
|
|
|
|
|
2012-10-24 21:59:18 +00:00
|
|
|
def test_repr(self):
|
|
|
|
e = language.DisconnectAt("r")
|
|
|
|
assert repr(e)
|
|
|
|
|
2012-10-04 22:23:30 +00:00
|
|
|
|
2012-06-09 03:08:51 +00:00
|
|
|
class TestDisconnects:
|
2012-06-24 05:01:04 +00:00
|
|
|
def test_parse_response(self):
|
2012-10-24 20:45:55 +00:00
|
|
|
a = language.parse_response({}, "400:d0").actions[0]
|
|
|
|
assert a.spec() == "d0"
|
|
|
|
a = language.parse_response({}, "400:dr").actions[0]
|
|
|
|
assert a.spec() == "dr"
|
2012-04-28 10:51:36 +00:00
|
|
|
|
2012-04-29 02:20:27 +00:00
|
|
|
def test_at(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
e = language.DisconnectAt.expr()
|
2012-04-29 02:20:27 +00:00
|
|
|
v = e.parseString("d0")[0]
|
2012-10-04 21:30:32 +00:00
|
|
|
assert isinstance(v, language.DisconnectAt)
|
2012-10-04 22:23:30 +00:00
|
|
|
assert v.offset == 0
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-04-29 02:20:27 +00:00
|
|
|
v = e.parseString("d100")[0]
|
2012-10-04 22:23:30 +00:00
|
|
|
assert v.offset == 100
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-10-04 21:30:32 +00:00
|
|
|
e = language.DisconnectAt.expr()
|
2012-04-28 00:42:03 +00:00
|
|
|
v = e.parseString("dr")[0]
|
2012-10-04 22:23:30 +00:00
|
|
|
assert v.offset == "r"
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-10-23 22:32:53 +00:00
|
|
|
def test_spec(self):
|
|
|
|
assert language.DisconnectAt("r").spec() == "dr"
|
|
|
|
assert language.DisconnectAt(10).spec() == "d10"
|
|
|
|
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-07-20 11:36:39 +00:00
|
|
|
class TestInject:
|
|
|
|
def test_parse_response(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
a = language.parse_response({}, "400:ir,@100").actions[0]
|
2012-10-24 20:45:55 +00:00
|
|
|
assert a.offset == "r"
|
|
|
|
assert a.value.datatype == "bytes"
|
|
|
|
assert a.value.usize == 100
|
2012-07-20 11:36:39 +00:00
|
|
|
|
2012-10-04 21:30:32 +00:00
|
|
|
a = language.parse_response({}, "400:ia,@100").actions[0]
|
2012-10-24 20:45:55 +00:00
|
|
|
assert a.offset == "a"
|
2012-07-20 11:47:34 +00:00
|
|
|
|
2012-07-20 11:36:39 +00:00
|
|
|
def test_at(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
e = language.InjectAt.expr()
|
2012-07-20 11:36:39 +00:00
|
|
|
v = e.parseString("i0,'foo'")[0]
|
|
|
|
assert v.value.val == "foo"
|
|
|
|
assert v.offset == 0
|
2012-10-04 21:30:32 +00:00
|
|
|
assert isinstance(v, language.InjectAt)
|
2012-07-20 11:36:39 +00:00
|
|
|
|
|
|
|
v = e.parseString("ir,'foo'")[0]
|
|
|
|
assert v.offset == "r"
|
|
|
|
|
2012-07-20 11:47:34 +00:00
|
|
|
def test_serve(self):
|
|
|
|
s = cStringIO.StringIO()
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_response({}, "400:i0,'foo'")
|
2012-10-27 20:06:55 +00:00
|
|
|
assert r.serve(s, {})
|
2012-07-20 11:47:34 +00:00
|
|
|
|
2012-10-23 22:32:53 +00:00
|
|
|
def test_spec(self):
|
|
|
|
e = language.InjectAt.expr()
|
|
|
|
v = e.parseString("i0,'foo'")[0]
|
|
|
|
assert v.spec() == 'i0,"foo"'
|
2012-10-24 21:59:18 +00:00
|
|
|
|
2012-04-28 10:51:36 +00:00
|
|
|
|
2012-06-09 03:08:51 +00:00
|
|
|
class TestPauses:
|
2012-06-24 05:01:04 +00:00
|
|
|
def test_parse_response(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
e = language.PauseAt.expr()
|
2012-04-29 02:20:27 +00:00
|
|
|
v = e.parseString("p10,10")[0]
|
|
|
|
assert v.seconds == 10
|
|
|
|
assert v.offset == 10
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-07-24 11:57:18 +00:00
|
|
|
v = e.parseString("p10,f")[0]
|
2012-04-29 02:20:27 +00:00
|
|
|
assert v.seconds == "f"
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-07-24 11:57:18 +00:00
|
|
|
v = e.parseString("pr,f")[0]
|
2012-04-29 02:20:27 +00:00
|
|
|
assert v.offset == "r"
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-07-24 11:57:18 +00:00
|
|
|
v = e.parseString("pa,f")[0]
|
2012-04-29 02:20:27 +00:00
|
|
|
assert v.offset == "a"
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-04-29 02:20:27 +00:00
|
|
|
def test_request(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_response({}, '400:p10,10')
|
2012-10-24 20:45:55 +00:00
|
|
|
assert r.actions[0].spec() == "p10,10"
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-10-23 22:32:53 +00:00
|
|
|
def test_spec(self):
|
|
|
|
assert language.PauseAt("r", 5).spec() == "pr,5"
|
|
|
|
assert language.PauseAt(0, 5).spec() == "p0,5"
|
|
|
|
assert language.PauseAt(0, "f").spec() == "p0,f"
|
|
|
|
|
|
|
|
|
|
|
|
class TestShortcuts:
|
|
|
|
def test_parse_response(self):
|
2012-10-27 01:00:50 +00:00
|
|
|
assert language.parse_response({}, "400:c'foo'").headers[0].key.val == "Content-Type"
|
|
|
|
assert language.parse_response({}, "400:l'foo'").headers[0].key.val == "Location"
|
2012-10-23 22:32:53 +00:00
|
|
|
|
2012-04-28 10:28:28 +00:00
|
|
|
|
2012-06-24 05:47:55 +00:00
|
|
|
class TestParseRequest:
|
2012-07-22 11:37:46 +00:00
|
|
|
def test_file(self):
|
|
|
|
p = tutils.test_data.path("data")
|
|
|
|
d = dict(staticdir=p)
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_request(d, "+request")
|
2012-10-28 01:15:29 +00:00
|
|
|
assert r.path.values({})[0][:] == "/foo"
|
2012-07-22 11:37:46 +00:00
|
|
|
|
2012-10-24 21:59:18 +00:00
|
|
|
def test_nonascii(self):
|
|
|
|
tutils.raises("ascii", language.parse_request, {}, "get:\xf0")
|
|
|
|
|
2012-06-24 06:38:22 +00:00
|
|
|
def test_err(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
tutils.raises(language.ParseException, language.parse_request, {}, 'GET')
|
2012-06-24 06:38:22 +00:00
|
|
|
|
2012-06-24 05:47:55 +00:00
|
|
|
def test_simple(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_request({}, 'GET:"/foo"')
|
2012-10-28 01:15:29 +00:00
|
|
|
assert r.method.string() == "GET"
|
|
|
|
assert r.path.string() == "/foo"
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_request({}, 'GET:/foo')
|
2012-10-28 01:15:29 +00:00
|
|
|
assert r.path.string() == "/foo"
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_request({}, 'GET:@1k')
|
2012-10-28 01:15:29 +00:00
|
|
|
assert len(r.path.string()) == 1024
|
2012-06-24 05:47:55 +00:00
|
|
|
|
2012-06-24 07:12:52 +00:00
|
|
|
def test_render(self):
|
|
|
|
s = cStringIO.StringIO()
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_request({}, "GET:'/foo'")
|
2012-10-27 20:06:55 +00:00
|
|
|
assert r.serve(s, {}, "foo.com")
|
2012-06-24 07:12:52 +00:00
|
|
|
|
2012-07-22 03:26:05 +00:00
|
|
|
def test_multiline(self):
|
|
|
|
l = """
|
|
|
|
GET
|
|
|
|
"/foo"
|
|
|
|
ir,@1
|
|
|
|
"""
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_request({}, l)
|
2012-10-28 01:15:29 +00:00
|
|
|
assert r.method.string() == "GET"
|
|
|
|
assert r.path.string() == "/foo"
|
2012-07-22 03:26:05 +00:00
|
|
|
assert r.actions
|
|
|
|
|
|
|
|
|
|
|
|
l = """
|
|
|
|
GET
|
|
|
|
|
|
|
|
"/foo
|
2012-07-22 11:37:46 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
2012-07-22 03:26:05 +00:00
|
|
|
bar"
|
|
|
|
|
|
|
|
ir,@1
|
|
|
|
"""
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_request({}, l)
|
2012-10-28 01:15:29 +00:00
|
|
|
assert r.method.string() == "GET"
|
|
|
|
assert r.path.string().endswith("bar")
|
2012-07-22 03:26:05 +00:00
|
|
|
assert r.actions
|
|
|
|
|
2012-06-24 07:12:52 +00:00
|
|
|
|
2012-06-24 05:47:55 +00:00
|
|
|
class TestParseResponse:
|
2012-04-28 00:42:03 +00:00
|
|
|
def test_parse_err(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
tutils.raises(language.ParseException, language.parse_response, {}, "400:msg,b:")
|
2012-04-28 02:43:57 +00:00
|
|
|
try:
|
2012-10-04 21:30:32 +00:00
|
|
|
language.parse_response({}, "400'msg':b:")
|
|
|
|
except language.ParseException, v:
|
2012-04-28 05:12:39 +00:00
|
|
|
assert v.marked()
|
2012-04-28 10:28:28 +00:00
|
|
|
assert str(v)
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-10-24 21:59:18 +00:00
|
|
|
def test_nonascii(self):
|
|
|
|
tutils.raises("ascii", language.parse_response, {}, "foo:b\xf0")
|
|
|
|
|
2012-04-28 00:42:03 +00:00
|
|
|
def test_parse_header(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_response({}, '400:h"foo"="bar"')
|
2012-06-24 05:47:55 +00:00
|
|
|
assert utils.get_header("foo", r.headers)
|
2012-04-28 00:42:03 +00:00
|
|
|
|
|
|
|
def test_parse_pause_before(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_response({}, "400:p0,10")
|
2012-10-24 20:45:55 +00:00
|
|
|
assert r.actions[0].spec() == "p0,10"
|
2012-04-28 00:42:03 +00:00
|
|
|
|
|
|
|
def test_parse_pause_after(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_response({}, "400:pa,10")
|
2012-10-24 20:45:55 +00:00
|
|
|
assert r.actions[0].spec() == "pa,10"
|
2012-04-28 00:42:03 +00:00
|
|
|
|
|
|
|
def test_parse_pause_random(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_response({}, "400:pr,10")
|
2012-10-24 20:45:55 +00:00
|
|
|
assert r.actions[0].spec() == "pr,10"
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-04-29 09:15:02 +00:00
|
|
|
def test_parse_stress(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_response({}, "400:b@100g")
|
2012-10-27 01:00:50 +00:00
|
|
|
assert r.length({}, None)
|
2012-04-29 09:15:02 +00:00
|
|
|
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-06-24 05:01:04 +00:00
|
|
|
class TestWriteValues:
|
2012-07-20 08:14:35 +00:00
|
|
|
def test_send_chunk(self):
|
|
|
|
v = "foobarfoobar"
|
|
|
|
for bs in range(1, len(v)+2):
|
|
|
|
s = cStringIO.StringIO()
|
2012-10-04 21:30:32 +00:00
|
|
|
language.send_chunk(s, v, bs, 0, len(v))
|
2012-07-20 08:14:35 +00:00
|
|
|
assert s.getvalue() == v
|
|
|
|
for start in range(len(v)):
|
|
|
|
for end in range(len(v)):
|
|
|
|
s = cStringIO.StringIO()
|
2012-10-04 21:30:32 +00:00
|
|
|
language.send_chunk(s, v, bs, start, end)
|
2012-07-20 08:14:35 +00:00
|
|
|
assert s.getvalue() == v[start:end]
|
|
|
|
|
2012-07-20 11:36:39 +00:00
|
|
|
def test_write_values_inject(self):
|
|
|
|
tst = "foo"
|
|
|
|
|
|
|
|
s = cStringIO.StringIO()
|
2012-10-04 21:30:32 +00:00
|
|
|
language.write_values(s, [tst], [(0, "inject", "aaa")], blocksize=5)
|
2012-07-20 11:36:39 +00:00
|
|
|
assert s.getvalue() == "aaafoo"
|
|
|
|
|
|
|
|
s = cStringIO.StringIO()
|
2012-10-04 21:30:32 +00:00
|
|
|
language.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5)
|
2012-07-20 11:36:39 +00:00
|
|
|
assert s.getvalue() == "faaaoo"
|
|
|
|
|
|
|
|
s = cStringIO.StringIO()
|
2012-10-04 21:30:32 +00:00
|
|
|
language.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5)
|
2012-07-20 11:36:39 +00:00
|
|
|
assert s.getvalue() == "faaaoo"
|
|
|
|
|
2012-04-28 00:42:03 +00:00
|
|
|
def test_write_values_disconnects(self):
|
2012-06-20 22:56:30 +00:00
|
|
|
s = cStringIO.StringIO()
|
2012-04-28 00:42:03 +00:00
|
|
|
tst = "foo"*100
|
2012-10-04 21:30:32 +00:00
|
|
|
language.write_values(s, [tst], [(0, "disconnect")], blocksize=5)
|
2012-04-28 00:42:03 +00:00
|
|
|
assert not s.getvalue()
|
|
|
|
|
|
|
|
def test_write_values(self):
|
2012-07-20 08:14:35 +00:00
|
|
|
tst = "foobarvoing"
|
2012-06-20 22:56:30 +00:00
|
|
|
s = cStringIO.StringIO()
|
2012-10-04 21:30:32 +00:00
|
|
|
language.write_values(s, [tst], [])
|
2012-04-28 00:42:03 +00:00
|
|
|
assert s.getvalue() == tst
|
|
|
|
|
2012-07-20 08:14:35 +00:00
|
|
|
for bs in range(1, len(tst) + 2):
|
|
|
|
for off in range(len(tst)):
|
|
|
|
s = cStringIO.StringIO()
|
2012-10-04 21:30:32 +00:00
|
|
|
language.write_values(s, [tst], [(off, "disconnect")], blocksize=bs)
|
2012-07-20 08:14:35 +00:00
|
|
|
assert s.getvalue() == tst[:off]
|
|
|
|
|
2012-04-28 00:42:03 +00:00
|
|
|
def test_write_values_pauses(self):
|
|
|
|
tst = "".join(str(i) for i in range(10))
|
|
|
|
for i in range(2, 10):
|
2012-06-20 22:56:30 +00:00
|
|
|
s = cStringIO.StringIO()
|
2012-10-04 21:30:32 +00:00
|
|
|
language.write_values(s, [tst], [(2, "pause", 0), (1, "pause", 0)], blocksize=i)
|
2012-04-28 00:42:03 +00:00
|
|
|
assert s.getvalue() == tst
|
|
|
|
|
|
|
|
for i in range(2, 10):
|
2012-06-20 22:56:30 +00:00
|
|
|
s = cStringIO.StringIO()
|
2012-10-04 21:30:32 +00:00
|
|
|
language.write_values(s, [tst], [(1, "pause", 0)], blocksize=i)
|
2012-04-28 00:42:03 +00:00
|
|
|
assert s.getvalue() == tst
|
|
|
|
|
|
|
|
tst = ["".join(str(i) for i in range(10))]*5
|
|
|
|
for i in range(2, 10):
|
2012-06-20 22:56:30 +00:00
|
|
|
s = cStringIO.StringIO()
|
2012-10-04 21:30:32 +00:00
|
|
|
language.write_values(s, tst[:], [(1, "pause", 0)], blocksize=i)
|
2012-04-28 00:42:03 +00:00
|
|
|
assert s.getvalue() == "".join(tst)
|
|
|
|
|
2012-07-24 11:49:58 +00:00
|
|
|
def test_write_values_after(self):
|
|
|
|
s = cStringIO.StringIO()
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_response({}, "400:da")
|
2012-10-27 20:06:55 +00:00
|
|
|
r.serve(s, {})
|
2012-07-24 11:49:58 +00:00
|
|
|
|
|
|
|
s = cStringIO.StringIO()
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_response({}, "400:pa,0")
|
2012-10-27 20:06:55 +00:00
|
|
|
r.serve(s, {})
|
2012-07-24 11:49:58 +00:00
|
|
|
|
|
|
|
s = cStringIO.StringIO()
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_response({}, "400:ia,'xx'")
|
2012-10-27 20:06:55 +00:00
|
|
|
r.serve(s, {})
|
2012-07-24 11:49:58 +00:00
|
|
|
assert s.getvalue().endswith('xx')
|
|
|
|
|
2012-06-24 05:01:04 +00:00
|
|
|
|
|
|
|
class TestResponse:
|
|
|
|
def dummy_response(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
return language.parse_response({}, "400'msg'")
|
2012-06-24 05:01:04 +00:00
|
|
|
|
2012-07-22 11:37:46 +00:00
|
|
|
def test_file(self):
|
|
|
|
p = tutils.test_data.path("data")
|
|
|
|
d = dict(staticdir=p)
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_response(d, "+response")
|
2012-10-28 04:07:16 +00:00
|
|
|
assert r.code.string() == "202"
|
2012-07-22 11:37:46 +00:00
|
|
|
|
2012-06-24 05:01:04 +00:00
|
|
|
def test_response(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_response({}, "400'msg'")
|
2012-10-28 04:07:16 +00:00
|
|
|
assert r.code.string() == "400"
|
|
|
|
assert r.reason.string() == "msg"
|
2012-06-24 05:01:04 +00:00
|
|
|
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_response({}, "400'msg':b@100b")
|
2012-10-28 04:07:16 +00:00
|
|
|
assert r.reason.string() == "msg"
|
2012-10-27 23:56:08 +00:00
|
|
|
assert r.body.values({})
|
2012-06-24 05:01:04 +00:00
|
|
|
assert str(r)
|
|
|
|
|
2012-10-28 04:07:16 +00:00
|
|
|
r = language.parse_response({}, "200")
|
|
|
|
assert r.code.string() == "200"
|
|
|
|
assert not r.reason
|
|
|
|
assert "OK" in [i[:] for i in r.preamble({})]
|
|
|
|
|
2012-04-28 00:42:03 +00:00
|
|
|
def test_render(self):
|
2012-06-20 22:56:30 +00:00
|
|
|
s = cStringIO.StringIO()
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_response({}, "400'msg'")
|
2012-10-27 20:06:55 +00:00
|
|
|
assert r.serve(s, {})
|
2012-04-28 00:42:03 +00:00
|
|
|
|
2012-07-24 00:18:14 +00:00
|
|
|
def test_raw(self):
|
|
|
|
s = cStringIO.StringIO()
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_response({}, "400:b'foo'")
|
2012-10-27 20:06:55 +00:00
|
|
|
r.serve(s, {})
|
2012-07-24 00:46:14 +00:00
|
|
|
v = s.getvalue()
|
|
|
|
assert "Content-Length" in v
|
|
|
|
assert "Date" in v
|
2012-07-24 00:18:14 +00:00
|
|
|
|
|
|
|
s = cStringIO.StringIO()
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_response({}, "400:b'foo':r")
|
2012-10-27 20:06:55 +00:00
|
|
|
r.serve(s, {})
|
2012-07-24 00:46:14 +00:00
|
|
|
v = s.getvalue()
|
|
|
|
assert not "Content-Length" in v
|
|
|
|
assert not "Date" in v
|
2012-07-24 00:18:14 +00:00
|
|
|
|
2012-04-28 00:42:03 +00:00
|
|
|
def test_length(self):
|
|
|
|
def testlen(x):
|
2012-06-20 22:56:30 +00:00
|
|
|
s = cStringIO.StringIO()
|
2012-10-27 20:06:55 +00:00
|
|
|
x.serve(s, {})
|
2012-10-27 01:00:50 +00:00
|
|
|
assert x.length({}, None) == len(s.getvalue())
|
2012-10-04 21:30:32 +00:00
|
|
|
testlen(language.parse_response({}, "400'msg'"))
|
|
|
|
testlen(language.parse_response({}, "400'msg':h'foo'='bar'"))
|
|
|
|
testlen(language.parse_response({}, "400'msg':h'foo'='bar':b@100b"))
|
2012-07-22 11:37:46 +00:00
|
|
|
|
2012-10-27 04:40:22 +00:00
|
|
|
def test_maximum_length(self):
|
2012-07-22 22:47:33 +00:00
|
|
|
def testlen(x, actions):
|
|
|
|
s = cStringIO.StringIO()
|
2012-10-27 04:40:22 +00:00
|
|
|
m = x.maximum_length({}, None)
|
2012-10-27 20:06:55 +00:00
|
|
|
x.serve(s, {})
|
2012-10-27 04:40:22 +00:00
|
|
|
assert m >= len(s.getvalue())
|
2012-07-22 22:47:33 +00:00
|
|
|
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_response({}, "400'msg':b@100")
|
2012-07-22 22:47:33 +00:00
|
|
|
|
|
|
|
actions = [
|
2012-10-24 20:45:55 +00:00
|
|
|
language.DisconnectAt(0)
|
2012-07-22 22:47:33 +00:00
|
|
|
]
|
|
|
|
r.actions = actions
|
|
|
|
testlen(r, actions)
|
|
|
|
|
|
|
|
actions = [
|
2012-10-24 20:45:55 +00:00
|
|
|
language.DisconnectAt(0),
|
|
|
|
language.InjectAt(0, language.ValueLiteral("foo"))
|
2012-07-22 22:47:33 +00:00
|
|
|
]
|
|
|
|
r.actions = actions
|
|
|
|
testlen(r, actions)
|
|
|
|
|
|
|
|
actions = [
|
2012-10-24 20:45:55 +00:00
|
|
|
language.InjectAt(0, language.ValueLiteral("foo"))
|
2012-07-22 22:47:33 +00:00
|
|
|
]
|
|
|
|
r.actions = actions
|
|
|
|
testlen(r, actions)
|
|
|
|
|
2012-07-24 10:38:48 +00:00
|
|
|
def test_render(self):
|
2012-10-04 21:30:32 +00:00
|
|
|
r = language.parse_response({}, "400:p0,100:dr")
|
2012-10-24 20:45:55 +00:00
|
|
|
assert r.actions[0].spec() == "p0,100"
|
2012-07-25 00:49:22 +00:00
|
|
|
assert len(r.preview_safe()) == 1
|
2012-10-24 20:45:55 +00:00
|
|
|
assert not r.actions[0].spec().startswith("p")
|
2012-07-24 10:38:48 +00:00
|
|
|
|
2012-07-22 22:47:33 +00:00
|
|
|
|
2012-07-22 11:37:46 +00:00
|
|
|
|
|
|
|
def test_read_file():
|
2012-10-04 21:30:32 +00:00
|
|
|
tutils.raises(language.FileAccessDenied, language.read_file, {}, "=/foo")
|
2012-07-22 11:37:46 +00:00
|
|
|
p = tutils.test_data.path("data")
|
|
|
|
d = dict(staticdir=p)
|
2012-10-04 21:30:32 +00:00
|
|
|
assert language.read_file(d, "+./file").strip() == "testfile"
|
|
|
|
assert language.read_file(d, "+file").strip() == "testfile"
|
|
|
|
tutils.raises(language.FileAccessDenied, language.read_file, d, "+./nonexistent")
|
|
|
|
tutils.raises(language.FileAccessDenied, language.read_file, d, "+/nonexistent")
|
2012-07-22 11:37:46 +00:00
|
|
|
|
2012-10-04 21:30:32 +00:00
|
|
|
tutils.raises(language.FileAccessDenied, language.read_file, d, "+../test_language.py")
|
2012-07-22 11:37:46 +00:00
|
|
|
d["unconstrained_file_access"] = True
|
2012-10-04 21:30:32 +00:00
|
|
|
assert language.read_file(d, "+../test_language.py")
|