rparse.py -> language.py

This commit is contained in:
Aldo Cortesi 2012-10-05 10:30:32 +13:00
parent d3e1f8a014
commit f5d5cc4988
7 changed files with 128 additions and 128 deletions

View File

@ -1,6 +1,6 @@
import logging, pprint, cStringIO import logging, pprint, cStringIO
from flask import Flask, jsonify, render_template, request, abort, make_response from flask import Flask, jsonify, render_template, request, abort, make_response
import version, rparse, utils import version, language, utils
logging.basicConfig(level="DEBUG") logging.basicConfig(level="DEBUG")
app = Flask(__name__) app = Flask(__name__)
@ -116,14 +116,14 @@ def _preview(is_request):
try: try:
if is_request: if is_request:
r = rparse.parse_request(app.config["pathod"].request_settings, spec) r = language.parse_request(app.config["pathod"].request_settings, spec)
else: else:
r = rparse.parse_response(app.config["pathod"].request_settings, spec) r = language.parse_response(app.config["pathod"].request_settings, spec)
except rparse.ParseException, v: except language.ParseException, v:
args["syntaxerror"] = str(v) args["syntaxerror"] = str(v)
args["marked"] = v.marked() args["marked"] = v.marked()
return render(template, False, **args) return render(template, False, **args)
except rparse.FileAccessDenied: except language.FileAccessDenied:
args["error"] = "File access is disabled." args["error"] = "File access is disabled."
return render(template, False, **args) return render(template, False, **args)

View File

@ -1,7 +1,7 @@
import sys, os import sys, os
from netlib import tcp, http from netlib import tcp, http
import netlib.utils import netlib.utils
import rparse, utils import language, utils
class PathocError(Exception): pass class PathocError(Exception): pass
@ -18,10 +18,10 @@ class Pathoc(tcp.TCPClient):
""" """
Return an (httpversion, code, msg, headers, content) tuple. Return an (httpversion, code, msg, headers, content) tuple.
May raise rparse.ParseException, netlib.http.HttpError or May raise language.ParseException, netlib.http.HttpError or
rparse.FileAccessDenied. language.FileAccessDenied.
""" """
r = rparse.parse_request(self.settings, spec) r = language.parse_request(self.settings, spec)
ret = r.serve(self.wfile, None, self.host) ret = r.serve(self.wfile, None, self.host)
self.wfile.flush() self.wfile.flush()
return http.read_response(self.rfile, r.method, None) return http.read_response(self.rfile, r.method, None)
@ -53,23 +53,23 @@ class Pathoc(tcp.TCPClient):
Returns True if we have a non-ignored response. Returns True if we have a non-ignored response.
""" """
try: try:
r = rparse.parse_request(self.settings, spec) r = language.parse_request(self.settings, spec)
except rparse.ParseException, v: except language.ParseException, v:
print >> fp, "Error parsing request spec: %s"%v.msg print >> fp, "Error parsing request spec: %s"%v.msg
print >> fp, v.marked() print >> fp, v.marked()
return return
except rparse.FileAccessDenied, v: except language.FileAccessDenied, v:
print >> fp, "File access error: %s"%v print >> fp, "File access error: %s"%v
return return
resp, req = None, None resp, req = None, None
if showreq: if showreq:
self.wfile.start_log() self.wfile.start_log()
if showresp:
self.rfile.start_log()
try: try:
req = r.serve(self.wfile, None, self.host) req = r.serve(self.wfile, None, self.host)
self.wfile.flush() self.wfile.flush()
if showresp:
self.rfile.start_log()
resp = http.read_response(self.rfile, r.method, None) resp = http.read_response(self.rfile, r.method, None)
except http.HttpError, v: except http.HttpError, v:
print >> fp, "<< HTTP Error:", v.msg print >> fp, "<< HTTP Error:", v.msg

View File

@ -1,7 +1,7 @@
import urllib, threading, re, logging, socket, sys import urllib, threading, re, logging, socket, sys
from netlib import tcp, http, odict, wsgi from netlib import tcp, http, odict, wsgi
import netlib.utils import netlib.utils
import version, app, rparse import version, app, language
logger = logging.getLogger('pathod') logger = logging.getLogger('pathod')
@ -76,26 +76,26 @@ class PathodHandler(tcp.BaseHandler):
for i in self.server.anchors: for i in self.server.anchors:
if i[0].match(path): if i[0].match(path):
self.info("crafting anchor: %s"%path) self.info("crafting anchor: %s"%path)
aresp = rparse.parse_response(self.server.request_settings, i[1]) aresp = language.parse_response(self.server.request_settings, i[1])
return self.serve_crafted(aresp, request_log) return self.serve_crafted(aresp, request_log)
if not self.server.nocraft and path.startswith(self.server.craftanchor): if not self.server.nocraft and path.startswith(self.server.craftanchor):
spec = urllib.unquote(path)[len(self.server.craftanchor):] spec = urllib.unquote(path)[len(self.server.craftanchor):]
self.info("crafting spec: %s"%spec) self.info("crafting spec: %s"%spec)
try: try:
crafted = rparse.parse_response(self.server.request_settings, spec) crafted = language.parse_response(self.server.request_settings, spec)
except rparse.ParseException, v: except language.ParseException, v:
self.info("Parse error: %s"%v.msg) self.info("Parse error: %s"%v.msg)
crafted = rparse.PathodErrorResponse( crafted = language.PathodErrorResponse(
"Parse Error", "Parse Error",
"Error parsing response spec: %s\n"%v.msg + v.marked() "Error parsing response spec: %s\n"%v.msg + v.marked()
) )
except rparse.FileAccessDenied: except language.FileAccessDenied:
self.info("File access denied") self.info("File access denied")
crafted = rparse.PathodErrorResponse("Access Denied") crafted = language.PathodErrorResponse("Access Denied")
return self.serve_crafted(crafted, request_log) return self.serve_crafted(crafted, request_log)
elif self.server.noweb: elif self.server.noweb:
crafted = rparse.PathodErrorResponse("Access Denied") crafted = language.PathodErrorResponse("Access Denied")
crafted.serve(self.wfile, self.server.check_policy) crafted.serve(self.wfile, self.server.check_policy)
return False, dict(type = "error", msg="Access denied: web interface disabled") return False, dict(type = "error", msg="Access denied: web interface disabled")
else: else:
@ -200,8 +200,8 @@ class Pathod(tcp.TCPServer):
except re.error: except re.error:
raise PathodError("Invalid regex in anchor: %s"%i[0]) raise PathodError("Invalid regex in anchor: %s"%i[0])
try: try:
aresp = rparse.parse_response(self.request_settings, i[1]) aresp = language.parse_response(self.request_settings, i[1])
except rparse.ParseException, v: except language.ParseException, v:
raise PathodError("Invalid page spec in anchor: '%s', %s"%(i[1], str(v))) raise PathodError("Invalid page spec in anchor: '%s', %s"%(i[1], str(v)))
self.anchors.append((arex, i[1])) self.anchors.append((arex, i[1]))

2
pathoc
View File

@ -1,6 +1,6 @@
#!/usr/bin/env python #!/usr/bin/env python
import argparse, sys import argparse, sys
from libpathod import pathoc, version, rparse from libpathod import pathoc, version, language
from netlib import tcp from netlib import tcp
if __name__ == "__main__": if __name__ == "__main__":

4
pathod
View File

@ -1,7 +1,7 @@
#!/usr/bin/env python #!/usr/bin/env python
import argparse, sys, logging, logging.handlers import argparse, sys, logging, logging.handlers
import os import os
from libpathod import pathod, utils, version, rparse from libpathod import pathod, utils, version, language
def daemonize (stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'): def daemonize (stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
@ -96,7 +96,7 @@ def main(parser, args):
) )
except pathod.PathodError, v: except pathod.PathodError, v:
parser.error(str(v)) parser.error(str(v))
except rparse.FileAccessDenied, v: except language.FileAccessDenied, v:
parser.error("%s You probably want to a -d argument."%str(v)) parser.error("%s You probably want to a -d argument."%str(v))
try: try:

View File

@ -1,18 +1,18 @@
import os, cStringIO import os, cStringIO
from libpathod import rparse, utils from libpathod import language, utils
import tutils import tutils
rparse.TESTING = True language.TESTING = True
class TestMisc: class TestMisc:
def test_generators(self): def test_generators(self):
v = rparse.Value.parseString("'val'")[0] v = language.Value.parseString("'val'")[0]
g = v.get_generator({}) g = v.get_generator({})
assert g[:] == "val" assert g[:] == "val"
def test_randomgenerator(self): def test_randomgenerator(self):
g = rparse.RandomGenerator("bytes", 100) g = language.RandomGenerator("bytes", 100)
assert repr(g) assert repr(g)
assert len(g[:10]) == 10 assert len(g[:10]) == 10
assert len(g[1:10]) == 9 assert len(g[1:10]) == 9
@ -21,7 +21,7 @@ class TestMisc:
assert g[0] assert g[0]
def test_literalgenerator(self): def test_literalgenerator(self):
g = rparse.LiteralGenerator("one") g = language.LiteralGenerator("one")
assert repr(g) assert repr(g)
assert g == "one" assert g == "one"
assert g[:] == "one" assert g[:] == "one"
@ -33,7 +33,7 @@ class TestMisc:
f = open(path, "w") f = open(path, "w")
f.write("x"*10000) f.write("x"*10000)
f.close() f.close()
g = rparse.FileGenerator(path) g = language.FileGenerator(path)
assert len(g) == 10000 assert len(g) == 10000
assert g[0] == "x" assert g[0] == "x"
assert g[-1] == "x" assert g[-1] == "x"
@ -41,26 +41,26 @@ class TestMisc:
assert repr(g) assert repr(g)
def test_valueliteral(self): def test_valueliteral(self):
v = rparse.ValueLiteral("foo") v = language.ValueLiteral("foo")
assert v.expr() assert v.expr()
assert v.val == "foo" assert v.val == "foo"
v = rparse.ValueLiteral(r"foo\n") v = language.ValueLiteral(r"foo\n")
assert v.expr() assert v.expr()
assert v.val == "foo\n" assert v.val == "foo\n"
assert repr(v) assert repr(v)
def test_valuenakedliteral(self): def test_valuenakedliteral(self):
v = rparse.ValueNakedLiteral("foo") v = language.ValueNakedLiteral("foo")
assert v.expr() assert v.expr()
assert repr(v) assert repr(v)
def test_file_value(self): def test_file_value(self):
v = rparse.Value.parseString("<'one two'")[0] v = language.Value.parseString("<'one two'")[0]
assert str(v) assert str(v)
assert v.path == "one two" assert v.path == "one two"
v = rparse.Value.parseString("<path")[0] v = language.Value.parseString("<path")[0]
assert v.path == "path" assert v.path == "path"
with tutils.tmpdir() as t: with tutils.tmpdir() as t:
@ -71,57 +71,57 @@ class TestMisc:
assert v.get_generator(dict(staticdir=t)) assert v.get_generator(dict(staticdir=t))
v = rparse.Value.parseString("<path2")[0] v = language.Value.parseString("<path2")[0]
tutils.raises(rparse.FileAccessDenied, v.get_generator, dict(staticdir=t)) tutils.raises(language.FileAccessDenied, v.get_generator, dict(staticdir=t))
tutils.raises("access disabled", v.get_generator, dict()) tutils.raises("access disabled", v.get_generator, dict())
v = rparse.Value.parseString("</outside")[0] v = language.Value.parseString("</outside")[0]
tutils.raises("outside", v.get_generator, dict(staticdir=t)) tutils.raises("outside", v.get_generator, dict(staticdir=t))
def test_generated_value(self): def test_generated_value(self):
v = rparse.Value.parseString("@10b")[0] v = language.Value.parseString("@10b")[0]
assert v.usize == 10 assert v.usize == 10
assert v.unit == "b" assert v.unit == "b"
assert v.bytes() == 10 assert v.bytes() == 10
v = rparse.Value.parseString("@10")[0] v = language.Value.parseString("@10")[0]
assert v.unit == "b" assert v.unit == "b"
v = rparse.Value.parseString("@10k")[0] v = language.Value.parseString("@10k")[0]
assert v.bytes() == 10240 assert v.bytes() == 10240
v = rparse.Value.parseString("@10g")[0] v = language.Value.parseString("@10g")[0]
assert v.bytes() == 1024**3 * 10 assert v.bytes() == 1024**3 * 10
v = rparse.Value.parseString("@10g,digits")[0] v = language.Value.parseString("@10g,digits")[0]
assert v.datatype == "digits" assert v.datatype == "digits"
g = v.get_generator({}) g = v.get_generator({})
assert g[:100] assert g[:100]
v = rparse.Value.parseString("@10,digits")[0] v = language.Value.parseString("@10,digits")[0]
assert v.unit == "b" assert v.unit == "b"
assert v.datatype == "digits" assert v.datatype == "digits"
def test_value(self): def test_value(self):
assert rparse.Value.parseString("'val'")[0].val == "val" assert language.Value.parseString("'val'")[0].val == "val"
assert rparse.Value.parseString('"val"')[0].val == "val" assert language.Value.parseString('"val"')[0].val == "val"
assert rparse.Value.parseString('"\'val\'"')[0].val == "'val'" assert language.Value.parseString('"\'val\'"')[0].val == "'val'"
def test_path(self): def test_path(self):
e = rparse.Path.expr() e = language.Path.expr()
assert e.parseString('"/foo"')[0].value.val == "/foo" assert e.parseString('"/foo"')[0].value.val == "/foo"
e = rparse.Path("/foo") e = language.Path("/foo")
assert e.value.val == "/foo" assert e.value.val == "/foo"
def test_method(self): def test_method(self):
e = rparse.Method.expr() e = language.Method.expr()
assert e.parseString("get")[0].value.val == "GET" assert e.parseString("get")[0].value.val == "GET"
assert e.parseString("'foo'")[0].value.val == "foo" assert e.parseString("'foo'")[0].value.val == "foo"
assert e.parseString("'get'")[0].value.val == "get" assert e.parseString("'get'")[0].value.val == "get"
def test_raw(self): def test_raw(self):
e = rparse.Raw.expr() e = language.Raw.expr()
assert e.parseString("r")[0] assert e.parseString("r")[0]
def test_body(self): def test_body(self):
e = rparse.Body.expr() e = language.Body.expr()
v = e.parseString("b'foo'")[0] v = e.parseString("b'foo'")[0]
assert v.value.val == "foo" assert v.value.val == "foo"
@ -133,13 +133,13 @@ class TestMisc:
assert str(v.value) == "@100g,digits" assert str(v.value) == "@100g,digits"
def test_header(self): def test_header(self):
e = rparse.Header.expr() e = language.Header.expr()
v = e.parseString("h'foo'='bar'")[0] v = e.parseString("h'foo'='bar'")[0]
assert v.key.val == "foo" assert v.key.val == "foo"
assert v.value.val == "bar" assert v.value.val == "bar"
def test_code(self): def test_code(self):
e = rparse.Code.expr() e = language.Code.expr()
v = e.parseString("200")[0] v = e.parseString("200")[0]
assert v.code == 200 assert v.code == 200
@ -164,64 +164,64 @@ class TestMisc:
def test_internal_response(self): def test_internal_response(self):
d = cStringIO.StringIO() d = cStringIO.StringIO()
s = rparse.PathodErrorResponse("foo") s = language.PathodErrorResponse("foo")
s.serve(d) s.serve(d)
class TestDisconnects: class TestDisconnects:
def test_parse_response(self): def test_parse_response(self):
assert (0, "disconnect") in rparse.parse_response({}, "400:d0").actions assert (0, "disconnect") in language.parse_response({}, "400:d0").actions
assert ("r", "disconnect") in rparse.parse_response({}, "400:dr").actions assert ("r", "disconnect") in language.parse_response({}, "400:dr").actions
def test_at(self): def test_at(self):
e = rparse.DisconnectAt.expr() e = language.DisconnectAt.expr()
v = e.parseString("d0")[0] v = e.parseString("d0")[0]
assert isinstance(v, rparse.DisconnectAt) assert isinstance(v, language.DisconnectAt)
assert v.value == 0 assert v.value == 0
v = e.parseString("d100")[0] v = e.parseString("d100")[0]
assert v.value == 100 assert v.value == 100
e = rparse.DisconnectAt.expr() e = language.DisconnectAt.expr()
v = e.parseString("dr")[0] v = e.parseString("dr")[0]
assert v.value == "r" assert v.value == "r"
class TestInject: class TestInject:
def test_parse_response(self): def test_parse_response(self):
a = rparse.parse_response({}, "400:ir,@100").actions[0] a = language.parse_response({}, "400:ir,@100").actions[0]
assert a[0] == "r" assert a[0] == "r"
assert a[1] == "inject" assert a[1] == "inject"
a = rparse.parse_response({}, "400:ia,@100").actions[0] a = language.parse_response({}, "400:ia,@100").actions[0]
assert a[0] == "a" assert a[0] == "a"
assert a[1] == "inject" assert a[1] == "inject"
def test_at(self): def test_at(self):
e = rparse.InjectAt.expr() e = language.InjectAt.expr()
v = e.parseString("i0,'foo'")[0] v = e.parseString("i0,'foo'")[0]
assert v.value.val == "foo" assert v.value.val == "foo"
assert v.offset == 0 assert v.offset == 0
assert isinstance(v, rparse.InjectAt) assert isinstance(v, language.InjectAt)
v = e.parseString("ir,'foo'")[0] v = e.parseString("ir,'foo'")[0]
assert v.offset == "r" assert v.offset == "r"
def test_serve(self): def test_serve(self):
s = cStringIO.StringIO() s = cStringIO.StringIO()
r = rparse.parse_response({}, "400:i0,'foo'") r = language.parse_response({}, "400:i0,'foo'")
assert r.serve(s, None) assert r.serve(s, None)
class TestShortcuts: class TestShortcuts:
def test_parse_response(self): def test_parse_response(self):
assert rparse.parse_response({}, "400:c'foo'").headers[0][0][:] == "Content-Type" assert language.parse_response({}, "400:c'foo'").headers[0][0][:] == "Content-Type"
assert rparse.parse_response({}, "400:l'foo'").headers[0][0][:] == "Location" assert language.parse_response({}, "400:l'foo'").headers[0][0][:] == "Location"
class TestPauses: class TestPauses:
def test_parse_response(self): def test_parse_response(self):
e = rparse.PauseAt.expr() e = language.PauseAt.expr()
v = e.parseString("p10,10")[0] v = e.parseString("p10,10")[0]
assert v.seconds == 10 assert v.seconds == 10
assert v.offset == 10 assert v.offset == 10
@ -236,7 +236,7 @@ class TestPauses:
assert v.offset == "a" assert v.offset == "a"
def test_request(self): def test_request(self):
r = rparse.parse_response({}, '400:p10,10') r = language.parse_response({}, '400:p10,10')
assert r.actions[0] == (10, "pause", 10) assert r.actions[0] == (10, "pause", 10)
@ -244,28 +244,28 @@ class TestParseRequest:
def test_file(self): def test_file(self):
p = tutils.test_data.path("data") p = tutils.test_data.path("data")
d = dict(staticdir=p) d = dict(staticdir=p)
r = rparse.parse_request(d, "+request") r = language.parse_request(d, "+request")
assert r.path == "/foo" assert r.path == "/foo"
def test_err(self): def test_err(self):
tutils.raises(rparse.ParseException, rparse.parse_request, {}, 'GET') tutils.raises(language.ParseException, language.parse_request, {}, 'GET')
def test_simple(self): def test_simple(self):
r = rparse.parse_request({}, 'GET:"/foo"') r = language.parse_request({}, 'GET:"/foo"')
assert r.method == "GET" assert r.method == "GET"
assert r.path == "/foo" assert r.path == "/foo"
r = rparse.parse_request({}, 'GET:/foo') r = language.parse_request({}, 'GET:/foo')
assert r.path == "/foo" assert r.path == "/foo"
r = rparse.parse_request({}, 'GET:@1k') r = language.parse_request({}, 'GET:@1k')
assert len(r.path) == 1024 assert len(r.path) == 1024
def test_render(self): def test_render(self):
s = cStringIO.StringIO() s = cStringIO.StringIO()
r = rparse.parse_request({}, "GET:'/foo'") r = language.parse_request({}, "GET:'/foo'")
assert r.serve(s, None, "foo.com") assert r.serve(s, None, "foo.com")
def test_str(self): def test_str(self):
r = rparse.parse_request({}, 'GET:"/foo"') r = language.parse_request({}, 'GET:"/foo"')
assert str(r) assert str(r)
def test_multiline(self): def test_multiline(self):
@ -274,7 +274,7 @@ class TestParseRequest:
"/foo" "/foo"
ir,@1 ir,@1
""" """
r = rparse.parse_request({}, l) r = language.parse_request({}, l)
assert r.method == "GET" assert r.method == "GET"
assert r.path == "/foo" assert r.path == "/foo"
assert r.actions assert r.actions
@ -291,7 +291,7 @@ class TestParseRequest:
ir,@1 ir,@1
""" """
r = rparse.parse_request({}, l) r = language.parse_request({}, l)
assert r.method == "GET" assert r.method == "GET"
assert r.path.s.endswith("bar") assert r.path.s.endswith("bar")
assert r.actions assert r.actions
@ -299,31 +299,31 @@ class TestParseRequest:
class TestParseResponse: class TestParseResponse:
def test_parse_err(self): def test_parse_err(self):
tutils.raises(rparse.ParseException, rparse.parse_response, {}, "400:msg,b:") tutils.raises(language.ParseException, language.parse_response, {}, "400:msg,b:")
try: try:
rparse.parse_response({}, "400'msg':b:") language.parse_response({}, "400'msg':b:")
except rparse.ParseException, v: except language.ParseException, v:
assert v.marked() assert v.marked()
assert str(v) assert str(v)
def test_parse_header(self): def test_parse_header(self):
r = rparse.parse_response({}, '400:h"foo"="bar"') r = language.parse_response({}, '400:h"foo"="bar"')
assert utils.get_header("foo", r.headers) assert utils.get_header("foo", r.headers)
def test_parse_pause_before(self): def test_parse_pause_before(self):
r = rparse.parse_response({}, "400:p0,10") r = language.parse_response({}, "400:p0,10")
assert (0, "pause", 10) in r.actions assert (0, "pause", 10) in r.actions
def test_parse_pause_after(self): def test_parse_pause_after(self):
r = rparse.parse_response({}, "400:pa,10") r = language.parse_response({}, "400:pa,10")
assert ("a", "pause", 10) in r.actions assert ("a", "pause", 10) in r.actions
def test_parse_pause_random(self): def test_parse_pause_random(self):
r = rparse.parse_response({}, "400:pr,10") r = language.parse_response({}, "400:pr,10")
assert ("r", "pause", 10) in r.actions assert ("r", "pause", 10) in r.actions
def test_parse_stress(self): def test_parse_stress(self):
r = rparse.parse_response({}, "400:b@100g") r = language.parse_response({}, "400:b@100g")
assert r.length() assert r.length()
@ -332,138 +332,138 @@ class TestWriteValues:
v = "foobarfoobar" v = "foobarfoobar"
for bs in range(1, len(v)+2): for bs in range(1, len(v)+2):
s = cStringIO.StringIO() s = cStringIO.StringIO()
rparse.send_chunk(s, v, bs, 0, len(v)) language.send_chunk(s, v, bs, 0, len(v))
assert s.getvalue() == v assert s.getvalue() == v
for start in range(len(v)): for start in range(len(v)):
for end in range(len(v)): for end in range(len(v)):
s = cStringIO.StringIO() s = cStringIO.StringIO()
rparse.send_chunk(s, v, bs, start, end) language.send_chunk(s, v, bs, start, end)
assert s.getvalue() == v[start:end] assert s.getvalue() == v[start:end]
def test_write_values_inject(self): def test_write_values_inject(self):
tst = "foo" tst = "foo"
s = cStringIO.StringIO() s = cStringIO.StringIO()
rparse.write_values(s, [tst], [(0, "inject", "aaa")], blocksize=5) language.write_values(s, [tst], [(0, "inject", "aaa")], blocksize=5)
assert s.getvalue() == "aaafoo" assert s.getvalue() == "aaafoo"
s = cStringIO.StringIO() s = cStringIO.StringIO()
rparse.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5) language.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5)
assert s.getvalue() == "faaaoo" assert s.getvalue() == "faaaoo"
s = cStringIO.StringIO() s = cStringIO.StringIO()
rparse.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5) language.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5)
assert s.getvalue() == "faaaoo" assert s.getvalue() == "faaaoo"
def test_write_values_disconnects(self): def test_write_values_disconnects(self):
s = cStringIO.StringIO() s = cStringIO.StringIO()
tst = "foo"*100 tst = "foo"*100
rparse.write_values(s, [tst], [(0, "disconnect")], blocksize=5) language.write_values(s, [tst], [(0, "disconnect")], blocksize=5)
assert not s.getvalue() assert not s.getvalue()
def test_write_values(self): def test_write_values(self):
tst = "foobarvoing" tst = "foobarvoing"
s = cStringIO.StringIO() s = cStringIO.StringIO()
rparse.write_values(s, [tst], []) language.write_values(s, [tst], [])
assert s.getvalue() == tst assert s.getvalue() == tst
for bs in range(1, len(tst) + 2): for bs in range(1, len(tst) + 2):
for off in range(len(tst)): for off in range(len(tst)):
s = cStringIO.StringIO() s = cStringIO.StringIO()
rparse.write_values(s, [tst], [(off, "disconnect")], blocksize=bs) language.write_values(s, [tst], [(off, "disconnect")], blocksize=bs)
assert s.getvalue() == tst[:off] assert s.getvalue() == tst[:off]
def test_write_values_pauses(self): def test_write_values_pauses(self):
tst = "".join(str(i) for i in range(10)) tst = "".join(str(i) for i in range(10))
for i in range(2, 10): for i in range(2, 10):
s = cStringIO.StringIO() s = cStringIO.StringIO()
rparse.write_values(s, [tst], [(2, "pause", 0), (1, "pause", 0)], blocksize=i) language.write_values(s, [tst], [(2, "pause", 0), (1, "pause", 0)], blocksize=i)
assert s.getvalue() == tst assert s.getvalue() == tst
for i in range(2, 10): for i in range(2, 10):
s = cStringIO.StringIO() s = cStringIO.StringIO()
rparse.write_values(s, [tst], [(1, "pause", 0)], blocksize=i) language.write_values(s, [tst], [(1, "pause", 0)], blocksize=i)
assert s.getvalue() == tst assert s.getvalue() == tst
tst = ["".join(str(i) for i in range(10))]*5 tst = ["".join(str(i) for i in range(10))]*5
for i in range(2, 10): for i in range(2, 10):
s = cStringIO.StringIO() s = cStringIO.StringIO()
rparse.write_values(s, tst[:], [(1, "pause", 0)], blocksize=i) language.write_values(s, tst[:], [(1, "pause", 0)], blocksize=i)
assert s.getvalue() == "".join(tst) assert s.getvalue() == "".join(tst)
def test_write_values_after(self): def test_write_values_after(self):
s = cStringIO.StringIO() s = cStringIO.StringIO()
r = rparse.parse_response({}, "400:da") r = language.parse_response({}, "400:da")
r.serve(s, None) r.serve(s, None)
s = cStringIO.StringIO() s = cStringIO.StringIO()
r = rparse.parse_response({}, "400:pa,0") r = language.parse_response({}, "400:pa,0")
r.serve(s, None) r.serve(s, None)
s = cStringIO.StringIO() s = cStringIO.StringIO()
r = rparse.parse_response({}, "400:ia,'xx'") r = language.parse_response({}, "400:ia,'xx'")
r.serve(s, None) r.serve(s, None)
assert s.getvalue().endswith('xx') assert s.getvalue().endswith('xx')
def test_ready_actions(): def test_ready_actions():
x = [(0, 5)] x = [(0, 5)]
assert rparse.ready_actions(100, x) == x assert language.ready_actions(100, x) == x
x = [("r", 5)] x = [("r", 5)]
ret = rparse.ready_actions(100, x) ret = language.ready_actions(100, x)
assert 0 <= ret[0][0] < 100 assert 0 <= ret[0][0] < 100
x = [("a", "pause", 5)] x = [("a", "pause", 5)]
ret = rparse.ready_actions(100, x) ret = language.ready_actions(100, x)
assert ret[0][0] > 100 assert ret[0][0] > 100
x = [(1, 5), (0, 5)] x = [(1, 5), (0, 5)]
assert rparse.ready_actions(100, x) == sorted(x) assert language.ready_actions(100, x) == sorted(x)
class TestResponse: class TestResponse:
def dummy_response(self): def dummy_response(self):
return rparse.parse_response({}, "400'msg'") return language.parse_response({}, "400'msg'")
def test_file(self): def test_file(self):
p = tutils.test_data.path("data") p = tutils.test_data.path("data")
d = dict(staticdir=p) d = dict(staticdir=p)
r = rparse.parse_response(d, "+response") r = language.parse_response(d, "+response")
assert r.code == 202 assert r.code == 202
def test_response(self): def test_response(self):
r = rparse.parse_response({}, "400'msg'") r = language.parse_response({}, "400'msg'")
assert r.code == 400 assert r.code == 400
assert r.msg == "msg" assert r.msg == "msg"
r = rparse.parse_response({}, "400'msg':b@100b") r = language.parse_response({}, "400'msg':b@100b")
assert r.msg == "msg" assert r.msg == "msg"
assert r.body[:] assert r.body[:]
assert str(r) assert str(r)
def test_checkfunc(self): def test_checkfunc(self):
s = cStringIO.StringIO() s = cStringIO.StringIO()
r = rparse.parse_response({}, "400:b@100k") r = language.parse_response({}, "400:b@100k")
def check(req, acts): def check(req, acts):
return "errmsg" return "errmsg"
assert r.serve(s, check=check)["error"] == "errmsg" assert r.serve(s, check=check)["error"] == "errmsg"
def test_render(self): def test_render(self):
s = cStringIO.StringIO() s = cStringIO.StringIO()
r = rparse.parse_response({}, "400'msg'") r = language.parse_response({}, "400'msg'")
assert r.serve(s, None) assert r.serve(s, None)
def test_raw(self): def test_raw(self):
s = cStringIO.StringIO() s = cStringIO.StringIO()
r = rparse.parse_response({}, "400:b'foo'") r = language.parse_response({}, "400:b'foo'")
r.serve(s, None) r.serve(s, None)
v = s.getvalue() v = s.getvalue()
assert "Content-Length" in v assert "Content-Length" in v
assert "Date" in v assert "Date" in v
s = cStringIO.StringIO() s = cStringIO.StringIO()
r = rparse.parse_response({}, "400:b'foo':r") r = language.parse_response({}, "400:b'foo':r")
r.serve(s, None) r.serve(s, None)
v = s.getvalue() v = s.getvalue()
assert not "Content-Length" in v assert not "Content-Length" in v
@ -474,9 +474,9 @@ class TestResponse:
s = cStringIO.StringIO() s = cStringIO.StringIO()
x.serve(s, None) x.serve(s, None)
assert x.length() == len(s.getvalue()) assert x.length() == len(s.getvalue())
testlen(rparse.parse_response({}, "400'msg'")) testlen(language.parse_response({}, "400'msg'"))
testlen(rparse.parse_response({}, "400'msg':h'foo'='bar'")) testlen(language.parse_response({}, "400'msg':h'foo'='bar'"))
testlen(rparse.parse_response({}, "400'msg':h'foo'='bar':b@100b")) testlen(language.parse_response({}, "400'msg':h'foo'='bar':b@100b"))
def test_effective_length(self): def test_effective_length(self):
def testlen(x, actions): def testlen(x, actions):
@ -486,7 +486,7 @@ class TestResponse:
actions = [ actions = [
] ]
r = rparse.parse_response({}, "400'msg':b@100") r = language.parse_response({}, "400'msg':b@100")
actions = [ actions = [
(0, "disconnect"), (0, "disconnect"),
@ -508,7 +508,7 @@ class TestResponse:
testlen(r, actions) testlen(r, actions)
def test_render(self): def test_render(self):
r = rparse.parse_response({}, "400:p0,100:dr") r = language.parse_response({}, "400:p0,100:dr")
assert r.actions[0][1] == "pause" assert r.actions[0][1] == "pause"
assert len(r.preview_safe()) == 1 assert len(r.preview_safe()) == 1
assert not r.actions[0][1] == "pause" assert not r.actions[0][1] == "pause"
@ -516,14 +516,14 @@ class TestResponse:
def test_read_file(): def test_read_file():
tutils.raises(rparse.FileAccessDenied, rparse.read_file, {}, "=/foo") tutils.raises(language.FileAccessDenied, language.read_file, {}, "=/foo")
p = tutils.test_data.path("data") p = tutils.test_data.path("data")
d = dict(staticdir=p) d = dict(staticdir=p)
assert rparse.read_file(d, "+./file").strip() == "testfile" assert language.read_file(d, "+./file").strip() == "testfile"
assert rparse.read_file(d, "+file").strip() == "testfile" assert language.read_file(d, "+file").strip() == "testfile"
tutils.raises(rparse.FileAccessDenied, rparse.read_file, d, "+./nonexistent") tutils.raises(language.FileAccessDenied, language.read_file, d, "+./nonexistent")
tutils.raises(rparse.FileAccessDenied, rparse.read_file, d, "+/nonexistent") tutils.raises(language.FileAccessDenied, language.read_file, d, "+/nonexistent")
tutils.raises(rparse.FileAccessDenied, rparse.read_file, d, "+../test_rparse.py") tutils.raises(language.FileAccessDenied, language.read_file, d, "+../test_language.py")
d["unconstrained_file_access"] = True d["unconstrained_file_access"] = True
assert rparse.read_file(d, "+../test_rparse.py") assert language.read_file(d, "+../test_language.py")