mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-22 15:37:45 +00:00
replacements addon: rename to ModifyBody
This commit is contained in:
parent
96e756ead0
commit
b263b0dece
@ -11,9 +11,9 @@ from mitmproxy.addons import disable_h2c
|
||||
from mitmproxy.addons import export
|
||||
from mitmproxy.addons import onboarding
|
||||
from mitmproxy.addons import proxyauth
|
||||
from mitmproxy.addons import replace
|
||||
from mitmproxy.addons import script
|
||||
from mitmproxy.addons import serverplayback
|
||||
from mitmproxy.addons import modifybody
|
||||
from mitmproxy.addons import modifyheaders
|
||||
from mitmproxy.addons import stickyauth
|
||||
from mitmproxy.addons import stickycookie
|
||||
@ -37,9 +37,9 @@ def default_addons():
|
||||
export.Export(),
|
||||
onboarding.Onboarding(),
|
||||
proxyauth.ProxyAuth(),
|
||||
replace.Replace(),
|
||||
script.ScriptLoader(),
|
||||
serverplayback.ServerPlayback(),
|
||||
modifybody.ModifyBody(),
|
||||
modifyheaders.ModifyHeaders(),
|
||||
stickyauth.StickyAuth(),
|
||||
stickycookie.StickyCookie(),
|
||||
|
@ -8,11 +8,11 @@ from mitmproxy import ctx
|
||||
from mitmproxy.utils import strutils
|
||||
|
||||
|
||||
def parse_replacements(s):
|
||||
def parse_modify_body(s):
|
||||
"""
|
||||
Returns a (flow_filter, regex, replacement) tuple.
|
||||
|
||||
The general form for a replacements hook is as follows:
|
||||
The general form for a modify_body hook is as follows:
|
||||
|
||||
[/flow_filter]/regex/replacement
|
||||
|
||||
@ -40,41 +40,42 @@ def parse_replacements(s):
|
||||
flow_filter, regex, repl = parts
|
||||
else:
|
||||
raise exceptions.OptionsError(
|
||||
"Invalid replacements specifier: %s" % s
|
||||
"Invalid modify_body specifier: %s" % s
|
||||
)
|
||||
return flow_filter, regex, repl
|
||||
|
||||
|
||||
class Replace:
|
||||
class ModifyBody:
|
||||
def __init__(self):
|
||||
self.lst = []
|
||||
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
"replacements", typing.Sequence[str], [],
|
||||
"modify_body", typing.Sequence[str], [],
|
||||
"""
|
||||
Replacement pattern of the form "[/flow-filter]/regex/replacement", where
|
||||
the separator can be any character.
|
||||
Replacement pattern of the form "[/flow-filter]/regex/[@]replacement", where
|
||||
the separator can be any character. The @ allows to provide a file path that
|
||||
is used to read the replacement string.
|
||||
"""
|
||||
)
|
||||
|
||||
def configure(self, updated):
|
||||
"""
|
||||
.replacements is a list of tuples (flow_filter_pattern, regex, repl):
|
||||
.modify_body is a list of tuples (flow_filter_pattern, regex, repl):
|
||||
|
||||
flow_filter_pattern: a string specifying a flow filter pattern.
|
||||
regex: a regular expression, as string.
|
||||
repl: the replacement string
|
||||
"""
|
||||
if "replacements" in updated:
|
||||
if "modify_body" in updated:
|
||||
lst = []
|
||||
for rep in ctx.options.replacements:
|
||||
flow_filter_pattern, regex, repl = parse_replacements(rep)
|
||||
for rep in ctx.options.modify_body:
|
||||
flow_filter_pattern, regex, repl = parse_modify_body(rep)
|
||||
|
||||
flow_filter = flowfilter.parse(flow_filter_pattern)
|
||||
if not flow_filter:
|
||||
raise exceptions.OptionsError(
|
||||
"Invalid replacements flow filter: %s" % flow_filter_pattern
|
||||
"Invalid modify_body flow filter: %s" % flow_filter_pattern
|
||||
)
|
||||
try:
|
||||
# We should ideally escape here before trying to compile
|
@ -75,8 +75,7 @@ def run(
|
||||
|
||||
# To make migration from 2.x to 3.0 bearable.
|
||||
if "-R" in sys.argv and sys.argv[sys.argv.index("-R") + 1].startswith("http"):
|
||||
print("-R is used for specifying replacements.\n"
|
||||
"To use mitmproxy in reverse mode please use --mode reverse:SPEC instead")
|
||||
print("To use mitmproxy in reverse mode please use --mode reverse:SPEC instead")
|
||||
|
||||
try:
|
||||
args = parser.parse_args(arguments)
|
||||
|
@ -81,9 +81,9 @@ def common_options(parser, opts):
|
||||
opts.make_parser(group, "server_replay_nopop")
|
||||
opts.make_parser(group, "server_replay_refresh")
|
||||
|
||||
# Replacements
|
||||
group = parser.add_argument_group("Replacements")
|
||||
opts.make_parser(group, "replacements", metavar="PATTERN", short="R")
|
||||
# Modify Body
|
||||
group = parser.add_argument_group("Modify Body")
|
||||
opts.make_parser(group, "modify_body", metavar="PATTERN", short="B")
|
||||
|
||||
# Modify headers
|
||||
group = parser.add_argument_group("Modify Headers")
|
||||
|
@ -210,8 +210,8 @@ class StatusBar(urwid.WidgetWrap):
|
||||
r.append("[")
|
||||
r.append(("heading_key", "H"))
|
||||
r.append("eaders]")
|
||||
if len(self.master.options.replacements):
|
||||
r.append("[%d replacements]" % len(self.master.options.replacements))
|
||||
if len(self.master.options.modify_body):
|
||||
r.append("[%d body modifications]" % len(self.master.options.modify_body))
|
||||
if creplay:
|
||||
r.append("[")
|
||||
r.append(("heading_key", "cplayback"))
|
||||
|
@ -55,6 +55,7 @@ REPLACED = """
|
||||
--insecure
|
||||
-c
|
||||
--replace
|
||||
--replacements
|
||||
-i
|
||||
-f
|
||||
--filter
|
||||
@ -97,6 +98,7 @@ REPLACEMENTS = {
|
||||
"--insecure": "--ssl-insecure",
|
||||
"-c": "-C",
|
||||
"--replace": "--replacements",
|
||||
"--replacements": ["--modify-body", "--modify-headers"],
|
||||
"-i": "--intercept",
|
||||
"-f": "--view-filter",
|
||||
"--filter": "--view-filter",
|
||||
@ -129,11 +131,15 @@ def check():
|
||||
|
||||
for option in REPLACED.splitlines():
|
||||
if option in args:
|
||||
if isinstance(REPLACEMENTS.get(option), list):
|
||||
new_options = REPLACEMENTS.get(option)
|
||||
else:
|
||||
new_options = [REPLACEMENTS.get(option)]
|
||||
print(
|
||||
"{} is deprecated.\n"
|
||||
"Please use `{}` instead.".format(
|
||||
option,
|
||||
REPLACEMENTS.get(option)
|
||||
"` or `".join(new_options)
|
||||
)
|
||||
)
|
||||
|
||||
|
@ -1,57 +1,57 @@
|
||||
import pytest
|
||||
|
||||
from mitmproxy.addons import replace
|
||||
from mitmproxy.addons import modifybody
|
||||
from mitmproxy.test import taddons
|
||||
from mitmproxy.test import tflow
|
||||
|
||||
|
||||
class TestReplace:
|
||||
def test_parse_replacements(self):
|
||||
x = replace.parse_replacements("/foo/bar/voing")
|
||||
class TestModifyBody:
|
||||
def test_parse_modify_body(self):
|
||||
x = modifybody.parse_modify_body("/foo/bar/voing")
|
||||
assert x == ("foo", "bar", "voing")
|
||||
x = replace.parse_replacements("/foo/bar/vo/ing/")
|
||||
x = modifybody.parse_modify_body("/foo/bar/vo/ing/")
|
||||
assert x == ("foo", "bar", "vo/ing/")
|
||||
x = replace.parse_replacements("/bar/voing")
|
||||
x = modifybody.parse_modify_body("/bar/voing")
|
||||
assert x == (".*", "bar", "voing")
|
||||
with pytest.raises(Exception, match="Invalid replacements"):
|
||||
replace.parse_replacements("/")
|
||||
with pytest.raises(Exception, match="Invalid modify_body specifier"):
|
||||
modifybody.parse_modify_body("/")
|
||||
|
||||
def test_configure(self):
|
||||
r = replace.Replace()
|
||||
with taddons.context(r) as tctx:
|
||||
tctx.configure(r, replacements=["one/two/three"])
|
||||
with pytest.raises(Exception, match="Invalid replacements flow filter"):
|
||||
tctx.configure(r, replacements=["/~b/two/three"])
|
||||
mb = modifybody.ModifyBody()
|
||||
with taddons.context(mb) as tctx:
|
||||
tctx.configure(mb, modify_body=["one/two/three"])
|
||||
with pytest.raises(Exception, match="Invalid modify_body flow filter"):
|
||||
tctx.configure(mb, modify_body=["/~b/two/three"])
|
||||
with pytest.raises(Exception, match="Invalid regular expression"):
|
||||
tctx.configure(r, replacements=["/foo/+/three"])
|
||||
tctx.configure(r, replacements=["/a/b/c/"])
|
||||
tctx.configure(mb, modify_body=["/foo/+/three"])
|
||||
tctx.configure(mb, modify_body=["/a/b/c/"])
|
||||
|
||||
def test_simple(self):
|
||||
r = replace.Replace()
|
||||
with taddons.context(r) as tctx:
|
||||
mb = modifybody.ModifyBody()
|
||||
with taddons.context(mb) as tctx:
|
||||
tctx.configure(
|
||||
r,
|
||||
replacements=[
|
||||
mb,
|
||||
modify_body=[
|
||||
"/~q/foo/bar",
|
||||
"/~s/foo/bar",
|
||||
]
|
||||
)
|
||||
f = tflow.tflow()
|
||||
f.request.content = b"foo"
|
||||
r.request(f)
|
||||
mb.request(f)
|
||||
assert f.request.content == b"bar"
|
||||
|
||||
f = tflow.tflow(resp=True)
|
||||
f.response.content = b"foo"
|
||||
r.response(f)
|
||||
mb.response(f)
|
||||
assert f.response.content == b"bar"
|
||||
|
||||
def test_order(self):
|
||||
r = replace.Replace()
|
||||
with taddons.context(r) as tctx:
|
||||
mb = modifybody.ModifyBody()
|
||||
with taddons.context(mb) as tctx:
|
||||
tctx.configure(
|
||||
r,
|
||||
replacements=[
|
||||
mb,
|
||||
modify_body=[
|
||||
"/foo/bar",
|
||||
"/bar/baz",
|
||||
"/foo/oh noes!",
|
||||
@ -60,43 +60,43 @@ class TestReplace:
|
||||
)
|
||||
f = tflow.tflow()
|
||||
f.request.content = b"foo"
|
||||
r.request(f)
|
||||
mb.request(f)
|
||||
assert f.request.content == b"baz"
|
||||
|
||||
|
||||
class TestReplaceFile:
|
||||
class TestModifyBodyFile:
|
||||
def test_simple(self, tmpdir):
|
||||
r = replace.Replace()
|
||||
with taddons.context(r) as tctx:
|
||||
mb = modifybody.ModifyBody()
|
||||
with taddons.context(mb) as tctx:
|
||||
tmpfile = tmpdir.join("replacement")
|
||||
tmpfile.write("bar")
|
||||
tctx.configure(
|
||||
r,
|
||||
replacements=["/~q/foo/@" + str(tmpfile)]
|
||||
mb,
|
||||
modify_body=["/~q/foo/@" + str(tmpfile)]
|
||||
)
|
||||
f = tflow.tflow()
|
||||
f.request.content = b"foo"
|
||||
r.request(f)
|
||||
mb.request(f)
|
||||
assert f.request.content == b"bar"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_nonexistent(self, tmpdir):
|
||||
r = replace.Replace()
|
||||
with taddons.context(r) as tctx:
|
||||
mb = modifybody.ModifyBody()
|
||||
with taddons.context(mb) as tctx:
|
||||
with pytest.raises(Exception, match="Invalid file path"):
|
||||
tctx.configure(
|
||||
r,
|
||||
replacements=["/~q/foo/@nonexistent"]
|
||||
mb,
|
||||
modify_body=["/~q/foo/@nonexistent"]
|
||||
)
|
||||
|
||||
tmpfile = tmpdir.join("replacement")
|
||||
tmpfile.write("bar")
|
||||
tctx.configure(
|
||||
r,
|
||||
replacements=["/~q/foo/@" + str(tmpfile)]
|
||||
mb,
|
||||
modify_body=["/~q/foo/@" + str(tmpfile)]
|
||||
)
|
||||
tmpfile.remove()
|
||||
f = tflow.tflow()
|
||||
f.request.content = b"foo"
|
||||
r.request(f)
|
||||
mb.request(f)
|
||||
assert await tctx.master.await_log("could not read")
|
@ -9,7 +9,7 @@ def test_statusbar(monkeypatch):
|
||||
m = master.ConsoleMaster(o)
|
||||
m.options.update(
|
||||
modify_headers=[":~q:foo:bar"],
|
||||
replacements=[":~q:foo:bar"],
|
||||
modify_body=[":~q:foo:bar"],
|
||||
ignore_hosts=["example.com", "example.org"],
|
||||
tcp_hosts=["example.tcp"],
|
||||
intercept="~q",
|
||||
|
Loading…
Reference in New Issue
Block a user