2017-02-01 15:48:46 +00:00
|
|
|
import pytest
|
|
|
|
|
2016-02-16 19:59:33 +00:00
|
|
|
from pathod import language
|
|
|
|
from pathod.language import websockets
|
2016-10-19 22:56:38 +00:00
|
|
|
import mitmproxy.net.websockets
|
2016-06-17 12:15:48 +00:00
|
|
|
|
2016-11-01 21:06:25 +00:00
|
|
|
from . import tservers
|
2015-05-02 05:10:25 +00:00
|
|
|
|
|
|
|
|
|
|
|
def parse_request(s):
|
2016-06-05 19:22:37 +00:00
|
|
|
return next(language.parse_pathoc(s))
|
2015-05-02 05:10:25 +00:00
|
|
|
|
|
|
|
|
|
|
|
class TestWebsocketFrame:
|
2015-06-18 16:12:11 +00:00
|
|
|
|
2015-06-04 06:14:25 +00:00
|
|
|
def _test_messages(self, specs, message_klass):
|
|
|
|
for i in specs:
|
|
|
|
wf = parse_request(i)
|
|
|
|
assert isinstance(wf, message_klass)
|
|
|
|
assert wf
|
|
|
|
assert wf.values(language.Settings())
|
|
|
|
assert wf.resolve(language.Settings())
|
|
|
|
|
|
|
|
spec = wf.spec()
|
|
|
|
wf2 = parse_request(spec)
|
|
|
|
assert wf2.spec() == spec
|
|
|
|
|
|
|
|
def test_server_values(self):
|
2015-05-02 05:19:48 +00:00
|
|
|
specs = [
|
|
|
|
"wf",
|
2015-05-04 23:16:29 +00:00
|
|
|
"wf:dr",
|
2015-05-03 01:54:52 +00:00
|
|
|
"wf:b'foo'",
|
2015-05-17 03:38:13 +00:00
|
|
|
"wf:mask:r'foo'",
|
2015-05-16 23:31:02 +00:00
|
|
|
"wf:l1024:b'foo'",
|
2015-05-03 01:54:52 +00:00
|
|
|
"wf:cbinary",
|
2015-05-03 22:48:35 +00:00
|
|
|
"wf:c1",
|
2015-05-16 23:04:53 +00:00
|
|
|
"wf:mask:knone",
|
2015-05-03 22:48:35 +00:00
|
|
|
"wf:fin",
|
|
|
|
"wf:fin:rsv1:rsv2:rsv3:mask",
|
|
|
|
"wf:-fin:-rsv1:-rsv2:-rsv3:-mask",
|
2015-05-15 21:42:47 +00:00
|
|
|
"wf:k@4",
|
2015-06-07 22:58:12 +00:00
|
|
|
"wf:x10",
|
2015-05-02 05:19:48 +00:00
|
|
|
]
|
2015-06-04 06:14:25 +00:00
|
|
|
self._test_messages(specs, websockets.WebsocketFrame)
|
2015-05-02 05:19:48 +00:00
|
|
|
|
2015-06-07 22:58:12 +00:00
|
|
|
def test_parse_websocket_frames(self):
|
|
|
|
wf = language.parse_websocket_frame("wf:x10")
|
|
|
|
assert len(list(wf)) == 10
|
2017-02-01 15:48:46 +00:00
|
|
|
with pytest.raises(language.ParseException):
|
|
|
|
language.parse_websocket_frame("wf:x")
|
2015-06-07 22:58:12 +00:00
|
|
|
|
2015-06-04 06:14:25 +00:00
|
|
|
def test_client_values(self):
|
|
|
|
specs = [
|
|
|
|
"wf:f'wf'",
|
|
|
|
]
|
|
|
|
self._test_messages(specs, websockets.WebsocketClientFrame)
|
2015-05-03 01:54:52 +00:00
|
|
|
|
2015-06-04 08:36:50 +00:00
|
|
|
def test_nested_frame(self):
|
|
|
|
wf = parse_request("wf:f'wf'")
|
|
|
|
assert wf.nested_frame
|
|
|
|
|
2015-05-03 22:48:35 +00:00
|
|
|
def test_flags(self):
|
|
|
|
wf = parse_request("wf:fin:mask:rsv1:rsv2:rsv3")
|
2016-11-01 21:06:25 +00:00
|
|
|
frm = mitmproxy.net.websockets.Frame.from_bytes(tservers.render(wf))
|
2015-05-03 22:48:35 +00:00
|
|
|
assert frm.header.fin
|
|
|
|
assert frm.header.mask
|
|
|
|
assert frm.header.rsv1
|
|
|
|
assert frm.header.rsv2
|
|
|
|
assert frm.header.rsv3
|
|
|
|
|
|
|
|
wf = parse_request("wf:-fin:-mask:-rsv1:-rsv2:-rsv3")
|
2016-11-01 21:06:25 +00:00
|
|
|
frm = mitmproxy.net.websockets.Frame.from_bytes(tservers.render(wf))
|
2015-05-03 22:48:35 +00:00
|
|
|
assert not frm.header.fin
|
|
|
|
assert not frm.header.mask
|
|
|
|
assert not frm.header.rsv1
|
|
|
|
assert not frm.header.rsv2
|
|
|
|
assert not frm.header.rsv3
|
|
|
|
|
2015-05-15 23:31:53 +00:00
|
|
|
def fr(self, spec, **kwargs):
|
|
|
|
settings = language.base.Settings(**kwargs)
|
|
|
|
wf = parse_request(spec)
|
2016-11-01 21:06:25 +00:00
|
|
|
return mitmproxy.net.websockets.Frame.from_bytes(tservers.render(wf, settings))
|
2015-05-03 01:54:52 +00:00
|
|
|
|
2015-05-15 23:31:53 +00:00
|
|
|
def test_construction(self):
|
|
|
|
assert self.fr("wf:c1").header.opcode == 1
|
|
|
|
assert self.fr("wf:c0").header.opcode == 0
|
|
|
|
assert self.fr("wf:cbinary").header.opcode ==\
|
2016-10-19 22:56:38 +00:00
|
|
|
mitmproxy.net.websockets.OPCODE.BINARY
|
2015-05-15 23:31:53 +00:00
|
|
|
assert self.fr("wf:ctext").header.opcode ==\
|
2016-10-19 22:56:38 +00:00
|
|
|
mitmproxy.net.websockets.OPCODE.TEXT
|
2015-05-15 21:42:47 +00:00
|
|
|
|
2015-05-17 03:38:13 +00:00
|
|
|
def test_rawbody(self):
|
|
|
|
frm = self.fr("wf:mask:r'foo'")
|
|
|
|
assert len(frm.payload) == 3
|
2016-06-05 19:23:03 +00:00
|
|
|
assert frm.payload != b"foo"
|
2015-05-17 03:38:13 +00:00
|
|
|
|
2016-06-05 19:23:03 +00:00
|
|
|
assert self.fr("wf:r'foo'").payload == b"foo"
|
2015-05-17 03:38:13 +00:00
|
|
|
|
2016-05-15 17:39:39 +00:00
|
|
|
def test_construction_2(self):
|
2015-05-15 23:31:53 +00:00
|
|
|
# Simple server frame
|
|
|
|
frm = self.fr("wf:b'foo'")
|
|
|
|
assert not frm.header.mask
|
|
|
|
assert not frm.header.masking_key
|
|
|
|
|
|
|
|
# Simple client frame
|
|
|
|
frm = self.fr("wf:b'foo'", is_client=True)
|
|
|
|
assert frm.header.mask
|
|
|
|
assert frm.header.masking_key
|
|
|
|
frm = self.fr("wf:b'foo':k'abcd'", is_client=True)
|
|
|
|
assert frm.header.mask
|
2016-06-05 19:23:03 +00:00
|
|
|
assert frm.header.masking_key == b'abcd'
|
2015-05-15 23:31:53 +00:00
|
|
|
|
|
|
|
# Server frame, mask explicitly set
|
|
|
|
frm = self.fr("wf:b'foo':mask")
|
|
|
|
assert frm.header.mask
|
|
|
|
assert frm.header.masking_key
|
|
|
|
frm = self.fr("wf:b'foo':k'abcd'")
|
|
|
|
assert frm.header.mask
|
2016-06-05 19:23:03 +00:00
|
|
|
assert frm.header.masking_key == b'abcd'
|
2015-05-15 23:31:53 +00:00
|
|
|
|
|
|
|
# Client frame, mask explicitly unset
|
|
|
|
frm = self.fr("wf:b'foo':-mask", is_client=True)
|
|
|
|
assert not frm.header.mask
|
|
|
|
assert not frm.header.masking_key
|
|
|
|
|
|
|
|
frm = self.fr("wf:b'foo':-mask:k'abcd'", is_client=True)
|
|
|
|
assert not frm.header.mask
|
|
|
|
# We're reading back a corrupted frame - the first 3 characters of the
|
|
|
|
# mask is mis-interpreted as the payload
|
2016-06-05 19:23:03 +00:00
|
|
|
assert frm.payload == b"abc"
|
2015-05-16 23:04:53 +00:00
|
|
|
|
|
|
|
def test_knone(self):
|
2017-02-06 16:48:44 +00:00
|
|
|
with pytest.raises(Exception, match="Expected 4 bytes"):
|
2015-09-21 21:03:45 +00:00
|
|
|
self.fr("wf:b'foo':mask:knone")
|
2015-05-16 23:31:02 +00:00
|
|
|
|
|
|
|
def test_length(self):
|
|
|
|
assert self.fr("wf:l3:b'foo'").header.payload_length == 3
|
|
|
|
frm = self.fr("wf:l2:b'foo'")
|
|
|
|
assert frm.header.payload_length == 2
|
2016-06-05 19:23:03 +00:00
|
|
|
assert frm.payload == b"fo"
|
2017-02-06 16:48:44 +00:00
|
|
|
with pytest.raises(Exception, match="Expected 1024 bytes"):
|
2017-02-01 15:48:46 +00:00
|
|
|
self.fr("wf:l1024:b'foo'")
|