From 1a7ce384dac5099308b68e629c88e7b81ad44866 Mon Sep 17 00:00:00 2001 From: Thomas Kriechbaumer Date: Sun, 17 Dec 2017 18:13:35 +0100 Subject: [PATCH] websocket: support domain and url filters --- mitmproxy/flowfilter.py | 11 +++++++---- mitmproxy/test/tflow.py | 2 +- test/mitmproxy/test_flowfilter.py | 14 ++++++++++++++ 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/mitmproxy/flowfilter.py b/mitmproxy/flowfilter.py index 23e47e2ba..12f2bac80 100644 --- a/mitmproxy/flowfilter.py +++ b/mitmproxy/flowfilter.py @@ -322,14 +322,15 @@ class FDomain(_Rex): flags = re.IGNORECASE is_binary = False - @only(http.HTTPFlow) + @only(http.HTTPFlow, websocket.WebSocketFlow) def __call__(self, f): + if isinstance(f, websocket.WebSocketFlow): + f = f.handshake_flow return bool( self.re.search(f.request.host) or self.re.search(f.request.pretty_host) ) - class FUrl(_Rex): code = "u" help = "URL" @@ -342,9 +343,11 @@ class FUrl(_Rex): toks = toks[1:] return klass(*toks) - @only(http.HTTPFlow) + @only(http.HTTPFlow, websocket.WebSocketFlow) def __call__(self, f): - if not f.request: + if isinstance(f, websocket.WebSocketFlow): + f = f.handshake_flow + if not f or not f.request: return False return self.re.search(f.request.pretty_url) diff --git a/mitmproxy/test/tflow.py b/mitmproxy/test/tflow.py index c3dab30ce..cb2b940a7 100644 --- a/mitmproxy/test/tflow.py +++ b/mitmproxy/test/tflow.py @@ -44,7 +44,7 @@ def twebsocketflow(client_conn=True, server_conn=True, messages=True, err=None, "GET", "http", "example.com", - "80", + 80, "/ws", "HTTP/1.1", headers=net_http.Headers( diff --git a/test/mitmproxy/test_flowfilter.py b/test/mitmproxy/test_flowfilter.py index c411258a6..4eb37d813 100644 --- a/test/mitmproxy/test_flowfilter.py +++ b/test/mitmproxy/test_flowfilter.py @@ -420,6 +420,20 @@ class TestMatchingWebSocketFlow: e = self.err() assert self.q("~e", e) + def test_domain(self): + q = self.flow() + assert self.q("~d example.com", q) + assert not self.q("~d none", q) + + def test_url(self): + q = self.flow() + assert self.q("~u example.com", q) + assert self.q("~u example.com/ws", q) + assert not self.q("~u moo/path", q) + + q.handshake_flow = None + assert not self.q("~u example.com", q) + def test_body(self): f = self.flow()