import os, cStringIO from libpathod import language, utils import tutils language.TESTING = True class TestValueNakedLiteral: def test_expr(self): v = language.ValueNakedLiteral("foo") assert v.expr() def test_spec(self): v = language.ValueNakedLiteral("foo") assert v.spec() == repr(v) == "foo" v = language.ValueNakedLiteral("f\x00oo") assert v.spec() == repr(v) == r"f\x00oo" class TestValueLiteral: def test_espr(self): v = language.ValueLiteral("foo") assert v.expr() assert v.val == "foo" v = language.ValueLiteral(r"foo\n") assert v.expr() assert v.val == "foo\n" assert repr(v) def test_spec(self): v = language.ValueLiteral("foo") assert v.spec() == r'"foo"' v = language.ValueLiteral("f\x00oo") assert v.spec() == repr(v) == r'"f\x00oo"' def test_freeze(self): v = language.ValueLiteral("foo") assert v.freeze({}).val == v.val class TestValueGenerate: def test_basic(self): v = language.Value.parseString("@10b")[0] assert v.usize == 10 assert v.unit == "b" assert v.bytes() == 10 v = language.Value.parseString("@10")[0] assert v.unit == "b" v = language.Value.parseString("@10k")[0] assert v.bytes() == 10240 v = language.Value.parseString("@10g")[0] assert v.bytes() == 1024**3 * 10 v = language.Value.parseString("@10g,digits")[0] assert v.datatype == "digits" g = v.get_generator({}) assert g[:100] v = language.Value.parseString("@10,digits")[0] assert v.unit == "b" assert v.datatype == "digits" def test_spec(self): v = language.ValueGenerate(1, "b", "bytes") assert v.spec() == repr(v) == "@1" v = language.ValueGenerate(1, "k", "bytes") assert v.spec() == repr(v) == "@1k" v = language.ValueGenerate(1, "k", "ascii") assert v.spec() == repr(v) == "@1k,ascii" v = language.ValueGenerate(1, "b", "ascii") assert v.spec() == repr(v) == "@1,ascii" def test_freeze(self): v = language.ValueGenerate(100, "b", "ascii") f = v.freeze({}) assert len(f.val) == 100 class TestValueFile: def test_file_value(self): v = language.Value.parseString("<'one two'")[0] assert str(v) assert v.path == "one two" v = language.Value.parseString(" 100 def test_path_generator(self): r = language.parse_request({}, "GET:@100").freeze({}) assert len(r.spec()) > 100 class TestWriteValues: def test_send_chunk(self): v = "foobarfoobar" for bs in range(1, len(v)+2): s = cStringIO.StringIO() language.send_chunk(s, v, bs, 0, len(v)) assert s.getvalue() == v for start in range(len(v)): for end in range(len(v)): s = cStringIO.StringIO() language.send_chunk(s, v, bs, start, end) assert s.getvalue() == v[start:end] def test_write_values_inject(self): tst = "foo" s = cStringIO.StringIO() language.write_values(s, [tst], [(0, "inject", "aaa")], blocksize=5) assert s.getvalue() == "aaafoo" s = cStringIO.StringIO() language.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5) assert s.getvalue() == "faaaoo" s = cStringIO.StringIO() language.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5) assert s.getvalue() == "faaaoo" def test_write_values_disconnects(self): s = cStringIO.StringIO() tst = "foo"*100 language.write_values(s, [tst], [(0, "disconnect")], blocksize=5) assert not s.getvalue() def test_write_values(self): tst = "foobarvoing" s = cStringIO.StringIO() language.write_values(s, [tst], []) assert s.getvalue() == tst for bs in range(1, len(tst) + 2): for off in range(len(tst)): s = cStringIO.StringIO() language.write_values(s, [tst], [(off, "disconnect")], blocksize=bs) assert s.getvalue() == tst[:off] def test_write_values_pauses(self): tst = "".join(str(i) for i in range(10)) for i in range(2, 10): s = cStringIO.StringIO() language.write_values(s, [tst], [(2, "pause", 0), (1, "pause", 0)], blocksize=i) assert s.getvalue() == tst for i in range(2, 10): s = cStringIO.StringIO() language.write_values(s, [tst], [(1, "pause", 0)], blocksize=i) assert s.getvalue() == tst tst = ["".join(str(i) for i in range(10))]*5 for i in range(2, 10): s = cStringIO.StringIO() language.write_values(s, tst[:], [(1, "pause", 0)], blocksize=i) assert s.getvalue() == "".join(tst) def test_write_values_after(self): s = cStringIO.StringIO() r = language.parse_response({}, "400:da") language.serve(r, s, {}) s = cStringIO.StringIO() r = language.parse_response({}, "400:pa,0") language.serve(r, s, {}) s = cStringIO.StringIO() r = language.parse_response({}, "400:ia,'xx'") language.serve(r, s, {}) assert s.getvalue().endswith('xx') class TestResponse: def dummy_response(self): return language.parse_response({}, "400'msg'") def test_file(self): p = tutils.test_data.path("data") d = dict(staticdir=p) r = language.parse_response(d, "+response") assert r.code.string() == "202" def test_response(self): r = language.parse_response({}, "400:m'msg'") assert r.code.string() == "400" assert r.reason.string() == "msg" r = language.parse_response({}, "400:m'msg':b@100b") assert r.reason.string() == "msg" assert r.body.values({}) assert str(r) r = language.parse_response({}, "200") assert r.code.string() == "200" assert not r.reason assert "OK" in [i[:] for i in r.preamble({})] def test_render(self): s = cStringIO.StringIO() r = language.parse_response({}, "400:m'msg'") assert language.serve(r, s, {}) def test_raw(self): s = cStringIO.StringIO() r = language.parse_response({}, "400:b'foo'") language.serve(r, s, {}) v = s.getvalue() assert "Content-Length" in v assert "Date" in v s = cStringIO.StringIO() r = language.parse_response({}, "400:b'foo':r") language.serve(r, s, {}) v = s.getvalue() assert not "Content-Length" in v assert not "Date" in v def test_length(self): def testlen(x): s = cStringIO.StringIO() language.serve(x, s, {}) assert x.length({}) == len(s.getvalue()) testlen(language.parse_response({}, "400:m'msg':r")) testlen(language.parse_response({}, "400:m'msg':h'foo'='bar':r")) testlen(language.parse_response({}, "400:m'msg':h'foo'='bar':b@100b:r")) def test_maximum_length(self): def testlen(x): s = cStringIO.StringIO() m = x.maximum_length({}) language.serve(x, s, {}) assert m >= len(s.getvalue()) r = language.parse_response({}, "400:m'msg':b@100:d0") testlen(r) r = language.parse_response({}, "400:m'msg':b@100:d0:i0,'foo'") testlen(r) r = language.parse_response({}, "400:m'msg':b@100:d0:i0,'foo'") testlen(r) def test_render(self): r = language.parse_response({}, "400:p0,100:dr") assert "p0" in r.spec() s = r.preview_safe() assert not "p0" in s.spec() def test_parse_err(self): tutils.raises(language.ParseException, language.parse_response, {}, "400:msg,b:") try: language.parse_response({}, "400'msg':b:") except language.ParseException, v: assert v.marked() assert str(v) def test_nonascii(self): tutils.raises("ascii", language.parse_response, {}, "foo:b\xf0") def test_parse_header(self): r = language.parse_response({}, '400:h"foo"="bar"') assert utils.get_header("foo", r.headers) def test_parse_pause_before(self): r = language.parse_response({}, "400:p0,10") assert r.actions[0].spec() == "p0,10" def test_parse_pause_after(self): r = language.parse_response({}, "400:pa,10") assert r.actions[0].spec() == "pa,10" def test_parse_pause_random(self): r = language.parse_response({}, "400:pr,10") assert r.actions[0].spec() == "pr,10" def test_parse_stress(self): r = language.parse_response({}, "400:b@100g") assert r.length({}) def test_spec(self): def rt(s): s = language.parse_response({}, s).spec() assert language.parse_response({}, s).spec() == s rt("400:b@100g") rt("400") rt("400:da") def test_read_file(): tutils.raises(language.FileAccessDenied, language.read_file, {}, "=/foo") p = tutils.test_data.path("data") d = dict(staticdir=p) assert language.read_file(d, "+./file").strip() == "testfile" assert language.read_file(d, "+file").strip() == "testfile" tutils.raises(language.FileAccessDenied, language.read_file, d, "+./nonexistent") tutils.raises(language.FileAccessDenied, language.read_file, d, "+/nonexistent") tutils.raises(language.FileAccessDenied, language.read_file, d, "+../test_language.py") d["unconstrained_file_access"] = True assert language.read_file(d, "+../test_language.py")