mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-22 07:08:10 +00:00
Add metadata filter syntax: ~meta
This commit is contained in:
parent
69f555f9bb
commit
da07cb78a6
@ -61,6 +61,7 @@ If you depend on these features, please raise your voice in
|
||||
* Multiple Browsers: The `browser.start` command may be executed more than once to start additional
|
||||
browser sessions. (@rbdixon)
|
||||
* Improve readability of SHA256 fingerprint. (@wrekone)
|
||||
* Metadata and Replay Flow Filters: Flows may be filtered based on metadata and replay status. (@rbdixon)
|
||||
* --- TODO: add new PRs above this line ---
|
||||
* ... and various other fixes, documentation improvements, dependency version bumps, etc.
|
||||
|
||||
|
@ -36,7 +36,6 @@ import functools
|
||||
import re
|
||||
import sys
|
||||
from typing import Callable, ClassVar, Optional, Sequence, Type
|
||||
|
||||
import pyparsing as pp
|
||||
|
||||
from mitmproxy import flow, http, tcp
|
||||
@ -406,6 +405,17 @@ class FReplayServer(_Action):
|
||||
return f.is_replay == 'response'
|
||||
|
||||
|
||||
class FMeta(_Rex):
|
||||
code = "meta"
|
||||
help = "Flow metadata"
|
||||
flags = re.MULTILINE
|
||||
is_binary = False
|
||||
|
||||
def __call__(self, f):
|
||||
m = "\n".join([f"{key}: {value}" for key, value in f.metadata.items()])
|
||||
return self.re.search(m)
|
||||
|
||||
|
||||
class _Int(_Action):
|
||||
|
||||
def __init__(self, num):
|
||||
@ -491,6 +501,7 @@ filter_rex: Sequence[Type[_Rex]] = [
|
||||
FMethod,
|
||||
FSrc,
|
||||
FUrl,
|
||||
FMeta,
|
||||
]
|
||||
filter_int = [
|
||||
FCode
|
||||
|
@ -301,15 +301,28 @@ class TestMatchingHTTPFlow:
|
||||
|
||||
def test_replay(self):
|
||||
f = tflow.tflow()
|
||||
assert not self.q("~r", f)
|
||||
assert not self.q("~replay", f)
|
||||
f.is_replay = "request"
|
||||
assert self.q("~r", f)
|
||||
assert self.q("~rc", f)
|
||||
assert not self.q("~rs", f)
|
||||
assert self.q("~replay", f)
|
||||
assert self.q("~replayq", f)
|
||||
assert not self.q("~replays", f)
|
||||
f.is_replay = "response"
|
||||
assert self.q("~r", f)
|
||||
assert not self.q("~rc", f)
|
||||
assert self.q("~rs", f)
|
||||
assert self.q("~replay", f)
|
||||
assert not self.q("~replayq", f)
|
||||
assert self.q("~replays", f)
|
||||
|
||||
def test_metadata(self):
|
||||
f = tflow.tflow()
|
||||
f.metadata["a"] = 1
|
||||
f.metadata["b"] = "string"
|
||||
f.metadata["c"] = {"key": "value"}
|
||||
assert self.q("~meta a", f)
|
||||
assert not self.q("~meta no", f)
|
||||
assert self.q("~meta string", f)
|
||||
assert self.q("~meta key", f)
|
||||
assert self.q("~meta value", f)
|
||||
assert self.q("~meta \"b: string\"", f)
|
||||
assert self.q("~meta \"'key': 'value'\"", f)
|
||||
|
||||
|
||||
class TestMatchingTCPFlow:
|
||||
|
Loading…
Reference in New Issue
Block a user