mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-30 03:14:22 +00:00
Merge pull request #1842 from cortesi/optsave
Save options to file from console & related work
This commit is contained in:
commit
5cf268b012
@ -212,7 +212,7 @@ class OptManager(metaclass=_DefaultsMeta):
|
||||
if not text:
|
||||
return {}
|
||||
try:
|
||||
data = ruamel.yaml.load(text, ruamel.yaml.Loader)
|
||||
data = ruamel.yaml.load(text, ruamel.yaml.RoundTripLoader)
|
||||
except ruamel.yaml.error.YAMLError as v:
|
||||
snip = v.problem_mark.get_snippet()
|
||||
raise exceptions.OptionsError(
|
||||
|
@ -150,9 +150,6 @@ def save_data(path, data):
|
||||
|
||||
|
||||
def ask_save_overwrite(path, data):
|
||||
if not path:
|
||||
return
|
||||
path = os.path.expanduser(path)
|
||||
if os.path.exists(path):
|
||||
def save_overwrite(k):
|
||||
if k == "y":
|
||||
|
@ -5,8 +5,8 @@ from mitmproxy import flowfilter
|
||||
from mitmproxy.addons import script
|
||||
from mitmproxy.tools.console import common
|
||||
from mitmproxy.tools.console.grideditor import base
|
||||
from mitmproxy.tools.console.grideditor import col_bytes
|
||||
from mitmproxy.tools.console.grideditor import col_text
|
||||
from mitmproxy.tools.console.grideditor import col_bytes
|
||||
from mitmproxy.tools.console.grideditor import col_subgrid
|
||||
from mitmproxy.tools.console import signals
|
||||
from mitmproxy.net.http import user_agents
|
||||
@ -74,8 +74,8 @@ class ReplaceEditor(base.GridEditor):
|
||||
title = "Editing replacement patterns"
|
||||
columns = [
|
||||
col_text.Column("Filter"),
|
||||
col_bytes.Column("Regex"),
|
||||
col_bytes.Column("Replacement"),
|
||||
col_text.Column("Regex"),
|
||||
col_text.Column("Replacement"),
|
||||
]
|
||||
|
||||
def is_error(self, col, val):
|
||||
@ -94,8 +94,8 @@ class SetHeadersEditor(base.GridEditor):
|
||||
title = "Editing header set patterns"
|
||||
columns = [
|
||||
col_text.Column("Filter"),
|
||||
col_bytes.Column("Header"),
|
||||
col_bytes.Column("Value"),
|
||||
col_text.Column("Header"),
|
||||
col_text.Column("Value"),
|
||||
]
|
||||
|
||||
def is_error(self, col, val):
|
||||
|
@ -386,17 +386,10 @@ class ConsoleMaster(master.Master):
|
||||
)
|
||||
|
||||
def _write_flows(self, path, flows):
|
||||
if not path:
|
||||
return
|
||||
path = os.path.expanduser(path)
|
||||
try:
|
||||
f = open(path, "wb")
|
||||
with open(path, "wb") as f:
|
||||
fw = io.FlowWriter(f)
|
||||
for i in flows:
|
||||
fw.add(i)
|
||||
f.close()
|
||||
except IOError as v:
|
||||
signals.status_message.send(message=v.strerror)
|
||||
|
||||
def save_one_flow(self, path, flow):
|
||||
return self._write_flows(path, [flow])
|
||||
@ -405,8 +398,6 @@ class ConsoleMaster(master.Master):
|
||||
return self._write_flows(path, self.view)
|
||||
|
||||
def load_flows_callback(self, path):
|
||||
if not path:
|
||||
return
|
||||
ret = self.load_flows_path(path)
|
||||
return ret or "Flows loaded from %s" % path
|
||||
|
||||
|
@ -9,6 +9,7 @@ from mitmproxy.tools.console import signals
|
||||
footer = [
|
||||
('heading_key', "enter/space"), ":toggle ",
|
||||
('heading_key', "C"), ":clear all ",
|
||||
('heading_key', "W"), ":save ",
|
||||
]
|
||||
|
||||
|
||||
@ -17,6 +18,7 @@ def _mkhelp():
|
||||
keys = [
|
||||
("enter/space", "activate option"),
|
||||
("C", "clear all options"),
|
||||
("w", "save options"),
|
||||
]
|
||||
text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
|
||||
return text
|
||||
@ -162,8 +164,21 @@ class Options(urwid.WidgetWrap):
|
||||
if key == "C":
|
||||
self.clearall()
|
||||
return None
|
||||
if key == "W":
|
||||
self.save()
|
||||
return None
|
||||
return super().keypress(size, key)
|
||||
|
||||
def do_save(self, path):
|
||||
self.master.options.save(path)
|
||||
return "Saved"
|
||||
|
||||
def save(self):
|
||||
signals.status_prompt_path.send(
|
||||
prompt = "Save options to file",
|
||||
callback = self.do_save
|
||||
)
|
||||
|
||||
def clearall(self):
|
||||
self.master.options.reset()
|
||||
signals.update_settings.send(self)
|
||||
|
@ -57,8 +57,8 @@ class _PathCompleter:
|
||||
|
||||
class PathEdit(urwid.Edit, _PathCompleter):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
urwid.Edit.__init__(self, *args, **kwargs)
|
||||
def __init__(self, prompt, last_path):
|
||||
urwid.Edit.__init__(self, prompt, last_path)
|
||||
_PathCompleter.__init__(self)
|
||||
|
||||
def keypress(self, size, key):
|
||||
|
@ -9,6 +9,28 @@ from mitmproxy.tools.console import signals
|
||||
from mitmproxy.utils import human
|
||||
|
||||
|
||||
class PromptPath:
|
||||
def __init__(self, callback, args):
|
||||
self.callback, self.args = callback, args
|
||||
|
||||
def __call__(self, pth):
|
||||
if not pth:
|
||||
return
|
||||
pth = os.path.expanduser(pth)
|
||||
try:
|
||||
return self.callback(pth, *self.args)
|
||||
except IOError as v:
|
||||
signals.status_message.send(message=v.strerror)
|
||||
|
||||
|
||||
class PromptStub:
|
||||
def __init__(self, callback, args):
|
||||
self.callback, self.args = callback, args
|
||||
|
||||
def __call__(self, txt):
|
||||
return self.callback(txt, *self.args)
|
||||
|
||||
|
||||
class ActionBar(urwid.WidgetWrap):
|
||||
|
||||
def __init__(self):
|
||||
@ -21,7 +43,8 @@ class ActionBar(urwid.WidgetWrap):
|
||||
|
||||
self.last_path = ""
|
||||
|
||||
self.prompting = False
|
||||
self.prompting = None
|
||||
|
||||
self.onekey = False
|
||||
self.pathprompt = False
|
||||
|
||||
@ -42,7 +65,7 @@ class ActionBar(urwid.WidgetWrap):
|
||||
def sig_prompt(self, sender, prompt, text, callback, args=()):
|
||||
signals.focus.send(self, section="footer")
|
||||
self._w = urwid.Edit(self.prep_prompt(prompt), text or "")
|
||||
self.prompting = (callback, args)
|
||||
self.prompting = PromptStub(callback, args)
|
||||
|
||||
def sig_path_prompt(self, sender, prompt, callback, args=()):
|
||||
signals.focus.send(self, section="footer")
|
||||
@ -51,7 +74,7 @@ class ActionBar(urwid.WidgetWrap):
|
||||
os.path.dirname(self.last_path)
|
||||
)
|
||||
self.pathprompt = True
|
||||
self.prompting = (callback, args)
|
||||
self.prompting = PromptPath(callback, args)
|
||||
|
||||
def sig_prompt_onekey(self, sender, prompt, keys, callback, args=()):
|
||||
"""
|
||||
@ -69,7 +92,7 @@ class ActionBar(urwid.WidgetWrap):
|
||||
prompt.append(")? ")
|
||||
self.onekey = set(i[1] for i in keys)
|
||||
self._w = urwid.Edit(prompt, "")
|
||||
self.prompting = (callback, args)
|
||||
self.prompting = PromptStub(callback, args)
|
||||
|
||||
def selectable(self):
|
||||
return True
|
||||
@ -93,10 +116,10 @@ class ActionBar(urwid.WidgetWrap):
|
||||
|
||||
def clear(self):
|
||||
self._w = urwid.Text("")
|
||||
self.prompting = False
|
||||
self.prompting = None
|
||||
|
||||
def prompt_done(self):
|
||||
self.prompting = False
|
||||
self.prompting = None
|
||||
self.onekey = False
|
||||
self.pathprompt = False
|
||||
signals.status_message.send(message="")
|
||||
@ -105,9 +128,9 @@ class ActionBar(urwid.WidgetWrap):
|
||||
def prompt_execute(self, txt):
|
||||
if self.pathprompt:
|
||||
self.last_path = txt
|
||||
p, args = self.prompting
|
||||
p = self.prompting
|
||||
self.prompt_done()
|
||||
msg = p(txt, *args)
|
||||
msg = p(txt)
|
||||
if msg:
|
||||
signals.status_message.send(message=msg, expire=1)
|
||||
|
||||
|
@ -21,7 +21,7 @@ def check_type(attr_name: str, value: typing.Any, typeinfo: type) -> None:
|
||||
type(value)
|
||||
))
|
||||
|
||||
if isinstance(typeinfo, typing.UnionMeta):
|
||||
if typeinfo.__qualname__ == "Union":
|
||||
for T in typeinfo.__union_params__:
|
||||
try:
|
||||
check_type(attr_name, value, T)
|
||||
@ -30,18 +30,24 @@ def check_type(attr_name: str, value: typing.Any, typeinfo: type) -> None:
|
||||
else:
|
||||
return
|
||||
raise e
|
||||
if isinstance(typeinfo, typing.TupleMeta):
|
||||
check_type(attr_name, value, tuple)
|
||||
elif typeinfo.__qualname__ == "Tuple":
|
||||
if not isinstance(value, (tuple, list)):
|
||||
raise e
|
||||
if len(typeinfo.__tuple_params__) != len(value):
|
||||
raise e
|
||||
for i, (x, T) in enumerate(zip(value, typeinfo.__tuple_params__)):
|
||||
check_type("{}[{}]".format(attr_name, i), x, T)
|
||||
return
|
||||
if issubclass(typeinfo, typing.IO):
|
||||
elif typeinfo.__qualname__ == "Sequence":
|
||||
T = typeinfo.__args__[0]
|
||||
if not isinstance(value, (tuple, list)):
|
||||
raise e
|
||||
for v in value:
|
||||
check_type(attr_name, v, T)
|
||||
elif typeinfo.__qualname__ == "IO":
|
||||
if hasattr(value, "read"):
|
||||
return
|
||||
|
||||
if not isinstance(value, typeinfo):
|
||||
elif not isinstance(value, typeinfo):
|
||||
raise e
|
||||
|
||||
|
||||
|
@ -54,7 +54,7 @@ class TestPathEdit:
|
||||
|
||||
def test_keypress(self):
|
||||
|
||||
pe = pathedit.PathEdit()
|
||||
pe = pathedit.PathEdit("", "")
|
||||
|
||||
with patch('urwid.widget.Edit.get_edit_text') as get_text, \
|
||||
patch('urwid.widget.Edit.set_edit_text') as set_text:
|
||||
|
@ -26,6 +26,8 @@ def test_check_type():
|
||||
typecheck.check_type("foo", 42, str)
|
||||
with pytest.raises(TypeError):
|
||||
typecheck.check_type("foo", None, str)
|
||||
with pytest.raises(TypeError):
|
||||
typecheck.check_type("foo", b"foo", str)
|
||||
|
||||
|
||||
def test_check_union():
|
||||
@ -44,5 +46,14 @@ def test_check_tuple():
|
||||
typecheck.check_type("foo", (42, 42), typing.Tuple[int, str])
|
||||
with pytest.raises(TypeError):
|
||||
typecheck.check_type("foo", ("42", 42), typing.Tuple[int, str])
|
||||
|
||||
typecheck.check_type("foo", (42, "42"), typing.Tuple[int, str])
|
||||
|
||||
|
||||
def test_check_sequence():
|
||||
typecheck.check_type("foo", [10], typing.Sequence[int])
|
||||
with pytest.raises(TypeError):
|
||||
typecheck.check_type("foo", ["foo"], typing.Sequence[int])
|
||||
with pytest.raises(TypeError):
|
||||
typecheck.check_type("foo", [10, "foo"], typing.Sequence[int])
|
||||
with pytest.raises(TypeError):
|
||||
typecheck.check_type("foo", [b"foo"], typing.Sequence[str])
|
||||
|
Loading…
Reference in New Issue
Block a user