mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-26 18:18:25 +00:00
Basic outline of the command subsystem
- Add a command manager - Sketch out the type system with a few simple supported types
This commit is contained in:
parent
2a46f3851a
commit
169baabcab
@ -93,6 +93,9 @@ class Loader:
|
||||
choices
|
||||
)
|
||||
|
||||
def add_command(self, path: str, func: typing.Callable) -> None:
|
||||
self.master.commands.add_command(path, func)
|
||||
|
||||
|
||||
def traverse(chain):
|
||||
"""
|
||||
|
78
mitmproxy/command.py
Normal file
78
mitmproxy/command.py
Normal file
@ -0,0 +1,78 @@
|
||||
import inspect
|
||||
import typing
|
||||
import shlex
|
||||
from mitmproxy.utils import typecheck
|
||||
|
||||
|
||||
class CommandError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def typename(t: type) -> str:
|
||||
if t in (str, int, bool):
|
||||
return t.__name__
|
||||
else: # pragma: no cover
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
def parsearg(spec: str, argtype: type) -> typing.Any:
|
||||
"""
|
||||
Convert a string to a argument to the appropriate type.
|
||||
"""
|
||||
if argtype == str:
|
||||
return spec
|
||||
else:
|
||||
raise CommandError("Unsupported argument type: %s" % argtype)
|
||||
|
||||
|
||||
class Command:
|
||||
def __init__(self, manager, path, func) -> None:
|
||||
self.path = path
|
||||
self.manager = manager
|
||||
self.func = func
|
||||
sig = inspect.signature(self.func)
|
||||
self.paramtypes = [v.annotation for v in sig.parameters.values()]
|
||||
self.returntype = sig.return_annotation
|
||||
|
||||
def signature_help(self) -> str:
|
||||
params = " ".join([typename(i) for i in self.paramtypes])
|
||||
ret = " -> " + typename(self.returntype) if self.returntype else ""
|
||||
return "%s %s%s" % (self.path, params, ret)
|
||||
|
||||
def call(self, args: typing.Sequence[str]):
|
||||
"""
|
||||
Call the command with a set of arguments. At this point, all argumets are strings.
|
||||
"""
|
||||
if len(self.paramtypes) != len(args):
|
||||
raise CommandError("SIGNATURE")
|
||||
|
||||
args = [parsearg(args[i], self.paramtypes[i]) for i in range(len(args))]
|
||||
|
||||
with self.manager.master.handlecontext():
|
||||
ret = self.func(*args)
|
||||
|
||||
if not typecheck.check_command_return_type(ret, self.returntype):
|
||||
raise CommandError("Command returned unexpected data")
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
class CommandManager:
|
||||
def __init__(self, master):
|
||||
self.master = master
|
||||
self.commands = {}
|
||||
|
||||
def add(self, path: str, func: typing.Callable):
|
||||
self.commands[path] = Command(self, path, func)
|
||||
|
||||
def call(self, cmdstr: str):
|
||||
"""
|
||||
Call a command using a string. May raise CommandError.
|
||||
"""
|
||||
parts = shlex.split(cmdstr)
|
||||
if not len(parts) >= 1:
|
||||
raise CommandError("Invalid command: %s" % cmdstr)
|
||||
path = parts[0]
|
||||
if path not in self.commands:
|
||||
raise CommandError("Unknown command: %s" % path)
|
||||
return self.commands[path].call(parts[1:])
|
@ -8,6 +8,7 @@ from mitmproxy import controller
|
||||
from mitmproxy import eventsequence
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import connections
|
||||
from mitmproxy import command
|
||||
from mitmproxy import http
|
||||
from mitmproxy import log
|
||||
from mitmproxy.proxy.protocol import http_replay
|
||||
@ -34,6 +35,7 @@ class Master:
|
||||
"""
|
||||
def __init__(self, opts, server):
|
||||
self.options = opts or options.Options()
|
||||
self.commands = command.CommandManager(self)
|
||||
self.addons = addonmanager.AddonManager(self)
|
||||
self.event_queue = queue.Queue()
|
||||
self.should_exit = threading.Event()
|
||||
|
@ -31,7 +31,7 @@ class _Option:
|
||||
help: str,
|
||||
choices: typing.Optional[typing.Sequence[str]]
|
||||
) -> None:
|
||||
typecheck.check_type(name, default, typespec)
|
||||
typecheck.check_option_type(name, default, typespec)
|
||||
self.name = name
|
||||
self.typespec = typespec
|
||||
self._default = default
|
||||
@ -54,7 +54,7 @@ class _Option:
|
||||
return copy.deepcopy(v)
|
||||
|
||||
def set(self, value: typing.Any) -> None:
|
||||
typecheck.check_type(self.name, value, self.typespec)
|
||||
typecheck.check_option_type(self.name, value, self.typespec)
|
||||
self.value = value
|
||||
|
||||
def reset(self) -> None:
|
||||
|
@ -1,20 +1,21 @@
|
||||
import typing
|
||||
|
||||
|
||||
def check_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None:
|
||||
def check_command_return_type(value: typing.Any, typeinfo: typing.Any) -> bool:
|
||||
"""
|
||||
This function checks if the provided value is an instance of typeinfo
|
||||
and raises a TypeError otherwise.
|
||||
|
||||
The following types from the typing package have specialized support:
|
||||
|
||||
- Union
|
||||
- Tuple
|
||||
- IO
|
||||
Check if the provided value is an instance of typeinfo. Returns True if the
|
||||
types match, False otherwise. This function supports only those types
|
||||
required for command return values.
|
||||
"""
|
||||
# If we realize that we need to extend this list substantially, it may make sense
|
||||
# to use typeguard for this, but right now it's not worth the hassle for 16 lines of code.
|
||||
return True
|
||||
|
||||
|
||||
def check_option_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None:
|
||||
"""
|
||||
Check if the provided value is an instance of typeinfo and raises a
|
||||
TypeError otherwise. This function supports only those types required for
|
||||
options.
|
||||
"""
|
||||
e = TypeError("Expected {} for {}, but got {}.".format(
|
||||
typeinfo,
|
||||
name,
|
||||
@ -32,7 +33,7 @@ def check_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None:
|
||||
|
||||
for T in types:
|
||||
try:
|
||||
check_type(name, value, T)
|
||||
check_option_type(name, value, T)
|
||||
except TypeError:
|
||||
pass
|
||||
else:
|
||||
@ -50,7 +51,7 @@ def check_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None:
|
||||
if len(types) != len(value):
|
||||
raise e
|
||||
for i, (x, T) in enumerate(zip(value, types)):
|
||||
check_type("{}[{}]".format(name, i), x, T)
|
||||
check_option_type("{}[{}]".format(name, i), x, T)
|
||||
return
|
||||
elif typename.startswith("typing.Sequence"):
|
||||
try:
|
||||
@ -58,11 +59,10 @@ def check_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None:
|
||||
except AttributeError:
|
||||
# Python 3.5.0
|
||||
T = typeinfo.__parameters__[0] # type: ignore
|
||||
|
||||
if not isinstance(value, (tuple, list)):
|
||||
raise e
|
||||
for v in value:
|
||||
check_type(name, v, T)
|
||||
check_option_type(name, v, T)
|
||||
elif typename.startswith("typing.IO"):
|
||||
if hasattr(value, "read"):
|
||||
return
|
||||
|
35
test/mitmproxy/test_command.py
Normal file
35
test/mitmproxy/test_command.py
Normal file
@ -0,0 +1,35 @@
|
||||
from mitmproxy import command
|
||||
from mitmproxy import master
|
||||
from mitmproxy import options
|
||||
from mitmproxy import proxy
|
||||
import pytest
|
||||
|
||||
|
||||
class TAddon:
|
||||
def cmd1(self, foo: str) -> str:
|
||||
return "ret " + foo
|
||||
|
||||
|
||||
class TestCommand:
|
||||
def test_call(self):
|
||||
o = options.Options()
|
||||
m = master.Master(o, proxy.DummyServer(o))
|
||||
cm = command.CommandManager(m)
|
||||
|
||||
a = TAddon()
|
||||
c = command.Command(cm, "cmd.path", a.cmd1)
|
||||
assert c.call(["foo"]) == "ret foo"
|
||||
assert c.signature_help() == "cmd.path str -> str"
|
||||
|
||||
|
||||
def test_simple():
|
||||
o = options.Options()
|
||||
m = master.Master(o, proxy.DummyServer(o))
|
||||
c = command.CommandManager(m)
|
||||
a = TAddon()
|
||||
c.add("one.two", a.cmd1)
|
||||
assert(c.call("one.two foo") == "ret foo")
|
||||
with pytest.raises(command.CommandError, match="Unknown"):
|
||||
c.call("nonexistent")
|
||||
with pytest.raises(command.CommandError, match="Invalid"):
|
||||
c.call("")
|
@ -16,72 +16,72 @@ class T(TBase):
|
||||
super(T, self).__init__(42)
|
||||
|
||||
|
||||
def test_check_type():
|
||||
typecheck.check_type("foo", 42, int)
|
||||
def test_check_option_type():
|
||||
typecheck.check_option_type("foo", 42, int)
|
||||
with pytest.raises(TypeError):
|
||||
typecheck.check_type("foo", 42, str)
|
||||
typecheck.check_option_type("foo", 42, str)
|
||||
with pytest.raises(TypeError):
|
||||
typecheck.check_type("foo", None, str)
|
||||
typecheck.check_option_type("foo", None, str)
|
||||
with pytest.raises(TypeError):
|
||||
typecheck.check_type("foo", b"foo", str)
|
||||
typecheck.check_option_type("foo", b"foo", str)
|
||||
|
||||
|
||||
def test_check_union():
|
||||
typecheck.check_type("foo", 42, typing.Union[int, str])
|
||||
typecheck.check_type("foo", "42", typing.Union[int, str])
|
||||
typecheck.check_option_type("foo", 42, typing.Union[int, str])
|
||||
typecheck.check_option_type("foo", "42", typing.Union[int, str])
|
||||
with pytest.raises(TypeError):
|
||||
typecheck.check_type("foo", [], typing.Union[int, str])
|
||||
typecheck.check_option_type("foo", [], typing.Union[int, str])
|
||||
|
||||
# Python 3.5 only defines __union_params__
|
||||
m = mock.Mock()
|
||||
m.__str__ = lambda self: "typing.Union"
|
||||
m.__union_params__ = (int,)
|
||||
typecheck.check_type("foo", 42, m)
|
||||
typecheck.check_option_type("foo", 42, m)
|
||||
|
||||
|
||||
def test_check_tuple():
|
||||
typecheck.check_type("foo", (42, "42"), typing.Tuple[int, str])
|
||||
typecheck.check_option_type("foo", (42, "42"), typing.Tuple[int, str])
|
||||
with pytest.raises(TypeError):
|
||||
typecheck.check_type("foo", None, typing.Tuple[int, str])
|
||||
typecheck.check_option_type("foo", None, typing.Tuple[int, str])
|
||||
with pytest.raises(TypeError):
|
||||
typecheck.check_type("foo", (), typing.Tuple[int, str])
|
||||
typecheck.check_option_type("foo", (), typing.Tuple[int, str])
|
||||
with pytest.raises(TypeError):
|
||||
typecheck.check_type("foo", (42, 42), typing.Tuple[int, str])
|
||||
typecheck.check_option_type("foo", (42, 42), typing.Tuple[int, str])
|
||||
with pytest.raises(TypeError):
|
||||
typecheck.check_type("foo", ("42", 42), typing.Tuple[int, str])
|
||||
typecheck.check_option_type("foo", ("42", 42), typing.Tuple[int, str])
|
||||
|
||||
# Python 3.5 only defines __tuple_params__
|
||||
m = mock.Mock()
|
||||
m.__str__ = lambda self: "typing.Tuple"
|
||||
m.__tuple_params__ = (int, str)
|
||||
typecheck.check_type("foo", (42, "42"), m)
|
||||
typecheck.check_option_type("foo", (42, "42"), m)
|
||||
|
||||
|
||||
def test_check_sequence():
|
||||
typecheck.check_type("foo", [10], typing.Sequence[int])
|
||||
typecheck.check_option_type("foo", [10], typing.Sequence[int])
|
||||
with pytest.raises(TypeError):
|
||||
typecheck.check_type("foo", ["foo"], typing.Sequence[int])
|
||||
typecheck.check_option_type("foo", ["foo"], typing.Sequence[int])
|
||||
with pytest.raises(TypeError):
|
||||
typecheck.check_type("foo", [10, "foo"], typing.Sequence[int])
|
||||
typecheck.check_option_type("foo", [10, "foo"], typing.Sequence[int])
|
||||
with pytest.raises(TypeError):
|
||||
typecheck.check_type("foo", [b"foo"], typing.Sequence[str])
|
||||
typecheck.check_option_type("foo", [b"foo"], typing.Sequence[str])
|
||||
with pytest.raises(TypeError):
|
||||
typecheck.check_type("foo", "foo", typing.Sequence[str])
|
||||
typecheck.check_option_type("foo", "foo", typing.Sequence[str])
|
||||
|
||||
# Python 3.5 only defines __parameters__
|
||||
m = mock.Mock()
|
||||
m.__str__ = lambda self: "typing.Sequence"
|
||||
m.__parameters__ = (int,)
|
||||
typecheck.check_type("foo", [10], m)
|
||||
typecheck.check_option_type("foo", [10], m)
|
||||
|
||||
|
||||
def test_check_io():
|
||||
typecheck.check_type("foo", io.StringIO(), typing.IO[str])
|
||||
typecheck.check_option_type("foo", io.StringIO(), typing.IO[str])
|
||||
with pytest.raises(TypeError):
|
||||
typecheck.check_type("foo", "foo", typing.IO[str])
|
||||
typecheck.check_option_type("foo", "foo", typing.IO[str])
|
||||
|
||||
|
||||
def test_check_any():
|
||||
typecheck.check_type("foo", 42, typing.Any)
|
||||
typecheck.check_type("foo", object(), typing.Any)
|
||||
typecheck.check_type("foo", None, typing.Any)
|
||||
typecheck.check_option_type("foo", 42, typing.Any)
|
||||
typecheck.check_option_type("foo", object(), typing.Any)
|
||||
typecheck.check_option_type("foo", None, typing.Any)
|
||||
|
Loading…
Reference in New Issue
Block a user