mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2025-02-01 07:49:10 +00:00
Add an injection operator.
This commit is contained in:
parent
2bdbbaa8af
commit
d7841898e3
@ -66,6 +66,8 @@ def write_values(fp, vals, actions, sofar=0, skip=0, blocksize=BLOCKSIZE):
|
||||
offset += send_chunk(fp, v, blocksize, offset, a[0]-sofar-offset)
|
||||
if a[1] == "pause":
|
||||
time.sleep(a[2])
|
||||
elif a[1] == "inject":
|
||||
send_chunk(fp, a[2], blocksize, 0, len(a[2]))
|
||||
elif a[1] == "disconnect":
|
||||
return True
|
||||
send_chunk(fp, v, blocksize, offset, len(v))
|
||||
@ -409,6 +411,27 @@ class DisconnectAt:
|
||||
return e.setParseAction(lambda x: klass(*x))
|
||||
|
||||
|
||||
class InjectAt:
|
||||
def __init__(self, offset, value):
|
||||
self.offset, self.value = offset, value
|
||||
|
||||
@classmethod
|
||||
def expr(klass):
|
||||
e = pp.Literal("i").suppress()
|
||||
e = e + pp.MatchFirst(
|
||||
[
|
||||
v_integer,
|
||||
pp.Literal("r")
|
||||
]
|
||||
)
|
||||
e += pp.Literal(",").suppress()
|
||||
e += Value
|
||||
return e.setParseAction(lambda x: klass(*x))
|
||||
|
||||
def accept(self, settings, r):
|
||||
r.actions.append((self.offset, "inject", self.value))
|
||||
|
||||
|
||||
class Header:
|
||||
def __init__(self, key, value):
|
||||
self.key, self.value = key, value
|
||||
@ -512,6 +535,7 @@ class Response(Message):
|
||||
Header,
|
||||
PauseAt,
|
||||
DisconnectAt,
|
||||
InjectAt,
|
||||
ShortcutContentType,
|
||||
ShortcutLocation,
|
||||
)
|
||||
@ -551,6 +575,7 @@ class Request(Message):
|
||||
Header,
|
||||
PauseAt,
|
||||
DisconnectAt,
|
||||
InjectAt,
|
||||
ShortcutContentType,
|
||||
)
|
||||
logattrs = ["method", "path"]
|
||||
|
@ -169,6 +169,23 @@ class TestDisconnects:
|
||||
assert v.value == "r"
|
||||
|
||||
|
||||
class TestInject:
|
||||
def test_parse_response(self):
|
||||
a = rparse.parse_response({}, "400:ir,@100").actions[0]
|
||||
assert a[0] == "r"
|
||||
assert a[1] == "inject"
|
||||
|
||||
def test_at(self):
|
||||
e = rparse.InjectAt.expr()
|
||||
v = e.parseString("i0,'foo'")[0]
|
||||
assert v.value.val == "foo"
|
||||
assert v.offset == 0
|
||||
assert isinstance(v, rparse.InjectAt)
|
||||
|
||||
v = e.parseString("ir,'foo'")[0]
|
||||
assert v.offset == "r"
|
||||
|
||||
|
||||
class TestShortcuts:
|
||||
def test_parse_response(self):
|
||||
assert rparse.parse_response({}, "400:c'foo'").headers[0][0][:] == "Content-Type"
|
||||
@ -262,6 +279,21 @@ class TestWriteValues:
|
||||
rparse.send_chunk(s, v, bs, start, end)
|
||||
assert s.getvalue() == v[start:end]
|
||||
|
||||
def test_write_values_inject(self):
|
||||
tst = "foo"
|
||||
|
||||
s = cStringIO.StringIO()
|
||||
rparse.write_values(s, [tst], [(0, "inject", "aaa")], blocksize=5)
|
||||
assert s.getvalue() == "aaafoo"
|
||||
|
||||
s = cStringIO.StringIO()
|
||||
rparse.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5)
|
||||
assert s.getvalue() == "faaaoo"
|
||||
|
||||
s = cStringIO.StringIO()
|
||||
rparse.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5)
|
||||
assert s.getvalue() == "faaaoo"
|
||||
|
||||
def test_write_values_disconnects(self):
|
||||
s = cStringIO.StringIO()
|
||||
tst = "foo"*100
|
||||
|
Loading…
Reference in New Issue
Block a user