mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-27 02:24:18 +00:00
tests: extract language.writer and language.websocket
This commit is contained in:
parent
88eabfd8ef
commit
a6dbb82936
@ -470,99 +470,3 @@ class TestPauses:
|
||||
def test_freeze(self):
|
||||
l = base.PauseAt("r", 5)
|
||||
assert l.freeze({}).spec() == l.spec()
|
||||
|
||||
|
||||
class TestWebsocketFrame:
|
||||
def test_spec(self):
|
||||
e = websockets.WebsocketFrame.expr()
|
||||
wf = e.parseString("wf:b'foo'")
|
||||
assert wf
|
||||
|
||||
assert parse_request("wf:b'foo'")
|
||||
|
||||
def test_values(self):
|
||||
r = parse_request("wf:b'foo'")
|
||||
assert r.values(language.Settings())
|
||||
|
||||
|
||||
class TestWriteValues:
|
||||
def test_send_chunk(self):
|
||||
v = "foobarfoobar"
|
||||
for bs in range(1, len(v) + 2):
|
||||
s = cStringIO.StringIO()
|
||||
writer.send_chunk(s, v, bs, 0, len(v))
|
||||
assert s.getvalue() == v
|
||||
for start in range(len(v)):
|
||||
for end in range(len(v)):
|
||||
s = cStringIO.StringIO()
|
||||
writer.send_chunk(s, v, bs, start, end)
|
||||
assert s.getvalue() == v[start:end]
|
||||
|
||||
def test_write_values_inject(self):
|
||||
tst = "foo"
|
||||
|
||||
s = cStringIO.StringIO()
|
||||
writer.write_values(s, [tst], [(0, "inject", "aaa")], blocksize=5)
|
||||
assert s.getvalue() == "aaafoo"
|
||||
|
||||
s = cStringIO.StringIO()
|
||||
writer.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5)
|
||||
assert s.getvalue() == "faaaoo"
|
||||
|
||||
s = cStringIO.StringIO()
|
||||
writer.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5)
|
||||
assert s.getvalue() == "faaaoo"
|
||||
|
||||
def test_write_values_disconnects(self):
|
||||
s = cStringIO.StringIO()
|
||||
tst = "foo" * 100
|
||||
writer.write_values(s, [tst], [(0, "disconnect")], blocksize=5)
|
||||
assert not s.getvalue()
|
||||
|
||||
def test_write_values(self):
|
||||
tst = "foobarvoing"
|
||||
s = cStringIO.StringIO()
|
||||
writer.write_values(s, [tst], [])
|
||||
assert s.getvalue() == tst
|
||||
|
||||
for bs in range(1, len(tst) + 2):
|
||||
for off in range(len(tst)):
|
||||
s = cStringIO.StringIO()
|
||||
writer.write_values(
|
||||
s, [tst], [(off, "disconnect")], blocksize=bs
|
||||
)
|
||||
assert s.getvalue() == tst[:off]
|
||||
|
||||
def test_write_values_pauses(self):
|
||||
tst = "".join(str(i) for i in range(10))
|
||||
for i in range(2, 10):
|
||||
s = cStringIO.StringIO()
|
||||
writer.write_values(
|
||||
s, [tst], [(2, "pause", 0), (1, "pause", 0)], blocksize=i
|
||||
)
|
||||
assert s.getvalue() == tst
|
||||
|
||||
for i in range(2, 10):
|
||||
s = cStringIO.StringIO()
|
||||
writer.write_values(s, [tst], [(1, "pause", 0)], blocksize=i)
|
||||
assert s.getvalue() == tst
|
||||
|
||||
tst = ["".join(str(i) for i in range(10))] * 5
|
||||
for i in range(2, 10):
|
||||
s = cStringIO.StringIO()
|
||||
writer.write_values(s, tst[:], [(1, "pause", 0)], blocksize=i)
|
||||
assert s.getvalue() == "".join(tst)
|
||||
|
||||
def test_write_values_after(self):
|
||||
s = cStringIO.StringIO()
|
||||
r = language.parse_response("400:da")
|
||||
language.serve(r, s, {})
|
||||
|
||||
s = cStringIO.StringIO()
|
||||
r = language.parse_response("400:pa,0")
|
||||
language.serve(r, s, {})
|
||||
|
||||
s = cStringIO.StringIO()
|
||||
r = language.parse_response("400:ia,'xx'")
|
||||
language.serve(r, s, {})
|
||||
assert s.getvalue().endswith('xx')
|
||||
|
20
test/test_language_websocket.py
Normal file
20
test/test_language_websocket.py
Normal file
@ -0,0 +1,20 @@
|
||||
|
||||
from libpathod import language
|
||||
from libpathod.language import websockets
|
||||
|
||||
|
||||
def parse_request(s):
|
||||
return language.parse_requests(s)[0]
|
||||
|
||||
|
||||
class TestWebsocketFrame:
|
||||
def test_spec(self):
|
||||
e = websockets.WebsocketFrame.expr()
|
||||
wf = e.parseString("wf:b'foo'")
|
||||
assert wf
|
||||
|
||||
assert parse_request("wf:b'foo'")
|
||||
|
||||
def test_values(self):
|
||||
r = parse_request("wf:b'foo'")
|
||||
assert r.values(language.Settings())
|
91
test/test_language_writer.py
Normal file
91
test/test_language_writer.py
Normal file
@ -0,0 +1,91 @@
|
||||
import cStringIO
|
||||
|
||||
from libpathod import language
|
||||
from libpathod.language import writer
|
||||
|
||||
|
||||
def test_send_chunk():
|
||||
v = "foobarfoobar"
|
||||
for bs in range(1, len(v) + 2):
|
||||
s = cStringIO.StringIO()
|
||||
writer.send_chunk(s, v, bs, 0, len(v))
|
||||
assert s.getvalue() == v
|
||||
for start in range(len(v)):
|
||||
for end in range(len(v)):
|
||||
s = cStringIO.StringIO()
|
||||
writer.send_chunk(s, v, bs, start, end)
|
||||
assert s.getvalue() == v[start:end]
|
||||
|
||||
|
||||
def test_write_values_inject():
|
||||
tst = "foo"
|
||||
|
||||
s = cStringIO.StringIO()
|
||||
writer.write_values(s, [tst], [(0, "inject", "aaa")], blocksize=5)
|
||||
assert s.getvalue() == "aaafoo"
|
||||
|
||||
s = cStringIO.StringIO()
|
||||
writer.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5)
|
||||
assert s.getvalue() == "faaaoo"
|
||||
|
||||
s = cStringIO.StringIO()
|
||||
writer.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5)
|
||||
assert s.getvalue() == "faaaoo"
|
||||
|
||||
|
||||
def test_write_values_disconnects():
|
||||
s = cStringIO.StringIO()
|
||||
tst = "foo" * 100
|
||||
writer.write_values(s, [tst], [(0, "disconnect")], blocksize=5)
|
||||
assert not s.getvalue()
|
||||
|
||||
|
||||
def test_write_values():
|
||||
tst = "foobarvoing"
|
||||
s = cStringIO.StringIO()
|
||||
writer.write_values(s, [tst], [])
|
||||
assert s.getvalue() == tst
|
||||
|
||||
for bs in range(1, len(tst) + 2):
|
||||
for off in range(len(tst)):
|
||||
s = cStringIO.StringIO()
|
||||
writer.write_values(
|
||||
s, [tst], [(off, "disconnect")], blocksize=bs
|
||||
)
|
||||
assert s.getvalue() == tst[:off]
|
||||
|
||||
|
||||
def test_write_values_pauses():
|
||||
tst = "".join(str(i) for i in range(10))
|
||||
for i in range(2, 10):
|
||||
s = cStringIO.StringIO()
|
||||
writer.write_values(
|
||||
s, [tst], [(2, "pause", 0), (1, "pause", 0)], blocksize=i
|
||||
)
|
||||
assert s.getvalue() == tst
|
||||
|
||||
for i in range(2, 10):
|
||||
s = cStringIO.StringIO()
|
||||
writer.write_values(s, [tst], [(1, "pause", 0)], blocksize=i)
|
||||
assert s.getvalue() == tst
|
||||
|
||||
tst = ["".join(str(i) for i in range(10))] * 5
|
||||
for i in range(2, 10):
|
||||
s = cStringIO.StringIO()
|
||||
writer.write_values(s, tst[:], [(1, "pause", 0)], blocksize=i)
|
||||
assert s.getvalue() == "".join(tst)
|
||||
|
||||
|
||||
def test_write_values_after():
|
||||
s = cStringIO.StringIO()
|
||||
r = language.parse_response("400:da")
|
||||
language.serve(r, s, {})
|
||||
|
||||
s = cStringIO.StringIO()
|
||||
r = language.parse_response("400:pa,0")
|
||||
language.serve(r, s, {})
|
||||
|
||||
s = cStringIO.StringIO()
|
||||
r = language.parse_response("400:ia,'xx'")
|
||||
language.serve(r, s, {})
|
||||
assert s.getvalue().endswith('xx')
|
Loading…
Reference in New Issue
Block a user