mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-12-04 04:37:15 +00:00
Script cleanup: editing in console, Python3 compatibility fixes
This commit is contained in:
parent
b94f5fd361
commit
deffed2196
@ -6,11 +6,12 @@ import re
|
|||||||
|
|
||||||
import urwid
|
import urwid
|
||||||
|
|
||||||
|
from mitmproxy import exceptions
|
||||||
from mitmproxy import filt
|
from mitmproxy import filt
|
||||||
from mitmproxy import script
|
from mitmproxy.builtins import script
|
||||||
from mitmproxy import utils
|
|
||||||
from mitmproxy.console import common
|
from mitmproxy.console import common
|
||||||
from mitmproxy.console import signals
|
from mitmproxy.console import signals
|
||||||
|
from netlib import strutils
|
||||||
from netlib.http import cookies
|
from netlib.http import cookies
|
||||||
from netlib.http import user_agents
|
from netlib.http import user_agents
|
||||||
|
|
||||||
@ -55,7 +56,7 @@ class TextColumn:
|
|||||||
o = editor.walker.get_current_value()
|
o = editor.walker.get_current_value()
|
||||||
if o is not None:
|
if o is not None:
|
||||||
n = editor.master.spawn_editor(o.encode("string-escape"))
|
n = editor.master.spawn_editor(o.encode("string-escape"))
|
||||||
n = utils.clean_hanging_newline(n)
|
n = strutils.clean_hanging_newline(n)
|
||||||
editor.walker.set_current_value(n, False)
|
editor.walker.set_current_value(n, False)
|
||||||
editor.walker._modified()
|
editor.walker._modified()
|
||||||
elif key in ["enter"]:
|
elif key in ["enter"]:
|
||||||
@ -643,8 +644,8 @@ class ScriptEditor(GridEditor):
|
|||||||
|
|
||||||
def is_error(self, col, val):
|
def is_error(self, col, val):
|
||||||
try:
|
try:
|
||||||
script.Script.parse_command(val)
|
script.parse_command(val)
|
||||||
except script.ScriptException as e:
|
except exceptions.AddonError as e:
|
||||||
return str(e)
|
return str(e)
|
||||||
|
|
||||||
|
|
||||||
|
@ -248,23 +248,6 @@ class ConsoleMaster(flow.FlowMaster):
|
|||||||
if options.server_replay:
|
if options.server_replay:
|
||||||
self.server_playback_path(options.server_replay)
|
self.server_playback_path(options.server_replay)
|
||||||
|
|
||||||
if options.scripts:
|
|
||||||
for i in options.scripts:
|
|
||||||
try:
|
|
||||||
self.load_script(i)
|
|
||||||
except exceptions.ScriptException as e:
|
|
||||||
print("Script load error: {}".format(e), file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
if options.outfile:
|
|
||||||
err = self.start_stream_to_path(
|
|
||||||
options.outfile[0],
|
|
||||||
options.outfile[1]
|
|
||||||
)
|
|
||||||
if err:
|
|
||||||
print("Stream file error: {}".format(err), file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
self.view_stack = []
|
self.view_stack = []
|
||||||
|
|
||||||
if options.app:
|
if options.app:
|
||||||
@ -685,20 +668,7 @@ class ConsoleMaster(flow.FlowMaster):
|
|||||||
self.refresh_focus()
|
self.refresh_focus()
|
||||||
|
|
||||||
def edit_scripts(self, scripts):
|
def edit_scripts(self, scripts):
|
||||||
commands = [x[0] for x in scripts] # remove outer array
|
self.options.scripts = [x[0] for x in scripts]
|
||||||
if commands == [s.command for s in self.scripts]:
|
|
||||||
return
|
|
||||||
|
|
||||||
self.unload_scripts()
|
|
||||||
for command in commands:
|
|
||||||
try:
|
|
||||||
self.load_script(command)
|
|
||||||
except exceptions.ScriptException as e:
|
|
||||||
signals.status_message.send(
|
|
||||||
message='Error loading "{}".'.format(command)
|
|
||||||
)
|
|
||||||
signals.add_event('Error loading "{}":\n{}'.format(command, e), "error")
|
|
||||||
signals.update_settings.send(self)
|
|
||||||
|
|
||||||
def stop_client_playback_prompt(self, a):
|
def stop_client_playback_prompt(self, a):
|
||||||
if a != "n":
|
if a != "n":
|
||||||
|
@ -54,7 +54,7 @@ class Options(urwid.WidgetWrap):
|
|||||||
select.Option(
|
select.Option(
|
||||||
"Scripts",
|
"Scripts",
|
||||||
"S",
|
"S",
|
||||||
lambda: master.scripts,
|
lambda: master.options.scripts,
|
||||||
self.scripts
|
self.scripts
|
||||||
),
|
),
|
||||||
|
|
||||||
@ -160,12 +160,14 @@ class Options(urwid.WidgetWrap):
|
|||||||
self.master.replacehooks.clear()
|
self.master.replacehooks.clear()
|
||||||
self.master.set_ignore_filter([])
|
self.master.set_ignore_filter([])
|
||||||
self.master.set_tcp_filter([])
|
self.master.set_tcp_filter([])
|
||||||
self.master.scripts = []
|
|
||||||
|
|
||||||
self.master.options.anticache = False
|
self.master.options.update(
|
||||||
self.master.options.anticomp = False
|
scripts = [],
|
||||||
self.master.options.stickyauth = None
|
anticache = False,
|
||||||
self.master.options.stickycookie = None
|
anticomp = False,
|
||||||
|
stickyauth = None,
|
||||||
|
stickycookie = None
|
||||||
|
)
|
||||||
|
|
||||||
self.master.state.default_body_view = contentviews.get("Auto")
|
self.master.state.default_body_view = contentviews.get("Auto")
|
||||||
|
|
||||||
@ -234,7 +236,7 @@ class Options(urwid.WidgetWrap):
|
|||||||
self.master.view_grideditor(
|
self.master.view_grideditor(
|
||||||
grideditor.ScriptEditor(
|
grideditor.ScriptEditor(
|
||||||
self.master,
|
self.master,
|
||||||
[[i.command] for i in self.master.scripts],
|
[[i] for i in self.master.options.scripts],
|
||||||
self.master.edit_scripts
|
self.master.edit_scripts
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
@ -218,14 +218,13 @@ class StatusBar(urwid.WidgetWrap):
|
|||||||
dst.address.host,
|
dst.address.host,
|
||||||
dst.address.port
|
dst.address.port
|
||||||
))
|
))
|
||||||
if self.master.scripts:
|
if self.master.options.scripts:
|
||||||
r.append("[")
|
r.append("[")
|
||||||
r.append(("heading_key", "s"))
|
r.append(("heading_key", "s"))
|
||||||
r.append("cripts:%s]" % len(self.master.scripts))
|
r.append("cripts:%s]" % len(self.master.options.scripts))
|
||||||
# r.append("[lt:%0.3f]"%self.master.looptime)
|
|
||||||
|
|
||||||
if self.master.stream:
|
if self.master.options.outfile:
|
||||||
r.append("[W:%s]" % self.master.stream_path)
|
r.append("[W:%s]" % self.master.outfile[0])
|
||||||
|
|
||||||
return r
|
return r
|
||||||
|
|
||||||
|
@ -1,11 +1,10 @@
|
|||||||
from test.mitmproxy import tutils
|
from test.mitmproxy import tutils, mastertest
|
||||||
from mitmproxy import controller
|
from mitmproxy import controller
|
||||||
from mitmproxy.builtins import script
|
from mitmproxy.builtins import script
|
||||||
from mitmproxy import options
|
from mitmproxy import options
|
||||||
from mitmproxy.flow import master
|
from mitmproxy.flow import master
|
||||||
from mitmproxy.flow import state
|
from mitmproxy.flow import state
|
||||||
import time
|
import time
|
||||||
from .. import mastertest, tutils
|
|
||||||
|
|
||||||
|
|
||||||
class Thing:
|
class Thing:
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
import json
|
import json
|
||||||
|
|
||||||
|
import sys
|
||||||
import os.path
|
import os.path
|
||||||
from mitmproxy.flow import master
|
from mitmproxy.flow import master
|
||||||
from mitmproxy.flow import state
|
from mitmproxy.flow import state
|
||||||
@ -42,19 +43,19 @@ class TestScripts(mastertest.MasterTest):
|
|||||||
def test_custom_contentviews(self):
|
def test_custom_contentviews(self):
|
||||||
m, sc = tscript("custom_contentviews.py")
|
m, sc = tscript("custom_contentviews.py")
|
||||||
pig = contentviews.get("pig_latin_HTML")
|
pig = contentviews.get("pig_latin_HTML")
|
||||||
_, fmt = pig("<html>test!</html>")
|
_, fmt = pig(b"<html>test!</html>")
|
||||||
assert any('esttay!' in val[0][1] for val in fmt)
|
assert any(b'esttay!' in val[0][1] for val in fmt)
|
||||||
assert not pig("gobbledygook")
|
assert not pig(b"gobbledygook")
|
||||||
|
|
||||||
def test_iframe_injector(self):
|
def test_iframe_injector(self):
|
||||||
with tutils.raises(ScriptError):
|
with tutils.raises(ScriptError):
|
||||||
tscript("iframe_injector.py")
|
tscript("iframe_injector.py")
|
||||||
|
|
||||||
m, sc = tscript("iframe_injector.py", "http://example.org/evil_iframe")
|
m, sc = tscript("iframe_injector.py", "http://example.org/evil_iframe")
|
||||||
flow = tutils.tflow(resp=netutils.tresp(content="<html>mitmproxy</html>"))
|
flow = tutils.tflow(resp=netutils.tresp(content=b"<html>mitmproxy</html>"))
|
||||||
self.invoke(m, "response", flow)
|
self.invoke(m, "response", flow)
|
||||||
content = flow.response.content
|
content = flow.response.content
|
||||||
assert 'iframe' in content and 'evil_iframe' in content
|
assert b'iframe' in content and b'evil_iframe' in content
|
||||||
|
|
||||||
def test_modify_form(self):
|
def test_modify_form(self):
|
||||||
m, sc = tscript("modify_form.py")
|
m, sc = tscript("modify_form.py")
|
||||||
@ -63,11 +64,11 @@ class TestScripts(mastertest.MasterTest):
|
|||||||
f = tutils.tflow(req=netutils.treq(headers=form_header))
|
f = tutils.tflow(req=netutils.treq(headers=form_header))
|
||||||
self.invoke(m, "request", f)
|
self.invoke(m, "request", f)
|
||||||
|
|
||||||
assert f.request.urlencoded_form["mitmproxy"] == "rocks"
|
assert f.request.urlencoded_form[b"mitmproxy"] == b"rocks"
|
||||||
|
|
||||||
f.request.headers["content-type"] = ""
|
f.request.headers["content-type"] = ""
|
||||||
self.invoke(m, "request", f)
|
self.invoke(m, "request", f)
|
||||||
assert list(f.request.urlencoded_form.items()) == [("foo", "bar")]
|
assert list(f.request.urlencoded_form.items()) == [(b"foo", b"bar")]
|
||||||
|
|
||||||
def test_modify_querystring(self):
|
def test_modify_querystring(self):
|
||||||
m, sc = tscript("modify_querystring.py")
|
m, sc = tscript("modify_querystring.py")
|
||||||
@ -85,9 +86,9 @@ class TestScripts(mastertest.MasterTest):
|
|||||||
tscript("modify_response_body.py")
|
tscript("modify_response_body.py")
|
||||||
|
|
||||||
m, sc = tscript("modify_response_body.py", "mitmproxy rocks")
|
m, sc = tscript("modify_response_body.py", "mitmproxy rocks")
|
||||||
f = tutils.tflow(resp=netutils.tresp(content="I <3 mitmproxy"))
|
f = tutils.tflow(resp=netutils.tresp(content=b"I <3 mitmproxy"))
|
||||||
self.invoke(m, "response", f)
|
self.invoke(m, "response", f)
|
||||||
assert f.response.content == "I <3 rocks"
|
assert f.response.content == b"I <3 rocks"
|
||||||
|
|
||||||
def test_redirect_requests(self):
|
def test_redirect_requests(self):
|
||||||
m, sc = tscript("redirect_requests.py")
|
m, sc = tscript("redirect_requests.py")
|
||||||
@ -96,6 +97,11 @@ class TestScripts(mastertest.MasterTest):
|
|||||||
assert f.request.host == "mitmproxy.org"
|
assert f.request.host == "mitmproxy.org"
|
||||||
|
|
||||||
def test_har_extractor(self):
|
def test_har_extractor(self):
|
||||||
|
if sys.version_info >= (3, 0):
|
||||||
|
with tutils.raises("does not work on Python 3"):
|
||||||
|
tscript("har_extractor.py")
|
||||||
|
return
|
||||||
|
|
||||||
with tutils.raises(ScriptError):
|
with tutils.raises(ScriptError):
|
||||||
tscript("har_extractor.py")
|
tscript("har_extractor.py")
|
||||||
|
|
||||||
|
@ -293,7 +293,7 @@ class TestHTTP(tservers.HTTPProxyTest, CommonMixin, AppMixin):
|
|||||||
)
|
)
|
||||||
self.master.addons.add(s)
|
self.master.addons.add(s)
|
||||||
d = self.pathod('200:b"foo"')
|
d = self.pathod('200:b"foo"')
|
||||||
assert d.content == "bar"
|
assert d.content == b"bar"
|
||||||
self.master.addons.remove(s)
|
self.master.addons.remove(s)
|
||||||
|
|
||||||
|
|
||||||
@ -523,7 +523,7 @@ class TestTransparent(tservers.TransparentProxyTest, CommonMixin, TcpMixin):
|
|||||||
self._tcpproxy_on()
|
self._tcpproxy_on()
|
||||||
d = self.pathod('200:b"foo"')
|
d = self.pathod('200:b"foo"')
|
||||||
self._tcpproxy_off()
|
self._tcpproxy_off()
|
||||||
assert d.content == "bar"
|
assert d.content == b"bar"
|
||||||
self.master.addons.remove(s)
|
self.master.addons.remove(s)
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user