mitmproxy/examples/complex/webscanner_helper/urldict.py
anneborcherding 7fdcbb09e6
added add-ons that enhance the performance of web application scanners. (#3961)
* added add-ons that enhance the performance of web application scanners.

Co-authored-by: weichweich <14820950+weichweich@users.noreply.github.com>
2020-05-04 10:37:13 +02:00

91 lines
2.6 KiB
Python

import itertools
import json
import typing
from collections.abc import MutableMapping
from typing import Any, Dict, Generator, List, TextIO, Callable
from mitmproxy import flowfilter
from mitmproxy.http import HTTPFlow
def f_id(x):
return x
class URLDict(MutableMapping):
"""Data structure to store information using filters as keys."""
def __init__(self):
self.store: Dict[flowfilter.TFilter, Any] = {}
def __getitem__(self, key, *, count=0):
if count:
ret = itertools.islice(self.get_generator(key), 0, count)
else:
ret = list(self.get_generator(key))
if ret:
return ret
else:
raise KeyError
def __setitem__(self, key: str, value):
fltr = flowfilter.parse(key)
if fltr:
self.store.__setitem__(fltr, value)
else:
raise ValueError("Not a valid filter")
def __delitem__(self, key):
self.store.__delitem__(key)
def __iter__(self):
return self.store.__iter__()
def __len__(self):
return self.store.__len__()
def get_generator(self, flow: HTTPFlow) -> Generator[Any, None, None]:
for fltr, value in self.store.items():
if flowfilter.match(fltr, flow):
yield value
def get(self, flow: HTTPFlow, default=None, *, count=0) -> List[Any]:
try:
return self.__getitem__(flow, count=count)
except KeyError:
return default
@classmethod
def _load(cls, json_obj, value_loader: Callable = f_id):
url_dict = cls()
for fltr, value in json_obj.items():
url_dict[fltr] = value_loader(value)
return url_dict
@classmethod
def load(cls, f: TextIO, value_loader: Callable = f_id):
json_obj = json.load(f)
return cls._load(json_obj, value_loader)
@classmethod
def loads(cls, json_str: str, value_loader: Callable = f_id):
json_obj = json.loads(json_str)
return cls._load(json_obj, value_loader)
def _dump(self, value_dumper: Callable = f_id) -> Dict:
dumped: Dict[typing.Union[flowfilter.TFilter, str], Any] = {}
for fltr, value in self.store.items():
if hasattr(fltr, 'pattern'):
# cast necessary for mypy
dumped[typing.cast(Any, fltr).pattern] = value_dumper(value)
else:
dumped[str(fltr)] = value_dumper(value)
return dumped
def dump(self, f: TextIO, value_dumper: Callable = f_id):
json.dump(self._dump(value_dumper), f)
def dumps(self, value_dumper: Callable = f_id):
return json.dumps(self._dump(value_dumper))