mitmproxy/pathod/language/message.py

145 lines
4.1 KiB
Python
Raw Normal View History

import abc
import typing # noqa
2017-05-26 14:22:13 +00:00
import pyparsing as pp
from mitmproxy.utils import strutils
from . import actions, exceptions, base
LOG_TRUNCATE = 1024
2016-10-17 04:29:45 +00:00
class Message:
__metaclass__ = abc.ABCMeta
logattrs: typing.List[str] = []
def __init__(self, tokens):
track = set([])
for i in tokens:
if i.unique_name:
if i.unique_name in track:
raise exceptions.ParseException(
"Message has multiple %s clauses, "
"but should only have one." % i.unique_name,
0, 0
)
else:
track.add(i.unique_name)
self.tokens = tokens
2015-05-17 04:42:59 +00:00
def strike_token(self, name):
toks = [i for i in self.tokens if i.unique_name != name]
return self.__class__(toks)
def toks(self, klass):
"""
Fetch all tokens that are instances of klass
"""
return [i for i in self.tokens if isinstance(i, klass)]
def tok(self, klass):
"""
Fetch first token that is an instance of klass
"""
l = self.toks(klass)
if l:
return l[0]
def length(self, settings):
"""
Calculate the length of the base message without any applied
actions.
"""
return sum(len(x) for x in self.values(settings))
def preview_safe(self):
"""
Return a copy of this message that is safe for previews.
"""
tokens = [i for i in self.tokens if not isinstance(i, actions.PauseAt)]
return self.__class__(tokens)
def maximum_length(self, settings):
"""
Calculate the maximum length of the base message with all applied
actions.
"""
l = self.length(settings)
for i in self.actions:
if isinstance(i, actions.InjectAt):
l += len(i.value.get_generator(settings))
return l
@classmethod
2015-06-18 09:07:33 +00:00
def expr(cls): # pragma: no cover
pass
def log(self, settings):
"""
A dictionary that should be logged if this message is served.
"""
ret = {}
for i in self.logattrs:
v = getattr(self, i)
# Careful not to log any VALUE specs without sanitizing them first.
# We truncate at 1k.
if hasattr(v, "values"):
v = [x[:LOG_TRUNCATE] for x in v.values(settings)]
v = strutils.bytes_to_escaped_str(b"".join(v))
elif hasattr(v, "__len__"):
v = v[:LOG_TRUNCATE]
v = strutils.bytes_to_escaped_str(v)
ret[i] = v
ret["spec"] = self.spec()
return ret
def freeze(self, settings):
r = self.resolve(settings)
return self.__class__([i.freeze(settings) for i in r.tokens])
def __repr__(self):
return self.spec()
2017-05-26 14:22:13 +00:00
class NestedMessage(base.Token):
"""
A nested message, as an escaped string with a preamble.
"""
preamble = ""
nest_type: typing.Optional[typing.Type[Message]] = None
2017-05-26 14:22:13 +00:00
def __init__(self, value):
super().__init__()
self.value = value
try:
self.parsed = self.nest_type(
self.nest_type.expr().parseString(
value.val.decode(),
parseAll=True
)
)
except pp.ParseException as v:
raise exceptions.ParseException(v.msg, v.line, v.col)
@classmethod
def expr(cls):
e = pp.Literal(cls.preamble).suppress()
e = e + base.TokValueLiteral.expr()
return e.setParseAction(lambda x: cls(*x))
def values(self, settings):
return [
self.value.get_generator(settings),
]
def spec(self):
return "%s%s" % (self.preamble, self.value.spec())
def freeze(self, settings):
f = self.parsed.freeze(settings).spec()
return self.__class__(
base.TokValueLiteral(
strutils.bytes_to_escaped_str(f.encode(), escape_single_quotes=True)
)
)