mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-26 18:18:25 +00:00
minor addon improvements, fix tests
This commit is contained in:
parent
cf158022d9
commit
0ee0cf5668
@ -1,8 +1,7 @@
|
|||||||
import os
|
|
||||||
import re
|
import re
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
from mitmproxy import exceptions
|
from mitmproxy import exceptions, http
|
||||||
from mitmproxy import ctx
|
from mitmproxy import ctx
|
||||||
from mitmproxy.addons.modifyheaders import parse_modify_spec, ModifySpec
|
from mitmproxy.addons.modifyheaders import parse_modify_spec, ModifySpec
|
||||||
|
|
||||||
@ -26,40 +25,26 @@ class MapRemote:
|
|||||||
self.replacements = []
|
self.replacements = []
|
||||||
for option in ctx.options.map_remote:
|
for option in ctx.options.map_remote:
|
||||||
try:
|
try:
|
||||||
spec = parse_modify_spec(option)
|
spec = parse_modify_spec(option, True)
|
||||||
try:
|
|
||||||
re.compile(spec.subject)
|
|
||||||
except re.error:
|
|
||||||
raise ValueError(f"Invalid regular expression: {spec.subject}")
|
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
raise exceptions.OptionsError(
|
raise exceptions.OptionsError(f"Cannot parse map_remote option {option}: {e}") from e
|
||||||
f"Cannot parse map_remote option {option}: {e}"
|
|
||||||
) from e
|
|
||||||
|
|
||||||
self.replacements.append(spec)
|
self.replacements.append(spec)
|
||||||
|
|
||||||
def request(self, flow):
|
def request(self, flow: http.HTTPFlow) -> None:
|
||||||
if not flow.reply.has_message:
|
if flow.reply and flow.reply.has_message:
|
||||||
for spec in self.replacements:
|
return
|
||||||
if spec.matches(flow):
|
for spec in self.replacements:
|
||||||
self.replace(flow.request, spec.subject, spec.replacement)
|
if spec.matches(flow):
|
||||||
|
try:
|
||||||
|
replacement = spec.read_replacement()
|
||||||
|
except IOError as e:
|
||||||
|
ctx.log.warn(f"Could not read replacement file: {e}")
|
||||||
|
continue
|
||||||
|
|
||||||
def replace(self, obj, search, repl):
|
url = flow.request.pretty_url.encode("utf8", "surrogateescape")
|
||||||
"""
|
new_url = re.sub(spec.subject, replacement, url)
|
||||||
Replaces all matches of the regex search in the url of the request with repl.
|
# this is a bit messy: setting .url also updates the host header,
|
||||||
|
# so we really only do that if the replacement affected the URL.
|
||||||
Returns:
|
if url != new_url:
|
||||||
The number of replacements made.
|
flow.request.url = new_url
|
||||||
"""
|
|
||||||
if repl.startswith(b"@"):
|
|
||||||
path = os.path.expanduser(repl[1:])
|
|
||||||
try:
|
|
||||||
with open(path, "rb") as f:
|
|
||||||
repl = f.read()
|
|
||||||
except IOError:
|
|
||||||
ctx.log.warn("Could not read replacement file: %s" % repl)
|
|
||||||
return
|
|
||||||
|
|
||||||
replacements = 0
|
|
||||||
obj.url, replacements = re.subn(search, repl, obj.pretty_url.encode("utf8", "surrogateescape"), flags=re.DOTALL)
|
|
||||||
return replacements
|
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
import os
|
|
||||||
import re
|
import re
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
@ -26,26 +25,12 @@ class ModifyBody:
|
|||||||
self.replacements = []
|
self.replacements = []
|
||||||
for option in ctx.options.modify_body:
|
for option in ctx.options.modify_body:
|
||||||
try:
|
try:
|
||||||
spec = parse_modify_spec(option)
|
spec = parse_modify_spec(option, True)
|
||||||
try:
|
|
||||||
re.compile(spec.subject)
|
|
||||||
except re.error:
|
|
||||||
raise ValueError(f"Invalid regular expression: {spec.subject}")
|
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
raise exceptions.OptionsError(
|
raise exceptions.OptionsError(f"Cannot parse modify_body option {option}: {e}") from e
|
||||||
f"Cannot parse modify_body option {option}: {e}"
|
|
||||||
) from e
|
|
||||||
|
|
||||||
self.replacements.append(spec)
|
self.replacements.append(spec)
|
||||||
|
|
||||||
def run(self, flow):
|
|
||||||
for spec in self.replacements:
|
|
||||||
if spec.matches(flow):
|
|
||||||
if flow.response:
|
|
||||||
self.replace(flow.response, spec.subject, spec.replacement)
|
|
||||||
else:
|
|
||||||
self.replace(flow.request, spec.subject, spec.replacement)
|
|
||||||
|
|
||||||
def request(self, flow):
|
def request(self, flow):
|
||||||
if not flow.reply.has_message:
|
if not flow.reply.has_message:
|
||||||
self.run(flow)
|
self.run(flow)
|
||||||
@ -54,23 +39,15 @@ class ModifyBody:
|
|||||||
if not flow.reply.has_message:
|
if not flow.reply.has_message:
|
||||||
self.run(flow)
|
self.run(flow)
|
||||||
|
|
||||||
def replace(self, obj, search, repl):
|
def run(self, flow):
|
||||||
"""
|
for spec in self.replacements:
|
||||||
Replaces all matches of the regex search in the body of the message with repl.
|
if spec.matches(flow):
|
||||||
|
try:
|
||||||
Returns:
|
replacement = spec.read_replacement()
|
||||||
The number of replacements made.
|
except IOError as e:
|
||||||
"""
|
ctx.log.warn(f"Could not read replacement file: {e}")
|
||||||
if repl.startswith(b"@"):
|
continue
|
||||||
repl = os.path.expanduser(repl[1:])
|
if flow.response:
|
||||||
try:
|
flow.response.content = re.sub(spec.subject, replacement, flow.response.content, flags=re.DOTALL)
|
||||||
with open(repl, "rb") as f:
|
else:
|
||||||
repl = f.read()
|
flow.request.content = re.sub(spec.subject, replacement, flow.request.content, flags=re.DOTALL)
|
||||||
except IOError:
|
|
||||||
ctx.log.warn("Could not read replacement file: %s" % repl)
|
|
||||||
return
|
|
||||||
|
|
||||||
replacements = 0
|
|
||||||
if obj.content:
|
|
||||||
obj.content, replacements = re.subn(search, repl, obj.content, flags=re.DOTALL)
|
|
||||||
return replacements
|
|
||||||
|
@ -1,26 +1,39 @@
|
|||||||
import os
|
import re
|
||||||
import typing
|
import typing
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from mitmproxy import exceptions
|
from mitmproxy import exceptions, http
|
||||||
from mitmproxy import flowfilter
|
from mitmproxy import flowfilter
|
||||||
|
from mitmproxy.net.http import Headers
|
||||||
from mitmproxy.utils import strutils
|
from mitmproxy.utils import strutils
|
||||||
from mitmproxy import ctx
|
from mitmproxy import ctx
|
||||||
|
|
||||||
|
|
||||||
class ModifySpec(typing.NamedTuple):
|
class ModifySpec(typing.NamedTuple):
|
||||||
"""
|
|
||||||
match_str: a string specifying a flow filter pattern.
|
|
||||||
matches: the parsed match_str as a flowfilter.TFilter object
|
|
||||||
subject: a header name for ModifyHeaders and a regex pattern for ModifyBody
|
|
||||||
replacement: the replacement string
|
|
||||||
"""
|
|
||||||
match_str: str
|
|
||||||
matches: flowfilter.TFilter
|
matches: flowfilter.TFilter
|
||||||
subject: bytes
|
subject: bytes
|
||||||
replacement: bytes
|
replacement_str: str
|
||||||
|
|
||||||
|
def read_replacement(self) -> bytes:
|
||||||
|
"""
|
||||||
|
Process the replacement str. This usually just involves converting it to bytes.
|
||||||
|
However, if it starts with `@`, we interpret the rest as a file path to read from.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
- IOError if the file cannot be read.
|
||||||
|
"""
|
||||||
|
if self.replacement_str.startswith("@"):
|
||||||
|
return Path(self.replacement_str[1:]).expanduser().read_bytes()
|
||||||
|
else:
|
||||||
|
# We could cache this at some point, but unlikely to be a problem.
|
||||||
|
return strutils.escaped_str_to_bytes(self.replacement_str)
|
||||||
|
|
||||||
|
|
||||||
def parse_modify_spec(option) -> ModifySpec:
|
def _match_all(flow) -> bool:
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def parse_modify_spec(option, subject_is_regex: bool) -> ModifySpec:
|
||||||
"""
|
"""
|
||||||
The form for the modify_* options is as follows:
|
The form for the modify_* options is as follows:
|
||||||
|
|
||||||
@ -48,24 +61,31 @@ def parse_modify_spec(option) -> ModifySpec:
|
|||||||
sep, rem = option[0], option[1:]
|
sep, rem = option[0], option[1:]
|
||||||
parts = rem.split(sep, 2)
|
parts = rem.split(sep, 2)
|
||||||
if len(parts) == 2:
|
if len(parts) == 2:
|
||||||
flow_filter_pattern = ".*"
|
flow_filter = _match_all
|
||||||
subject, replacement = parts
|
subject, replacement = parts
|
||||||
elif len(parts) == 3:
|
elif len(parts) == 3:
|
||||||
flow_filter_pattern, subject, replacement = parts
|
flow_filter_pattern, subject, replacement = parts
|
||||||
|
flow_filter = flowfilter.parse(flow_filter_pattern) # type: ignore
|
||||||
|
if not flow_filter:
|
||||||
|
raise ValueError(f"Invalid filter pattern: {flow_filter_pattern}")
|
||||||
else:
|
else:
|
||||||
raise ValueError("Invalid number of parameters (2 or 3 are expected)")
|
raise ValueError("Invalid number of parameters (2 or 3 are expected)")
|
||||||
|
|
||||||
flow_filter = flowfilter.parse(flow_filter_pattern)
|
|
||||||
if not flow_filter:
|
|
||||||
raise ValueError(f"Invalid filter pattern: {flow_filter_pattern}")
|
|
||||||
|
|
||||||
subject = strutils.escaped_str_to_bytes(subject)
|
subject = strutils.escaped_str_to_bytes(subject)
|
||||||
replacement = strutils.escaped_str_to_bytes(replacement)
|
if subject_is_regex:
|
||||||
|
try:
|
||||||
|
re.compile(subject)
|
||||||
|
except re.error as e:
|
||||||
|
raise ValueError(f"Invalid regular expression {subject!r} ({e})")
|
||||||
|
|
||||||
if replacement.startswith(b"@") and not os.path.isfile(os.path.expanduser(replacement[1:])):
|
spec = ModifySpec(flow_filter, subject, replacement)
|
||||||
raise ValueError(f"Invalid file path: {replacement[1:]}")
|
|
||||||
|
|
||||||
return ModifySpec(flow_filter_pattern, flow_filter, subject, replacement)
|
try:
|
||||||
|
spec.read_replacement()
|
||||||
|
except IOError as e:
|
||||||
|
raise ValueError(f"Invalid file path: {replacement[1:]} ({e})")
|
||||||
|
|
||||||
|
return spec
|
||||||
|
|
||||||
|
|
||||||
class ModifyHeaders:
|
class ModifyHeaders:
|
||||||
@ -87,35 +107,11 @@ class ModifyHeaders:
|
|||||||
if "modify_headers" in updated:
|
if "modify_headers" in updated:
|
||||||
for option in ctx.options.modify_headers:
|
for option in ctx.options.modify_headers:
|
||||||
try:
|
try:
|
||||||
spec = parse_modify_spec(option)
|
spec = parse_modify_spec(option, False)
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
raise exceptions.OptionsError(
|
raise exceptions.OptionsError(f"Cannot parse modify_headers option {option}: {e}") from e
|
||||||
f"Cannot parse modify_headers option {option}: {e}"
|
|
||||||
) from e
|
|
||||||
self.replacements.append(spec)
|
self.replacements.append(spec)
|
||||||
|
|
||||||
def run(self, flow, hdrs):
|
|
||||||
# unset all specified headers
|
|
||||||
for spec in self.replacements:
|
|
||||||
if spec.matches(flow):
|
|
||||||
hdrs.pop(spec.subject, None)
|
|
||||||
|
|
||||||
# set all specified headers if the replacement string is not empty
|
|
||||||
for spec in self.replacements:
|
|
||||||
if spec.replacement.startswith(b"@"):
|
|
||||||
path = os.path.expanduser(spec.replacement[1:])
|
|
||||||
try:
|
|
||||||
with open(path, "rb") as file:
|
|
||||||
replacement = file.read()
|
|
||||||
except IOError:
|
|
||||||
ctx.log.warn(f"Could not read replacement file {path}")
|
|
||||||
return
|
|
||||||
else:
|
|
||||||
replacement = spec.replacement
|
|
||||||
|
|
||||||
if spec.matches(flow) and replacement:
|
|
||||||
hdrs.add(spec.subject, replacement)
|
|
||||||
|
|
||||||
def request(self, flow):
|
def request(self, flow):
|
||||||
if not flow.reply.has_message:
|
if not flow.reply.has_message:
|
||||||
self.run(flow, flow.request.headers)
|
self.run(flow, flow.request.headers)
|
||||||
@ -123,3 +119,21 @@ class ModifyHeaders:
|
|||||||
def response(self, flow):
|
def response(self, flow):
|
||||||
if not flow.reply.has_message:
|
if not flow.reply.has_message:
|
||||||
self.run(flow, flow.response.headers)
|
self.run(flow, flow.response.headers)
|
||||||
|
|
||||||
|
def run(self, flow: http.HTTPFlow, hdrs: Headers) -> None:
|
||||||
|
# unset all specified headers
|
||||||
|
for spec in self.replacements:
|
||||||
|
if spec.matches(flow):
|
||||||
|
hdrs.pop(spec.subject, None)
|
||||||
|
|
||||||
|
# set all specified headers if the replacement string is not empty
|
||||||
|
for spec in self.replacements:
|
||||||
|
if spec.matches(flow):
|
||||||
|
try:
|
||||||
|
replacement = spec.read_replacement()
|
||||||
|
except IOError as e:
|
||||||
|
ctx.log.warn(f"Could not read replacement file: {e}")
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
if replacement:
|
||||||
|
hdrs.add(spec.subject, replacement)
|
||||||
|
@ -12,7 +12,7 @@ class TestMapRemote:
|
|||||||
with taddons.context(mr) as tctx:
|
with taddons.context(mr) as tctx:
|
||||||
tctx.configure(mr, map_remote=["one/two/three"])
|
tctx.configure(mr, map_remote=["one/two/three"])
|
||||||
with pytest.raises(Exception, match="Cannot parse map_remote .* Invalid number"):
|
with pytest.raises(Exception, match="Cannot parse map_remote .* Invalid number"):
|
||||||
tctx.configure(mr, map_remote = ["/"])
|
tctx.configure(mr, map_remote=["/"])
|
||||||
with pytest.raises(Exception, match="Cannot parse map_remote .* Invalid filter"):
|
with pytest.raises(Exception, match="Cannot parse map_remote .* Invalid filter"):
|
||||||
tctx.configure(mr, map_remote=["/~b/two/three"])
|
tctx.configure(mr, map_remote=["/~b/two/three"])
|
||||||
with pytest.raises(Exception, match="Cannot parse map_remote .* Invalid regular expression"):
|
with pytest.raises(Exception, match="Cannot parse map_remote .* Invalid regular expression"):
|
||||||
@ -33,6 +33,16 @@ class TestMapRemote:
|
|||||||
mr.request(f)
|
mr.request(f)
|
||||||
assert f.request.url == "https://mitmproxy.org/img/test.jpg"
|
assert f.request.url == "https://mitmproxy.org/img/test.jpg"
|
||||||
|
|
||||||
|
def test_has_reply(self):
|
||||||
|
mr = mapremote.MapRemote()
|
||||||
|
with taddons.context(mr) as tctx:
|
||||||
|
tctx.configure(mr, map_remote=[":example.org:mitmproxy.org"])
|
||||||
|
f = tflow.tflow()
|
||||||
|
f.request.url = b"https://example.org/images/test.jpg"
|
||||||
|
f.kill()
|
||||||
|
mr.request(f)
|
||||||
|
assert f.request.url == "https://example.org/images/test.jpg"
|
||||||
|
|
||||||
|
|
||||||
class TestMapRemoteFile:
|
class TestMapRemoteFile:
|
||||||
def test_simple(self, tmpdir):
|
def test_simple(self, tmpdir):
|
||||||
@ -42,7 +52,7 @@ class TestMapRemoteFile:
|
|||||||
tmpfile.write("mitmproxy.org")
|
tmpfile.write("mitmproxy.org")
|
||||||
tctx.configure(
|
tctx.configure(
|
||||||
mr,
|
mr,
|
||||||
map_remote=[":example.org:@" + str(tmpfile)]
|
map_remote=["|example.org|@" + str(tmpfile)]
|
||||||
)
|
)
|
||||||
f = tflow.tflow()
|
f = tflow.tflow()
|
||||||
f.request.url = b"https://example.org/test"
|
f.request.url = b"https://example.org/test"
|
||||||
@ -63,7 +73,7 @@ class TestMapRemoteFile:
|
|||||||
tmpfile.write("mitmproxy.org")
|
tmpfile.write("mitmproxy.org")
|
||||||
tctx.configure(
|
tctx.configure(
|
||||||
mr,
|
mr,
|
||||||
map_remote=[":example.org:@" + str(tmpfile)]
|
map_remote=["|example.org|@" + str(tmpfile)]
|
||||||
)
|
)
|
||||||
tmpfile.remove()
|
tmpfile.remove()
|
||||||
f = tflow.tflow()
|
f = tflow.tflow()
|
||||||
|
@ -1,38 +1,17 @@
|
|||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from mitmproxy.addons import modifybody
|
from mitmproxy.addons import modifybody
|
||||||
from mitmproxy.addons.modifyheaders import parse_modify_spec
|
|
||||||
from mitmproxy.test import taddons
|
from mitmproxy.test import taddons
|
||||||
from mitmproxy.test import tflow
|
from mitmproxy.test import tflow
|
||||||
|
|
||||||
|
|
||||||
class TestModifyBody:
|
class TestModifyBody:
|
||||||
def test_parse_modify_spec(self):
|
|
||||||
x = parse_modify_spec("/foo/bar/voing")
|
|
||||||
assert [x[0], x[2], x[3]] == ["foo", b"bar", b"voing"]
|
|
||||||
|
|
||||||
x = parse_modify_spec("/foo/bar/vo/ing/")
|
|
||||||
assert [x[0], x[2], x[3]] == ["foo", b"bar", b"vo/ing/"]
|
|
||||||
|
|
||||||
x = parse_modify_spec("/bar/voing")
|
|
||||||
assert [x[0], x[2], x[3]] == [".*", b"bar", b"voing"]
|
|
||||||
|
|
||||||
with pytest.raises(Exception, match="Invalid number of parameters"):
|
|
||||||
parse_modify_spec("/")
|
|
||||||
|
|
||||||
with pytest.raises(Exception, match="Invalid filter pattern"):
|
|
||||||
parse_modify_spec("/~b/one/two")
|
|
||||||
|
|
||||||
def test_configure(self):
|
def test_configure(self):
|
||||||
mb = modifybody.ModifyBody()
|
mb = modifybody.ModifyBody()
|
||||||
with taddons.context(mb) as tctx:
|
with taddons.context(mb) as tctx:
|
||||||
tctx.configure(mb, modify_body=["one/two/three"])
|
tctx.configure(mb, modify_body=["one/two/three"])
|
||||||
with pytest.raises(Exception, match="Cannot parse modify_body .* Invalid number"):
|
with pytest.raises(Exception, match="Cannot parse modify_body"):
|
||||||
tctx.configure(mb, modify_body = ["/"])
|
tctx.configure(mb, modify_body=["/"])
|
||||||
with pytest.raises(Exception, match="Cannot parse modify_body .* Invalid filter"):
|
|
||||||
tctx.configure(mb, modify_body=["/~b/two/three"])
|
|
||||||
with pytest.raises(Exception, match="Cannot parse modify_body .* Invalid regular expression"):
|
|
||||||
tctx.configure(mb, modify_body=["/foo/+/three"])
|
|
||||||
tctx.configure(mb, modify_body=["/a/b/c/"])
|
tctx.configure(mb, modify_body=["/a/b/c/"])
|
||||||
|
|
||||||
def test_simple(self):
|
def test_simple(self):
|
||||||
|
@ -3,49 +3,53 @@ import pytest
|
|||||||
from mitmproxy.test import tflow
|
from mitmproxy.test import tflow
|
||||||
from mitmproxy.test import taddons
|
from mitmproxy.test import taddons
|
||||||
|
|
||||||
from mitmproxy.addons import modifyheaders
|
from mitmproxy.addons.modifyheaders import parse_modify_spec, ModifyHeaders
|
||||||
|
|
||||||
|
|
||||||
|
def test_parse_modify_spec():
|
||||||
|
spec = parse_modify_spec("/foo/bar/voing", True)
|
||||||
|
assert spec.matches.pattern == "foo"
|
||||||
|
assert spec.subject == b"bar"
|
||||||
|
assert spec.read_replacement() == b"voing"
|
||||||
|
|
||||||
|
spec = parse_modify_spec("/foo/bar/vo/ing/", False)
|
||||||
|
assert spec.matches.pattern == "foo"
|
||||||
|
assert spec.subject == b"bar"
|
||||||
|
assert spec.read_replacement() == b"vo/ing/"
|
||||||
|
|
||||||
|
spec = parse_modify_spec("/bar/voing", False)
|
||||||
|
assert spec.matches(tflow.tflow())
|
||||||
|
assert spec.subject == b"bar"
|
||||||
|
assert spec.read_replacement() == b"voing"
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="Invalid number of parameters"):
|
||||||
|
parse_modify_spec("/", False)
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="Invalid filter pattern"):
|
||||||
|
parse_modify_spec("/~b/one/two", False)
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="Invalid filter pattern"):
|
||||||
|
parse_modify_spec("/~b/one/two", False)
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="Invalid regular expression"):
|
||||||
|
parse_modify_spec("/[/two", True)
|
||||||
|
|
||||||
|
|
||||||
class TestModifyHeaders:
|
class TestModifyHeaders:
|
||||||
def test_parse_modify_spec(self):
|
|
||||||
x = modifyheaders.parse_modify_spec("/foo/bar/voing")
|
|
||||||
assert [x[0], x[2], x[3]] == ["foo", b"bar", b"voing"]
|
|
||||||
|
|
||||||
x = modifyheaders.parse_modify_spec("/foo/bar/vo/ing/")
|
|
||||||
assert [x[0], x[2], x[3]] == ["foo", b"bar", b"vo/ing/"]
|
|
||||||
|
|
||||||
x = modifyheaders.parse_modify_spec("/bar/voing")
|
|
||||||
assert [x[0], x[2], x[3]] == [".*", b"bar", b"voing"]
|
|
||||||
|
|
||||||
with pytest.raises(Exception, match="Invalid number of parameters"):
|
|
||||||
modifyheaders.parse_modify_spec("/")
|
|
||||||
|
|
||||||
with pytest.raises(Exception, match="Invalid filter pattern"):
|
|
||||||
modifyheaders.parse_modify_spec("/~b/one/two")
|
|
||||||
|
|
||||||
with pytest.raises(Exception, match="Invalid file path"):
|
|
||||||
modifyheaders.parse_modify_spec("/~q/foo/@nonexistent")
|
|
||||||
|
|
||||||
def test_configure(self):
|
def test_configure(self):
|
||||||
mh = modifyheaders.ModifyHeaders()
|
mh = ModifyHeaders()
|
||||||
with taddons.context(mh) as tctx:
|
with taddons.context(mh) as tctx:
|
||||||
with pytest.raises(Exception, match="Cannot parse modify_headers .* Invalid number"):
|
with pytest.raises(Exception, match="Cannot parse modify_headers"):
|
||||||
tctx.configure(mh, modify_headers = ["/"])
|
tctx.configure(mh, modify_headers=["/"])
|
||||||
|
tctx.configure(mh, modify_headers=["/foo/bar/voing"])
|
||||||
with pytest.raises(Exception, match="Cannot parse modify_headers .* Invalid filter"):
|
|
||||||
tctx.configure(mh, modify_headers = ["/~b/one/two"])
|
|
||||||
|
|
||||||
with pytest.raises(Exception, match="Cannot parse modify_headers .* Invalid file"):
|
|
||||||
tctx.configure(mh, modify_headers = ["/~q/foo/@nonexistent"])
|
|
||||||
|
|
||||||
tctx.configure(mh, modify_headers = ["/foo/bar/voing"])
|
|
||||||
|
|
||||||
def test_modify_headers(self):
|
def test_modify_headers(self):
|
||||||
mh = modifyheaders.ModifyHeaders()
|
mh = ModifyHeaders()
|
||||||
with taddons.context(mh) as tctx:
|
with taddons.context(mh) as tctx:
|
||||||
tctx.configure(
|
tctx.configure(
|
||||||
mh,
|
mh,
|
||||||
modify_headers = [
|
modify_headers=[
|
||||||
"/~q/one/two",
|
"/~q/one/two",
|
||||||
"/~s/one/three"
|
"/~s/one/three"
|
||||||
]
|
]
|
||||||
@ -62,7 +66,7 @@ class TestModifyHeaders:
|
|||||||
|
|
||||||
tctx.configure(
|
tctx.configure(
|
||||||
mh,
|
mh,
|
||||||
modify_headers = [
|
modify_headers=[
|
||||||
"/~s/one/two",
|
"/~s/one/two",
|
||||||
"/~s/one/three"
|
"/~s/one/three"
|
||||||
]
|
]
|
||||||
@ -75,7 +79,7 @@ class TestModifyHeaders:
|
|||||||
|
|
||||||
tctx.configure(
|
tctx.configure(
|
||||||
mh,
|
mh,
|
||||||
modify_headers = [
|
modify_headers=[
|
||||||
"/~q/one/two",
|
"/~q/one/two",
|
||||||
"/~q/one/three"
|
"/~q/one/three"
|
||||||
]
|
]
|
||||||
@ -88,7 +92,7 @@ class TestModifyHeaders:
|
|||||||
# test removal of existing headers
|
# test removal of existing headers
|
||||||
tctx.configure(
|
tctx.configure(
|
||||||
mh,
|
mh,
|
||||||
modify_headers = [
|
modify_headers=[
|
||||||
"/~q/one/",
|
"/~q/one/",
|
||||||
"/~s/one/"
|
"/~s/one/"
|
||||||
]
|
]
|
||||||
@ -105,7 +109,7 @@ class TestModifyHeaders:
|
|||||||
|
|
||||||
tctx.configure(
|
tctx.configure(
|
||||||
mh,
|
mh,
|
||||||
modify_headers = [
|
modify_headers=[
|
||||||
"/one/"
|
"/one/"
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
@ -122,7 +126,7 @@ class TestModifyHeaders:
|
|||||||
|
|
||||||
class TestModifyHeadersFile:
|
class TestModifyHeadersFile:
|
||||||
def test_simple(self, tmpdir):
|
def test_simple(self, tmpdir):
|
||||||
mh = modifyheaders.ModifyHeaders()
|
mh = ModifyHeaders()
|
||||||
with taddons.context(mh) as tctx:
|
with taddons.context(mh) as tctx:
|
||||||
tmpfile = tmpdir.join("replacement")
|
tmpfile = tmpdir.join("replacement")
|
||||||
tmpfile.write("two")
|
tmpfile.write("two")
|
||||||
@ -137,7 +141,7 @@ class TestModifyHeadersFile:
|
|||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_nonexistent(self, tmpdir):
|
async def test_nonexistent(self, tmpdir):
|
||||||
mh = modifyheaders.ModifyHeaders()
|
mh = ModifyHeaders()
|
||||||
with taddons.context(mh) as tctx:
|
with taddons.context(mh) as tctx:
|
||||||
with pytest.raises(Exception, match="Cannot parse modify_headers .* Invalid file path"):
|
with pytest.raises(Exception, match="Cannot parse modify_headers .* Invalid file path"):
|
||||||
tctx.configure(
|
tctx.configure(
|
||||||
|
Loading…
Reference in New Issue
Block a user