Extend Action and Value classes

- Values now know how to print their own specs
- Actions now know how to print their own specs
- Actions have a resolve_offset method that resolves relative and random offsets.
This commit is contained in:
Aldo Cortesi 2012-10-24 11:32:53 +13:00
parent e83392bfc8
commit c684f7417d
2 changed files with 171 additions and 71 deletions

View File

@ -1,4 +1,4 @@
import operator, string, random, mmap, os, time
import operator, string, random, mmap, os, time, copy
from email.utils import formatdate
import contrib.pyparsing as pp
from netlib import http_status, tcp
@ -189,7 +189,7 @@ class _Value:
return LiteralGenerator(self.val)
def __repr__(self):
return self.val
return self.spec()
class ValueLiteral(_Value):
@ -198,6 +198,9 @@ class ValueLiteral(_Value):
e = v_literal.copy()
return e.setParseAction(lambda x: klass(*x))
def spec(self):
return '"%s"'%self.val.encode("string_escape")
class ValueNakedLiteral(_Value):
@classmethod
@ -205,6 +208,9 @@ class ValueNakedLiteral(_Value):
e = v_naked_literal.copy()
return e.setParseAction(lambda x: klass(*x))
def spec(self):
return self.val.encode("string_escape")
class ValueGenerate:
def __init__(self, usize, unit, datatype):
@ -230,8 +236,16 @@ class ValueGenerate:
e += pp.Optional(s, default="bytes")
return e.setParseAction(lambda x: klass(*x))
def __str__(self):
return "@%s%s,%s"%(self.usize, self.unit, self.datatype)
def spec(self):
s = "@%s"%self.usize
if self.unit != "b":
s += self.unit
if self.datatype != "bytes":
s += ",%s"%self.datatype
return s
def __repr__(self):
return self.spec()
class ValueFile:
@ -259,8 +273,8 @@ class ValueFile:
raise FileAccessDenied("File not readable")
return FileGenerator(s)
def __str__(self):
return "<%s"%(self.path)
def spec(self):
return '<"%s"'%self.path.encode("string_escape")
Value = pp.MatchFirst(
@ -410,9 +424,24 @@ class _Action:
def __init__(self, offset):
self.offset = offset
def resolve_offset(self, msg):
"""
Resolves offset specifications to a numeric offset. Returns a copy
of the action object.
"""
c = copy.copy(self)
if c.offset == "r":
c.offset = random.randrange(msg.length())
elif c.offset == "a":
c.offset = msg.length() + 1
return c
def __cmp__(self, other):
return cmp(self.offset, other.offset)
def __repr__(self):
return self.spec()
class PauseAt(_Action):
def __init__(self, offset, seconds):
@ -432,6 +461,9 @@ class PauseAt(_Action):
)
return e.setParseAction(lambda x: klass(*x))
def spec(self):
return "p%s,%s"%(self.offset, self.seconds)
def accept(self, settings, r):
r.actions.append((self.offset, "pause", self.seconds))
@ -449,6 +481,9 @@ class DisconnectAt(_Action):
e += Offset
return e.setParseAction(lambda x: klass(*x))
def spec(self):
return "d%s"%self.offset
class InjectAt(_Action):
def __init__(self, offset, value):
@ -463,6 +498,9 @@ class InjectAt(_Action):
e += Value
return e.setParseAction(lambda x: klass(*x))
def spec(self):
return "i%s,%s"%(self.offset, self.value.spec())
def accept(self, settings, r):
r.actions.append(
(

View File

@ -5,6 +5,106 @@ import tutils
language.TESTING = True
class TestValueNakedLiteral:
def test_expr(self):
v = language.ValueNakedLiteral("foo")
assert v.expr()
def test_spec(self):
v = language.ValueNakedLiteral("foo")
assert v.spec() == repr(v) == "foo"
v = language.ValueNakedLiteral("f\x00oo")
assert v.spec() == repr(v) == r"f\x00oo"
class TestValueLiteral:
def test_espr(self):
v = language.ValueLiteral("foo")
assert v.expr()
assert v.val == "foo"
v = language.ValueLiteral(r"foo\n")
assert v.expr()
assert v.val == "foo\n"
assert repr(v)
def test_spec(self):
v = language.ValueLiteral("foo")
assert v.spec() == r'"foo"'
v = language.ValueLiteral("f\x00oo")
assert v.spec() == repr(v) == r'"f\x00oo"'
class TestValueGenerate:
def test_basic(self):
v = language.Value.parseString("@10b")[0]
assert v.usize == 10
assert v.unit == "b"
assert v.bytes() == 10
v = language.Value.parseString("@10")[0]
assert v.unit == "b"
v = language.Value.parseString("@10k")[0]
assert v.bytes() == 10240
v = language.Value.parseString("@10g")[0]
assert v.bytes() == 1024**3 * 10
v = language.Value.parseString("@10g,digits")[0]
assert v.datatype == "digits"
g = v.get_generator({})
assert g[:100]
v = language.Value.parseString("@10,digits")[0]
assert v.unit == "b"
assert v.datatype == "digits"
def test_spec(self):
v = language.ValueGenerate(1, "b", "bytes")
assert v.spec() == repr(v) == "@1"
v = language.ValueGenerate(1, "k", "bytes")
assert v.spec() == repr(v) == "@1k"
v = language.ValueGenerate(1, "k", "ascii")
assert v.spec() == repr(v) == "@1k,ascii"
v = language.ValueGenerate(1, "b", "ascii")
assert v.spec() == repr(v) == "@1,ascii"
class TestValueFile:
def test_file_value(self):
v = language.Value.parseString("<'one two'")[0]
assert str(v)
assert v.path == "one two"
v = language.Value.parseString("<path")[0]
assert v.path == "path"
def test_access_control(self):
v = language.Value.parseString("<path")[0]
with tutils.tmpdir() as t:
p = os.path.join(t, "path")
f = open(p, "w")
f.write("x"*10000)
f.close()
assert v.get_generator(dict(staticdir=t))
v = language.Value.parseString("<path2")[0]
tutils.raises(language.FileAccessDenied, v.get_generator, dict(staticdir=t))
tutils.raises("access disabled", v.get_generator, dict())
v = language.Value.parseString("</outside")[0]
tutils.raises("outside", v.get_generator, dict(staticdir=t))
def test_spec(self):
v = language.Value.parseString("<'one two'")[0]
v2 = language.Value.parseString(v.spec())[0]
assert v2.path == "one two"
class TestMisc:
def test_generators(self):
v = language.Value.parseString("'val'")[0]
@ -40,65 +140,6 @@ class TestMisc:
assert g[0:5] == "xxxxx"
assert repr(g)
def test_valueliteral(self):
v = language.ValueLiteral("foo")
assert v.expr()
assert v.val == "foo"
v = language.ValueLiteral(r"foo\n")
assert v.expr()
assert v.val == "foo\n"
assert repr(v)
def test_valuenakedliteral(self):
v = language.ValueNakedLiteral("foo")
assert v.expr()
assert repr(v)
def test_file_value(self):
v = language.Value.parseString("<'one two'")[0]
assert str(v)
assert v.path == "one two"
v = language.Value.parseString("<path")[0]
assert v.path == "path"
with tutils.tmpdir() as t:
p = os.path.join(t, "path")
f = open(p, "w")
f.write("x"*10000)
f.close()
assert v.get_generator(dict(staticdir=t))
v = language.Value.parseString("<path2")[0]
tutils.raises(language.FileAccessDenied, v.get_generator, dict(staticdir=t))
tutils.raises("access disabled", v.get_generator, dict())
v = language.Value.parseString("</outside")[0]
tutils.raises("outside", v.get_generator, dict(staticdir=t))
def test_generated_value(self):
v = language.Value.parseString("@10b")[0]
assert v.usize == 10
assert v.unit == "b"
assert v.bytes() == 10
v = language.Value.parseString("@10")[0]
assert v.unit == "b"
v = language.Value.parseString("@10k")[0]
assert v.bytes() == 10240
v = language.Value.parseString("@10g")[0]
assert v.bytes() == 1024**3 * 10
v = language.Value.parseString("@10g,digits")[0]
assert v.datatype == "digits"
g = v.get_generator({})
assert g[:100]
v = language.Value.parseString("@10,digits")[0]
assert v.unit == "b"
assert v.datatype == "digits"
def test_value(self):
assert language.Value.parseString("'val'")[0].val == "val"
assert language.Value.parseString('"val"')[0].val == "val"
@ -126,7 +167,7 @@ class TestMisc:
assert v.value.val == "foo"
v = e.parseString("b@100")[0]
assert str(v.value) == "@100b,bytes"
assert str(v.value) == "@100"
v = e.parseString("b@100g,digits", parseAll=True)[0]
assert v.value.datatype == "digits"
@ -179,6 +220,12 @@ class Test_Action:
l.sort()
assert l[0].offset == 0
def test_resolve_offset(self):
r = language.parse_request({}, 'GET:"/foo"')
e = language.DisconnectAt("r")
ret = e.resolve_offset(r)
assert isinstance(ret.offset, int)
class TestDisconnects:
def test_parse_response(self):
@ -198,6 +245,10 @@ class TestDisconnects:
v = e.parseString("dr")[0]
assert v.offset == "r"
def test_spec(self):
assert language.DisconnectAt("r").spec() == "dr"
assert language.DisconnectAt(10).spec() == "d10"
class TestInject:
def test_parse_response(self):
@ -224,11 +275,11 @@ class TestInject:
r = language.parse_response({}, "400:i0,'foo'")
assert r.serve(s, None)
class TestShortcuts:
def test_parse_response(self):
assert language.parse_response({}, "400:c'foo'").headers[0][0][:] == "Content-Type"
assert language.parse_response({}, "400:l'foo'").headers[0][0][:] == "Location"
def test_spec(self):
e = language.InjectAt.expr()
v = e.parseString("i0,'foo'")[0]
assert v.spec() == 'i0,"foo"'
class TestPauses:
@ -251,6 +302,17 @@ class TestPauses:
r = language.parse_response({}, '400:p10,10')
assert r.actions[0] == (10, "pause", 10)
def test_spec(self):
assert language.PauseAt("r", 5).spec() == "pr,5"
assert language.PauseAt(0, 5).spec() == "p0,5"
assert language.PauseAt(0, "f").spec() == "p0,f"
class TestShortcuts:
def test_parse_response(self):
assert language.parse_response({}, "400:c'foo'").headers[0][0][:] == "Content-Type"
assert language.parse_response({}, "400:l'foo'").headers[0][0][:] == "Location"
class TestParseRequest:
def test_file(self):