mitmproxy/libpathod/language/websockets.py

167 lines
3.3 KiB
Python
Raw Normal View History

import netlib.websockets
import pyparsing as pp
from . import base, generators, actions, message
"""
wf:ctext:b'foo'
wf:c15:r'foo'
wf:fin:rsv1:rsv2:rsv3:mask
wf:-fin:-rsv1:-rsv2:-rsv3:-mask
wf:k"mask"
wf:l234
"""
class WF(base.CaselessLiteral):
TOK = "wf"
class OpCode(base.IntField):
2015-05-03 01:54:52 +00:00
names = {
"continue": netlib.websockets.OPCODE.CONTINUE,
"text": netlib.websockets.OPCODE.TEXT,
"binary": netlib.websockets.OPCODE.BINARY,
"close": netlib.websockets.OPCODE.CLOSE,
"ping": netlib.websockets.OPCODE.PING,
"pong": netlib.websockets.OPCODE.PONG,
}
max = 15
preamble = "c"
2015-05-03 00:54:25 +00:00
class Body(base.Value):
preamble = "b"
class Raw(base.CaselessLiteral):
TOK = "r"
class Fin(base.Boolean):
name = "fin"
class RSV1(base.Boolean):
name = "rsv1"
class RSV2(base.Boolean):
name = "rsv2"
class RSV3(base.Boolean):
name = "rsv3"
class Mask(base.Boolean):
name = "mask"
class Key(base.FixedLengthValue):
preamble = "k"
length = 4
class WebsocketFrame(message.Message):
comps = (
Body,
OpCode,
# Bit flags
Fin,
RSV1,
RSV2,
RSV3,
Mask,
actions.PauseAt,
actions.DisconnectAt,
actions.InjectAt,
Key,
Raw,
)
logattrs = ["body"]
@property
def actions(self):
return self.toks(actions._Action)
@property
def body(self):
return self.tok(Body)
2015-05-03 01:54:52 +00:00
@property
def opcode(self):
return self.tok(OpCode)
@property
def fin(self):
return self.tok(Fin)
@property
def rsv1(self):
return self.tok(RSV1)
@property
def rsv2(self):
return self.tok(RSV2)
@property
def rsv3(self):
return self.tok(RSV3)
@property
def mask(self):
return self.tok(Mask)
2015-05-03 01:54:52 +00:00
@classmethod
def expr(klass):
parts = [i.expr() for i in klass.comps]
atom = pp.MatchFirst(parts)
resp = pp.And(
[
WF.expr(),
base.Sep,
pp.ZeroOrMore(base.Sep + atom)
]
)
resp = resp.setParseAction(klass)
return resp
def values(self, settings):
vals = []
if self.body:
bodygen = self.body.value.get_generator(settings)
length = len(self.body.value.get_generator(settings))
else:
bodygen = None
length = 0
2015-05-03 01:54:52 +00:00
frameparts = dict(
mask = True,
payload_length = length
)
for i in ["opcode", "fin", "rsv1", "rsv2", "rsv3", "mask"]:
v = getattr(self, i, None)
if v is not None:
frameparts[i] = v.value
2015-05-03 01:54:52 +00:00
frame = netlib.websockets.FrameHeader(**frameparts)
vals = [frame.to_bytes()]
if self.body:
masker = netlib.websockets.Masker(frame.masking_key)
vals.append(
generators.TransformGenerator(
bodygen,
masker.mask
)
)
return vals
def resolve(self, settings, msg=None):
return self.__class__(
2015-05-04 23:16:29 +00:00
[i.resolve(settings, self) for i in self.tokens]
)
def spec(self):
return ":".join([i.spec() for i in self.tokens])