diff --git a/test/test_language.py b/test/test_language.py index 159a6d5d8..ffa5e82c8 100644 --- a/test/test_language.py +++ b/test/test_language.py @@ -470,99 +470,3 @@ class TestPauses: def test_freeze(self): l = base.PauseAt("r", 5) assert l.freeze({}).spec() == l.spec() - - -class TestWebsocketFrame: - def test_spec(self): - e = websockets.WebsocketFrame.expr() - wf = e.parseString("wf:b'foo'") - assert wf - - assert parse_request("wf:b'foo'") - - def test_values(self): - r = parse_request("wf:b'foo'") - assert r.values(language.Settings()) - - -class TestWriteValues: - def test_send_chunk(self): - v = "foobarfoobar" - for bs in range(1, len(v) + 2): - s = cStringIO.StringIO() - writer.send_chunk(s, v, bs, 0, len(v)) - assert s.getvalue() == v - for start in range(len(v)): - for end in range(len(v)): - s = cStringIO.StringIO() - writer.send_chunk(s, v, bs, start, end) - assert s.getvalue() == v[start:end] - - def test_write_values_inject(self): - tst = "foo" - - s = cStringIO.StringIO() - writer.write_values(s, [tst], [(0, "inject", "aaa")], blocksize=5) - assert s.getvalue() == "aaafoo" - - s = cStringIO.StringIO() - writer.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5) - assert s.getvalue() == "faaaoo" - - s = cStringIO.StringIO() - writer.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5) - assert s.getvalue() == "faaaoo" - - def test_write_values_disconnects(self): - s = cStringIO.StringIO() - tst = "foo" * 100 - writer.write_values(s, [tst], [(0, "disconnect")], blocksize=5) - assert not s.getvalue() - - def test_write_values(self): - tst = "foobarvoing" - s = cStringIO.StringIO() - writer.write_values(s, [tst], []) - assert s.getvalue() == tst - - for bs in range(1, len(tst) + 2): - for off in range(len(tst)): - s = cStringIO.StringIO() - writer.write_values( - s, [tst], [(off, "disconnect")], blocksize=bs - ) - assert s.getvalue() == tst[:off] - - def test_write_values_pauses(self): - tst = "".join(str(i) for i in range(10)) - for i in range(2, 10): - s = cStringIO.StringIO() - writer.write_values( - s, [tst], [(2, "pause", 0), (1, "pause", 0)], blocksize=i - ) - assert s.getvalue() == tst - - for i in range(2, 10): - s = cStringIO.StringIO() - writer.write_values(s, [tst], [(1, "pause", 0)], blocksize=i) - assert s.getvalue() == tst - - tst = ["".join(str(i) for i in range(10))] * 5 - for i in range(2, 10): - s = cStringIO.StringIO() - writer.write_values(s, tst[:], [(1, "pause", 0)], blocksize=i) - assert s.getvalue() == "".join(tst) - - def test_write_values_after(self): - s = cStringIO.StringIO() - r = language.parse_response("400:da") - language.serve(r, s, {}) - - s = cStringIO.StringIO() - r = language.parse_response("400:pa,0") - language.serve(r, s, {}) - - s = cStringIO.StringIO() - r = language.parse_response("400:ia,'xx'") - language.serve(r, s, {}) - assert s.getvalue().endswith('xx') diff --git a/test/test_language_websocket.py b/test/test_language_websocket.py new file mode 100644 index 000000000..8a683d813 --- /dev/null +++ b/test/test_language_websocket.py @@ -0,0 +1,20 @@ + +from libpathod import language +from libpathod.language import websockets + + +def parse_request(s): + return language.parse_requests(s)[0] + + +class TestWebsocketFrame: + def test_spec(self): + e = websockets.WebsocketFrame.expr() + wf = e.parseString("wf:b'foo'") + assert wf + + assert parse_request("wf:b'foo'") + + def test_values(self): + r = parse_request("wf:b'foo'") + assert r.values(language.Settings()) diff --git a/test/test_language_writer.py b/test/test_language_writer.py new file mode 100644 index 000000000..406655d9a --- /dev/null +++ b/test/test_language_writer.py @@ -0,0 +1,91 @@ +import cStringIO + +from libpathod import language +from libpathod.language import writer + + +def test_send_chunk(): + v = "foobarfoobar" + for bs in range(1, len(v) + 2): + s = cStringIO.StringIO() + writer.send_chunk(s, v, bs, 0, len(v)) + assert s.getvalue() == v + for start in range(len(v)): + for end in range(len(v)): + s = cStringIO.StringIO() + writer.send_chunk(s, v, bs, start, end) + assert s.getvalue() == v[start:end] + + +def test_write_values_inject(): + tst = "foo" + + s = cStringIO.StringIO() + writer.write_values(s, [tst], [(0, "inject", "aaa")], blocksize=5) + assert s.getvalue() == "aaafoo" + + s = cStringIO.StringIO() + writer.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5) + assert s.getvalue() == "faaaoo" + + s = cStringIO.StringIO() + writer.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5) + assert s.getvalue() == "faaaoo" + + +def test_write_values_disconnects(): + s = cStringIO.StringIO() + tst = "foo" * 100 + writer.write_values(s, [tst], [(0, "disconnect")], blocksize=5) + assert not s.getvalue() + + +def test_write_values(): + tst = "foobarvoing" + s = cStringIO.StringIO() + writer.write_values(s, [tst], []) + assert s.getvalue() == tst + + for bs in range(1, len(tst) + 2): + for off in range(len(tst)): + s = cStringIO.StringIO() + writer.write_values( + s, [tst], [(off, "disconnect")], blocksize=bs + ) + assert s.getvalue() == tst[:off] + + +def test_write_values_pauses(): + tst = "".join(str(i) for i in range(10)) + for i in range(2, 10): + s = cStringIO.StringIO() + writer.write_values( + s, [tst], [(2, "pause", 0), (1, "pause", 0)], blocksize=i + ) + assert s.getvalue() == tst + + for i in range(2, 10): + s = cStringIO.StringIO() + writer.write_values(s, [tst], [(1, "pause", 0)], blocksize=i) + assert s.getvalue() == tst + + tst = ["".join(str(i) for i in range(10))] * 5 + for i in range(2, 10): + s = cStringIO.StringIO() + writer.write_values(s, tst[:], [(1, "pause", 0)], blocksize=i) + assert s.getvalue() == "".join(tst) + + +def test_write_values_after(): + s = cStringIO.StringIO() + r = language.parse_response("400:da") + language.serve(r, s, {}) + + s = cStringIO.StringIO() + r = language.parse_response("400:pa,0") + language.serve(r, s, {}) + + s = cStringIO.StringIO() + r = language.parse_response("400:ia,'xx'") + language.serve(r, s, {}) + assert s.getvalue().endswith('xx')