From 169baabcab33c1bd1ab7e33907794c7103f962d8 Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Thu, 27 Apr 2017 11:09:40 +1200 Subject: [PATCH] Basic outline of the command subsystem - Add a command manager - Sketch out the type system with a few simple supported types --- mitmproxy/addonmanager.py | 3 + mitmproxy/command.py | 78 ++++++++++++++++++++++++++ mitmproxy/master.py | 2 + mitmproxy/optmanager.py | 4 +- mitmproxy/utils/typecheck.py | 30 +++++----- test/mitmproxy/test_command.py | 35 ++++++++++++ test/mitmproxy/utils/test_typecheck.py | 52 ++++++++--------- 7 files changed, 161 insertions(+), 43 deletions(-) create mode 100644 mitmproxy/command.py create mode 100644 test/mitmproxy/test_command.py diff --git a/mitmproxy/addonmanager.py b/mitmproxy/addonmanager.py index 254613387..a9b76f7b4 100644 --- a/mitmproxy/addonmanager.py +++ b/mitmproxy/addonmanager.py @@ -93,6 +93,9 @@ class Loader: choices ) + def add_command(self, path: str, func: typing.Callable) -> None: + self.master.commands.add_command(path, func) + def traverse(chain): """ diff --git a/mitmproxy/command.py b/mitmproxy/command.py new file mode 100644 index 000000000..57ca05aef --- /dev/null +++ b/mitmproxy/command.py @@ -0,0 +1,78 @@ +import inspect +import typing +import shlex +from mitmproxy.utils import typecheck + + +class CommandError(Exception): + pass + + +def typename(t: type) -> str: + if t in (str, int, bool): + return t.__name__ + else: # pragma: no cover + raise NotImplementedError + + +def parsearg(spec: str, argtype: type) -> typing.Any: + """ + Convert a string to a argument to the appropriate type. + """ + if argtype == str: + return spec + else: + raise CommandError("Unsupported argument type: %s" % argtype) + + +class Command: + def __init__(self, manager, path, func) -> None: + self.path = path + self.manager = manager + self.func = func + sig = inspect.signature(self.func) + self.paramtypes = [v.annotation for v in sig.parameters.values()] + self.returntype = sig.return_annotation + + def signature_help(self) -> str: + params = " ".join([typename(i) for i in self.paramtypes]) + ret = " -> " + typename(self.returntype) if self.returntype else "" + return "%s %s%s" % (self.path, params, ret) + + def call(self, args: typing.Sequence[str]): + """ + Call the command with a set of arguments. At this point, all argumets are strings. + """ + if len(self.paramtypes) != len(args): + raise CommandError("SIGNATURE") + + args = [parsearg(args[i], self.paramtypes[i]) for i in range(len(args))] + + with self.manager.master.handlecontext(): + ret = self.func(*args) + + if not typecheck.check_command_return_type(ret, self.returntype): + raise CommandError("Command returned unexpected data") + + return ret + + +class CommandManager: + def __init__(self, master): + self.master = master + self.commands = {} + + def add(self, path: str, func: typing.Callable): + self.commands[path] = Command(self, path, func) + + def call(self, cmdstr: str): + """ + Call a command using a string. May raise CommandError. + """ + parts = shlex.split(cmdstr) + if not len(parts) >= 1: + raise CommandError("Invalid command: %s" % cmdstr) + path = parts[0] + if path not in self.commands: + raise CommandError("Unknown command: %s" % path) + return self.commands[path].call(parts[1:]) diff --git a/mitmproxy/master.py b/mitmproxy/master.py index 949009154..2a032c4a0 100644 --- a/mitmproxy/master.py +++ b/mitmproxy/master.py @@ -8,6 +8,7 @@ from mitmproxy import controller from mitmproxy import eventsequence from mitmproxy import exceptions from mitmproxy import connections +from mitmproxy import command from mitmproxy import http from mitmproxy import log from mitmproxy.proxy.protocol import http_replay @@ -34,6 +35,7 @@ class Master: """ def __init__(self, opts, server): self.options = opts or options.Options() + self.commands = command.CommandManager(self) self.addons = addonmanager.AddonManager(self) self.event_queue = queue.Queue() self.should_exit = threading.Event() diff --git a/mitmproxy/optmanager.py b/mitmproxy/optmanager.py index 8369a36ed..cf6e21b06 100644 --- a/mitmproxy/optmanager.py +++ b/mitmproxy/optmanager.py @@ -31,7 +31,7 @@ class _Option: help: str, choices: typing.Optional[typing.Sequence[str]] ) -> None: - typecheck.check_type(name, default, typespec) + typecheck.check_option_type(name, default, typespec) self.name = name self.typespec = typespec self._default = default @@ -54,7 +54,7 @@ class _Option: return copy.deepcopy(v) def set(self, value: typing.Any) -> None: - typecheck.check_type(self.name, value, self.typespec) + typecheck.check_option_type(self.name, value, self.typespec) self.value = value def reset(self) -> None: diff --git a/mitmproxy/utils/typecheck.py b/mitmproxy/utils/typecheck.py index 628ea642e..7199f2fb3 100644 --- a/mitmproxy/utils/typecheck.py +++ b/mitmproxy/utils/typecheck.py @@ -1,20 +1,21 @@ import typing -def check_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None: +def check_command_return_type(value: typing.Any, typeinfo: typing.Any) -> bool: """ - This function checks if the provided value is an instance of typeinfo - and raises a TypeError otherwise. - - The following types from the typing package have specialized support: - - - Union - - Tuple - - IO + Check if the provided value is an instance of typeinfo. Returns True if the + types match, False otherwise. This function supports only those types + required for command return values. """ - # If we realize that we need to extend this list substantially, it may make sense - # to use typeguard for this, but right now it's not worth the hassle for 16 lines of code. + return True + +def check_option_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None: + """ + Check if the provided value is an instance of typeinfo and raises a + TypeError otherwise. This function supports only those types required for + options. + """ e = TypeError("Expected {} for {}, but got {}.".format( typeinfo, name, @@ -32,7 +33,7 @@ def check_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None: for T in types: try: - check_type(name, value, T) + check_option_type(name, value, T) except TypeError: pass else: @@ -50,7 +51,7 @@ def check_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None: if len(types) != len(value): raise e for i, (x, T) in enumerate(zip(value, types)): - check_type("{}[{}]".format(name, i), x, T) + check_option_type("{}[{}]".format(name, i), x, T) return elif typename.startswith("typing.Sequence"): try: @@ -58,11 +59,10 @@ def check_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None: except AttributeError: # Python 3.5.0 T = typeinfo.__parameters__[0] # type: ignore - if not isinstance(value, (tuple, list)): raise e for v in value: - check_type(name, v, T) + check_option_type(name, v, T) elif typename.startswith("typing.IO"): if hasattr(value, "read"): return diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py new file mode 100644 index 000000000..d4da7c326 --- /dev/null +++ b/test/mitmproxy/test_command.py @@ -0,0 +1,35 @@ +from mitmproxy import command +from mitmproxy import master +from mitmproxy import options +from mitmproxy import proxy +import pytest + + +class TAddon: + def cmd1(self, foo: str) -> str: + return "ret " + foo + + +class TestCommand: + def test_call(self): + o = options.Options() + m = master.Master(o, proxy.DummyServer(o)) + cm = command.CommandManager(m) + + a = TAddon() + c = command.Command(cm, "cmd.path", a.cmd1) + assert c.call(["foo"]) == "ret foo" + assert c.signature_help() == "cmd.path str -> str" + + +def test_simple(): + o = options.Options() + m = master.Master(o, proxy.DummyServer(o)) + c = command.CommandManager(m) + a = TAddon() + c.add("one.two", a.cmd1) + assert(c.call("one.two foo") == "ret foo") + with pytest.raises(command.CommandError, match="Unknown"): + c.call("nonexistent") + with pytest.raises(command.CommandError, match="Invalid"): + c.call("") diff --git a/test/mitmproxy/utils/test_typecheck.py b/test/mitmproxy/utils/test_typecheck.py index fd0c6e0c2..388f96ca6 100644 --- a/test/mitmproxy/utils/test_typecheck.py +++ b/test/mitmproxy/utils/test_typecheck.py @@ -16,72 +16,72 @@ class T(TBase): super(T, self).__init__(42) -def test_check_type(): - typecheck.check_type("foo", 42, int) +def test_check_option_type(): + typecheck.check_option_type("foo", 42, int) with pytest.raises(TypeError): - typecheck.check_type("foo", 42, str) + typecheck.check_option_type("foo", 42, str) with pytest.raises(TypeError): - typecheck.check_type("foo", None, str) + typecheck.check_option_type("foo", None, str) with pytest.raises(TypeError): - typecheck.check_type("foo", b"foo", str) + typecheck.check_option_type("foo", b"foo", str) def test_check_union(): - typecheck.check_type("foo", 42, typing.Union[int, str]) - typecheck.check_type("foo", "42", typing.Union[int, str]) + typecheck.check_option_type("foo", 42, typing.Union[int, str]) + typecheck.check_option_type("foo", "42", typing.Union[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", [], typing.Union[int, str]) + typecheck.check_option_type("foo", [], typing.Union[int, str]) # Python 3.5 only defines __union_params__ m = mock.Mock() m.__str__ = lambda self: "typing.Union" m.__union_params__ = (int,) - typecheck.check_type("foo", 42, m) + typecheck.check_option_type("foo", 42, m) def test_check_tuple(): - typecheck.check_type("foo", (42, "42"), typing.Tuple[int, str]) + typecheck.check_option_type("foo", (42, "42"), typing.Tuple[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", None, typing.Tuple[int, str]) + typecheck.check_option_type("foo", None, typing.Tuple[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", (), typing.Tuple[int, str]) + typecheck.check_option_type("foo", (), typing.Tuple[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", (42, 42), typing.Tuple[int, str]) + typecheck.check_option_type("foo", (42, 42), typing.Tuple[int, str]) with pytest.raises(TypeError): - typecheck.check_type("foo", ("42", 42), typing.Tuple[int, str]) + typecheck.check_option_type("foo", ("42", 42), typing.Tuple[int, str]) # Python 3.5 only defines __tuple_params__ m = mock.Mock() m.__str__ = lambda self: "typing.Tuple" m.__tuple_params__ = (int, str) - typecheck.check_type("foo", (42, "42"), m) + typecheck.check_option_type("foo", (42, "42"), m) def test_check_sequence(): - typecheck.check_type("foo", [10], typing.Sequence[int]) + typecheck.check_option_type("foo", [10], typing.Sequence[int]) with pytest.raises(TypeError): - typecheck.check_type("foo", ["foo"], typing.Sequence[int]) + typecheck.check_option_type("foo", ["foo"], typing.Sequence[int]) with pytest.raises(TypeError): - typecheck.check_type("foo", [10, "foo"], typing.Sequence[int]) + typecheck.check_option_type("foo", [10, "foo"], typing.Sequence[int]) with pytest.raises(TypeError): - typecheck.check_type("foo", [b"foo"], typing.Sequence[str]) + typecheck.check_option_type("foo", [b"foo"], typing.Sequence[str]) with pytest.raises(TypeError): - typecheck.check_type("foo", "foo", typing.Sequence[str]) + typecheck.check_option_type("foo", "foo", typing.Sequence[str]) # Python 3.5 only defines __parameters__ m = mock.Mock() m.__str__ = lambda self: "typing.Sequence" m.__parameters__ = (int,) - typecheck.check_type("foo", [10], m) + typecheck.check_option_type("foo", [10], m) def test_check_io(): - typecheck.check_type("foo", io.StringIO(), typing.IO[str]) + typecheck.check_option_type("foo", io.StringIO(), typing.IO[str]) with pytest.raises(TypeError): - typecheck.check_type("foo", "foo", typing.IO[str]) + typecheck.check_option_type("foo", "foo", typing.IO[str]) def test_check_any(): - typecheck.check_type("foo", 42, typing.Any) - typecheck.check_type("foo", object(), typing.Any) - typecheck.check_type("foo", None, typing.Any) + typecheck.check_option_type("foo", 42, typing.Any) + typecheck.check_option_type("foo", object(), typing.Any) + typecheck.check_option_type("foo", None, typing.Any)