diff --git a/mitmproxy/addons/save.py b/mitmproxy/addons/save.py index 92b7c5bea..3dbef14e0 100644 --- a/mitmproxy/addons/save.py +++ b/mitmproxy/addons/save.py @@ -49,6 +49,10 @@ class Save: self.start_stream_to_path(ctx.options.save_stream_file, self.filt) def save(self, flows: typing.Sequence[flow.Flow], path: str) -> None: + """ + Save flows to a file. If the path starts with a +, flows are + appended to the file, otherwise it is over-written. + """ try: f = self.open_file(path) except IOError as v: diff --git a/mitmproxy/addons/view.py b/mitmproxy/addons/view.py index 63416b9fd..f4082abe4 100644 --- a/mitmproxy/addons/view.py +++ b/mitmproxy/addons/view.py @@ -323,6 +323,9 @@ class View(collections.Sequence): self.focus_follow = ctx.options.console_focus_follow def resolve(self, spec: str) -> typing.Sequence[mitmproxy.flow.Flow]: + """ + Resolve a flow list specification to an actual list of flows. + """ if spec == "@focus": return [self.focus.flow] if self.focus.flow else [] elif spec == "@shown": diff --git a/mitmproxy/command.py b/mitmproxy/command.py index acf938d5c..1c943cefe 100644 --- a/mitmproxy/command.py +++ b/mitmproxy/command.py @@ -1,6 +1,8 @@ import inspect import typing import shlex +import textwrap + from mitmproxy.utils import typecheck from mitmproxy import exceptions from mitmproxy import flow @@ -25,12 +27,24 @@ class Command: self.manager = manager self.func = func sig = inspect.signature(self.func) + self.help = None + if func.__doc__: + txt = func.__doc__.strip() + self.help = "\n".join(textwrap.wrap(txt)) self.paramtypes = [v.annotation for v in sig.parameters.values()] self.returntype = sig.return_annotation + def paramnames(self) -> typing.Sequence[str]: + return [typename(i, False) for i in self.paramtypes] + + def retname(self) -> str: + return typename(self.returntype, True) if self.returntype else "" + def signature_help(self) -> str: - params = " ".join([typename(i, False) for i in self.paramtypes]) - ret = " -> " + typename(self.returntype, True) if self.returntype else "" + params = " ".join(self.paramnames()) + ret = self.retname() + if ret: + ret = " -> " + ret return "%s %s%s" % (self.path, params, ret) def call(self, args: typing.Sequence[str]): diff --git a/mitmproxy/tools/console/command.py b/mitmproxy/tools/console/commandeditor.py similarity index 88% rename from mitmproxy/tools/console/command.py rename to mitmproxy/tools/console/commandeditor.py index 4cb4fe6df..fd7d12ac7 100644 --- a/mitmproxy/tools/console/command.py +++ b/mitmproxy/tools/console/commandeditor.py @@ -5,8 +5,8 @@ from mitmproxy.tools.console import signals class CommandEdit(urwid.Edit): - def __init__(self): - urwid.Edit.__init__(self, ":", "") + def __init__(self, partial): + urwid.Edit.__init__(self, ":", partial) def keypress(self, size, key): return urwid.Edit.keypress(self, size, key) diff --git a/mitmproxy/tools/console/commands.py b/mitmproxy/tools/console/commands.py new file mode 100644 index 000000000..689aa6372 --- /dev/null +++ b/mitmproxy/tools/console/commands.py @@ -0,0 +1,175 @@ +import urwid +import blinker +import textwrap +from mitmproxy.tools.console import common +from mitmproxy.tools.console import signals + +HELP_HEIGHT = 5 + + +footer = [ + ('heading_key', "enter"), ":edit ", + ('heading_key', "?"), ":help ", +] + + +def _mkhelp(): + text = [] + keys = [ + ("enter", "execute command"), + ] + text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) + return text + + +help_context = _mkhelp() + + +def fcol(s, width, attr): + s = str(s) + return ( + "fixed", + width, + urwid.Text((attr, s)) + ) + + +command_focus_change = blinker.Signal() + + +class CommandItem(urwid.WidgetWrap): + def __init__(self, walker, cmd, focused): + self.walker, self.cmd, self.focused = walker, cmd, focused + super().__init__(None) + self._w = self.get_widget() + + def get_widget(self): + parts = [ + ("focus", ">> " if self.focused else " "), + ("title", self.cmd.path), + ("text", " "), + ("text", " ".join(self.cmd.paramnames())), + ] + if self.cmd.returntype: + parts.append([ + ("title", " -> "), + ("text", self.cmd.retname()), + ]) + + return urwid.AttrMap( + urwid.Padding(urwid.Text(parts)), + "text" + ) + + def get_edit_text(self): + return self._w[1].get_edit_text() + + def selectable(self): + return True + + def keypress(self, size, key): + return key + + +class CommandListWalker(urwid.ListWalker): + def __init__(self, master): + self.master = master + + self.index = 0 + self.focusobj = None + self.cmds = list(master.commands.commands.values()) + self.cmds.sort(key=lambda x: x.signature_help()) + self.set_focus(0) + + def get_edit_text(self): + return self.focus_obj.get_edit_text() + + def _get(self, pos): + cmd = self.cmds[pos] + return CommandItem(self, cmd, pos == self.index) + + def get_focus(self): + return self.focus_obj, self.index + + def set_focus(self, index): + cmd = self.cmds[index] + self.index = index + self.focus_obj = self._get(self.index) + command_focus_change.send(cmd.help or "") + + def get_next(self, pos): + if pos >= len(self.cmds) - 1: + return None, None + pos = pos + 1 + return self._get(pos), pos + + def get_prev(self, pos): + pos = pos - 1 + if pos < 0: + return None, None + return self._get(pos), pos + + +class CommandsList(urwid.ListBox): + def __init__(self, master): + self.master = master + self.walker = CommandListWalker(master) + super().__init__(self.walker) + + def keypress(self, size, key): + if key == "enter": + foc, idx = self.get_focus() + signals.status_prompt_command.send(partial=foc.cmd.path + " ") + return super().keypress(size, key) + + +class CommandHelp(urwid.Frame): + def __init__(self, master): + self.master = master + super().__init__(self.widget("")) + self.set_active(False) + command_focus_change.connect(self.sig_mod) + + def set_active(self, val): + h = urwid.Text("Command Help") + style = "heading" if val else "heading_inactive" + self.header = urwid.AttrWrap(h, style) + + def widget(self, txt): + cols, _ = self.master.ui.get_cols_rows() + return urwid.ListBox( + [urwid.Text(i) for i in textwrap.wrap(txt, cols)] + ) + + def sig_mod(self, txt): + self.set_body(self.widget(txt)) + + +class Commands(urwid.Pile): + def __init__(self, master): + oh = CommandHelp(master) + super().__init__( + [ + CommandsList(master), + (HELP_HEIGHT, oh), + ] + ) + self.master = master + + def keypress(self, size, key): + key = common.shortcuts(key) + if key == "tab": + self.focus_position = ( + self.focus_position + 1 + ) % len(self.widget_list) + self.widget_list[1].set_active(self.focus_position == 1) + key = None + + # This is essentially a copypasta from urwid.Pile's keypress handler. + # So much for "closed for modification, but open for extension". + item_rows = None + if len(size) == 2: + item_rows = self.get_item_rows(size, focus = True) + i = self.widget_list.index(self.focus_item) + tsize = self.get_item_size(size, i, True, item_rows) + return self.focus_item.keypress(tsize, key) diff --git a/mitmproxy/tools/console/flowlist.py b/mitmproxy/tools/console/flowlist.py index 00e5cf4ed..e75708248 100644 --- a/mitmproxy/tools/console/flowlist.py +++ b/mitmproxy/tools/console/flowlist.py @@ -353,9 +353,7 @@ class FlowListBox(urwid.ListBox): def keypress(self, size, key): key = common.shortcuts(key) - if key == ":": - signals.status_prompt_command.send() - elif key == "A": + if key == "A": for f in self.master.view: if f.intercepted: f.resume() diff --git a/mitmproxy/tools/console/keymap.py b/mitmproxy/tools/console/keymap.py new file mode 100644 index 000000000..018d1bdec --- /dev/null +++ b/mitmproxy/tools/console/keymap.py @@ -0,0 +1,34 @@ +import typing +from mitmproxy.tools.console import commandeditor + + +class Keymap: + def __init__(self, master): + self.executor = commandeditor.CommandExecutor(master) + self.keys = {} + + def add(self, key: str, command: str, context: str = "") -> None: + """ + Add a key to the key map. If context is empty, it's considered to be + a global binding. + """ + d = self.keys.setdefault(context, {}) + d[key] = command + + def get(self, context: str, key: str) -> typing.Optional[str]: + if context in self.keys: + return self.keys[context].get(key, None) + return None + + def handle(self, context: str, key: str) -> typing.Optional[str]: + """ + Returns the key if it has not been handled, or None. + """ + cmd = self.get(context, key) + if cmd: + return self.executor(cmd) + if cmd != "": + cmd = self.get("", key) + if cmd: + return self.executor(cmd) + return key diff --git a/mitmproxy/tools/console/master.py b/mitmproxy/tools/console/master.py index 8c5376bdd..d65562de0 100644 --- a/mitmproxy/tools/console/master.py +++ b/mitmproxy/tools/console/master.py @@ -24,9 +24,10 @@ from mitmproxy.tools.console import flowlist from mitmproxy.tools.console import flowview from mitmproxy.tools.console import grideditor from mitmproxy.tools.console import help +from mitmproxy.tools.console import keymap from mitmproxy.tools.console import options +from mitmproxy.tools.console import commands from mitmproxy.tools.console import overlay -from mitmproxy.tools.console import palettepicker from mitmproxy.tools.console import palettes from mitmproxy.tools.console import signals from mitmproxy.tools.console import statusbar @@ -75,6 +76,58 @@ class UnsupportedLog: signals.add_log(strutils.bytes_to_escaped_str(message.content), "debug") +class ConsoleCommands: + """ + An addon that exposes console-specific commands. + """ + def __init__(self, master): + self.master = master + + def command(self) -> None: + """Prompt for a command.""" + signals.status_prompt_command.send() + + def view_commands(self) -> None: + """View the commands list.""" + self.master.view_commands() + + def view_options(self) -> None: + """View the options editor.""" + self.master.view_options() + + def view_help(self) -> None: + """View help.""" + self.master.view_help() + + def exit(self) -> None: + """Exit mitmproxy.""" + raise urwid.ExitMainLoop + + def view_pop(self) -> None: + """ + Pop a view off the console stack. At the top level, this prompts the + user to exit mitmproxy. + """ + signals.pop_view_state.send(self) + + def load(self, l): + l.add_command("console.command", self.command) + l.add_command("console.exit", self.exit) + l.add_command("console.view.commands", self.view_commands) + l.add_command("console.view.help", self.view_help) + l.add_command("console.view.options", self.view_options) + l.add_command("console.view.pop", self.view_pop) + + +def default_keymap(km): + km.add(":", "console.command") + km.add("?", "console.view.help") + km.add("C", "console.view.commands") + km.add("O", "console.view.options") + km.add("Q", "console.exit") + km.add("q", "console.view.pop") + + class ConsoleMaster(master.Master): def __init__(self, options, server): @@ -84,6 +137,8 @@ class ConsoleMaster(master.Master): self.stream_path = None # This line is just for type hinting self.options = self.options # type: Options + self.keymap = keymap.Keymap(self) + default_keymap(self.keymap) self.options.errored.connect(self.options_error) self.logbuffer = urwid.SimpleListWalker([]) @@ -102,6 +157,7 @@ class ConsoleMaster(master.Master): self.view, UnsupportedLog(), readfile.ReadFile(), + ConsoleCommands(self), ) def sigint_handler(*args, **kwargs): @@ -331,12 +387,13 @@ class ConsoleMaster(master.Master): ) ) - def view_help(self, helpctx): + def view_help(self): + hc = self.view_stack[0].helpctx signals.push_view_state.send( self, window = window.Window( self, - help.HelpView(helpctx), + help.HelpView(hc), None, statusbar.StatusBar(self, help.footer), None @@ -358,15 +415,18 @@ class ConsoleMaster(master.Master): ) ) - def view_palette_picker(self): + def view_commands(self): + for i in self.view_stack: + if isinstance(i["body"], commands.Commands): + return signals.push_view_state.send( self, window = window.Window( self, - palettepicker.PalettePicker(self), + commands.Commands(self), None, - statusbar.StatusBar(self, palettepicker.footer), - palettepicker.help_context, + statusbar.StatusBar(self, commands.footer), + options.help_context, ) ) diff --git a/mitmproxy/tools/console/palettepicker.py b/mitmproxy/tools/console/palettepicker.py deleted file mode 100644 index 1f238b0df..000000000 --- a/mitmproxy/tools/console/palettepicker.py +++ /dev/null @@ -1,78 +0,0 @@ -import urwid - -from mitmproxy.tools.console import common -from mitmproxy.tools.console import palettes -from mitmproxy.tools.console import select - -footer = [ - ('heading_key', "enter/space"), ":select", -] - - -def _mkhelp(): - text = [] - keys = [ - ("enter/space", "select"), - ] - text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) - return text - - -help_context = _mkhelp() - - -class PalettePicker(urwid.WidgetWrap): - - def __init__(self, master): - self.master = master - low, high = [], [] - for k, v in palettes.palettes.items(): - if v.high: - high.append(k) - else: - low.append(k) - high.sort() - low.sort() - - options = [ - select.Heading("High Colour") - ] - - def mkopt(name): - return select.Option( - i, - None, - lambda: self.master.options.console_palette == name, - lambda: setattr(self.master.options, "console_palette", name) - ) - - for i in high: - options.append(mkopt(i)) - options.append(select.Heading("Low Colour")) - for i in low: - options.append(mkopt(i)) - - options.extend( - [ - select.Heading("Options"), - select.Option( - "Transparent", - "T", - lambda: master.options.console_palette_transparent, - master.options.toggler("console_palette_transparent") - ) - ] - ) - - self.lb = select.Select(options) - title = urwid.Text("Palettes") - title = urwid.Padding(title, align="left", width=("relative", 100)) - title = urwid.AttrWrap(title, "heading") - self._w = urwid.Frame( - self.lb, - header = title - ) - master.options.changed.connect(self.sig_options_changed) - - def sig_options_changed(self, options, updated): - self.lb.walker._modified() diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py index 1930fa2f6..8ded0cda2 100644 --- a/mitmproxy/tools/console/statusbar.py +++ b/mitmproxy/tools/console/statusbar.py @@ -5,7 +5,7 @@ import urwid from mitmproxy.tools.console import common from mitmproxy.tools.console import pathedit from mitmproxy.tools.console import signals -from mitmproxy.tools.console import command +from mitmproxy.tools.console import commandeditor import mitmproxy.tools.console.master # noqa @@ -69,10 +69,10 @@ class ActionBar(urwid.WidgetWrap): self._w = urwid.Edit(self.prep_prompt(prompt), text or "") self.prompting = PromptStub(callback, args) - def sig_prompt_command(self, sender): + def sig_prompt_command(self, sender, partial=""): signals.focus.send(self, section="footer") - self._w = command.CommandEdit() - self.prompting = command.CommandExecutor(self.master) + self._w = commandeditor.CommandEdit(partial) + self.prompting = commandeditor.CommandExecutor(self.master) def sig_path_prompt(self, sender, prompt, callback, args=()): signals.focus.send(self, section="footer") diff --git a/mitmproxy/tools/console/window.py b/mitmproxy/tools/console/window.py index 1c962e2fe..0e41fb2af 100644 --- a/mitmproxy/tools/console/window.py +++ b/mitmproxy/tools/console/window.py @@ -82,8 +82,9 @@ class Window(urwid.Frame): def keypress(self, size, k): k = super().keypress(size, k) - if k == "?": - self.master.view_help(self.helpctx) + k = self.master.keymap.handle("", k) + if not k: + return elif k == "i": signals.status_prompt.send( self, @@ -91,21 +92,5 @@ class Window(urwid.Frame): text = self.master.options.intercept, callback = self.master.options.setter("intercept") ) - elif k == "O": - self.master.view_options() - elif k == "Q": - raise urwid.ExitMainLoop - elif k == "q": - signals.pop_view_state.send(self) - elif k == "R": - signals.status_prompt_onekey.send( - self, - prompt = "Replay", - keys = ( - ("client", "c"), - ("server", "s"), - ), - callback = self.handle_replay, - ) else: return k diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py index 92d8c77bc..0c272a2c0 100644 --- a/test/mitmproxy/test_command.py +++ b/test/mitmproxy/test_command.py @@ -12,11 +12,15 @@ import pytest class TAddon: def cmd1(self, foo: str) -> str: + """cmd1 help""" return "ret " + foo def cmd2(self, foo: str) -> str: return 99 + def empty(self) -> None: + pass + class TestCommand: def test_call(self): @@ -40,6 +44,7 @@ def test_simple(): c = command.CommandManager(m) a = TAddon() c.add("one.two", a.cmd1) + assert c.commands["one.two"].help == "cmd1 help" assert(c.call("one.two foo") == "ret foo") with pytest.raises(exceptions.CommandError, match="Unknown"): c.call("nonexistent") @@ -48,6 +53,9 @@ def test_simple(): with pytest.raises(exceptions.CommandError, match="Usage"): c.call("one.two too many args") + c.add("empty", a.empty) + c.call("empty") + def test_typename(): assert command.typename(str, True) == "str"