improve flowfilter

This commit is contained in:
Maximilian Hils 2019-11-15 02:27:33 +01:00
parent cd660a035f
commit 2239c49e18

View File

@ -32,19 +32,17 @@
rex Equivalent to ~u rex rex Equivalent to ~u rex
""" """
import functools
import re import re
import sys import sys
import functools from typing import Callable, ClassVar, Optional, Sequence, Type
from mitmproxy import http
from mitmproxy import websocket
from mitmproxy import tcp
from mitmproxy import flow
from mitmproxy.utils import strutils
import pyparsing as pp import pyparsing as pp
from typing import Callable, Sequence, Type, Optional, ClassVar
from mitmproxy import flow
from mitmproxy import http
from mitmproxy import tcp
from mitmproxy import websocket
def only(*types): def only(*types):
@ -54,7 +52,9 @@ def only(*types):
if isinstance(flow, types): if isinstance(flow, types):
return fn(self, flow) return fn(self, flow)
return False return False
return filter_types return filter_types
return decorator return decorator
@ -146,10 +146,10 @@ class _Rex(_Action):
def __init__(self, expr): def __init__(self, expr):
self.expr = expr self.expr = expr
if self.is_binary: if self.is_binary:
expr = strutils.escaped_str_to_bytes(expr) expr = expr.encode()
try: try:
self.re = re.compile(expr, self.flags) self.re = re.compile(expr, self.flags)
except: except Exception:
raise ValueError("Cannot compile expression.") raise ValueError("Cannot compile expression.")
@ -336,6 +336,7 @@ class FUrl(_Rex):
code = "u" code = "u"
help = "URL" help = "URL"
is_binary = False is_binary = False
# FUrl is special, because it can be "naked". # FUrl is special, because it can be "naked".
@classmethod @classmethod
@ -469,68 +470,51 @@ def _make():
# Order is important - multi-char expressions need to come before narrow # Order is important - multi-char expressions need to come before narrow
# ones. # ones.
parts = [] parts = []
for klass in filter_unary: for cls in filter_unary:
f = pp.Literal("~%s" % klass.code) + pp.WordEnd() f = pp.Literal(f"~{cls.code}") + pp.WordEnd()
f.setParseAction(klass.make) f.setParseAction(cls.make)
parts.append(f) parts.append(f)
simplerex = "".join(c for c in pp.printables if c not in "()~'\"") # This is a bit of a hack to simulate Word(pyparsing_unicode.printables),
alphdevanagari = pp.pyparsing_unicode.Devanagari.alphas # which has a horrible performance with len(pyparsing.pyparsing_unicode.printables) == 1114060
alphcyrillic = pp.pyparsing_unicode.Cyrillic.alphas unicode_words = pp.CharsNotIn("()~'\"" + pp.ParserElement.DEFAULT_WHITE_CHARS)
alphgreek = pp.pyparsing_unicode.Greek.alphas unicode_words.skipWhitespace = True
alphchinese = pp.pyparsing_unicode.Chinese.alphas regex = (
alpharabic = pp.pyparsing_unicode.Arabic.alphas unicode_words
alphhebrew = pp.pyparsing_unicode.Hebrew.alphas | pp.QuotedString('"', escChar='\\')
alphjapanese = pp.pyparsing_unicode.Japanese.alphas | pp.QuotedString("'", escChar='\\')
alphkorean = pp.pyparsing_unicode.Korean.alphas )
alphlatin1 = pp.pyparsing_unicode.Latin1.alphas for cls in filter_rex:
alphlatinA = pp.pyparsing_unicode.LatinA.alphas f = pp.Literal(f"~{cls.code}") + pp.WordEnd() + regex.copy()
alphlatinB = pp.pyparsing_unicode.LatinB.alphas f.setParseAction(cls.make)
rex = pp.Word(simplerex) |\
pp.Word(alphcyrillic) |\
pp.Word(alphgreek) |\
pp.Word(alphchinese) |\
pp.Word(alpharabic) |\
pp.Word(alphdevanagari) |\
pp.Word(alphhebrew) |\
pp.Word(alphjapanese) |\
pp.Word(alphkorean) |\
pp.Word(alphlatin1) |\
pp.Word(alphlatinA) |\
pp.Word(alphlatinB) |\
pp.QuotedString("\"", escChar='\\') |\
pp.QuotedString("'", escChar='\\')
for klass in filter_rex:
f = pp.Literal("~%s" % klass.code) + pp.WordEnd() + rex.copy()
f.setParseAction(klass.make)
parts.append(f) parts.append(f)
for klass in filter_int: for cls in filter_int:
f = pp.Literal("~%s" % klass.code) + pp.WordEnd() + pp.Word(pp.nums) f = pp.Literal(f"~{cls.code}") + pp.WordEnd() + pp.Word(pp.nums)
f.setParseAction(klass.make) f.setParseAction(cls.make)
parts.append(f) parts.append(f)
# A naked rex is a URL rex: # A naked rex is a URL rex:
f = rex.copy() f = regex.copy()
f.setParseAction(FUrl.make) f.setParseAction(FUrl.make)
parts.append(f) parts.append(f)
atom = pp.MatchFirst(parts) atom = pp.MatchFirst(parts)
expr = pp.operatorPrecedence(atom, expr = pp.infixNotation(
[(pp.Literal("!").suppress(), atom,
1, [(pp.Literal("!").suppress(),
pp.opAssoc.RIGHT, 1,
lambda x: FNot(*x)), pp.opAssoc.RIGHT,
(pp.Literal("&").suppress(), lambda x: FNot(*x)),
2, (pp.Literal("&").suppress(),
pp.opAssoc.LEFT, 2,
lambda x: FAnd(*x)), pp.opAssoc.LEFT,
(pp.Literal("|").suppress(), lambda x: FAnd(*x)),
2, (pp.Literal("|").suppress(),
pp.opAssoc.LEFT, 2,
lambda x: FOr(*x)), pp.opAssoc.LEFT,
]) lambda x: FOr(*x)),
])
expr = pp.OneOrMore(expr) expr = pp.OneOrMore(expr)
return expr.setParseAction(lambda x: FAnd(x) if len(x) != 1 else x) return expr.setParseAction(lambda x: FAnd(x) if len(x) != 1 else x)
@ -570,15 +554,15 @@ def match(flt, flow):
help = [] help = []
for a in filter_unary: for a in filter_unary:
help.append( help.append(
("~%s" % a.code, a.help) (f"~{a.code}", a.help)
) )
for b in filter_rex: for b in filter_rex:
help.append( help.append(
("~%s regex" % b.code, b.help) (f"~{b.code} regex", b.help)
) )
for c in filter_int: for c in filter_int:
help.append( help.append(
("~%s int" % c.code, c.help) (f"~{c.code} int", c.help)
) )
help.sort() help.sort()
help.extend( help.extend(