refactor spec parsing, map_local candidate generation

This commit is contained in:
Maximilian Hils 2020-07-10 13:23:13 +02:00
parent 8942ae88d5
commit fe1b76bdef
5 changed files with 187 additions and 149 deletions

View File

@ -1,52 +1,86 @@
import mimetypes
import re
import typing
import urllib
from pathlib import Path
from werkzeug.security import safe_join
from mitmproxy import ctx, exceptions, http
from mitmproxy.addons.modifyheaders import parse_modify_spec, ModifySpec
from mitmproxy import ctx, exceptions, flowfilter, http
from mitmproxy.addons.modifyheaders import parse_spec
class MapLocalSpec(typing.NamedTuple):
matches: flowfilter.TFilter
regex: str
local_path: Path
def parse_map_local_spec(option: str) -> MapLocalSpec:
filter, regex, replacement = parse_spec(option)
try:
re.compile(regex)
except re.error as e:
raise ValueError(f"Invalid regular expression {regex!r} ({e})")
try:
path = Path(replacement).expanduser().resolve(strict=True)
except FileNotFoundError as e:
raise ValueError(f"Invalid file path: {replacement} ({e})")
return MapLocalSpec(filter, regex, path)
def get_mime_type(file_path: str) -> str:
mimetype = (
mimetypes.guess_type(file_path)[0]
or "application/octet-stream"
mimetypes.guess_type(file_path)[0]
or "application/octet-stream"
)
return mimetype
def file_candidates(url: str, base_path: str) -> typing.List[Path]:
def _safe_path_join(root: Path, untrusted: str) -> Path:
"""Join a Path element with an untrusted str.
This is just a convenience wrapper for werkzeug's safe_join."""
untrusted_parts = Path(untrusted).parts
joined = safe_join(
root.as_posix(),
*untrusted_parts
)
if joined is None:
raise ValueError("Untrusted paths.")
return Path(joined)
def file_candidates(url: str, spec: MapLocalSpec) -> typing.List[Path]:
candidates = []
parsed_url = urllib.parse.urlparse(url)
path_components = parsed_url.path.lstrip("/").split("/")
filename = path_components.pop()
# todo: we may want to consider other filenames such as index.htm)
if not filename:
filename = 'index.html'
m = re.search(spec.regex, url)
assert m
if m.groups():
suffix = m.group(1)
else:
suffix = re.split(spec.regex, url, maxsplit=1)[1]
suffix = suffix.split("?")[0] # remove query string
# construct all possible paths
while True:
components_with_filename = tuple(path_components + [filename])
candidate_path = safe_join(base_path, *components_with_filename)
if candidate_path:
candidates.append(
Path(candidate_path)
)
suffix = re.sub(r"[^0-9a-zA-Z-_.=(),/]", "_", suffix.strip("/"))
if not path_components:
break
path_components.pop()
if suffix:
try:
candidates.append(_safe_path_join(spec.local_path, suffix))
candidates.append(_safe_path_join(spec.local_path, f"{suffix}/index.html"))
except ValueError:
return []
else:
candidates.append(spec.local_path / "index.html")
return candidates
class MapLocal:
def __init__(self):
self.replacements: typing.List[ModifySpec] = []
self.replacements: typing.List[MapLocalSpec] = []
def load(self, loader):
loader.add_option(
@ -63,7 +97,7 @@ class MapLocal:
self.replacements = []
for option in ctx.options.map_local:
try:
spec = parse_modify_spec(option, True, True)
spec = parse_map_local_spec(option)
except ValueError as e:
raise exceptions.OptionsError(f"Cannot parse map_local option {option}: {e}") from e
@ -74,31 +108,25 @@ class MapLocal:
return
for spec in self.replacements:
req = flow.request
url = req.pretty_url
base_path = Path(spec.replacement)
url = flow.request.pretty_url
if spec.matches(flow) and re.search(spec.subject, url.encode("utf8", "surrogateescape")):
replacement_path = None
if base_path.is_file():
replacement_path = base_path
elif base_path.is_dir():
candidates = file_candidates(url, str(base_path))
for candidate in candidates:
# check that path is not outside of the user-defined base_path
if candidate.is_file() and base_path in candidate.parents:
replacement_path = candidate
if spec.matches(flow) and re.search(spec.regex, url):
local_file: typing.Optional[Path] = None
if spec.local_path.is_file():
local_file = spec.local_path
elif spec.local_path.is_dir():
for candidate in file_candidates(url, spec):
if candidate.is_file():
local_file = candidate
break
if replacement_path:
try:
flow.response = http.HTTPResponse.make(
200,
replacement_path.read_bytes(),
{"Content-Type": get_mime_type(str(replacement_path))}
)
# only set flow.response once, for the first matching rule
break
except IOError:
ctx.log.warn(f"Could not read replacement file {replacement_path}")
return
if local_file:
flow.response = http.HTTPResponse.make(
200,
local_file.read_bytes(),
{"Content-Type": get_mime_type(str(local_file))}
)
# only set flow.response once, for the first matching rule
return

View File

@ -1,22 +1,38 @@
import re
import typing
from mitmproxy import ctx, exceptions, http
from mitmproxy.addons.modifyheaders import parse_modify_spec, ModifySpec
from mitmproxy import ctx, exceptions, flowfilter, http
from mitmproxy.addons.modifyheaders import parse_spec
class MapRemoteSpec(typing.NamedTuple):
matches: flowfilter.TFilter
subject: str
replacement: str
def parse_map_remote_spec(option: str) -> MapRemoteSpec:
spec = MapRemoteSpec(*parse_spec(option))
try:
re.compile(spec.subject)
except re.error as e:
raise ValueError(f"Invalid regular expression {spec.subject!r} ({e})")
return spec
class MapRemote:
def __init__(self):
self.replacements: typing.List[ModifySpec] = []
self.replacements: typing.List[MapRemoteSpec] = []
def load(self, loader):
loader.add_option(
"map_remote", typing.Sequence[str], [],
"""
Map remote resources to another remote URL using a pattern of the form
"[/flow-filter]/url-regex/[@]replacement", where the separator can
be any character. The @ allows to provide a file path that is
used to read the replacement string.
"[/flow-filter]/url-regex/replacement", where the separator can
be any character.
"""
)
@ -25,7 +41,7 @@ class MapRemote:
self.replacements = []
for option in ctx.options.map_remote:
try:
spec = parse_modify_spec(option, True, False)
spec = parse_map_remote_spec(option)
except ValueError as e:
raise exceptions.OptionsError(f"Cannot parse map_remote option {option}: {e}") from e
@ -36,14 +52,8 @@ class MapRemote:
return
for spec in self.replacements:
if spec.matches(flow):
try:
replacement = spec.read_replacement()
except IOError as e:
ctx.log.warn(f"Could not read replacement file: {e}")
continue
url = flow.request.pretty_url.encode("utf8", "surrogateescape")
new_url = re.sub(spec.subject, replacement, url)
url = flow.request.pretty_url
new_url = re.sub(spec.subject, spec.replacement, url)
# this is a bit messy: setting .url also updates the host header,
# so we really only do that if the replacement affected the URL.
if url != new_url:

View File

@ -24,7 +24,7 @@ class ModifyBody:
self.replacements = []
for option in ctx.options.modify_body:
try:
spec = parse_modify_spec(option, True, False)
spec = parse_modify_spec(option, True)
except ValueError as e:
raise exceptions.OptionsError(f"Cannot parse modify_body option {option}: {e}") from e

View File

@ -7,10 +7,36 @@ from mitmproxy.net.http import Headers
from mitmproxy.utils import strutils
def _match_all(flow) -> bool:
return True
def parse_spec(option: str) -> typing.Tuple[flowfilter.TFilter, str, str]:
"""
Parse strings in the following format:
[/flow-filter]/subject/replacement
"""
sep, rem = option[0], option[1:]
parts = rem.split(sep, 2)
if len(parts) == 2:
subject, replacement = parts
return _match_all, subject, replacement
elif len(parts) == 3:
patt, subject, replacement = parts
flow_filter = flowfilter.parse(patt)
if not flow_filter:
raise ValueError(f"Invalid filter pattern: {patt}")
return flow_filter, subject, replacement
else:
raise ValueError("Invalid number of parameters (2 or 3 are expected)")
class ModifySpec(typing.NamedTuple):
matches: flowfilter.TFilter
subject: bytes
replacement: str
replacement_str: str
def read_replacement(self) -> bytes:
"""
@ -20,79 +46,29 @@ class ModifySpec(typing.NamedTuple):
Raises:
- IOError if the file cannot be read.
"""
if self.replacement.startswith("@"):
return Path(self.replacement[1:]).expanduser().read_bytes()
if self.replacement_str.startswith("@"):
return Path(self.replacement_str[1:]).expanduser().read_bytes()
else:
# We could cache this at some point, but unlikely to be a problem.
return strutils.escaped_str_to_bytes(self.replacement)
return strutils.escaped_str_to_bytes(self.replacement_str)
def _match_all(flow) -> bool:
return True
def parse_modify_spec(option: str, subject_is_regex: bool) -> ModifySpec:
flow_filter, subject_str, replacement = parse_spec(option)
def parse_modify_spec(option, subject_is_regex: bool, replacement_is_path: bool) -> ModifySpec:
"""
The form for the modify_*, map_remote, and map_local options is as follows:
* modify_body: [/flow-filter]/body-regex/[@]replacement
* modify_headers: [/flow-filter]/header-name/[@]header-value
* map_local: [:flow-filter]:url-regex:path
* map_remote: [:flow-filter]:url-regex:[@]replacement
The @ allows to provide a file path that is used to read the respective option.
The addons ModifyHeaders, ModifyBody, MapRemote, and MapLocal use ModifySpec
to represent a single rule.
The first character specifies the separator. Example:
:~q:foo:bar
If only two clauses are specified, the flow filter is set to
match universally (i.e. ".*"). Example:
/foo/bar
Clauses are parsed from left to right. Extra separators are taken to be
part of the final clause. For instance, the last parameter (header-value,
replace, or path) below is "foo/bar/":
/one/two/foo/bar/
"""
sep, rem = option[0], option[1:]
parts = rem.split(sep, 2)
if len(parts) == 2:
flow_filter = _match_all
subject, replacement = parts
elif len(parts) == 3:
flow_filter_pattern, subject, replacement = parts
flow_filter = flowfilter.parse(flow_filter_pattern) # type: ignore
if not flow_filter:
raise ValueError(f"Invalid filter pattern: {flow_filter_pattern}")
else:
raise ValueError("Invalid number of parameters (2 or 3 are expected)")
subject = strutils.escaped_str_to_bytes(subject)
subject = strutils.escaped_str_to_bytes(subject_str)
if subject_is_regex:
try:
re.compile(subject)
except re.error as e:
raise ValueError(f"Invalid regular expression {subject!r} ({e})")
if replacement_is_path:
path = Path(replacement)
try:
replacement = str(path.expanduser().resolve(strict=True))
except FileNotFoundError as e:
raise ValueError(f"Invalid file path: {replacement} ({e})")
spec = ModifySpec(flow_filter, subject, replacement)
if not replacement_is_path:
try:
spec.read_replacement()
except IOError as e:
raise ValueError(f"Invalid file path: {replacement[1:]} ({e})")
try:
spec.read_replacement()
except IOError as e:
raise ValueError(f"Invalid file path: {replacement[1:]} ({e})")
return spec
@ -116,7 +92,7 @@ class ModifyHeaders:
if "modify_headers" in updated:
for option in ctx.options.modify_headers:
try:
spec = parse_modify_spec(option, False, False)
spec = parse_modify_spec(option, False)
except ValueError as e:
raise exceptions.OptionsError(f"Cannot parse modify_headers option {option}: {e}") from e
self.replacements.append(spec)

View File

@ -1,35 +1,59 @@
import re
from pathlib import Path
import pytest
from mitmproxy.addons.maplocal import MapLocal, file_candidates
from mitmproxy.addons.maplocal import MapLocal, MapLocalSpec, file_candidates
from mitmproxy.addons.modifyheaders import parse_spec
from mitmproxy.test import taddons
from mitmproxy.test import tflow
from mitmproxy.addons.modifyheaders import parse_modify_spec
@pytest.mark.parametrize(
"url,spec,expected_candidates", [
"url,spec,expected_candidates",
[
# trailing slashes
("https://example.com/foo", ":example.com/foo:/tmp", ["/tmp/index.html"]),
("https://example.com/foo/", ":example.com/foo:/tmp", ["/tmp/index.html"]),
("https://example.com/foo", ":example.com/foo:/tmp/", ["/tmp/index.html"]),
] + [
# simple prefixes
("http://example/foo/bar.jpg", ":example/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example/foo/bar.jpg", ":example/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example/foo/bar.jpg?query", ":example/foo:/tmp", ["/tmp/bar.jpg", "/tmp/bar.jpg/index.html"]),
("https://example/foo/bar/baz.jpg", ":example/foo:/tmp", ["/tmp/bar/baz.jpg", "/tmp/bar/baz.jpg/index.html"]),
] + [
# index.html
("https://example.com/foo", ":example.com/foo:/tmp", ["/tmp/index.html"]),
("https://example.com/foo/", ":example.com/foo:/tmp", ["/tmp/index.html"]),
("https://example.com/foo/bar", ":example.com/foo:/tmp", ["/tmp/bar", "/tmp/bar/index.html"]),
("https://example.com/foo/bar/", ":example.com/foo:/tmp", ["/tmp/bar", "/tmp/bar/index.html"]),
] + [
# regex
(
"https://example.org/img/topic/subtopic/test.jpg",
":example.com/foo:/tmp",
["/tmp/img/topic/subtopic/test.jpg", "/tmp/img/topic/test.jpg", "/tmp/img/test.jpg", "/tmp/test.jpg"]
),
(
"https://example.org/img/topic/subtopic/",
":/img:/tmp",
["/tmp/img/topic/subtopic/index.html", "/tmp/img/topic/index.html", "/tmp/img/index.html", "/tmp/index.html"]
),
(
"https://example.org",
":org:/tmp",
["/tmp/index.html"]
"https://example/view.php?f=foo.jpg",
":example/view.php\\?f=(.+):/tmp",
["/tmp/foo.jpg", "/tmp/foo.jpg/index.html"]
), (
"https://example/results?id=1&foo=2",
":example/(results\\?id=.+):/tmp",
["/tmp/results_id=1_foo=2", "/tmp/results_id=1_foo=2/index.html"]
),
] + [
# test directory traversal detection
("https://example.com/../../../../../../etc/passwd", ":example.com:/tmp", []),
# those get already sanitized to benign versions before they reach our detection:
("https://example.com/C:\\foo.txt", ":example.com:/tmp", ["/tmp/C__foo.txt", "/tmp/C__foo.txt/index.html"]),
("https://example.com//etc/passwd", ":example.com:/tmp", ["/tmp/etc/passwd", "/tmp/etc/passwd/index.html"]),
]
)
def test_file_candidates(url, spec, expected_candidates):
spec = parse_modify_spec(spec, True, True)
candidates = file_candidates(url, spec.replacement)
assert [str(x) for x in candidates] == expected_candidates
# we circumvent the path existence checks here to simplify testing
filt, subj, repl = parse_spec(spec)
spec = MapLocalSpec(filt, subj, Path(repl))
candidates = file_candidates(url, spec)
assert [x.as_posix() for x in candidates] == expected_candidates
class TestMapLocal: