mitmproxy/mitmproxy/command.py
Maximilian Hils bdc15cbe0c update mypy
2019-11-12 04:38:13 +01:00

294 lines
9.4 KiB
Python

"""
This module manages and invokes typed commands.
"""
import inspect
import types
import io
import typing
import shlex
import textwrap
import functools
import sys
from mitmproxy import exceptions
import mitmproxy.types
def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None:
sig = inspect.signature(f)
try:
sig.bind(*args, **kwargs)
except TypeError as v:
raise exceptions.CommandError("command argument mismatch: %s" % v.args[0])
def lexer(s):
# mypy mis-identifies shlex.shlex as abstract
lex = shlex.shlex(s, posix=True) # type: ignore
lex.wordchars += "."
lex.whitespace_split = True
lex.commenters = ''
return lex
def typename(t: type) -> str:
"""
Translates a type to an explanatory string.
"""
if t == inspect._empty: # type: ignore
raise exceptions.CommandError("missing type annotation")
to = mitmproxy.types.CommandTypes.get(t, None)
if not to:
raise exceptions.CommandError("unsupported type: %s" % getattr(t, "__name__", t))
return to.display
class Command:
returntype: typing.Optional[typing.Type]
def __init__(self, manager, path, func) -> None:
self.path = path
self.manager = manager
self.func = func
sig = inspect.signature(self.func)
self.help = None
if func.__doc__:
txt = func.__doc__.strip()
self.help = "\n".join(textwrap.wrap(txt))
self.has_positional = False
for i in sig.parameters.values():
# This is the kind for *args parameters
if i.kind == i.VAR_POSITIONAL:
self.has_positional = True
self.paramtypes = [v.annotation for v in sig.parameters.values()]
if sig.return_annotation == inspect._empty: # type: ignore
self.returntype = None
else:
self.returntype = sig.return_annotation
# This fails with a CommandException if types are invalid
self.signature_help()
def paramnames(self) -> typing.Sequence[str]:
v = [typename(i) for i in self.paramtypes]
if self.has_positional:
v[-1] = "*" + v[-1]
return v
def retname(self) -> str:
return typename(self.returntype) if self.returntype else ""
def signature_help(self) -> str:
params = " ".join(self.paramnames())
ret = self.retname()
if ret:
ret = " -> " + ret
return "%s %s%s" % (self.path, params, ret)
def prepare_args(self, args: typing.Sequence[str]) -> typing.List[typing.Any]:
verify_arg_signature(self.func, list(args), {})
remainder: typing.Sequence[str] = []
if self.has_positional:
remainder = args[len(self.paramtypes) - 1:]
args = args[:len(self.paramtypes) - 1]
pargs = []
for arg, paramtype in zip(args, self.paramtypes):
pargs.append(parsearg(self.manager, arg, paramtype))
pargs.extend(remainder)
return pargs
def call(self, args: typing.Sequence[str]) -> typing.Any:
"""
Call the command with a list of arguments. At this point, all
arguments are strings.
"""
ret = self.func(*self.prepare_args(args))
if ret is None and self.returntype is None:
return
typ = mitmproxy.types.CommandTypes.get(self.returntype)
if not typ.is_valid(self.manager, typ, ret):
raise exceptions.CommandError(
"%s returned unexpected data - expected %s" % (
self.path, typ.display
)
)
return ret
ParseResult = typing.NamedTuple(
"ParseResult",
[
("value", str),
("type", typing.Type),
("valid", bool),
],
)
class CommandManager(mitmproxy.types._CommandBase):
def __init__(self, master):
self.master = master
self.commands: typing.Dict[str, Command] = {}
def collect_commands(self, addon):
for i in dir(addon):
if not i.startswith("__"):
o = getattr(addon, i)
try:
is_command = hasattr(o, "command_path")
except Exception:
pass # hasattr may raise if o implements __getattr__.
else:
if is_command:
try:
self.add(o.command_path, o)
except exceptions.CommandError as e:
self.master.log.warn(
"Could not load command %s: %s" % (o.command_path, e)
)
def add(self, path: str, func: typing.Callable):
self.commands[path] = Command(self, path, func)
def parse_partial(
self,
cmdstr: str
) -> typing.Tuple[typing.Sequence[ParseResult], typing.Sequence[str]]:
"""
Parse a possibly partial command. Return a sequence of ParseResults and a sequence of remainder type help items.
"""
buf = io.StringIO(cmdstr)
parts: typing.List[str] = []
lex = lexer(buf)
while 1:
remainder = cmdstr[buf.tell():]
try:
t = lex.get_token()
except ValueError:
parts.append(remainder)
break
if not t:
break
parts.append(t)
if not parts:
parts = [""]
elif cmdstr.endswith(" "):
parts.append("")
parse: typing.List[ParseResult] = []
params: typing.List[type] = []
typ: typing.Type
for i in range(len(parts)):
if i == 0:
typ = mitmproxy.types.Cmd
if parts[i] in self.commands:
params.extend(self.commands[parts[i]].paramtypes)
elif params:
typ = params.pop(0)
if typ == mitmproxy.types.Cmd and params and params[0] == mitmproxy.types.Arg:
if parts[i] in self.commands:
params[:] = self.commands[parts[i]].paramtypes
else:
typ = mitmproxy.types.Unknown
to = mitmproxy.types.CommandTypes.get(typ, None)
valid = False
if to:
try:
to.parse(self, typ, parts[i])
except exceptions.TypeError:
valid = False
else:
valid = True
parse.append(
ParseResult(
value=parts[i],
type=typ,
valid=valid,
)
)
remhelp: typing.List[str] = []
for x in params:
remt = mitmproxy.types.CommandTypes.get(x, None)
remhelp.append(remt.display)
return parse, remhelp
def call(self, path: str, *args: typing.Sequence[typing.Any]) -> typing.Any:
"""
Call a command with native arguments. May raise CommandError.
"""
if path not in self.commands:
raise exceptions.CommandError("Unknown command: %s" % path)
return self.commands[path].func(*args)
def call_strings(self, path: str, args: typing.Sequence[str]) -> typing.Any:
"""
Call a command using a list of string arguments. May raise CommandError.
"""
if path not in self.commands:
raise exceptions.CommandError("Unknown command: %s" % path)
return self.commands[path].call(args)
def execute(self, cmdstr: str):
"""
Execute a command string. May raise CommandError.
"""
try:
parts = list(lexer(cmdstr))
except ValueError as e:
raise exceptions.CommandError("Command error: %s" % e)
if not len(parts) >= 1:
raise exceptions.CommandError("Invalid command: %s" % cmdstr)
return self.call_strings(parts[0], parts[1:])
def dump(self, out=sys.stdout) -> None:
cmds = list(self.commands.values())
cmds.sort(key=lambda x: x.signature_help())
for c in cmds:
for hl in (c.help or "").splitlines():
print("# " + hl, file=out)
print(c.signature_help(), file=out)
print(file=out)
def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
"""
Convert a string to a argument to the appropriate type.
"""
t = mitmproxy.types.CommandTypes.get(argtype, None)
if not t:
raise exceptions.CommandError("Unsupported argument type: %s" % argtype)
try:
return t.parse(manager, argtype, spec) # type: ignore
except exceptions.TypeError as e:
raise exceptions.CommandError from e
def command(path):
def decorator(function):
@functools.wraps(function)
def wrapper(*args, **kwargs):
verify_arg_signature(function, args, kwargs)
return function(*args, **kwargs)
wrapper.__dict__["command_path"] = path
return wrapper
return decorator
def argument(name, type):
"""
Set the type of a command argument at runtime. This is useful for more
specific types such as mitmproxy.types.Choice, which we cannot annotate
directly as mypy does not like that.
"""
def decorator(f: types.FunctionType) -> types.FunctionType:
assert name in f.__annotations__
f.__annotations__[name] = type
return f
return decorator