From 301f8cf79d1793826a9b73fca6406c005a1c1638 Mon Sep 17 00:00:00 2001 From: Marius Date: Tue, 16 Nov 2021 05:47:34 -0600 Subject: [PATCH] Create search.py (#4900) * Create search.py * Address linter findings * Address linter findings --- examples/contrib/search.py | 94 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 examples/contrib/search.py diff --git a/examples/contrib/search.py b/examples/contrib/search.py new file mode 100644 index 000000000..76a4dff95 --- /dev/null +++ b/examples/contrib/search.py @@ -0,0 +1,94 @@ +import re +import typing + +from json import dumps + +from mitmproxy import command, ctx, flow + + +MARKER = ':mag:' +RESULTS_STR = 'Search Results: ' + + +class Search: + def __init__(self): + self.exp = None + + @command.command('search') + def _search(self, + flows: typing.Sequence[flow.Flow], + regex: str) -> None: + """ + Defines a command named "search" that matches + the given regular expression against most parts + of each request/response included in the selected flows. + + Usage: from the flow list view, type ":search" followed by + a space, then a flow selection expression; e.g., "@shown", + then the desired regular expression to perform the search. + + Alternatively, define a custom shortcut in keys.yaml; e.g.: + - + key: "/" + ctx: ["flowlist"] + cmd: "console.command search @shown " + + Flows containing matches to the expression will be marked + with the magnifying glass emoji, and their comments will + contain JSON-formatted search results. + + To view flow comments, enter the flow view + and navigate to the detail tab. + """ + + try: + self.exp = re.compile(regex) + except re.error as e: + ctx.log.error(e) + return + + for _flow in flows: + # Erase previous results while preserving other comments: + comments = list() + for c in _flow.comment.split('\n'): + if c.startswith(RESULTS_STR): + break + comments.append(c) + _flow.comment = '\n'.join(comments) + + if _flow.marked == MARKER: + _flow.marked = False + + results = {k: v for k, v in self.flow_results(_flow).items() if v} + if results: + comments.append(RESULTS_STR) + comments.append(dumps(results, indent=2)) + _flow.comment = '\n'.join(comments) + _flow.marked = MARKER + + def header_results(self, message): + results = {k: self.exp.findall(v) for k, v in message.headers.items()} + return {k: v for k, v in results.items() if v} + + def flow_results(self, _flow): + results = dict() + results.update( + {'flow_comment': self.exp.findall(_flow.comment)}) + if _flow.request is not None: + results.update( + {'request_path': self.exp.findall(_flow.request.path)}) + results.update( + {'request_headers': self.header_results(_flow.request)}) + if _flow.request.text: + results.update( + {'request_body': self.exp.findall(_flow.request.text)}) + if _flow.response is not None: + results.update( + {'response_headers': self.header_results(_flow.response)}) + if _flow.response.text: + results.update( + {'response_body': self.exp.findall(_flow.response.text)}) + return results + + +addons = [Search()]