mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2025-01-30 14:58:38 +00:00
commands: add a command.command decorator
Use this for our built-ins and the console commands.
This commit is contained in:
parent
7ff84673fd
commit
6af1a49464
@ -153,6 +153,8 @@ class AddonManager:
|
||||
for a in traverse([addon]):
|
||||
name = _get_name(a)
|
||||
self.lookup[name] = a
|
||||
for a in traverse([addon]):
|
||||
self.master.commands.collect_commands(a)
|
||||
return addon
|
||||
|
||||
def add(self, *addons):
|
||||
|
@ -1,8 +1,10 @@
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import command
|
||||
|
||||
|
||||
class Core:
|
||||
@command.command("set")
|
||||
def set(self, spec: str) -> None:
|
||||
"""
|
||||
Set an option of the form "key[=value]". When the value is omitted,
|
||||
@ -14,6 +16,3 @@ class Core:
|
||||
ctx.options.set(spec)
|
||||
except exceptions.OptionsError as e:
|
||||
raise exceptions.CommandError(e) from e
|
||||
|
||||
def load(self, l):
|
||||
l.add_command("set", self.set)
|
||||
|
@ -2,6 +2,7 @@ import inspect
|
||||
import typing
|
||||
import shlex
|
||||
import textwrap
|
||||
import functools
|
||||
|
||||
from mitmproxy.utils import typecheck
|
||||
from mitmproxy import exceptions
|
||||
@ -74,6 +75,13 @@ class CommandManager:
|
||||
self.master = master
|
||||
self.commands = {}
|
||||
|
||||
def collect_commands(self, addon):
|
||||
for i in dir(addon):
|
||||
if not i.startswith("__"):
|
||||
o = getattr(addon, i)
|
||||
if hasattr(o, "command_path"):
|
||||
self.add(o.command_path, o)
|
||||
|
||||
def add(self, path: str, func: typing.Callable):
|
||||
self.commands[path] = Command(self, path, func)
|
||||
|
||||
@ -112,3 +120,13 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
|
||||
return flows[0]
|
||||
else:
|
||||
raise exceptions.CommandError("Unsupported argument type: %s" % argtype)
|
||||
|
||||
|
||||
def command(path):
|
||||
def decorator(function):
|
||||
@functools.wraps(function)
|
||||
def wrapper(*args, **kwargs):
|
||||
return function(*args, **kwargs)
|
||||
wrapper.__dict__["command_path"] = path
|
||||
return wrapper
|
||||
return decorator
|
||||
|
@ -14,6 +14,7 @@ import urwid
|
||||
|
||||
from mitmproxy import addons
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import command
|
||||
from mitmproxy import master
|
||||
from mitmproxy import io
|
||||
from mitmproxy import log
|
||||
@ -85,34 +86,41 @@ class ConsoleCommands:
|
||||
self.master = master
|
||||
self.started = False
|
||||
|
||||
def command(self, partial: str) -> None:
|
||||
@command.command("console.command")
|
||||
def console_command(self, partial: str) -> None:
|
||||
"""
|
||||
Prompt the user to edit a command with a (possilby empty) starting value.
|
||||
"""
|
||||
signals.status_prompt_command.send(partial=partial)
|
||||
|
||||
@command.command("console.view.commands")
|
||||
def view_commands(self) -> None:
|
||||
"""View the commands list."""
|
||||
self.master.view_commands()
|
||||
|
||||
@command.command("console.view.options")
|
||||
def view_options(self) -> None:
|
||||
"""View the options editor."""
|
||||
self.master.view_options()
|
||||
|
||||
@command.command("console.view.help")
|
||||
def view_help(self) -> None:
|
||||
"""View help."""
|
||||
self.master.view_help()
|
||||
|
||||
@command.command("console.view.flow")
|
||||
def view_flow(self, flow: flow.Flow) -> None:
|
||||
"""View a flow."""
|
||||
if hasattr(flow, "request"):
|
||||
# FIME: Also set focus?
|
||||
self.master.view_flow(flow)
|
||||
|
||||
@command.command("console.exit")
|
||||
def exit(self) -> None:
|
||||
"""Exit mitmproxy."""
|
||||
raise urwid.ExitMainLoop
|
||||
|
||||
@command.command("console.view.pop")
|
||||
def view_pop(self) -> None:
|
||||
"""
|
||||
Pop a view off the console stack. At the top level, this prompts the
|
||||
@ -120,15 +128,6 @@ class ConsoleCommands:
|
||||
"""
|
||||
signals.pop_view_state.send(self)
|
||||
|
||||
def load(self, l):
|
||||
l.add_command("console.command", self.command)
|
||||
l.add_command("console.exit", self.exit)
|
||||
l.add_command("console.view.commands", self.view_commands)
|
||||
l.add_command("console.view.help", self.view_help)
|
||||
l.add_command("console.view.options", self.view_options)
|
||||
l.add_command("console.view.pop", self.view_pop)
|
||||
l.add_command("console.view.flow", self.view_flow)
|
||||
|
||||
def running(self):
|
||||
self.started = True
|
||||
|
||||
|
@ -4,6 +4,7 @@ from mitmproxy import addons
|
||||
from mitmproxy import addonmanager
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import options
|
||||
from mitmproxy import command
|
||||
from mitmproxy import master
|
||||
from mitmproxy import proxy
|
||||
from mitmproxy.test import taddons
|
||||
@ -18,6 +19,10 @@ class TAddon:
|
||||
if addons:
|
||||
self.addons = addons
|
||||
|
||||
@command.command("test.command")
|
||||
def testcommand(self) -> str:
|
||||
return "here"
|
||||
|
||||
def __repr__(self):
|
||||
return "Addon(%s)" % self.name
|
||||
|
||||
@ -38,6 +43,12 @@ class AOption:
|
||||
l.add_option("custom_option", bool, False, "help")
|
||||
|
||||
|
||||
def test_command():
|
||||
with taddons.context() as tctx:
|
||||
tctx.master.addons.add(TAddon("test"))
|
||||
assert tctx.master.commands.call("test.command") == "here"
|
||||
|
||||
|
||||
def test_halt():
|
||||
o = options.Options()
|
||||
m = master.Master(o, proxy.DummyServer(o))
|
||||
|
@ -39,22 +39,21 @@ class TestCommand:
|
||||
|
||||
|
||||
def test_simple():
|
||||
o = options.Options()
|
||||
m = master.Master(o, proxy.DummyServer(o))
|
||||
c = command.CommandManager(m)
|
||||
a = TAddon()
|
||||
c.add("one.two", a.cmd1)
|
||||
assert c.commands["one.two"].help == "cmd1 help"
|
||||
assert(c.call("one.two foo") == "ret foo")
|
||||
with pytest.raises(exceptions.CommandError, match="Unknown"):
|
||||
c.call("nonexistent")
|
||||
with pytest.raises(exceptions.CommandError, match="Invalid"):
|
||||
c.call("")
|
||||
with pytest.raises(exceptions.CommandError, match="Usage"):
|
||||
c.call("one.two too many args")
|
||||
with taddons.context() as tctx:
|
||||
c = command.CommandManager(tctx.master)
|
||||
a = TAddon()
|
||||
c.add("one.two", a.cmd1)
|
||||
assert c.commands["one.two"].help == "cmd1 help"
|
||||
assert(c.call("one.two foo") == "ret foo")
|
||||
with pytest.raises(exceptions.CommandError, match="Unknown"):
|
||||
c.call("nonexistent")
|
||||
with pytest.raises(exceptions.CommandError, match="Invalid"):
|
||||
c.call("")
|
||||
with pytest.raises(exceptions.CommandError, match="Usage"):
|
||||
c.call("one.two too many args")
|
||||
|
||||
c.add("empty", a.empty)
|
||||
c.call("empty")
|
||||
c.add("empty", a.empty)
|
||||
c.call("empty")
|
||||
|
||||
|
||||
def test_typename():
|
||||
@ -87,3 +86,33 @@ def test_parsearg():
|
||||
command.parsearg(tctx.master.commands, "0", flow.Flow)
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
command.parsearg(tctx.master.commands, "foo", Exception)
|
||||
|
||||
|
||||
class TDec:
|
||||
@command.command("cmd1")
|
||||
def cmd1(self, foo: str) -> str:
|
||||
"""cmd1 help"""
|
||||
return "ret " + foo
|
||||
|
||||
@command.command("cmd2")
|
||||
def cmd2(self, foo: str) -> str:
|
||||
return 99
|
||||
|
||||
@command.command("empty")
|
||||
def empty(self) -> None:
|
||||
pass
|
||||
|
||||
|
||||
def test_decorator():
|
||||
with taddons.context() as tctx:
|
||||
c = command.CommandManager(tctx.master)
|
||||
a = TDec()
|
||||
c.collect_commands(a)
|
||||
assert "cmd1" in c.commands
|
||||
assert c.call("cmd1 bar") == "ret bar"
|
||||
assert "empty" in c.commands
|
||||
assert c.call("empty") is None
|
||||
|
||||
with taddons.context() as tctx:
|
||||
tctx.master.addons.add(a)
|
||||
assert tctx.master.commands.call("cmd1 bar") == "ret bar"
|
||||
|
Loading…
Reference in New Issue
Block a user