From da07cb78a6659328ef4a8aba4ef3ac141f6859bb Mon Sep 17 00:00:00 2001 From: Brad Dixon Date: Mon, 10 May 2021 10:05:04 -0400 Subject: [PATCH] Add metadata filter syntax: ~meta --- CHANGELOG.md | 1 + mitmproxy/flowfilter.py | 13 ++++++++++++- test/mitmproxy/test_flowfilter.py | 27 ++++++++++++++++++++------- 3 files changed, 33 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 98b8203cf..8707e3e58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,7 @@ If you depend on these features, please raise your voice in * Multiple Browsers: The `browser.start` command may be executed more than once to start additional browser sessions. (@rbdixon) * Improve readability of SHA256 fingerprint. (@wrekone) +* Metadata and Replay Flow Filters: Flows may be filtered based on metadata and replay status. (@rbdixon) * --- TODO: add new PRs above this line --- * ... and various other fixes, documentation improvements, dependency version bumps, etc. diff --git a/mitmproxy/flowfilter.py b/mitmproxy/flowfilter.py index 2e619397c..6435a73fe 100644 --- a/mitmproxy/flowfilter.py +++ b/mitmproxy/flowfilter.py @@ -36,7 +36,6 @@ import functools import re import sys from typing import Callable, ClassVar, Optional, Sequence, Type - import pyparsing as pp from mitmproxy import flow, http, tcp @@ -406,6 +405,17 @@ class FReplayServer(_Action): return f.is_replay == 'response' +class FMeta(_Rex): + code = "meta" + help = "Flow metadata" + flags = re.MULTILINE + is_binary = False + + def __call__(self, f): + m = "\n".join([f"{key}: {value}" for key, value in f.metadata.items()]) + return self.re.search(m) + + class _Int(_Action): def __init__(self, num): @@ -491,6 +501,7 @@ filter_rex: Sequence[Type[_Rex]] = [ FMethod, FSrc, FUrl, + FMeta, ] filter_int = [ FCode diff --git a/test/mitmproxy/test_flowfilter.py b/test/mitmproxy/test_flowfilter.py index aa579d6b0..7c6a93526 100644 --- a/test/mitmproxy/test_flowfilter.py +++ b/test/mitmproxy/test_flowfilter.py @@ -301,15 +301,28 @@ class TestMatchingHTTPFlow: def test_replay(self): f = tflow.tflow() - assert not self.q("~r", f) + assert not self.q("~replay", f) f.is_replay = "request" - assert self.q("~r", f) - assert self.q("~rc", f) - assert not self.q("~rs", f) + assert self.q("~replay", f) + assert self.q("~replayq", f) + assert not self.q("~replays", f) f.is_replay = "response" - assert self.q("~r", f) - assert not self.q("~rc", f) - assert self.q("~rs", f) + assert self.q("~replay", f) + assert not self.q("~replayq", f) + assert self.q("~replays", f) + + def test_metadata(self): + f = tflow.tflow() + f.metadata["a"] = 1 + f.metadata["b"] = "string" + f.metadata["c"] = {"key": "value"} + assert self.q("~meta a", f) + assert not self.q("~meta no", f) + assert self.q("~meta string", f) + assert self.q("~meta key", f) + assert self.q("~meta value", f) + assert self.q("~meta \"b: string\"", f) + assert self.q("~meta \"'key': 'value'\"", f) class TestMatchingTCPFlow: