mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2025-02-01 15:55:28 +00:00
Merge pull request #2282 from cortesi/cmddec
Add a command.command decorator
This commit is contained in:
commit
a781bab7db
@ -153,6 +153,8 @@ class AddonManager:
|
||||
for a in traverse([addon]):
|
||||
name = _get_name(a)
|
||||
self.lookup[name] = a
|
||||
for a in traverse([addon]):
|
||||
self.master.commands.collect_commands(a)
|
||||
return addon
|
||||
|
||||
def add(self, *addons):
|
||||
|
@ -1,18 +1,18 @@
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import command
|
||||
|
||||
|
||||
class Core:
|
||||
@command.command("set")
|
||||
def set(self, spec: str) -> None:
|
||||
"""
|
||||
Set an option of the form "key[=value]". When the value is omitted,
|
||||
booleans are set to true, strings and integers are set to None (if
|
||||
permitted), and sequences are emptied.
|
||||
permitted), and sequences are emptied. Boolean values can be true,
|
||||
false or toggle.
|
||||
"""
|
||||
try:
|
||||
ctx.options.set(spec)
|
||||
except exceptions.OptionsError as e:
|
||||
raise exceptions.CommandError(e) from e
|
||||
|
||||
def load(self, l):
|
||||
l.add_command("set", self.set)
|
||||
|
@ -2,6 +2,7 @@ import inspect
|
||||
import typing
|
||||
import shlex
|
||||
import textwrap
|
||||
import functools
|
||||
|
||||
from mitmproxy.utils import typecheck
|
||||
from mitmproxy import exceptions
|
||||
@ -15,8 +16,10 @@ def typename(t: type, ret: bool) -> str:
|
||||
"""
|
||||
if t in (str, int, bool):
|
||||
return t.__name__
|
||||
if t == typing.Sequence[flow.Flow]:
|
||||
elif t == typing.Sequence[flow.Flow]:
|
||||
return "[flow]" if ret else "flowspec"
|
||||
elif t == flow.Flow:
|
||||
return "flow"
|
||||
else: # pragma: no cover
|
||||
raise NotImplementedError(t)
|
||||
|
||||
@ -72,6 +75,13 @@ class CommandManager:
|
||||
self.master = master
|
||||
self.commands = {}
|
||||
|
||||
def collect_commands(self, addon):
|
||||
for i in dir(addon):
|
||||
if not i.startswith("__"):
|
||||
o = getattr(addon, i)
|
||||
if hasattr(o, "command_path"):
|
||||
self.add(o.command_path, o)
|
||||
|
||||
def add(self, path: str, func: typing.Callable):
|
||||
self.commands[path] = Command(self, path, func)
|
||||
|
||||
@ -101,5 +111,22 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
|
||||
return spec
|
||||
elif argtype == typing.Sequence[flow.Flow]:
|
||||
return manager.call_args("console.resolve", [spec])
|
||||
elif argtype == flow.Flow:
|
||||
flows = manager.call_args("console.resolve", [spec])
|
||||
if len(flows) != 1:
|
||||
raise exceptions.CommandError(
|
||||
"Command requires one flow, specification matched %s." % len(flows)
|
||||
)
|
||||
return flows[0]
|
||||
else:
|
||||
raise exceptions.CommandError("Unsupported argument type: %s" % argtype)
|
||||
|
||||
|
||||
def command(path):
|
||||
def decorator(function):
|
||||
@functools.wraps(function)
|
||||
def wrapper(*args, **kwargs):
|
||||
return function(*args, **kwargs)
|
||||
wrapper.__dict__["command_path"] = path
|
||||
return wrapper
|
||||
return decorator
|
||||
|
@ -293,6 +293,8 @@ class OptManager:
|
||||
else:
|
||||
return None
|
||||
elif o.typespec == bool:
|
||||
if optstr == "toggle":
|
||||
return not o.current()
|
||||
if not optstr or optstr == "true":
|
||||
return True
|
||||
elif optstr == "false":
|
||||
|
@ -44,7 +44,7 @@ def common_options(parser, opts):
|
||||
help="""
|
||||
Set an option. When the value is omitted, booleans are set to true,
|
||||
strings and integers are set to None (if permitted), and sequences
|
||||
are emptied.
|
||||
are emptied. Boolean values can be true, false or toggle.
|
||||
"""
|
||||
)
|
||||
parser.add_argument(
|
||||
|
@ -70,9 +70,6 @@ class LogBufferBox(urwid.ListBox):
|
||||
self.set_focus(len(self.master.logbuffer) - 1)
|
||||
elif key == "g":
|
||||
self.set_focus(0)
|
||||
elif key == "F":
|
||||
o = self.master.options
|
||||
o.console_focus_follow = not o.console_focus_follow
|
||||
return urwid.ListBox.keypress(self, size, key)
|
||||
|
||||
|
||||
@ -106,9 +103,6 @@ class BodyPile(urwid.Pile):
|
||||
else:
|
||||
self.widget_list[1].header = self.inactive_header
|
||||
key = None
|
||||
elif key == "e":
|
||||
self.master.toggle_eventlog()
|
||||
key = None
|
||||
|
||||
# This is essentially a copypasta from urwid.Pile's keypress handler.
|
||||
# So much for "closed for modification, but open for extension".
|
||||
@ -139,19 +133,6 @@ class FlowItem(urwid.WidgetWrap):
|
||||
def selectable(self):
|
||||
return True
|
||||
|
||||
def save_flows_prompt(self, k):
|
||||
if k == "l":
|
||||
signals.status_prompt_path.send(
|
||||
prompt = "Save listed flows to",
|
||||
callback = self.master.save_flows
|
||||
)
|
||||
else:
|
||||
signals.status_prompt_path.send(
|
||||
prompt = "Save this flow to",
|
||||
callback = self.master.save_one_flow,
|
||||
args = (self.flow,)
|
||||
)
|
||||
|
||||
def server_replay_prompt(self, k):
|
||||
a = self.master.addons.get("serverplayback")
|
||||
if k == "a":
|
||||
@ -223,23 +204,10 @@ class FlowItem(urwid.WidgetWrap):
|
||||
self.flow.revert()
|
||||
signals.flowlist_change.send(self)
|
||||
signals.status_message.send(message="Reverted.")
|
||||
elif key == "w":
|
||||
signals.status_prompt_onekey.send(
|
||||
self,
|
||||
prompt = "Save",
|
||||
keys = (
|
||||
("listed flows", "l"),
|
||||
("this flow", "t"),
|
||||
),
|
||||
callback = self.save_flows_prompt,
|
||||
)
|
||||
elif key == "X":
|
||||
if self.flow.killable:
|
||||
self.flow.kill()
|
||||
self.master.view.update(self.flow)
|
||||
elif key == "enter":
|
||||
if self.flow.request:
|
||||
self.master.view_flow(self.flow)
|
||||
elif key == "|":
|
||||
signals.status_prompt_path.send(
|
||||
prompt = "Send flow to script",
|
||||
@ -362,20 +330,12 @@ class FlowListBox(urwid.ListBox):
|
||||
self.master.view.clear()
|
||||
elif key == "Z":
|
||||
self.master.view.clear_not_marked()
|
||||
elif key == "e":
|
||||
self.master.toggle_eventlog()
|
||||
elif key == "g":
|
||||
if len(self.master.view):
|
||||
self.master.view.focus.index = 0
|
||||
elif key == "G":
|
||||
if len(self.master.view):
|
||||
self.master.view.focus.index = len(self.master.view) - 1
|
||||
elif key == "f":
|
||||
signals.status_prompt.send(
|
||||
prompt = "Filter View",
|
||||
text = self.master.options.view_filter,
|
||||
callback = self.master.options.setter("view_filter")
|
||||
)
|
||||
elif key == "L":
|
||||
signals.status_prompt_path.send(
|
||||
self,
|
||||
@ -402,20 +362,5 @@ class FlowListBox(urwid.ListBox):
|
||||
keys = orders,
|
||||
callback = change_order
|
||||
)
|
||||
elif key == "F":
|
||||
o = self.master.options
|
||||
o.console_focus_follow = not o.console_focus_follow
|
||||
elif key == "v":
|
||||
val = not self.master.options.console_order_reversed
|
||||
self.master.options.console_order_reversed = val
|
||||
elif key == "W":
|
||||
if self.master.options.save_stream_file:
|
||||
self.master.options.save_stream_file = None
|
||||
else:
|
||||
signals.status_prompt_path.send(
|
||||
self,
|
||||
prompt="Stream flows to",
|
||||
callback= lambda path: self.master.options.update(save_stream_file=path)
|
||||
)
|
||||
else:
|
||||
return urwid.ListBox.keypress(self, size, key)
|
||||
|
@ -2,16 +2,29 @@ import typing
|
||||
from mitmproxy.tools.console import commandeditor
|
||||
|
||||
|
||||
contexts = {
|
||||
"commands",
|
||||
"flowlist",
|
||||
"flowview",
|
||||
"global",
|
||||
"grideditor",
|
||||
"help",
|
||||
"options",
|
||||
}
|
||||
|
||||
|
||||
class Keymap:
|
||||
def __init__(self, master):
|
||||
self.executor = commandeditor.CommandExecutor(master)
|
||||
self.keys = {}
|
||||
|
||||
def add(self, key: str, command: str, context: str = "") -> None:
|
||||
def add(self, key: str, command: str, context: str = "global") -> None:
|
||||
"""
|
||||
Add a key to the key map. If context is empty, it's considered to be
|
||||
a global binding.
|
||||
"""
|
||||
if context not in contexts:
|
||||
raise ValueError("Unsupported context: %s" % context)
|
||||
d = self.keys.setdefault(context, {})
|
||||
d[key] = command
|
||||
|
||||
@ -25,10 +38,8 @@ class Keymap:
|
||||
Returns the key if it has not been handled, or None.
|
||||
"""
|
||||
cmd = self.get(context, key)
|
||||
if not cmd:
|
||||
cmd = self.get("global", key)
|
||||
if cmd:
|
||||
return self.executor(cmd)
|
||||
if cmd != "":
|
||||
cmd = self.get("", key)
|
||||
if cmd:
|
||||
return self.executor(cmd)
|
||||
return key
|
||||
|
@ -14,9 +14,11 @@ import urwid
|
||||
|
||||
from mitmproxy import addons
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import command
|
||||
from mitmproxy import master
|
||||
from mitmproxy import io
|
||||
from mitmproxy import log
|
||||
from mitmproxy import flow
|
||||
from mitmproxy.addons import intercept
|
||||
from mitmproxy.addons import readfile
|
||||
from mitmproxy.addons import view
|
||||
@ -82,27 +84,43 @@ class ConsoleCommands:
|
||||
"""
|
||||
def __init__(self, master):
|
||||
self.master = master
|
||||
self.started = False
|
||||
|
||||
def command(self, partial: str) -> None:
|
||||
"""Prompt for a command."""
|
||||
@command.command("console.command")
|
||||
def console_command(self, partial: str) -> None:
|
||||
"""
|
||||
Prompt the user to edit a command with a (possilby empty) starting value.
|
||||
"""
|
||||
signals.status_prompt_command.send(partial=partial)
|
||||
|
||||
@command.command("console.view.commands")
|
||||
def view_commands(self) -> None:
|
||||
"""View the commands list."""
|
||||
self.master.view_commands()
|
||||
|
||||
@command.command("console.view.options")
|
||||
def view_options(self) -> None:
|
||||
"""View the options editor."""
|
||||
self.master.view_options()
|
||||
|
||||
@command.command("console.view.help")
|
||||
def view_help(self) -> None:
|
||||
"""View help."""
|
||||
self.master.view_help()
|
||||
|
||||
@command.command("console.view.flow")
|
||||
def view_flow(self, flow: flow.Flow) -> None:
|
||||
"""View a flow."""
|
||||
if hasattr(flow, "request"):
|
||||
# FIME: Also set focus?
|
||||
self.master.view_flow(flow)
|
||||
|
||||
@command.command("console.exit")
|
||||
def exit(self) -> None:
|
||||
"""Exit mitmproxy."""
|
||||
raise urwid.ExitMainLoop
|
||||
|
||||
@command.command("console.view.pop")
|
||||
def view_pop(self) -> None:
|
||||
"""
|
||||
Pop a view off the console stack. At the top level, this prompts the
|
||||
@ -110,13 +128,13 @@ class ConsoleCommands:
|
||||
"""
|
||||
signals.pop_view_state.send(self)
|
||||
|
||||
def load(self, l):
|
||||
l.add_command("console.command", self.command)
|
||||
l.add_command("console.exit", self.exit)
|
||||
l.add_command("console.view.commands", self.view_commands)
|
||||
l.add_command("console.view.help", self.view_help)
|
||||
l.add_command("console.view.options", self.view_options)
|
||||
l.add_command("console.view.pop", self.view_pop)
|
||||
def running(self):
|
||||
self.started = True
|
||||
|
||||
def configure(self, updated):
|
||||
if self.started:
|
||||
if "console_eventlog" in updated:
|
||||
self.master.refresh_view()
|
||||
|
||||
|
||||
def default_keymap(km):
|
||||
@ -127,6 +145,14 @@ def default_keymap(km):
|
||||
km.add("Q", "console.exit")
|
||||
km.add("q", "console.view.pop")
|
||||
km.add("i", "console.command 'set intercept='")
|
||||
km.add("W", "console.command 'set save_stream_file='")
|
||||
|
||||
km.add("F", "set console_focus_follow=toggle", context="flowlist")
|
||||
km.add("v", "set console_order_reversed=toggle", context="flowlist")
|
||||
km.add("f", "console.command 'set view_filter='", context="flowlist")
|
||||
km.add("e", "set console_eventlog=toggle", context="flowlist")
|
||||
km.add("w", "console.command 'save.file @shown '", context="flowlist")
|
||||
km.add("enter", "console.view.flow @focus", context="flowlist")
|
||||
|
||||
|
||||
class ConsoleMaster(master.Master):
|
||||
@ -212,7 +238,7 @@ class ConsoleMaster(master.Master):
|
||||
def sig_replace_view_state(self, sender):
|
||||
"""
|
||||
A view has been pushed onto the stack, and is intended to replace
|
||||
the current view rather tha creating a new stack entry.
|
||||
the current view rather than creating a new stack entry.
|
||||
"""
|
||||
if len(self.view_stack) > 1:
|
||||
del self.view_stack[1]
|
||||
@ -244,8 +270,7 @@ class ConsoleMaster(master.Master):
|
||||
except ValueError as e:
|
||||
signals.add_log("Input error: %s" % e, "warn")
|
||||
|
||||
def toggle_eventlog(self):
|
||||
self.options.console_eventlog = not self.options.console_eventlog
|
||||
def refresh_view(self):
|
||||
self.view_flowlist()
|
||||
signals.replace_view_state.send(self)
|
||||
|
||||
@ -389,7 +414,7 @@ class ConsoleMaster(master.Master):
|
||||
)
|
||||
|
||||
def view_help(self):
|
||||
hc = self.view_stack[0].helpctx
|
||||
hc = self.view_stack[-1].helpctx
|
||||
signals.push_view_state.send(
|
||||
self,
|
||||
window = window.Window(
|
||||
@ -397,7 +422,8 @@ class ConsoleMaster(master.Master):
|
||||
help.HelpView(hc),
|
||||
None,
|
||||
statusbar.StatusBar(self, help.footer),
|
||||
None
|
||||
None,
|
||||
"help"
|
||||
)
|
||||
)
|
||||
|
||||
@ -413,6 +439,7 @@ class ConsoleMaster(master.Master):
|
||||
None,
|
||||
statusbar.StatusBar(self, options.footer),
|
||||
options.help_context,
|
||||
"options"
|
||||
)
|
||||
)
|
||||
|
||||
@ -427,7 +454,8 @@ class ConsoleMaster(master.Master):
|
||||
commands.Commands(self),
|
||||
None,
|
||||
statusbar.StatusBar(self, commands.footer),
|
||||
options.help_context,
|
||||
commands.help_context,
|
||||
"commands"
|
||||
)
|
||||
)
|
||||
|
||||
@ -439,7 +467,8 @@ class ConsoleMaster(master.Master):
|
||||
ge,
|
||||
None,
|
||||
statusbar.StatusBar(self, grideditor.base.FOOTER),
|
||||
ge.make_help()
|
||||
ge.make_help(),
|
||||
"grideditor"
|
||||
)
|
||||
)
|
||||
|
||||
@ -459,7 +488,8 @@ class ConsoleMaster(master.Master):
|
||||
body,
|
||||
None,
|
||||
statusbar.StatusBar(self, flowlist.footer),
|
||||
flowlist.help_context
|
||||
flowlist.help_context,
|
||||
"flowlist"
|
||||
)
|
||||
)
|
||||
|
||||
@ -472,7 +502,8 @@ class ConsoleMaster(master.Master):
|
||||
flowview.FlowView(self, self.view, flow, tab_offset),
|
||||
flowview.FlowViewHeader(self, flow),
|
||||
statusbar.StatusBar(self, flowview.footer),
|
||||
flowview.help_context
|
||||
flowview.help_context,
|
||||
"flowview"
|
||||
)
|
||||
)
|
||||
|
||||
|
@ -5,7 +5,7 @@ from mitmproxy.tools.console import signals
|
||||
|
||||
class Window(urwid.Frame):
|
||||
|
||||
def __init__(self, master, body, header, footer, helpctx):
|
||||
def __init__(self, master, body, header, footer, helpctx, keyctx):
|
||||
urwid.Frame.__init__(
|
||||
self,
|
||||
urwid.AttrWrap(body, "background"),
|
||||
@ -14,6 +14,7 @@ class Window(urwid.Frame):
|
||||
)
|
||||
self.master = master
|
||||
self.helpctx = helpctx
|
||||
self.keyctx = keyctx
|
||||
signals.focus.connect(self.sig_focus)
|
||||
|
||||
def sig_focus(self, sender, section):
|
||||
@ -82,4 +83,4 @@ class Window(urwid.Frame):
|
||||
|
||||
def keypress(self, size, k):
|
||||
k = super().keypress(size, k)
|
||||
return self.master.keymap.handle("", k)
|
||||
return self.master.keymap.handle(self.keyctx, k)
|
||||
|
@ -4,6 +4,7 @@ from mitmproxy import addons
|
||||
from mitmproxy import addonmanager
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import options
|
||||
from mitmproxy import command
|
||||
from mitmproxy import master
|
||||
from mitmproxy import proxy
|
||||
from mitmproxy.test import taddons
|
||||
@ -18,6 +19,10 @@ class TAddon:
|
||||
if addons:
|
||||
self.addons = addons
|
||||
|
||||
@command.command("test.command")
|
||||
def testcommand(self) -> str:
|
||||
return "here"
|
||||
|
||||
def __repr__(self):
|
||||
return "Addon(%s)" % self.name
|
||||
|
||||
@ -38,6 +43,12 @@ class AOption:
|
||||
l.add_option("custom_option", bool, False, "help")
|
||||
|
||||
|
||||
def test_command():
|
||||
with taddons.context() as tctx:
|
||||
tctx.master.addons.add(TAddon("test"))
|
||||
assert tctx.master.commands.call("test.command") == "here"
|
||||
|
||||
|
||||
def test_halt():
|
||||
o = options.Options()
|
||||
m = master.Master(o, proxy.DummyServer(o))
|
||||
|
@ -39,28 +39,28 @@ class TestCommand:
|
||||
|
||||
|
||||
def test_simple():
|
||||
o = options.Options()
|
||||
m = master.Master(o, proxy.DummyServer(o))
|
||||
c = command.CommandManager(m)
|
||||
a = TAddon()
|
||||
c.add("one.two", a.cmd1)
|
||||
assert c.commands["one.two"].help == "cmd1 help"
|
||||
assert(c.call("one.two foo") == "ret foo")
|
||||
with pytest.raises(exceptions.CommandError, match="Unknown"):
|
||||
c.call("nonexistent")
|
||||
with pytest.raises(exceptions.CommandError, match="Invalid"):
|
||||
c.call("")
|
||||
with pytest.raises(exceptions.CommandError, match="Usage"):
|
||||
c.call("one.two too many args")
|
||||
with taddons.context() as tctx:
|
||||
c = command.CommandManager(tctx.master)
|
||||
a = TAddon()
|
||||
c.add("one.two", a.cmd1)
|
||||
assert c.commands["one.two"].help == "cmd1 help"
|
||||
assert(c.call("one.two foo") == "ret foo")
|
||||
with pytest.raises(exceptions.CommandError, match="Unknown"):
|
||||
c.call("nonexistent")
|
||||
with pytest.raises(exceptions.CommandError, match="Invalid"):
|
||||
c.call("")
|
||||
with pytest.raises(exceptions.CommandError, match="Usage"):
|
||||
c.call("one.two too many args")
|
||||
|
||||
c.add("empty", a.empty)
|
||||
c.call("empty")
|
||||
c.add("empty", a.empty)
|
||||
c.call("empty")
|
||||
|
||||
|
||||
def test_typename():
|
||||
assert command.typename(str, True) == "str"
|
||||
assert command.typename(typing.Sequence[flow.Flow], True) == "[flow]"
|
||||
assert command.typename(typing.Sequence[flow.Flow], False) == "flowspec"
|
||||
assert command.typename(flow.Flow, False) == "flow"
|
||||
|
||||
|
||||
class DummyConsole:
|
||||
@ -68,7 +68,8 @@ class DummyConsole:
|
||||
l.add_command("console.resolve", self.resolve)
|
||||
|
||||
def resolve(self, spec: str) -> typing.Sequence[flow.Flow]:
|
||||
return [tflow.tflow(resp=True)]
|
||||
n = int(spec)
|
||||
return [tflow.tflow(resp=True)] * n
|
||||
|
||||
|
||||
def test_parsearg():
|
||||
@ -76,7 +77,42 @@ def test_parsearg():
|
||||
tctx.master.addons.add(DummyConsole())
|
||||
assert command.parsearg(tctx.master.commands, "foo", str) == "foo"
|
||||
assert len(command.parsearg(
|
||||
tctx.master.commands, "~b", typing.Sequence[flow.Flow]
|
||||
)) == 1
|
||||
tctx.master.commands, "2", typing.Sequence[flow.Flow]
|
||||
)) == 2
|
||||
assert command.parsearg(tctx.master.commands, "1", flow.Flow)
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
command.parsearg(tctx.master.commands, "2", flow.Flow)
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
command.parsearg(tctx.master.commands, "0", flow.Flow)
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
command.parsearg(tctx.master.commands, "foo", Exception)
|
||||
|
||||
|
||||
class TDec:
|
||||
@command.command("cmd1")
|
||||
def cmd1(self, foo: str) -> str:
|
||||
"""cmd1 help"""
|
||||
return "ret " + foo
|
||||
|
||||
@command.command("cmd2")
|
||||
def cmd2(self, foo: str) -> str:
|
||||
return 99
|
||||
|
||||
@command.command("empty")
|
||||
def empty(self) -> None:
|
||||
pass
|
||||
|
||||
|
||||
def test_decorator():
|
||||
with taddons.context() as tctx:
|
||||
c = command.CommandManager(tctx.master)
|
||||
a = TDec()
|
||||
c.collect_commands(a)
|
||||
assert "cmd1" in c.commands
|
||||
assert c.call("cmd1 bar") == "ret bar"
|
||||
assert "empty" in c.commands
|
||||
assert c.call("empty") is None
|
||||
|
||||
with taddons.context() as tctx:
|
||||
tctx.master.addons.add(a)
|
||||
assert tctx.master.commands.call("cmd1 bar") == "ret bar"
|
||||
|
@ -381,6 +381,11 @@ def test_set():
|
||||
with pytest.raises(exceptions.OptionsError):
|
||||
opts.set("bool=wobble")
|
||||
|
||||
opts.set("bool=toggle")
|
||||
assert opts.bool is False
|
||||
opts.set("bool=toggle")
|
||||
assert opts.bool is True
|
||||
|
||||
opts.set("int=1")
|
||||
assert opts.int == 1
|
||||
with pytest.raises(exceptions.OptionsError):
|
||||
|
Loading…
Reference in New Issue
Block a user