diff --git a/examples/complex/remote_debug.py b/examples/complex/remote_debug.py index fa6f3d331..4b117bdbb 100644 --- a/examples/complex/remote_debug.py +++ b/examples/complex/remote_debug.py @@ -15,5 +15,5 @@ Usage: def load(l): - import pydevd - pydevd.settrace("localhost", port=5678, stdoutToServer=True, stderrToServer=True) + import pydevd_pycharm + pydevd_pycharm.settrace("localhost", port=5678, stdoutToServer=True, stderrToServer=True, suspend=False) diff --git a/mitmproxy/addons/core.py b/mitmproxy/addons/core.py index 5c9bbcd09..8a3acedbb 100644 --- a/mitmproxy/addons/core.py +++ b/mitmproxy/addons/core.py @@ -83,15 +83,14 @@ class Core: ) @command.command("set") - def set(self, *spec: str) -> None: + def set(self, option: str, value: str = "") -> None: """ - Set an option of the form "key[=value]". When the value is omitted, - booleans are set to true, strings and integers are set to None (if - permitted), and sequences are emptied. Boolean values can be true, - false or toggle. If multiple specs are passed, they are joined - into one separated by spaces. + Set an option. When the value is omitted, booleans are set to true, + strings and integers are set to None (if permitted), and sequences + are emptied. Boolean values can be true, false or toggle. + Multiple values are concatenated with a single space. """ - strspec = " ".join(spec) + strspec = f"{option}={value}" try: ctx.options.set(strspec) except exceptions.OptionsError as e: @@ -109,14 +108,14 @@ class Core: # FIXME: this will become view.mark later @command.command("flow.mark") - def mark(self, flows: typing.Sequence[flow.Flow], val: bool) -> None: + def mark(self, flows: typing.Sequence[flow.Flow], boolean: bool) -> None: """ Mark flows. """ updated = [] for i in flows: - if i.marked != val: - i.marked = val + if i.marked != boolean: + i.marked = boolean updated.append(i) ctx.master.addons.trigger("update", updated) @@ -169,18 +168,18 @@ class Core: ] @command.command("flow.set") - @command.argument("spec", type=mitmproxy.types.Choice("flow.set.options")) + @command.argument("attr", type=mitmproxy.types.Choice("flow.set.options")) def flow_set( self, flows: typing.Sequence[flow.Flow], - spec: str, - sval: str + attr: str, + value: str ) -> None: """ Quickly set a number of common values on flows. """ - val: typing.Union[int, str] = sval - if spec == "status_code": + val: typing.Union[int, str] = value + if attr == "status_code": try: val = int(val) # type: ignore except ValueError as v: @@ -193,13 +192,13 @@ class Core: req = getattr(f, "request", None) rupdate = True if req: - if spec == "method": + if attr == "method": req.method = val - elif spec == "host": + elif attr == "host": req.host = val - elif spec == "path": + elif attr == "path": req.path = val - elif spec == "url": + elif attr == "url": try: req.url = val except ValueError as e: @@ -212,11 +211,11 @@ class Core: resp = getattr(f, "response", None) supdate = True if resp: - if spec == "status_code": + if attr == "status_code": resp.status_code = val if val in status_codes.RESPONSES: resp.reason = status_codes.RESPONSES[val] # type: ignore - elif spec == "reason": + elif attr == "reason": resp.reason = val else: supdate = False @@ -225,7 +224,7 @@ class Core: updated.append(f) ctx.master.addons.trigger("update", updated) - ctx.log.alert("Set %s on %s flows." % (spec, len(updated))) + ctx.log.alert("Set %s on %s flows." % (attr, len(updated))) @command.command("flow.decode") def decode(self, flows: typing.Sequence[flow.Flow], part: str) -> None: @@ -262,12 +261,12 @@ class Core: ctx.log.alert("Toggled encoding on %s flows." % len(updated)) @command.command("flow.encode") - @command.argument("enc", type=mitmproxy.types.Choice("flow.encode.options")) + @command.argument("encoding", type=mitmproxy.types.Choice("flow.encode.options")) def encode( self, flows: typing.Sequence[flow.Flow], part: str, - enc: str, + encoding: str, ) -> None: """ Encode flows with a specified encoding. @@ -279,7 +278,7 @@ class Core: current_enc = p.headers.get("content-encoding", "identity") if current_enc == "identity": f.backup() - p.encode(enc) + p.encode(encoding) updated.append(f) ctx.master.addons.trigger("update", updated) ctx.log.alert("Encoded %s flows." % len(updated)) diff --git a/mitmproxy/addons/export.py b/mitmproxy/addons/export.py index 68df93744..d874c95a7 100644 --- a/mitmproxy/addons/export.py +++ b/mitmproxy/addons/export.py @@ -121,14 +121,14 @@ class Export(): return list(sorted(formats.keys())) @command.command("export.file") - def file(self, fmt: str, f: flow.Flow, path: mitmproxy.types.Path) -> None: + def file(self, format: str, flow: flow.Flow, path: mitmproxy.types.Path) -> None: """ Export a flow to path. """ - if fmt not in formats: - raise exceptions.CommandError("No such export format: %s" % fmt) - func: typing.Any = formats[fmt] - v = func(f) + if format not in formats: + raise exceptions.CommandError("No such export format: %s" % format) + func: typing.Any = formats[format] + v = func(flow) try: with open(path, "wb") as fp: if isinstance(v, bytes): @@ -139,14 +139,14 @@ class Export(): ctx.log.error(str(e)) @command.command("export.clip") - def clip(self, fmt: str, f: flow.Flow) -> None: + def clip(self, format: str, flow: flow.Flow) -> None: """ Export a flow to the system clipboard. """ - if fmt not in formats: - raise exceptions.CommandError("No such export format: %s" % fmt) - func: typing.Any = formats[fmt] - v = strutils.always_str(func(f)) + if format not in formats: + raise exceptions.CommandError("No such export format: %s" % format) + func: typing.Any = formats[format] + v = strutils.always_str(func(flow)) try: pyperclip.copy(v) except pyperclip.PyperclipException as e: diff --git a/mitmproxy/addons/view.py b/mitmproxy/addons/view.py index da9d19f91..1d57d7812 100644 --- a/mitmproxy/addons/view.py +++ b/mitmproxy/addons/view.py @@ -217,7 +217,7 @@ class View(collections.abc.Sequence): # Focus @command.command("view.focus.go") - def go(self, dst: int) -> None: + def go(self, offset: int) -> None: """ Go to a specified offset. Positive offests are from the beginning of the view, negative from the end of the view, so that 0 is the first @@ -225,13 +225,13 @@ class View(collections.abc.Sequence): """ if len(self) == 0: return - if dst < 0: - dst = len(self) + dst - if dst < 0: - dst = 0 - if dst > len(self) - 1: - dst = len(self) - 1 - self.focus.flow = self[dst] + if offset < 0: + offset = len(self) + offset + if offset < 0: + offset = 0 + if offset > len(self) - 1: + offset = len(self) - 1 + self.focus.flow = self[offset] @command.command("view.focus.next") def focus_next(self) -> None: @@ -266,20 +266,20 @@ class View(collections.abc.Sequence): return list(sorted(self.orders.keys())) @command.command("view.order.reverse") - def set_reversed(self, value: bool) -> None: - self.order_reversed = value + def set_reversed(self, boolean: bool) -> None: + self.order_reversed = boolean self.sig_view_refresh.send(self) @command.command("view.order.set") - def set_order(self, order: str) -> None: + def set_order(self, order_key: str) -> None: """ Sets the current view order. """ - if order not in self.orders: + if order_key not in self.orders: raise exceptions.CommandError( - "Unknown flow order: %s" % order + "Unknown flow order: %s" % order_key ) - order_key = self.orders[order] + order_key = self.orders[order_key] self.order_key = order_key newview = sortedcontainers.SortedListWithKey(key=order_key) newview.update(self._view) @@ -298,16 +298,16 @@ class View(collections.abc.Sequence): # Filter @command.command("view.filter.set") - def set_filter_cmd(self, f: str) -> None: + def set_filter_cmd(self, filter_expr: str) -> None: """ Sets the current view filter. """ filt = None - if f: - filt = flowfilter.parse(f) + if filter_expr: + filt = flowfilter.parse(filter_expr) if not filt: raise exceptions.CommandError( - "Invalid interception filter: %s" % f + "Invalid interception filter: %s" % filter_expr ) self.set_filter(filt) @@ -340,11 +340,11 @@ class View(collections.abc.Sequence): # View Settings @command.command("view.settings.getval") - def getvalue(self, f: mitmproxy.flow.Flow, key: str, default: str) -> str: + def getvalue(self, flow: mitmproxy.flow.Flow, key: str, default: str) -> str: """ Get a value from the settings store for the specified flow. """ - return self.settings[f].get(key, default) + return self.settings[flow].get(key, default) @command.command("view.settings.setval.toggle") def setvalue_toggle( @@ -412,26 +412,26 @@ class View(collections.abc.Sequence): ctx.log.alert("Removed %s flows" % len(flows)) @command.command("view.flows.resolve") - def resolve(self, spec: str) -> typing.Sequence[mitmproxy.flow.Flow]: + def resolve(self, flow_spec: str) -> typing.Sequence[mitmproxy.flow.Flow]: """ Resolve a flow list specification to an actual list of flows. """ - if spec == "@all": + if flow_spec == "@all": return [i for i in self._store.values()] - if spec == "@focus": + if flow_spec == "@focus": return [self.focus.flow] if self.focus.flow else [] - elif spec == "@shown": + elif flow_spec == "@shown": return [i for i in self] - elif spec == "@hidden": + elif flow_spec == "@hidden": return [i for i in self._store.values() if i not in self._view] - elif spec == "@marked": + elif flow_spec == "@marked": return [i for i in self._store.values() if i.marked] - elif spec == "@unmarked": + elif flow_spec == "@unmarked": return [i for i in self._store.values() if not i.marked] else: - filt = flowfilter.parse(spec) + filt = flowfilter.parse(flow_spec) if not filt: - raise exceptions.CommandError("Invalid flow filter: %s" % spec) + raise exceptions.CommandError("Invalid flow filter: %s" % flow_spec) return [i for i in self._store.values() if filt(i)] @command.command("view.flows.create") diff --git a/mitmproxy/command.py b/mitmproxy/command.py index 0998601ca..6977ff91e 100644 --- a/mitmproxy/command.py +++ b/mitmproxy/command.py @@ -1,20 +1,19 @@ """ This module manages and invokes typed commands. """ -import inspect -import types -import io -import typing -import shlex -import textwrap import functools +import inspect import sys +import textwrap +import types +import typing -from mitmproxy import exceptions import mitmproxy.types +from mitmproxy import exceptions, command_lexer +from mitmproxy.command_lexer import unquote -def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None: +def verify_arg_signature(f: typing.Callable, args: typing.Iterable[typing.Any], kwargs: dict) -> None: sig = inspect.signature(f) try: sig.bind(*args, **kwargs) @@ -22,15 +21,6 @@ def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None: raise exceptions.CommandError("command argument mismatch: %s" % v.args[0]) -def lexer(s): - # mypy mis-identifies shlex.shlex as abstract - lex = shlex.shlex(s, posix=True) # type: ignore - lex.wordchars += "." - lex.whitespace_split = True - lex.commenters = '' - return lex - - def typename(t: type) -> str: """ Translates a type to an explanatory string. @@ -43,208 +33,234 @@ def typename(t: type) -> str: return to.display -class Command: - returntype: typing.Optional[typing.Type] +def _empty_as_none(x: typing.Any) -> typing.Any: + if x == inspect.Signature.empty: + return None + return x - def __init__(self, manager, path, func) -> None: - self.path = path + +class CommandParameter(typing.NamedTuple): + name: str + type: typing.Type + kind: inspect._ParameterKind = inspect.Parameter.POSITIONAL_OR_KEYWORD + + def __str__(self): + if self.kind is inspect.Parameter.VAR_POSITIONAL: + return f"*{self.name}" + else: + return self.name + + +class Command: + name: str + manager: "CommandManager" + signature: inspect.Signature + help: typing.Optional[str] + + def __init__(self, manager: "CommandManager", name: str, func: typing.Callable) -> None: + self.name = name self.manager = manager self.func = func - sig = inspect.signature(self.func) - self.help = None + self.signature = inspect.signature(self.func) + if func.__doc__: txt = func.__doc__.strip() self.help = "\n".join(textwrap.wrap(txt)) - - self.has_positional = False - for i in sig.parameters.values(): - # This is the kind for *args parameters - if i.kind == i.VAR_POSITIONAL: - self.has_positional = True - self.paramtypes = [v.annotation for v in sig.parameters.values()] - if sig.return_annotation == inspect._empty: # type: ignore - self.returntype = None else: - self.returntype = sig.return_annotation + self.help = None + # This fails with a CommandException if types are invalid - self.signature_help() + for name, parameter in self.signature.parameters.items(): + t = parameter.annotation + if not mitmproxy.types.CommandTypes.get(parameter.annotation, None): + raise exceptions.CommandError(f"Argument {name} has an unknown type ({_empty_as_none(t)}) in {func}.") + if self.return_type and not mitmproxy.types.CommandTypes.get(self.return_type, None): + raise exceptions.CommandError(f"Return type has an unknown type ({self.return_type}) in {func}.") - def paramnames(self) -> typing.Sequence[str]: - v = [typename(i) for i in self.paramtypes] - if self.has_positional: - v[-1] = "*" + v[-1] - return v + @property + def return_type(self) -> typing.Optional[typing.Type]: + return _empty_as_none(self.signature.return_annotation) - def retname(self) -> str: - return typename(self.returntype) if self.returntype else "" + @property + def parameters(self) -> typing.List[CommandParameter]: + """Returns a list of CommandParameters.""" + ret = [] + for name, param in self.signature.parameters.items(): + ret.append(CommandParameter(name, param.annotation, param.kind)) + return ret def signature_help(self) -> str: - params = " ".join(self.paramnames()) - ret = self.retname() - if ret: - ret = " -> " + ret - return "%s %s%s" % (self.path, params, ret) + params = " ".join(str(param) for param in self.parameters) + if self.return_type: + ret = f" -> {typename(self.return_type)}" + else: + ret = "" + return f"{self.name} {params}{ret}" - def prepare_args(self, args: typing.Sequence[str]) -> typing.List[typing.Any]: - verify_arg_signature(self.func, list(args), {}) + def prepare_args(self, args: typing.Sequence[str]) -> inspect.BoundArguments: + try: + bound_arguments = self.signature.bind(*args) + except TypeError as v: + raise exceptions.CommandError(f"Command argument mismatch: {v.args[0]}") - remainder: typing.Sequence[str] = [] - if self.has_positional: - remainder = args[len(self.paramtypes) - 1:] - args = args[:len(self.paramtypes) - 1] + for name, value in bound_arguments.arguments.items(): + convert_to = self.signature.parameters[name].annotation + bound_arguments.arguments[name] = parsearg(self.manager, value, convert_to) - pargs = [] - for arg, paramtype in zip(args, self.paramtypes): - pargs.append(parsearg(self.manager, arg, paramtype)) - pargs.extend(remainder) - return pargs + bound_arguments.apply_defaults() + + return bound_arguments def call(self, args: typing.Sequence[str]) -> typing.Any: """ - Call the command with a list of arguments. At this point, all - arguments are strings. + Call the command with a list of arguments. At this point, all + arguments are strings. """ - ret = self.func(*self.prepare_args(args)) - if ret is None and self.returntype is None: + bound_args = self.prepare_args(args) + ret = self.func(*bound_args.args, **bound_args.kwargs) + if ret is None and self.return_type is None: return - typ = mitmproxy.types.CommandTypes.get(self.returntype) + typ = mitmproxy.types.CommandTypes.get(self.return_type) + assert typ if not typ.is_valid(self.manager, typ, ret): raise exceptions.CommandError( - "%s returned unexpected data - expected %s" % ( - self.path, typ.display - ) + f"{self.name} returned unexpected data - expected {typ.display}" ) return ret -ParseResult = typing.NamedTuple( - "ParseResult", - [ - ("value", str), - ("type", typing.Type), - ("valid", bool), - ], -) +class ParseResult(typing.NamedTuple): + value: str + type: typing.Type + valid: bool -class CommandManager(mitmproxy.types._CommandBase): +class CommandManager: + commands: typing.Dict[str, Command] + def __init__(self, master): self.master = master - self.commands: typing.Dict[str, Command] = {} + self.commands = {} def collect_commands(self, addon): for i in dir(addon): if not i.startswith("__"): o = getattr(addon, i) try: - is_command = hasattr(o, "command_path") + is_command = hasattr(o, "command_name") except Exception: pass # hasattr may raise if o implements __getattr__. else: if is_command: try: - self.add(o.command_path, o) + self.add(o.command_name, o) except exceptions.CommandError as e: self.master.log.warn( - "Could not load command %s: %s" % (o.command_path, e) + "Could not load command %s: %s" % (o.command_name, e) ) def add(self, path: str, func: typing.Callable): self.commands[path] = Command(self, path, func) + @functools.lru_cache(maxsize=128) def parse_partial( - self, - cmdstr: str - ) -> typing.Tuple[typing.Sequence[ParseResult], typing.Sequence[str]]: + self, + cmdstr: str + ) -> typing.Tuple[typing.Sequence[ParseResult], typing.Sequence[CommandParameter]]: """ - Parse a possibly partial command. Return a sequence of ParseResults and a sequence of remainder type help items. + Parse a possibly partial command. Return a sequence of ParseResults and a sequence of remainder type help items. """ - buf = io.StringIO(cmdstr) - parts: typing.List[str] = [] - lex = lexer(buf) - while 1: - remainder = cmdstr[buf.tell():] - try: - t = lex.get_token() - except ValueError: - parts.append(remainder) - break - if not t: - break - parts.append(t) - if not parts: - parts = [""] - elif cmdstr.endswith(" "): - parts.append("") - parse: typing.List[ParseResult] = [] - params: typing.List[type] = [] - typ: typing.Type - for i in range(len(parts)): - if i == 0: - typ = mitmproxy.types.Cmd - if parts[i] in self.commands: - params.extend(self.commands[parts[i]].paramtypes) - elif params: - typ = params.pop(0) - if typ == mitmproxy.types.Cmd and params and params[0] == mitmproxy.types.Arg: - if parts[i] in self.commands: - params[:] = self.commands[parts[i]].paramtypes + parts: typing.List[str] = command_lexer.expr.parseString(cmdstr, parseAll=True) + + parsed: typing.List[ParseResult] = [] + next_params: typing.List[CommandParameter] = [ + CommandParameter("", mitmproxy.types.Cmd), + CommandParameter("", mitmproxy.types.CmdArgs), + ] + expected: typing.Optional[CommandParameter] = None + for part in parts: + if part.isspace(): + parsed.append( + ParseResult( + value=part, + type=mitmproxy.types.Space, + valid=True, + ) + ) + continue + + if expected and expected.kind is inspect.Parameter.VAR_POSITIONAL: + assert not next_params + elif next_params: + expected = next_params.pop(0) else: - typ = mitmproxy.types.Unknown + expected = CommandParameter("", mitmproxy.types.Unknown) - to = mitmproxy.types.CommandTypes.get(typ, None) + arg_is_known_command = ( + expected.type == mitmproxy.types.Cmd and part in self.commands + ) + arg_is_unknown_command = ( + expected.type == mitmproxy.types.Cmd and part not in self.commands + ) + command_args_following = ( + next_params and next_params[0].type == mitmproxy.types.CmdArgs + ) + if arg_is_known_command and command_args_following: + next_params = self.commands[part].parameters + next_params[1:] + if arg_is_unknown_command and command_args_following: + next_params.pop(0) + + to = mitmproxy.types.CommandTypes.get(expected.type, None) valid = False if to: try: - to.parse(self, typ, parts[i]) + to.parse(self, expected.type, part) except exceptions.TypeError: valid = False else: valid = True - parse.append( + parsed.append( ParseResult( - value=parts[i], - type=typ, + value=part, + type=expected.type, valid=valid, ) ) - remhelp: typing.List[str] = [] - for x in params: - remt = mitmproxy.types.CommandTypes.get(x, None) - remhelp.append(remt.display) + return parsed, next_params - return parse, remhelp + def call(self, command_name: str, *args: typing.Sequence[typing.Any]) -> typing.Any: + """ + Call a command with native arguments. May raise CommandError. + """ + if command_name not in self.commands: + raise exceptions.CommandError("Unknown command: %s" % command_name) + return self.commands[command_name].func(*args) - def call(self, path: str, *args: typing.Sequence[typing.Any]) -> typing.Any: + def _call_strings(self, command_name: str, args: typing.Sequence[str]) -> typing.Any: """ - Call a command with native arguments. May raise CommandError. + Call a command using a list of string arguments. May raise CommandError. """ - if path not in self.commands: - raise exceptions.CommandError("Unknown command: %s" % path) - return self.commands[path].func(*args) + if command_name not in self.commands: + raise exceptions.CommandError("Unknown command: %s" % command_name) - def call_strings(self, path: str, args: typing.Sequence[str]) -> typing.Any: - """ - Call a command using a list of string arguments. May raise CommandError. - """ - if path not in self.commands: - raise exceptions.CommandError("Unknown command: %s" % path) - return self.commands[path].call(args) + return self.commands[command_name].call(args) - def execute(self, cmdstr: str): + def execute(self, cmdstr: str) -> typing.Any: """ - Execute a command string. May raise CommandError. + Execute a command string. May raise CommandError. """ - try: - parts = list(lexer(cmdstr)) - except ValueError as e: - raise exceptions.CommandError("Command error: %s" % e) - if not len(parts) >= 1: - raise exceptions.CommandError("Invalid command: %s" % cmdstr) - return self.call_strings(parts[0], parts[1:]) + parts, _ = self.parse_partial(cmdstr) + if not parts: + raise exceptions.CommandError(f"Invalid command: {cmdstr!r}") + command_name, *args = [ + unquote(part.value) + for part in parts + if part.type != mitmproxy.types.Space + ] + return self._call_strings(command_name, args) def dump(self, out=sys.stdout) -> None: cmds = list(self.commands.values()) @@ -262,21 +278,23 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any: """ t = mitmproxy.types.CommandTypes.get(argtype, None) if not t: - raise exceptions.CommandError("Unsupported argument type: %s" % argtype) + raise exceptions.CommandError(f"Unsupported argument type: {argtype}") try: - return t.parse(manager, argtype, spec) # type: ignore + return t.parse(manager, argtype, spec) except exceptions.TypeError as e: raise exceptions.CommandError from e -def command(path): +def command(name: typing.Optional[str] = None): def decorator(function): @functools.wraps(function) def wrapper(*args, **kwargs): verify_arg_signature(function, args, kwargs) return function(*args, **kwargs) - wrapper.__dict__["command_path"] = path + + wrapper.__dict__["command_name"] = name or function.__name__.replace("_", ".") return wrapper + return decorator @@ -286,8 +304,10 @@ def argument(name, type): specific types such as mitmproxy.types.Choice, which we cannot annotate directly as mypy does not like that. """ + def decorator(f: types.FunctionType) -> types.FunctionType: assert name in f.__annotations__ f.__annotations__[name] = type return f + return decorator diff --git a/mitmproxy/command_lexer.py b/mitmproxy/command_lexer.py new file mode 100644 index 000000000..f042f3c95 --- /dev/null +++ b/mitmproxy/command_lexer.py @@ -0,0 +1,49 @@ +import ast +import re + +import pyparsing + +# TODO: There is a lot of work to be done here. +# The current implementation is written in a way that _any_ input is valid, +# which does not make sense once things get more complex. + +PartialQuotedString = pyparsing.Regex( + re.compile( + r''' + (["']) # start quote + (?: + (?!\1)[^\\] # unescaped character that is not our quote nor the begin of an escape sequence. We can't use \1 in [] + | + (?:\\.) # escape sequence + )* + (?:\1|$) # end quote + ''', + re.VERBOSE + ) +) + +expr = pyparsing.ZeroOrMore( + PartialQuotedString + | pyparsing.Word(" \r\n\t") + | pyparsing.CharsNotIn("""'" \r\n\t""") +).leaveWhitespace() + + +def quote(val: str) -> str: + if val and all(char not in val for char in "'\" \r\n\t"): + return val + return repr(val) # TODO: More of a hack. + + +def unquote(x: str) -> str: + quoted = ( + (x.startswith('"') and x.endswith('"')) + or + (x.startswith("'") and x.endswith("'")) + ) + if quoted: + try: + x = ast.literal_eval(x) + except Exception: + x = x[1:-1] + return x diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py index f291b8fd9..d751422b2 100644 --- a/mitmproxy/tools/console/commander/commander.py +++ b/mitmproxy/tools/console/commander/commander.py @@ -1,58 +1,55 @@ import abc +import collections import copy import typing -import collections import urwid from urwid.text_layout import calc_coords +import mitmproxy.command import mitmproxy.flow import mitmproxy.master -import mitmproxy.command import mitmproxy.types -class Completer: # pragma: no cover +class Completer: @abc.abstractmethod - def cycle(self) -> str: - pass + def cycle(self, forward: bool = True) -> str: + raise NotImplementedError() class ListCompleter(Completer): def __init__( - self, - start: str, - options: typing.Sequence[str], + self, + start: str, + options: typing.Sequence[str], ) -> None: self.start = start - self.options: typing.Sequence[str] = [] + self.options: typing.List[str] = [] for o in options: if o.startswith(start): self.options.append(o) self.options.sort() self.offset = 0 - def cycle(self) -> str: + def cycle(self, forward: bool = True) -> str: if not self.options: return self.start ret = self.options[self.offset] - self.offset = (self.offset + 1) % len(self.options) + delta = 1 if forward else -1 + self.offset = (self.offset + delta) % len(self.options) return ret -CompletionState = typing.NamedTuple( - "CompletionState", - [ - ("completer", Completer), - ("parse", typing.Sequence[mitmproxy.command.ParseResult]) - ] -) +class CompletionState(typing.NamedTuple): + completer: Completer + parsed: typing.Sequence[mitmproxy.command.ParseResult] class CommandBuffer: def __init__(self, master: mitmproxy.master.Master, start: str = "") -> None: self.master = master - self.text = self.flatten(start) + self.text = start # Cursor is always within the range [0:len(buffer)]. self._cursor = len(self.text) self.completion: typing.Optional[CompletionState] = None @@ -70,51 +67,30 @@ class CommandBuffer: else: self._cursor = x - def maybequote(self, value): - if " " in value and not value.startswith("\""): - return "\"%s\"" % value - return value - - def parse_quoted(self, txt): - parts, remhelp = self.master.commands.parse_partial(txt) - for i, p in enumerate(parts): - parts[i] = mitmproxy.command.ParseResult( - value = self.maybequote(p.value), - type = p.type, - valid = p.valid - ) - return parts, remhelp - def render(self): - """ - This function is somewhat tricky - in order to make the cursor - position valid, we have to make sure there is a - character-for-character offset match in the rendered output, up - to the cursor. Beyond that, we can add stuff. - """ - parts, remhelp = self.parse_quoted(self.text) + parts, remaining = self.master.commands.parse_partial(self.text) ret = [] - for p in parts: - if p.valid: - if p.type == mitmproxy.types.Cmd: - ret.append(("commander_command", p.value)) - else: - ret.append(("text", p.value)) - elif p.value: - ret.append(("commander_invalid", p.value)) - else: - ret.append(("text", "")) - ret.append(("text", " ")) - if remhelp: - ret.append(("text", " ")) - for v in remhelp: - ret.append(("commander_hint", "%s " % v)) - return ret + if not parts: + # Means we just received the leader, so we need to give a blank + # text to the widget to render or it crashes + ret.append(("text", "")) + else: + for p in parts: + if p.valid: + if p.type == mitmproxy.types.Cmd: + ret.append(("commander_command", p.value)) + else: + ret.append(("text", p.value)) + elif p.value: + ret.append(("commander_invalid", p.value)) - def flatten(self, txt): - parts, _ = self.parse_quoted(txt) - ret = [x.value for x in parts] - return " ".join(ret) + if remaining: + if parts[-1].type != mitmproxy.types.Space: + ret.append(("text", " ")) + for param in remaining: + ret.append(("commander_hint", f"{param} ")) + + return ret def left(self) -> None: self.cursor = self.cursor - 1 @@ -122,30 +98,38 @@ class CommandBuffer: def right(self) -> None: self.cursor = self.cursor + 1 - def cycle_completion(self) -> None: + def cycle_completion(self, forward: bool = True) -> None: if not self.completion: - parts, remainhelp = self.master.commands.parse_partial(self.text[:self.cursor]) - last = parts[-1] - ct = mitmproxy.types.CommandTypes.get(last.type, None) + parts, remaining = self.master.commands.parse_partial(self.text[:self.cursor]) + if parts and parts[-1].type != mitmproxy.types.Space: + type_to_complete = parts[-1].type + cycle_prefix = parts[-1].value + parsed = parts[:-1] + elif remaining: + type_to_complete = remaining[0].type + cycle_prefix = "" + parsed = parts + else: + return + ct = mitmproxy.types.CommandTypes.get(type_to_complete, None) if ct: self.completion = CompletionState( - completer = ListCompleter( - parts[-1].value, - ct.completion(self.master.commands, last.type, parts[-1].value) + completer=ListCompleter( + cycle_prefix, + ct.completion(self.master.commands, type_to_complete, cycle_prefix) ), - parse = parts, + parsed=parsed, ) if self.completion: - nxt = self.completion.completer.cycle() - buf = " ".join([i.value for i in self.completion.parse[:-1]]) + " " + nxt - buf = buf.strip() - self.text = self.flatten(buf) + nxt = self.completion.completer.cycle(forward) + buf = "".join([i.value for i in self.completion.parsed]) + nxt + self.text = buf self.cursor = len(self.text) def backspace(self) -> None: if self.cursor == 0: return - self.text = self.flatten(self.text[:self.cursor - 1] + self.text[self.cursor:]) + self.text = self.text[:self.cursor - 1] + self.text[self.cursor:] self.cursor = self.cursor - 1 self.completion = None @@ -153,13 +137,18 @@ class CommandBuffer: """ Inserts text at the cursor. """ - self.text = self.flatten(self.text[:self.cursor] + k + self.text[self.cursor:]) - self.cursor += 1 + + # We don't want to insert a space before the command + if k == ' ' and self.text[0:self.cursor].strip() == '': + return + + self.text = self.text[:self.cursor] + k + self.text[self.cursor:] + self.cursor += len(k) self.completion = None class CommandHistory: - def __init__(self, master: mitmproxy.master.Master, size: int=30) -> None: + def __init__(self, master: mitmproxy.master.Master, size: int = 30) -> None: self.saved_commands: collections.deque = collections.deque( [CommandBuffer(master, "")], maxlen=size @@ -182,7 +171,7 @@ class CommandHistory: return self.saved_commands[self.index] return None - def add_command(self, command: CommandBuffer, execution: bool=False) -> None: + def add_command(self, command: CommandBuffer, execution: bool = False) -> None: if self.index == self.last_index or execution: last_item = self.saved_commands[-1] last_item_empty = not last_item.text @@ -207,7 +196,7 @@ class CommandEdit(urwid.WidgetWrap): self.history = history self.update() - def keypress(self, size, key): + def keypress(self, size, key) -> None: if key == "backspace": self.cbuf.backspace() elif key == "left": @@ -219,27 +208,29 @@ class CommandEdit(urwid.WidgetWrap): self.cbuf = self.history.get_prev() or self.cbuf elif key == "down": self.cbuf = self.history.get_next() or self.cbuf + elif key == "shift tab": + self.cbuf.cycle_completion(False) elif key == "tab": self.cbuf.cycle_completion() elif len(key) == 1: self.cbuf.insert(key) self.update() - def update(self): + def update(self) -> None: self._w.set_text([self.leader, self.cbuf.render()]) - def render(self, size, focus=False): + def render(self, size, focus=False) -> urwid.Canvas: (maxcol,) = size canv = self._w.render((maxcol,)) canv = urwid.CompositeCanvas(canv) canv.cursor = self.get_cursor_coords((maxcol,)) return canv - def get_cursor_coords(self, size): + def get_cursor_coords(self, size) -> typing.Tuple[int, int]: p = self.cbuf.cursor + len(self.leader) trans = self._w.get_line_translation(size[0]) x, y = calc_coords(self._w.get_text()[0], trans, p) return x, y - def get_edit_text(self): + def get_edit_text(self) -> str: return self.cbuf.text diff --git a/mitmproxy/tools/console/commands.py b/mitmproxy/tools/console/commands.py index 0f35742b3..26a99b14d 100644 --- a/mitmproxy/tools/console/commands.py +++ b/mitmproxy/tools/console/commands.py @@ -1,6 +1,8 @@ import urwid import blinker import textwrap + +from mitmproxy import command from mitmproxy.tools.console import layoutwidget from mitmproxy.tools.console import signals @@ -10,7 +12,7 @@ command_focus_change = blinker.Signal() class CommandItem(urwid.WidgetWrap): - def __init__(self, walker, cmd, focused): + def __init__(self, walker, cmd: command.Command, focused: bool): self.walker, self.cmd, self.focused = walker, cmd, focused super().__init__(None) self._w = self.get_widget() @@ -18,15 +20,18 @@ class CommandItem(urwid.WidgetWrap): def get_widget(self): parts = [ ("focus", ">> " if self.focused else " "), - ("title", self.cmd.path), - ("text", " "), - ("text", " ".join(self.cmd.paramnames())), + ("title", self.cmd.name) ] - if self.cmd.returntype: - parts.append([ + if self.cmd.parameters: + parts += [ + ("text", " "), + ("text", " ".join(str(param) for param in self.cmd.parameters)), + ] + if self.cmd.return_type: + parts += [ ("title", " -> "), - ("text", self.cmd.retname()), - ]) + ("text", command.typename(self.cmd.return_type)), + ] return urwid.AttrMap( urwid.Padding(urwid.Text(parts)), @@ -92,7 +97,7 @@ class CommandsList(urwid.ListBox): def keypress(self, size, key): if key == "m_select": foc, idx = self.get_focus() - signals.status_prompt_command.send(partial=foc.cmd.path + " ") + signals.status_prompt_command.send(partial=foc.cmd.name + " ") elif key == "m_start": self.set_focus(0) self.walker._modified() diff --git a/mitmproxy/tools/console/consoleaddons.py b/mitmproxy/tools/console/consoleaddons.py index 9f595b422..7fcd9b484 100644 --- a/mitmproxy/tools/console/consoleaddons.py +++ b/mitmproxy/tools/console/consoleaddons.py @@ -1,21 +1,18 @@ import csv -import shlex import typing +import mitmproxy.types +from mitmproxy import command, command_lexer +from mitmproxy import contentviews from mitmproxy import ctx -from mitmproxy import command from mitmproxy import exceptions from mitmproxy import flow from mitmproxy import http from mitmproxy import log -from mitmproxy import contentviews -from mitmproxy.utils import strutils -import mitmproxy.types - - +from mitmproxy.tools.console import keymap from mitmproxy.tools.console import overlay from mitmproxy.tools.console import signals -from mitmproxy.tools.console import keymap +from mitmproxy.utils import strutils console_palettes = [ "lowlight", @@ -48,10 +45,12 @@ class UnsupportedLog: """ A small addon to dump info on flow types we don't support yet. """ + def websocket_message(self, f): message = f.messages[-1] ctx.log.info(f.message_info(message)) - ctx.log.debug(message.content if isinstance(message.content, str) else strutils.bytes_to_escaped_str(message.content)) + ctx.log.debug( + message.content if isinstance(message.content, str) else strutils.bytes_to_escaped_str(message.content)) def websocket_end(self, f): ctx.log.info("WebSocket connection closed by {}: {} {}, {}".format( @@ -78,6 +77,7 @@ class ConsoleAddon: An addon that exposes console-specific commands, and hooks into required events. """ + def __init__(self, master): self.master = master self.started = False @@ -86,7 +86,7 @@ class ConsoleAddon: loader.add_option( "console_default_contentview", str, "auto", "The default content view mode.", - choices = [i.name.lower() for i in contentviews.views] + choices=[i.name.lower() for i in contentviews.views] ) loader.add_option( "console_eventlog_verbosity", str, 'info', @@ -142,7 +142,7 @@ class ConsoleAddon: opts = self.layout_options() off = self.layout_options().index(ctx.options.console_layout) ctx.options.update( - console_layout = opts[(off + 1) % len(opts)] + console_layout=opts[(off + 1) % len(opts)] ) @command.command("console.panes.next") @@ -234,17 +234,18 @@ class ConsoleAddon: @command.command("console.choose") def console_choose( - self, - prompt: str, - choices: typing.Sequence[str], - cmd: mitmproxy.types.Cmd, - *args: mitmproxy.types.Arg + self, + prompt: str, + choices: typing.Sequence[str], + cmd: mitmproxy.types.Cmd, + *args: mitmproxy.types.CmdArgs ) -> None: """ Prompt the user to choose from a specified list of strings, then invoke another command with all occurrences of {choice} replaced by the choice the user made. """ + def callback(opt): # We're now outside of the call context... repl = cmd + " " + " ".join(args) @@ -260,22 +261,22 @@ class ConsoleAddon: @command.command("console.choose.cmd") def console_choose_cmd( - self, - prompt: str, - choicecmd: mitmproxy.types.Cmd, - subcmd: mitmproxy.types.Cmd, - *args: mitmproxy.types.Arg + self, + prompt: str, + choicecmd: mitmproxy.types.Cmd, + subcmd: mitmproxy.types.Cmd, + *args: mitmproxy.types.CmdArgs ) -> None: """ Prompt the user to choose from a list of strings returned by a command, then invoke another command with all occurrences of {choice} replaced by the choice the user made. """ - choices = ctx.master.commands.call_strings(choicecmd, []) + choices = ctx.master.commands.execute(choicecmd) def callback(opt): # We're now outside of the call context... - repl = shlex.quote(" ".join(args)) + repl = " ".join(command_lexer.quote(x) for x in args) repl = repl.replace("{choice}", opt) try: self.master.commands.execute(subcmd + " " + repl) @@ -287,21 +288,24 @@ class ConsoleAddon: ) @command.command("console.command") - def console_command(self, *partial: str) -> None: + def console_command(self, *command_str: str) -> None: """ Prompt the user to edit a command with a (possibly empty) starting value. """ - signals.status_prompt_command.send(partial=" ".join(partial)) # type: ignore + quoted = " ".join(command_lexer.quote(x) for x in command_str) + signals.status_prompt_command.send(partial=quoted) @command.command("console.command.set") - def console_command_set(self, option: str) -> None: + def console_command_set(self, option_name: str) -> None: """ - Prompt the user to set an option of the form "key[=value]". + Prompt the user to set an option. """ - option_value = getattr(self.master.options, option, None) - current_value = option_value if option_value else "" - self.master.commands.execute( - "console.command set %s=%s" % (option, current_value) + option_value = getattr(self.master.options, option_name, None) or "" + set_command = f"set {option_name} {option_value!r}" + cursor = len(set_command) - 1 + signals.status_prompt_command.send( + partial=set_command, + cursor=cursor ) @command.command("console.view.keybindings") @@ -351,14 +355,14 @@ class ConsoleAddon: @command.command("console.bodyview") @command.argument("part", type=mitmproxy.types.Choice("console.bodyview.options")) - def bodyview(self, f: flow.Flow, part: str) -> None: + def bodyview(self, flow: flow.Flow, part: str) -> None: """ Spawn an external viewer for a flow request or response body based on the detected MIME type. We use the mailcap system to find the correct viewier, and fall back to the programs in $PAGER or $EDITOR if necessary. """ - fpart = getattr(f, part, None) + fpart = getattr(flow, part, None) if not fpart: raise exceptions.CommandError("Part must be either request or response, not %s." % part) t = fpart.headers.get("content-type") @@ -397,8 +401,8 @@ class ConsoleAddon: ] @command.command("console.edit.focus") - @command.argument("part", type=mitmproxy.types.Choice("console.edit.focus.options")) - def edit_focus(self, part: str) -> None: + @command.argument("flow_part", type=mitmproxy.types.Choice("console.edit.focus.options")) + def edit_focus(self, flow_part: str) -> None: """ Edit a component of the currently focused flow. """ @@ -410,27 +414,27 @@ class ConsoleAddon: flow.backup() require_dummy_response = ( - part in ("response-headers", "response-body", "set-cookies") and - flow.response is None + flow_part in ("response-headers", "response-body", "set-cookies") and + flow.response is None ) if require_dummy_response: flow.response = http.HTTPResponse.make() - if part == "cookies": + if flow_part == "cookies": self.master.switch_view("edit_focus_cookies") - elif part == "urlencoded form": + elif flow_part == "urlencoded form": self.master.switch_view("edit_focus_urlencoded_form") - elif part == "multipart form": + elif flow_part == "multipart form": self.master.switch_view("edit_focus_multipart_form") - elif part == "path": + elif flow_part == "path": self.master.switch_view("edit_focus_path") - elif part == "query": + elif flow_part == "query": self.master.switch_view("edit_focus_query") - elif part == "request-headers": + elif flow_part == "request-headers": self.master.switch_view("edit_focus_request_headers") - elif part == "response-headers": + elif flow_part == "response-headers": self.master.switch_view("edit_focus_response_headers") - elif part in ("request-body", "response-body"): - if part == "request-body": + elif flow_part in ("request-body", "response-body"): + if flow_part == "request-body": message = flow.request else: message = flow.response @@ -442,16 +446,16 @@ class ConsoleAddon: # just strip the newlines off the end of the body when we return # from an editor. message.content = c.rstrip(b"\n") - elif part == "set-cookies": + elif flow_part == "set-cookies": self.master.switch_view("edit_focus_setcookies") - elif part == "url": + elif flow_part == "url": url = flow.request.url.encode() edited_url = self.master.spawn_editor(url) url = edited_url.rstrip(b"\n") flow.request.url = url.decode() - elif part in ["method", "status_code", "reason"]: + elif flow_part in ["method", "status_code", "reason"]: self.master.commands.execute( - "console.command flow.set @focus %s " % part + "console.command flow.set @focus %s " % flow_part ) def _grideditor(self): @@ -535,10 +539,8 @@ class ConsoleAddon: raise exceptions.CommandError("Invalid flowview mode.") try: - self.master.commands.call_strings( - "view.settings.setval", - ["@focus", "flowview_mode_%s" % idx, mode] - ) + cmd = 'view.settings.setval @focus flowview_mode_%s %s' % (idx, mode) + self.master.commands.execute(cmd) except exceptions.CommandError as e: signals.status_message.send(message=str(e)) @@ -558,14 +560,9 @@ class ConsoleAddon: if not fv: raise exceptions.CommandError("Not viewing a flow.") idx = fv.body.tab_offset - return self.master.commands.call_strings( - "view.settings.getval", - [ - "@focus", - "flowview_mode_%s" % idx, - self.master.options.console_default_contentview, - ] - ) + + cmd = 'view.settings.getval @focus flowview_mode_%s %s' % (idx, self.master.options.console_default_contentview) + return self.master.commands.execute(cmd) @command.command("console.key.contexts") def key_contexts(self) -> typing.Sequence[str]: @@ -576,11 +573,11 @@ class ConsoleAddon: @command.command("console.key.bind") def key_bind( - self, - contexts: typing.Sequence[str], - key: str, - cmd: mitmproxy.types.Cmd, - *args: mitmproxy.types.Arg + self, + contexts: typing.Sequence[str], + key: str, + cmd: mitmproxy.types.Cmd, + *args: mitmproxy.types.CmdArgs ) -> None: """ Bind a shortcut key. diff --git a/mitmproxy/tools/console/defaultkeys.py b/mitmproxy/tools/console/defaultkeys.py index 0a6c55619..a0f276252 100644 --- a/mitmproxy/tools/console/defaultkeys.py +++ b/mitmproxy/tools/console/defaultkeys.py @@ -26,7 +26,7 @@ def map(km): km.add("ctrl f", "console.nav.pagedown", ["global"], "Page down") km.add("ctrl b", "console.nav.pageup", ["global"], "Page up") - km.add("I", "set intercept_active=toggle", ["global"], "Toggle intercept") + km.add("I", "set intercept_active toggle", ["global"], "Toggle intercept") km.add("i", "console.command.set intercept", ["global"], "Set intercept") km.add("W", "console.command.set save_stream_file", ["global"], "Stream to file") km.add("A", "flow.resume @all", ["flowlist", "flowview"], "Resume all intercepted flows") @@ -48,14 +48,14 @@ def map(km): "Export this flow to file" ) km.add("f", "console.command.set view_filter", ["flowlist"], "Set view filter") - km.add("F", "set console_focus_follow=toggle", ["flowlist"], "Set focus follow") + km.add("F", "set console_focus_follow toggle", ["flowlist"], "Set focus follow") km.add( "ctrl l", "console.command cut.clip ", ["flowlist", "flowview"], "Send cuts to clipboard" ) - km.add("L", "console.command view.load ", ["flowlist"], "Load flows from file") + km.add("L", "console.command view.flows.load ", ["flowlist"], "Load flows from file") km.add("m", "flow.mark.toggle @focus", ["flowlist"], "Toggle mark on this flow") km.add("M", "view.properties.marked.toggle", ["flowlist"], "Toggle viewing marked flows") km.add( @@ -68,14 +68,14 @@ def map(km): "o", """ console.choose.cmd Order view.order.options - set view_order={choice} + set view_order {choice} """, ["flowlist"], "Set flow list order" ) km.add("r", "replay.client @focus", ["flowlist", "flowview"], "Replay this flow") km.add("S", "console.command replay.server ", ["flowlist"], "Start server replay") - km.add("v", "set view_order_reversed=toggle", ["flowlist"], "Reverse flow list order") + km.add("v", "set view_order_reversed toggle", ["flowlist"], "Reverse flow list order") km.add("U", "flow.mark @all false", ["flowlist"], "Un-set all marks") km.add("w", "console.command save.file @shown ", ["flowlist"], "Save listed flows to file") km.add("V", "flow.revert @focus", ["flowlist", "flowview"], "Revert changes to this flow") diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py index 56f0674f2..43f5170d0 100644 --- a/mitmproxy/tools/console/statusbar.py +++ b/mitmproxy/tools/console/statusbar.py @@ -1,4 +1,5 @@ import os.path +from typing import Optional import urwid @@ -98,10 +99,15 @@ class ActionBar(urwid.WidgetWrap): self._w = urwid.Edit(self.prep_prompt(prompt), text or "") self.prompting = PromptStub(callback, args) - def sig_prompt_command(self, sender, partial=""): + def sig_prompt_command(self, sender, partial: str = "", cursor: Optional[int] = None): signals.focus.send(self, section="footer") - self._w = commander.CommandEdit(self.master, partial, - self.command_history) + self._w = commander.CommandEdit( + self.master, + partial, + self.command_history, + ) + if cursor is not None: + self._w.cbuf.cursor = cursor self.prompting = commandexecutor.CommandExecutor(self.master) def sig_prompt_onekey(self, sender, prompt, keys, callback, args=()): diff --git a/mitmproxy/types.py b/mitmproxy/types.py index 0634e4d77..ac9922172 100644 --- a/mitmproxy/types.py +++ b/mitmproxy/types.py @@ -5,6 +5,9 @@ import typing from mitmproxy import exceptions from mitmproxy import flow +if typing.TYPE_CHECKING: # pragma: no cover + from mitmproxy.command import CommandManager + class Path(str): pass @@ -14,7 +17,7 @@ class Cmd(str): pass -class Arg(str): +class CmdArgs(str): pass @@ -22,6 +25,10 @@ class Unknown(str): pass +class Space(str): + pass + + class CutSpec(typing.Sequence[str]): pass @@ -40,27 +47,11 @@ class Choice: return False -# One of the many charming things about mypy is that introducing type -# annotations can cause circular dependencies where there were none before. -# Rather than putting types and the CommandManger in the same file, we introduce -# a stub type with the signature we use. -class _CommandBase: - commands: typing.MutableMapping[str, typing.Any] = {} - - def call_strings(self, path: str, args: typing.Sequence[str]) -> typing.Any: - raise NotImplementedError - - def execute(self, cmd: str) -> typing.Any: - raise NotImplementedError - - class _BaseType: typ: typing.Type = object display: str = "" - def completion( - self, manager: _CommandBase, t: typing.Any, s: str - ) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: typing.Any, s: str) -> typing.Sequence[str]: """ Returns a list of completion strings for a given prefix. The strings returned don't necessarily need to be suffixes of the prefix, since @@ -68,9 +59,7 @@ class _BaseType: """ raise NotImplementedError - def parse( - self, manager: _CommandBase, typ: typing.Any, s: str - ) -> typing.Any: + def parse(self, manager: "CommandManager", typ: typing.Any, s: str) -> typing.Any: """ Parse a string, given the specific type instance (to allow rich type annotations like Choice) and a string. @@ -78,7 +67,7 @@ class _BaseType: """ raise NotImplementedError - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: """ Check if data is valid for this type. """ @@ -89,10 +78,10 @@ class _BoolType(_BaseType): typ = bool display = "bool" - def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]: return ["false", "true"] - def parse(self, manager: _CommandBase, t: type, s: str) -> bool: + def parse(self, manager: "CommandManager", t: type, s: str) -> bool: if s == "true": return True elif s == "false": @@ -102,7 +91,7 @@ class _BoolType(_BaseType): "Booleans are 'true' or 'false', got %s" % s ) - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: return val in [True, False] @@ -110,13 +99,13 @@ class _StrType(_BaseType): typ = str display = "str" - def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]: return [] - def parse(self, manager: _CommandBase, t: type, s: str) -> str: + def parse(self, manager: "CommandManager", t: type, s: str) -> str: return s - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: return isinstance(val, str) @@ -124,13 +113,13 @@ class _UnknownType(_BaseType): typ = Unknown display = "unknown" - def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]: return [] - def parse(self, manager: _CommandBase, t: type, s: str) -> str: + def parse(self, manager: "CommandManager", t: type, s: str) -> str: return s - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: return False @@ -138,16 +127,16 @@ class _IntType(_BaseType): typ = int display = "int" - def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]: return [] - def parse(self, manager: _CommandBase, t: type, s: str) -> int: + def parse(self, manager: "CommandManager", t: type, s: str) -> int: try: return int(s) except ValueError as e: raise exceptions.TypeError from e - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: return isinstance(val, int) @@ -155,7 +144,7 @@ class _PathType(_BaseType): typ = Path display = "path" - def completion(self, manager: _CommandBase, t: type, start: str) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: type, start: str) -> typing.Sequence[str]: if not start: start = "./" path = os.path.expanduser(start) @@ -177,10 +166,10 @@ class _PathType(_BaseType): ret.sort() return ret - def parse(self, manager: _CommandBase, t: type, s: str) -> str: + def parse(self, manager: "CommandManager", t: type, s: str) -> str: return os.path.expanduser(s) - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: return isinstance(val, str) @@ -188,43 +177,43 @@ class _CmdType(_BaseType): typ = Cmd display = "cmd" - def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]: return list(manager.commands.keys()) - def parse(self, manager: _CommandBase, t: type, s: str) -> str: + def parse(self, manager: "CommandManager", t: type, s: str) -> str: if s not in manager.commands: raise exceptions.TypeError("Unknown command: %s" % s) return s - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: return val in manager.commands class _ArgType(_BaseType): - typ = Arg + typ = CmdArgs display = "arg" - def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]: return [] - def parse(self, manager: _CommandBase, t: type, s: str) -> str: + def parse(self, manager: "CommandManager", t: type, s: str) -> str: return s - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: return isinstance(val, str) class _StrSeqType(_BaseType): typ = typing.Sequence[str] - display = "[str]" + display = "str[]" - def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]: return [] - def parse(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + def parse(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]: return [x.strip() for x in s.split(",")] - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: if isinstance(val, str) or isinstance(val, bytes): return False try: @@ -238,7 +227,7 @@ class _StrSeqType(_BaseType): class _CutSpecType(_BaseType): typ = CutSpec - display = "[cut]" + display = "cut[]" valid_prefixes = [ "request.method", "request.scheme", @@ -277,7 +266,7 @@ class _CutSpecType(_BaseType): "server_conn.tls_established", ] - def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]: spec = s.split(",") opts = [] for pref in self.valid_prefixes: @@ -285,11 +274,11 @@ class _CutSpecType(_BaseType): opts.append(",".join(spec)) return opts - def parse(self, manager: _CommandBase, t: type, s: str) -> CutSpec: + def parse(self, manager: "CommandManager", t: type, s: str) -> CutSpec: parts: typing.Any = s.split(",") return parts - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: if not isinstance(val, str): return False parts = [x.strip() for x in val.split(",")] @@ -327,7 +316,7 @@ class _BaseFlowType(_BaseType): "~c", ] - def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]: return self.valid_prefixes @@ -335,9 +324,9 @@ class _FlowType(_BaseFlowType): typ = flow.Flow display = "flow" - def parse(self, manager: _CommandBase, t: type, s: str) -> flow.Flow: + def parse(self, manager: "CommandManager", t: type, s: str) -> flow.Flow: try: - flows = manager.call_strings("view.flows.resolve", [s]) + flows = manager.execute("view.flows.resolve %s" % (s)) except exceptions.CommandError as e: raise exceptions.TypeError from e if len(flows) != 1: @@ -346,21 +335,21 @@ class _FlowType(_BaseFlowType): ) return flows[0] - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: return isinstance(val, flow.Flow) class _FlowsType(_BaseFlowType): typ = typing.Sequence[flow.Flow] - display = "[flow]" + display = "flow[]" - def parse(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[flow.Flow]: + def parse(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[flow.Flow]: try: - return manager.call_strings("view.flows.resolve", [s]) + return manager.execute("view.flows.resolve %s" % (s)) except exceptions.CommandError as e: raise exceptions.TypeError from e - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: try: for v in val: if not isinstance(v, flow.Flow): @@ -372,19 +361,19 @@ class _FlowsType(_BaseFlowType): class _DataType(_BaseType): typ = Data - display = "[data]" + display = "data[][]" def completion( - self, manager: _CommandBase, t: type, s: str + self, manager: "CommandManager", t: type, s: str ) -> typing.Sequence[str]: # pragma: no cover raise exceptions.TypeError("data cannot be passed as argument") def parse( - self, manager: _CommandBase, t: type, s: str + self, manager: "CommandManager", t: type, s: str ) -> typing.Any: # pragma: no cover raise exceptions.TypeError("data cannot be passed as argument") - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: # FIXME: validate that all rows have equal length, and all columns have equal types try: for row in val: @@ -400,16 +389,16 @@ class _ChoiceType(_BaseType): typ = Choice display = "choice" - def completion(self, manager: _CommandBase, t: Choice, s: str) -> typing.Sequence[str]: + def completion(self, manager: "CommandManager", t: Choice, s: str) -> typing.Sequence[str]: return manager.execute(t.options_command) - def parse(self, manager: _CommandBase, t: Choice, s: str) -> str: + def parse(self, manager: "CommandManager", t: Choice, s: str) -> str: opts = manager.execute(t.options_command) if s not in opts: raise exceptions.TypeError("Invalid choice.") return s - def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: + def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool: try: opts = manager.execute(typ.options_command) except exceptions.CommandError: @@ -423,7 +412,7 @@ class TypeManager: for t in types: self.typemap[t.typ] = t() - def get(self, t: typing.Optional[typing.Type], default=None) -> _BaseType: + def get(self, t: typing.Optional[typing.Type], default=None) -> typing.Optional[_BaseType]: if type(t) in self.typemap: return self.typemap[type(t)] return self.typemap.get(t, default) diff --git a/setup.cfg b/setup.cfg index a2c49f48d..d0dcc2df4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -18,6 +18,8 @@ show_missing = True exclude_lines = pragma: no cover raise NotImplementedError() + if typing.TYPE_CHECKING: + if TYPE_CHECKING: [mypy] ignore_missing_imports = True diff --git a/test/mitmproxy/addons/test_core.py b/test/mitmproxy/addons/test_core.py index 59875c2b6..e6924ead8 100644 --- a/test/mitmproxy/addons/test_core.py +++ b/test/mitmproxy/addons/test_core.py @@ -11,7 +11,7 @@ def test_set(): sa = core.Core() with taddons.context(loadcore=False) as tctx: assert tctx.master.options.server - tctx.command(sa.set, "server=false") + tctx.command(sa.set, "server", "false") assert not tctx.master.options.server with pytest.raises(exceptions.CommandError): diff --git a/test/mitmproxy/addons/test_save.py b/test/mitmproxy/addons/test_save.py index 4aa1f6488..6727a96f7 100644 --- a/test/mitmproxy/addons/test_save.py +++ b/test/mitmproxy/addons/test_save.py @@ -73,7 +73,7 @@ def test_save_command(tmpdir): v = view.View() tctx.master.addons.add(v) tctx.master.addons.add(sa) - tctx.master.commands.call_strings("save.file", ["@shown", p]) + tctx.master.commands.execute("save.file @shown %s" % p) def test_simple(tmpdir): diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py index d9dcf5f9d..a432f9e38 100644 --- a/test/mitmproxy/test_command.py +++ b/test/mitmproxy/test_command.py @@ -1,14 +1,16 @@ -import typing import inspect -from mitmproxy import command -from mitmproxy import flow -from mitmproxy import exceptions -from mitmproxy.test import tflow -from mitmproxy.test import taddons -import mitmproxy.types import io +import typing + import pytest +import mitmproxy.types +from mitmproxy import command +from mitmproxy import exceptions +from mitmproxy import flow +from mitmproxy.test import taddons +from mitmproxy.test import tflow + class TAddon: @command.command("cmd1") @@ -29,7 +31,7 @@ class TAddon: return "ok" @command.command("subcommand") - def subcommand(self, cmd: mitmproxy.types.Cmd, *args: mitmproxy.types.Arg) -> str: + def subcommand(self, cmd: mitmproxy.types.Cmd, *args: mitmproxy.types.CmdArgs) -> str: return "ok" @command.command("empty") @@ -83,17 +85,15 @@ class TestCommand: with pytest.raises(exceptions.CommandError): command.Command(cm, "invalidret", a.invalidret) with pytest.raises(exceptions.CommandError): - command.Command(cm, "invalidarg", a.invalidarg) + assert command.Command(cm, "invalidarg", a.invalidarg) def test_varargs(self): with taddons.context() as tctx: cm = command.CommandManager(tctx.master) a = TAddon() c = command.Command(cm, "varargs", a.varargs) - assert c.signature_help() == "varargs str *str -> [str]" + assert c.signature_help() == "varargs one *var -> str[]" assert c.call(["one", "two", "three"]) == ["two", "three"] - with pytest.raises(exceptions.CommandError): - c.call(["one", "two", 3]) def test_call(self): with taddons.context() as tctx: @@ -101,7 +101,7 @@ class TestCommand: a = TAddon() c = command.Command(cm, "cmd.path", a.cmd1) assert c.call(["foo"]) == "ret foo" - assert c.signature_help() == "cmd.path str -> str" + assert c.signature_help() == "cmd.path foo -> str" c = command.Command(cm, "cmd.two", a.cmd2) with pytest.raises(exceptions.CommandError): @@ -115,154 +115,305 @@ class TestCommand: [ "foo bar", [ - command.ParseResult( - value = "foo", type = mitmproxy.types.Cmd, valid = False - ), - command.ParseResult( - value = "bar", type = mitmproxy.types.Unknown, valid = False - ) + command.ParseResult(value="foo", type=mitmproxy.types.Cmd, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="bar", type=mitmproxy.types.Unknown, valid=False) ], [], ], [ "cmd1 'bar", [ - command.ParseResult(value = "cmd1", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = "'bar", type = str, valid = True) + command.ParseResult(value="cmd1", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="'bar", type=str, valid=True) ], [], ], [ "a", - [command.ParseResult(value = "a", type = mitmproxy.types.Cmd, valid = False)], + [command.ParseResult(value="a", type=mitmproxy.types.Cmd, valid=False)], [], ], [ "", - [command.ParseResult(value = "", type = mitmproxy.types.Cmd, valid = False)], - [] + [], + [ + command.CommandParameter("", mitmproxy.types.Cmd), + command.CommandParameter("", mitmproxy.types.CmdArgs) + ] ], [ "cmd3 1", [ - command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = "1", type = int, valid = True), + command.ParseResult(value="cmd3", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="1", type=int, valid=True), ], [] ], [ "cmd3 ", [ - command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = "", type = int, valid = False), + command.ParseResult(value="cmd3", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), ], - [] + [command.CommandParameter('foo', int)] ], [ "subcommand ", [ - command.ParseResult( - value = "subcommand", type = mitmproxy.types.Cmd, valid = True, - ), - command.ParseResult(value = "", type = mitmproxy.types.Cmd, valid = False), + command.ParseResult(value="subcommand", type=mitmproxy.types.Cmd, valid=True, ), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), ], - ["arg"], + [ + command.CommandParameter('cmd', mitmproxy.types.Cmd), + command.CommandParameter('args', mitmproxy.types.CmdArgs, kind=inspect.Parameter.VAR_POSITIONAL), + ], + ], + [ + "varargs one", + [ + command.ParseResult(value="varargs", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="one", type=str, valid=True), + ], + [command.CommandParameter('var', str, kind=inspect.Parameter.VAR_POSITIONAL)] + ], + [ + "varargs one two three", + [ + command.ParseResult(value="varargs", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="one", type=str, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="two", type=str, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="three", type=str, valid=True), + ], + [], ], [ "subcommand cmd3 ", [ - command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = "", type = int, valid = False), + command.ParseResult(value="subcommand", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="cmd3", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), ], - [] + [command.CommandParameter('foo', int)] ], [ "cmd4", [ - command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True), + command.ParseResult(value="cmd4", type=mitmproxy.types.Cmd, valid=True), ], - ["int", "str", "path"] + [ + command.CommandParameter('a', int), + command.CommandParameter('b', str), + command.CommandParameter('c', mitmproxy.types.Path), + ] ], [ "cmd4 ", [ - command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = "", type = int, valid = False), + command.ParseResult(value="cmd4", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), ], - ["str", "path"] + [ + command.CommandParameter('a', int), + command.CommandParameter('b', str), + command.CommandParameter('c', mitmproxy.types.Path), + ] ], [ "cmd4 1", [ - command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = "1", type = int, valid = True), + command.ParseResult(value="cmd4", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="1", type=int, valid=True), ], - ["str", "path"] - ], - [ - "cmd4 1", [ - command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = "1", type = int, valid = True), - ], - ["str", "path"] + command.CommandParameter('b', str), + command.CommandParameter('c', mitmproxy.types.Path), + ] ], [ "flow", [ - command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True), + command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True), ], - ["flow", "str"] + [ + command.CommandParameter('f', flow.Flow), + command.CommandParameter('s', str), + ] ], [ "flow ", [ - command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = "", type = flow.Flow, valid = False), + command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), ], - ["str"] + [ + command.CommandParameter('f', flow.Flow), + command.CommandParameter('s', str), + ] ], [ "flow x", [ - command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = "x", type = flow.Flow, valid = False), + command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="x", type=flow.Flow, valid=False), ], - ["str"] + [ + command.CommandParameter('s', str), + ] ], [ "flow x ", [ - command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = "x", type = flow.Flow, valid = False), - command.ParseResult(value = "", type = str, valid = True), + command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="x", type=flow.Flow, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), ], - [] + [ + command.CommandParameter('s', str), + ] ], [ "flow \"one two", [ - command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = "\"one two", type = flow.Flow, valid = False), + command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="\"one two", type=flow.Flow, valid=False), ], - ["str"] + [ + command.CommandParameter('s', str), + ] ], [ - "flow \"one two\"", + "flow \"three four\"", [ - command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True), - command.ParseResult(value = "one two", type = flow.Flow, valid = False), + command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value='"three four"', type=flow.Flow, valid=False), ], - ["str"] + [ + command.CommandParameter('s', str), + ] ], + [ + "spaces ' '", + [ + command.ParseResult(value="spaces", type=mitmproxy.types.Cmd, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="' '", type=mitmproxy.types.Unknown, valid=False) + ], + [], + ], + [ + 'spaces2 " "', + [ + command.ParseResult(value="spaces2", type=mitmproxy.types.Cmd, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value='" "', type=mitmproxy.types.Unknown, valid=False) + ], + [], + ], + [ + '"abc"', + [ + command.ParseResult(value='"abc"', type=mitmproxy.types.Cmd, valid=False), + ], + [], + ], + [ + "'def'", + [ + command.ParseResult(value="'def'", type=mitmproxy.types.Cmd, valid=False), + ], + [], + ], + [ + "cmd10 'a' \"b\" c", + [ + command.ParseResult(value="cmd10", type=mitmproxy.types.Cmd, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="'a'", type=mitmproxy.types.Unknown, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value='"b"', type=mitmproxy.types.Unknown, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="c", type=mitmproxy.types.Unknown, valid=False), + ], + [], + ], + [ + "cmd11 'a \"b\" c'", + [ + command.ParseResult(value="cmd11", type=mitmproxy.types.Cmd, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="'a \"b\" c'", type=mitmproxy.types.Unknown, valid=False), + ], + [], + ], + [ + 'cmd12 "a \'b\' c"', + [ + command.ParseResult(value="cmd12", type=mitmproxy.types.Cmd, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value='"a \'b\' c"', type=mitmproxy.types.Unknown, valid=False), + ], + [], + ], + [ + r'cmd13 "a \"b\" c"', + [ + command.ParseResult(value="cmd13", type=mitmproxy.types.Cmd, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value=r'"a \"b\" c"', type=mitmproxy.types.Unknown, valid=False), + ], + [], + ], + [ + r"cmd14 'a \'b\' c'", + [ + command.ParseResult(value="cmd14", type=mitmproxy.types.Cmd, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value=r"'a \'b\' c'", type=mitmproxy.types.Unknown, valid=False), + ], + [], + ], + [ + " spaces_at_the_begining_are_not_stripped", + [ + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="spaces_at_the_begining_are_not_stripped", type=mitmproxy.types.Cmd, + valid=False), + ], + [], + ], + [ + " spaces_at_the_begining_are_not_stripped neither_at_the_end ", + [ + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="spaces_at_the_begining_are_not_stripped", type=mitmproxy.types.Cmd, + valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + command.ParseResult(value="neither_at_the_end", type=mitmproxy.types.Unknown, valid=False), + command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), + ], + [], + ], + ] + with taddons.context() as tctx: tctx.master.addons.add(TAddon()) for s, expected, expectedremain in tests: current, remain = tctx.master.commands.parse_partial(s) - assert current == expected - assert expectedremain == remain + assert (s, current, expectedremain) == (s, expected, remain) def test_simple(): @@ -270,9 +421,11 @@ def test_simple(): c = command.CommandManager(tctx.master) a = TAddon() c.add("one.two", a.cmd1) - assert c.commands["one.two"].help == "cmd1 help" - assert(c.execute("one.two foo") == "ret foo") - assert(c.call("one.two", "foo") == "ret foo") + assert (c.commands["one.two"].help == "cmd1 help") + assert (c.execute("one.two foo") == "ret foo") + assert (c.execute("one.two \"foo\"") == "ret foo") + assert (c.execute("one.two 'foo bar'") == "ret foo bar") + assert (c.call("one.two", "foo") == "ret foo") with pytest.raises(exceptions.CommandError, match="Unknown"): c.execute("nonexistent") with pytest.raises(exceptions.CommandError, match="Invalid"): @@ -281,8 +434,14 @@ def test_simple(): c.execute("one.two too many args") with pytest.raises(exceptions.CommandError, match="Unknown"): c.call("nonexistent") - with pytest.raises(exceptions.CommandError, match="No escaped"): + with pytest.raises(exceptions.CommandError, match="Unknown"): c.execute("\\") + with pytest.raises(exceptions.CommandError, match="Unknown"): + c.execute(r"\'") + with pytest.raises(exceptions.CommandError, match="Unknown"): + c.execute(r"\"") + with pytest.raises(exceptions.CommandError, match="Unknown"): + c.execute(r"\"") c.add("empty", a.empty) c.execute("empty") @@ -294,13 +453,13 @@ def test_simple(): def test_typename(): assert command.typename(str) == "str" - assert command.typename(typing.Sequence[flow.Flow]) == "[flow]" + assert command.typename(typing.Sequence[flow.Flow]) == "flow[]" - assert command.typename(mitmproxy.types.Data) == "[data]" - assert command.typename(mitmproxy.types.CutSpec) == "[cut]" + assert command.typename(mitmproxy.types.Data) == "data[][]" + assert command.typename(mitmproxy.types.CutSpec) == "cut[]" assert command.typename(flow.Flow) == "flow" - assert command.typename(typing.Sequence[str]) == "[str]" + assert command.typename(typing.Sequence[str]) == "str[]" assert command.typename(mitmproxy.types.Choice("foo")) == "choice" assert command.typename(mitmproxy.types.Path) == "path" diff --git a/test/mitmproxy/test_command_lexer.py b/test/mitmproxy/test_command_lexer.py new file mode 100644 index 000000000..3f009f88c --- /dev/null +++ b/test/mitmproxy/test_command_lexer.py @@ -0,0 +1,38 @@ +import pyparsing +import pytest + +from mitmproxy import command_lexer + + +@pytest.mark.parametrize( + "test_input,valid", [ + ("'foo'", True), + ('"foo"', True), + ("'foo' bar'", False), + ("'foo\\' bar'", True), + ("'foo' 'bar'", False), + ("'foo'x", False), + ('''"foo ''', True), + ('''"foo 'bar' ''', True), + ] +) +def test_partial_quoted_string(test_input, valid): + if valid: + assert command_lexer.PartialQuotedString.parseString(test_input, parseAll=True)[0] == test_input + else: + with pytest.raises(pyparsing.ParseException): + command_lexer.PartialQuotedString.parseString(test_input, parseAll=True) + + +@pytest.mark.parametrize( + "test_input,expected", [ + ("'foo'", ["'foo'"]), + ('"foo"', ['"foo"']), + ("'foo' 'bar'", ["'foo'", ' ', "'bar'"]), + ("'foo'x", ["'foo'", 'x']), + ('''"foo''', ['"foo']), + ('''"foo 'bar' ''', ['''"foo 'bar' ''']), + ] +) +def test_expr(test_input, expected): + assert list(command_lexer.expr.parseString(test_input, parseAll=True)) == expected diff --git a/test/mitmproxy/test_types.py b/test/mitmproxy/test_types.py index 571985fba..2cd17d87f 100644 --- a/test/mitmproxy/test_types.py +++ b/test/mitmproxy/test_types.py @@ -2,7 +2,6 @@ import pytest import os import typing import contextlib -from unittest import mock import mitmproxy.exceptions import mitmproxy.types @@ -64,13 +63,14 @@ def test_int(): b.parse(tctx.master.commands, int, "foo") -def test_path(tdata): +def test_path(tdata, monkeypatch): with taddons.context() as tctx: b = mitmproxy.types._PathType() assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/foo") == "/foo" assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/bar") == "/bar" - with mock.patch.dict("os.environ", {"HOME": "/home/test"}): - assert b.parse(tctx.master.commands, mitmproxy.types.Path, "~/mitm") == "/home/test/mitm" + monkeypatch.setenv("HOME", "/home/test") + monkeypatch.setenv("USERPROFILE", "/home/test") + assert b.parse(tctx.master.commands, mitmproxy.types.Path, "~/mitm") == "/home/test/mitm" assert b.is_valid(tctx.master.commands, mitmproxy.types.Path, "foo") is True assert b.is_valid(tctx.master.commands, mitmproxy.types.Path, "~/mitm") is True assert b.is_valid(tctx.master.commands, mitmproxy.types.Path, 3) is False @@ -127,10 +127,9 @@ def test_cutspec(): def test_arg(): with taddons.context() as tctx: b = mitmproxy.types._ArgType() - assert b.completion(tctx.master.commands, mitmproxy.types.Arg, "") == [] - assert b.parse(tctx.master.commands, mitmproxy.types.Arg, "foo") == "foo" - assert b.is_valid(tctx.master.commands, mitmproxy.types.Arg, "foo") is True - assert b.is_valid(tctx.master.commands, mitmproxy.types.Arg, 1) is False + assert b.completion(tctx.master.commands, mitmproxy.types.CmdArgs, "") == [] + assert b.parse(tctx.master.commands, mitmproxy.types.CmdArgs, "foo") == "foo" + assert b.is_valid(tctx.master.commands, mitmproxy.types.CmdArgs, 1) is False def test_strseq(): diff --git a/test/mitmproxy/tools/console/test_commander.py b/test/mitmproxy/tools/console/test_commander.py index b5e226fe7..a77be0432 100644 --- a/test/mitmproxy/tools/console/test_commander.py +++ b/test/mitmproxy/tools/console/test_commander.py @@ -1,6 +1,7 @@ +import pytest -from mitmproxy.tools.console.commander import commander from mitmproxy.test import taddons +from mitmproxy.tools.console.commander import commander class TestListCompleter: @@ -28,6 +29,112 @@ class TestListCompleter: assert c.cycle() == expected +class TestCommandEdit: + def test_open_command_bar(self): + with taddons.context() as tctx: + history = commander.CommandHistory(tctx.master, size=3) + edit = commander.CommandEdit(tctx.master, '', history) + + try: + edit.update() + except IndexError: + pytest.faied("Unexpected IndexError") + + def test_insert(self): + with taddons.context() as tctx: + history = commander.CommandHistory(tctx.master, size=3) + edit = commander.CommandEdit(tctx.master, '', history) + edit.keypress(1, 'a') + assert edit.get_edit_text() == 'a' + + # Don't let users type a space before starting a command + # as a usability feature + history = commander.CommandHistory(tctx.master, size=3) + edit = commander.CommandEdit(tctx.master, '', history) + edit.keypress(1, ' ') + assert edit.get_edit_text() == '' + + def test_backspace(self): + with taddons.context() as tctx: + history = commander.CommandHistory(tctx.master, size=3) + edit = commander.CommandEdit(tctx.master, '', history) + edit.keypress(1, 'a') + edit.keypress(1, 'b') + assert edit.get_edit_text() == 'ab' + edit.keypress(1, 'backspace') + assert edit.get_edit_text() == 'a' + + def test_left(self): + with taddons.context() as tctx: + history = commander.CommandHistory(tctx.master, size=3) + edit = commander.CommandEdit(tctx.master, '', history) + edit.keypress(1, 'a') + assert edit.cbuf.cursor == 1 + edit.keypress(1, 'left') + assert edit.cbuf.cursor == 0 + + # Do it again to make sure it won't go negative + edit.keypress(1, 'left') + assert edit.cbuf.cursor == 0 + + def test_right(self): + with taddons.context() as tctx: + history = commander.CommandHistory(tctx.master, size=3) + edit = commander.CommandEdit(tctx.master, '', history) + edit.keypress(1, 'a') + assert edit.cbuf.cursor == 1 + + # Make sure cursor won't go past the text + edit.keypress(1, 'right') + assert edit.cbuf.cursor == 1 + + # Make sure cursor goes left and then back right + edit.keypress(1, 'left') + assert edit.cbuf.cursor == 0 + edit.keypress(1, 'right') + assert edit.cbuf.cursor == 1 + + def test_up_and_down(self): + with taddons.context() as tctx: + history = commander.CommandHistory(tctx.master, size=3) + edit = commander.CommandEdit(tctx.master, '', history) + + buf = commander.CommandBuffer(tctx.master, 'cmd1') + history.add_command(buf) + buf = commander.CommandBuffer(tctx.master, 'cmd2') + history.add_command(buf) + + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'cmd2' + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'cmd1' + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'cmd1' + + history = commander.CommandHistory(tctx.master, size=5) + edit = commander.CommandEdit(tctx.master, '', history) + edit.keypress(1, 'a') + edit.keypress(1, 'b') + edit.keypress(1, 'c') + assert edit.get_edit_text() == 'abc' + edit.keypress(1, 'up') + assert edit.get_edit_text() == '' + edit.keypress(1, 'down') + assert edit.get_edit_text() == 'abc' + edit.keypress(1, 'down') + assert edit.get_edit_text() == 'abc' + + history = commander.CommandHistory(tctx.master, size=5) + edit = commander.CommandEdit(tctx.master, '', history) + buf = commander.CommandBuffer(tctx.master, 'cmd3') + history.add_command(buf) + edit.keypress(1, 'z') + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'cmd3' + edit.keypress(1, 'down') + assert edit.get_edit_text() == 'z' + + class TestCommandHistory: def fill_history(self, commands): with taddons.context() as tctx: @@ -148,13 +255,39 @@ class TestCommandBuffer: cb.cursor = len(cb.text) cb.cycle_completion() + ch = commander.CommandHistory(tctx.master, 30) + ce = commander.CommandEdit(tctx.master, "se", ch) + ce.keypress(1, 'tab') + ce.update() + ret = ce.cbuf.render() + assert ret == [ + ('commander_command', 'set'), + ('text', ' '), + ('commander_hint', 'option '), + ('commander_hint', 'value '), + ] + def test_render(self): with taddons.context() as tctx: cb = commander.CommandBuffer(tctx.master) cb.text = "foo" assert cb.render() - def test_flatten(self): - with taddons.context() as tctx: - cb = commander.CommandBuffer(tctx.master) - assert cb.flatten("foo bar") == "foo bar" + cb.text = "set view_filter '~bq test'" + ret = cb.render() + assert ret == [ + ('commander_command', 'set'), + ('text', ' '), + ('text', 'view_filter'), + ('text', ' '), + ('text', "'~bq test'"), + ] + + cb.text = "set" + ret = cb.render() + assert ret == [ + ('commander_command', 'set'), + ('text', ' '), + ('commander_hint', 'option '), + ('commander_hint', 'value '), + ] diff --git a/test/mitmproxy/tools/console/test_defaultkeys.py b/test/mitmproxy/tools/console/test_defaultkeys.py index 52075c845..58a0a585d 100644 --- a/test/mitmproxy/tools/console/test_defaultkeys.py +++ b/test/mitmproxy/tools/console/test_defaultkeys.py @@ -1,14 +1,18 @@ +import pytest + +import mitmproxy.types +from mitmproxy import command +from mitmproxy import ctx from mitmproxy.test.tflow import tflow from mitmproxy.tools.console import defaultkeys from mitmproxy.tools.console import keymap from mitmproxy.tools.console import master -from mitmproxy import command - -import pytest @pytest.mark.asyncio async def test_commands_exist(): + command_manager = command.CommandManager(ctx) + km = keymap.Keymap(None) defaultkeys.map(km) assert km.bindings @@ -16,7 +20,14 @@ async def test_commands_exist(): await m.load_flow(tflow()) for binding in km.bindings: - cmd, *args = command.lexer(binding.command) + parsed, _ = command_manager.parse_partial(binding.command.strip()) + + cmd = parsed[0].value + args = [ + a.value for a in parsed[1:] + if a.type != mitmproxy.types.Space + ] + assert cmd in m.commands.commands cmd_obj = m.commands.commands[cmd]