diff --git a/mitmproxy/addons/clientplayback.py b/mitmproxy/addons/clientplayback.py index 9e012b674..fcc3209b9 100644 --- a/mitmproxy/addons/clientplayback.py +++ b/mitmproxy/addons/clientplayback.py @@ -37,7 +37,7 @@ class ClientPlayback: ctx.master.addons.trigger("update", []) @command.command("replay.client.file") - def load_file(self, path: str) -> None: + def load_file(self, path: command.Path) -> None: try: flows = io.read_flows_from_paths([path]) except exceptions.FlowReadException as e: diff --git a/mitmproxy/addons/serverplayback.py b/mitmproxy/addons/serverplayback.py index 927f6e15c..46968a8d6 100644 --- a/mitmproxy/addons/serverplayback.py +++ b/mitmproxy/addons/serverplayback.py @@ -31,7 +31,7 @@ class ServerPlayback: ctx.master.addons.trigger("update", []) @command.command("replay.server.file") - def load_file(self, path: str) -> None: + def load_file(self, path: command.Path) -> None: try: flows = io.read_flows_from_paths([path]) except exceptions.FlowReadException as e: diff --git a/mitmproxy/command.py b/mitmproxy/command.py index c48219730..087f77704 100644 --- a/mitmproxy/command.py +++ b/mitmproxy/command.py @@ -3,6 +3,7 @@ """ import inspect import types +import io import typing import shlex import textwrap @@ -14,6 +15,15 @@ from mitmproxy import exceptions from mitmproxy import flow +def lexer(s): + # mypy mis-identifies shlex.shlex as abstract + lex = shlex.shlex(s) # type: ignore + lex.wordchars += "." + lex.whitespace_split = True + lex.commenters = '' + return lex + + Cuts = typing.Sequence[ typing.Sequence[typing.Union[str, bytes]] ] @@ -23,6 +33,14 @@ class Path(str): pass +class Cmd(str): + pass + + +class Arg(str): + pass + + def typename(t: type, ret: bool) -> str: """ Translates a type to an explanatory string. If ret is True, we're @@ -118,6 +136,12 @@ class Command: return ret +ParseResult = typing.NamedTuple( + "ParseResult", + [("value", str), ("type", typing.Type)], +) + + class CommandManager: def __init__(self, master): self.master = master @@ -133,6 +157,42 @@ class CommandManager: def add(self, path: str, func: typing.Callable): self.commands[path] = Command(self, path, func) + def parse_partial(self, cmdstr: str) -> typing.Sequence[ParseResult]: + """ + Parse a possibly partial command. Return a sequence of (part, type) tuples. + """ + buf = io.StringIO(cmdstr) + parts = [] # type: typing.List[str] + lex = lexer(buf) + while 1: + remainder = cmdstr[buf.tell():] + try: + t = lex.get_token() + except ValueError: + parts.append(remainder) + break + if not t: + break + parts.append(t) + if not parts: + parts = [""] + elif cmdstr.endswith(" "): + parts.append("") + + parse = [] # type: typing.List[ParseResult] + params = [] # type: typing.List[type] + for i in range(len(parts)): + if i == 0: + params[:] = [Cmd] + if parts[i] in self.commands: + params.extend(self.commands[parts[i]].paramtypes) + if params: + typ = params.pop(0) + else: + typ = str + parse.append(ParseResult(value=parts[i], type=typ)) + return parse + def call_args(self, path, args): """ Call a command using a list of string arguments. May raise CommandError. @@ -145,7 +205,7 @@ class CommandManager: """ Call a command using a string. May raise CommandError. """ - parts = shlex.split(cmdstr) + parts = list(lexer(cmdstr)) if not len(parts) >= 1: raise exceptions.CommandError("Invalid command: %s" % cmdstr) return self.call_args(parts[0], parts[1:]) diff --git a/mitmproxy/tools/console/commandeditor.py b/mitmproxy/tools/console/commandeditor.py index 17d1506bd..e57ddbb4d 100644 --- a/mitmproxy/tools/console/commandeditor.py +++ b/mitmproxy/tools/console/commandeditor.py @@ -1,19 +1,10 @@ import typing -import urwid from mitmproxy import exceptions from mitmproxy import flow from mitmproxy.tools.console import signals -class CommandEdit(urwid.Edit): - def __init__(self, partial): - urwid.Edit.__init__(self, ":", partial) - - def keypress(self, size, key): - return urwid.Edit.keypress(self, size, key) - - class CommandExecutor: def __init__(self, master): self.master = master diff --git a/mitmproxy/tools/console/commander/__init__.py b/mitmproxy/tools/console/commander/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py new file mode 100644 index 000000000..dbbc8ff2a --- /dev/null +++ b/mitmproxy/tools/console/commander/commander.py @@ -0,0 +1,158 @@ +import urwid +from urwid.text_layout import calc_coords +import typing +import abc + +import mitmproxy.master +import mitmproxy.command + + +class Completer: + @abc.abstractmethod + def cycle(self) -> str: + pass + + +class ListCompleter(Completer): + def __init__( + self, + start: str, + options: typing.Sequence[str], + ) -> None: + self.start = start + self.options = [] # type: typing.Sequence[str] + for o in options: + if o.startswith(start): + self.options.append(o) + self.offset = 0 + + def cycle(self) -> str: + if not self.options: + return self.start + ret = self.options[self.offset] + self.offset = (self.offset + 1) % len(self.options) + return ret + + +CompletionState = typing.NamedTuple( + "CompletionState", + [ + ("completer", Completer), + ("parse", typing.Sequence[mitmproxy.command.ParseResult]) + ] +) + + +class CommandBuffer(): + def __init__(self, master: mitmproxy.master.Master, start: str = "") -> None: + self.master = master + self.buf = start + # Cursor is always within the range [0:len(buffer)]. + self._cursor = len(self.buf) + self.completion = None # type: CompletionState + + @property + def cursor(self) -> int: + return self._cursor + + @cursor.setter + def cursor(self, x) -> None: + if x < 0: + self._cursor = 0 + elif x > len(self.buf): + self._cursor = len(self.buf) + else: + self._cursor = x + + def render(self): + return self.buf + + def left(self) -> None: + self.cursor = self.cursor - 1 + + def right(self) -> None: + self.cursor = self.cursor + 1 + + def cycle_completion(self) -> None: + if not self.completion: + parts = self.master.commands.parse_partial(self.buf[:self.cursor]) + last = parts[-1] + if last.type == mitmproxy.command.Cmd: + self.completion = CompletionState( + completer = ListCompleter( + parts[-1].value, + self.master.commands.commands.keys(), + ), + parse = parts, + ) + elif isinstance(last.type, mitmproxy.command.Choice): + self.completion = CompletionState( + completer = ListCompleter( + parts[-1].value, + self.master.commands.call(last.type.options_command), + ), + parse = parts, + ) + if self.completion: + nxt = self.completion.completer.cycle() + buf = " ".join([i.value for i in self.completion.parse[:-1]]) + " " + nxt + buf = buf.strip() + self.buf = buf + self.cursor = len(self.buf) + + def backspace(self) -> None: + if self.cursor == 0: + return + self.buf = self.buf[:self.cursor - 1] + self.buf[self.cursor:] + self.cursor = self.cursor - 1 + self.completion = None + + def insert(self, k: str) -> None: + """ + Inserts text at the cursor. + """ + self.buf = self.buf = self.buf[:self.cursor] + k + self.buf[self.cursor:] + self.cursor += 1 + self.completion = None + + +class CommandEdit(urwid.WidgetWrap): + leader = ": " + + def __init__(self, master: mitmproxy.master.Master, text: str) -> None: + self.master = master + self.cbuf = CommandBuffer(master, text) + self._w = urwid.Text(self.leader) + self.update() + + def keypress(self, size, key): + if key == "backspace": + self.cbuf.backspace() + elif key == "left": + self.cbuf.left() + elif key == "right": + self.cbuf.right() + elif key == "tab": + self.cbuf.cycle_completion() + elif len(key) == 1: + self.cbuf.insert(key) + self.update() + + def update(self): + self._w.set_text([self.leader, self.cbuf.render()]) + + def render(self, size, focus=False): + (maxcol,) = size + canv = self._w.render((maxcol,)) + canv = urwid.CompositeCanvas(canv) + canv.cursor = self.get_cursor_coords((maxcol,)) + return canv + + def get_cursor_coords(self, size): + p = self.cbuf.cursor + len(self.leader) + trans = self._w.get_line_translation(size[0]) + x, y = calc_coords(self._w.get_text()[0], trans, p) + return x, y + + def get_value(self): + return self.cbuf.buf diff --git a/mitmproxy/tools/console/consoleaddons.py b/mitmproxy/tools/console/consoleaddons.py index 06ee33418..023cc5d9c 100644 --- a/mitmproxy/tools/console/consoleaddons.py +++ b/mitmproxy/tools/console/consoleaddons.py @@ -224,7 +224,11 @@ class ConsoleAddon: @command.command("console.choose") def console_choose( - self, prompt: str, choices: typing.Sequence[str], *cmd: str + self, + prompt: str, + choices: typing.Sequence[str], + cmd: command.Cmd, + *args: command.Arg ) -> None: """ Prompt the user to choose from a specified list of strings, then @@ -233,7 +237,7 @@ class ConsoleAddon: """ def callback(opt): # We're now outside of the call context... - repl = " ".join(cmd) + repl = cmd + " " + " ".join(args) repl = repl.replace("{choice}", opt) try: self.master.commands.call(repl) @@ -246,7 +250,7 @@ class ConsoleAddon: @command.command("console.choose.cmd") def console_choose_cmd( - self, prompt: str, choicecmd: str, *cmd: str + self, prompt: str, choicecmd: command.Cmd, *cmd: command.Arg ) -> None: """ Prompt the user to choose from a list of strings returned by a @@ -492,14 +496,20 @@ class ConsoleAddon: return list(sorted(keymap.Contexts)) @command.command("console.key.bind") - def key_bind(self, contexts: typing.Sequence[str], key: str, *command: str) -> None: + def key_bind( + self, + contexts: typing.Sequence[str], + key: str, + cmd: command.Cmd, + *args: command.Arg + ) -> None: """ Bind a shortcut key. """ try: self.master.keymap.add( key, - " ".join(command), + cmd + " " + " ".join(args), contexts, "" ) diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py index 795b3d8a9..6a1f07a93 100644 --- a/mitmproxy/tools/console/statusbar.py +++ b/mitmproxy/tools/console/statusbar.py @@ -6,6 +6,7 @@ from mitmproxy.tools.console import common from mitmproxy.tools.console import signals from mitmproxy.tools.console import commandeditor import mitmproxy.tools.console.master # noqa +from mitmproxy.tools.console.commander import commander class PromptPath: @@ -66,7 +67,7 @@ class ActionBar(urwid.WidgetWrap): def sig_prompt_command(self, sender, partial=""): signals.focus.send(self, section="footer") - self._w = commandeditor.CommandEdit(partial) + self._w = commander.CommandEdit(self.master, partial) self.prompting = commandeditor.CommandExecutor(self.master) def sig_prompt_onekey(self, sender, prompt, keys, callback, args=()): @@ -100,7 +101,7 @@ class ActionBar(urwid.WidgetWrap): elif k in self.onekey: self.prompt_execute(k) elif k == "enter": - self.prompt_execute(self._w.get_edit_text()) + self.prompt_execute(self._w.get_value()) else: if common.is_keypress(k): self._w.keypress(size, k) diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py index e1879ba28..76ce22458 100644 --- a/test/mitmproxy/test_command.py +++ b/test/mitmproxy/test_command.py @@ -11,19 +11,24 @@ from mitmproxy.utils import typecheck class TAddon: + @command.command("cmd1") def cmd1(self, foo: str) -> str: """cmd1 help""" return "ret " + foo + @command.command("cmd2") def cmd2(self, foo: str) -> str: return 99 + @command.command("cmd3") def cmd3(self, foo: int) -> int: return foo + @command.command("empty") def empty(self) -> None: pass + @command.command("varargs") def varargs(self, one: str, *var: str) -> typing.Sequence[str]: return list(var) @@ -34,6 +39,7 @@ class TAddon: def choose(self, arg: str) -> typing.Sequence[str]: return ["one", "two", "three"] + @command.command("path") def path(self, arg: command.Path) -> None: pass @@ -64,6 +70,44 @@ class TestCommand: c = command.Command(cm, "cmd.three", a.cmd3) assert c.call(["1"]) == 1 + def test_parse_partial(self): + tests = [ + [ + "foo bar", + [ + command.ParseResult(value = "foo", type = command.Cmd), + command.ParseResult(value = "bar", type = str) + ], + ], + [ + "foo 'bar", + [ + command.ParseResult(value = "foo", type = command.Cmd), + command.ParseResult(value = "'bar", type = str) + ] + ], + ["a", [command.ParseResult(value = "a", type = command.Cmd)]], + ["", [command.ParseResult(value = "", type = command.Cmd)]], + [ + "cmd3 1", + [ + command.ParseResult(value = "cmd3", type = command.Cmd), + command.ParseResult(value = "1", type = int), + ] + ], + [ + "cmd3 ", + [ + command.ParseResult(value = "cmd3", type = command.Cmd), + command.ParseResult(value = "", type = int), + ] + ], + ] + with taddons.context() as tctx: + tctx.master.addons.add(TAddon()) + for s, expected in tests: + assert tctx.master.commands.parse_partial(s) == expected + def test_simple(): with taddons.context() as tctx: @@ -100,6 +144,7 @@ def test_typename(): assert command.typename(command.Choice("foo"), False) == "choice" assert command.typename(command.Path, False) == "path" + assert command.typename(command.Cmd, False) == "cmd" class DummyConsole: @@ -162,6 +207,9 @@ def test_parsearg(): assert command.parsearg( tctx.master.commands, "foo", command.Path ) == "foo" + assert command.parsearg( + tctx.master.commands, "foo", command.Cmd + ) == "foo" class TDec: diff --git a/test/mitmproxy/tools/console/test_commander.py b/test/mitmproxy/tools/console/test_commander.py new file mode 100644 index 000000000..1ac4c5c62 --- /dev/null +++ b/test/mitmproxy/tools/console/test_commander.py @@ -0,0 +1,68 @@ +from mitmproxy.tools.console.commander import commander +from mitmproxy.test import taddons + + +class TestListCompleter: + def test_cycle(self): + tests = [ + [ + "", + ["a", "b", "c"], + ["a", "b", "c", "a"] + ], + [ + "xxx", + ["a", "b", "c"], + ["xxx", "xxx", "xxx"] + ], + [ + "b", + ["a", "b", "ba", "bb", "c"], + ["b", "ba", "bb", "b"] + ], + ] + for start, options, cycle in tests: + c = commander.ListCompleter(start, options) + for expected in cycle: + assert c.cycle() == expected + + +class TestCommandBuffer: + + def test_backspace(self): + tests = [ + [("", 0), ("", 0)], + [("1", 0), ("1", 0)], + [("1", 1), ("", 0)], + [("123", 3), ("12", 2)], + [("123", 2), ("13", 1)], + [("123", 0), ("123", 0)], + ] + with taddons.context() as tctx: + for start, output in tests: + cb = commander.CommandBuffer(tctx.master) + cb.buf, cb.cursor = start[0], start[1] + cb.backspace() + assert cb.buf == output[0] + assert cb.cursor == output[1] + + def test_insert(self): + tests = [ + [("", 0), ("x", 1)], + [("a", 0), ("xa", 1)], + [("xa", 2), ("xax", 3)], + ] + with taddons.context() as tctx: + for start, output in tests: + cb = commander.CommandBuffer(tctx.master) + cb.buf, cb.cursor = start[0], start[1] + cb.insert("x") + assert cb.buf == output[0] + assert cb.cursor == output[1] + + def test_cycle_completion(self): + with taddons.context() as tctx: + cb = commander.CommandBuffer(tctx.master) + cb.buf = "foo bar" + cb.cursor = len(cb.buf) + cb.cycle_completion()