diff --git a/mitmproxy/tools/console/consoleaddons.py b/mitmproxy/tools/console/consoleaddons.py new file mode 100644 index 000000000..5431f5be6 --- /dev/null +++ b/mitmproxy/tools/console/consoleaddons.py @@ -0,0 +1,426 @@ +import typing + +from mitmproxy import ctx +from mitmproxy import command +from mitmproxy import exceptions +from mitmproxy import flow +from mitmproxy.tools.console import overlay +from mitmproxy import contentviews +from mitmproxy.utils import strutils +from mitmproxy.tools.console import signals +import urwid + + +class Logger: + def log(self, evt): + signals.add_log(evt.msg, evt.level) + if evt.level == "alert": + signals.status_message.send( + message=str(evt.msg), + expire=2 + ) + + +class UnsupportedLog: + """ + A small addon to dump info on flow types we don't support yet. + """ + def websocket_message(self, f): + message = f.messages[-1] + signals.add_log(f.message_info(message), "info") + signals.add_log(strutils.bytes_to_escaped_str(message.content), "debug") + + def websocket_end(self, f): + signals.add_log("WebSocket connection closed by {}: {} {}, {}".format( + f.close_sender, + f.close_code, + f.close_message, + f.close_reason), "info") + + def tcp_message(self, f): + message = f.messages[-1] + direction = "->" if message.from_client else "<-" + signals.add_log("{client_host}:{client_port} {direction} tcp {direction} {server_host}:{server_port}".format( + client_host=f.client_conn.address[0], + client_port=f.client_conn.address[1], + server_host=f.server_conn.address[0], + server_port=f.server_conn.address[1], + direction=direction, + ), "info") + signals.add_log(strutils.bytes_to_escaped_str(message.content), "debug") + + +class ConsoleAddon: + """ + An addon that exposes console-specific commands, and hooks into required + events. + """ + def __init__(self, master): + self.master = master + self.started = False + + @command.command("console.layout.options") + def layout_options(self) -> typing.Sequence[str]: + """ + Returns the valid options for console layout. Use these by setting + the console_layout option. + """ + return ["single", "vertical", "horizontal"] + + @command.command("console.layout.cycle") + def layout_cycle(self) -> None: + """ + Cycle through the console layout options. + """ + opts = self.layout_options() + off = self.layout_options().index(ctx.options.console_layout) + ctx.options.update( + console_layout = opts[(off + 1) % len(opts)] + ) + + @command.command("console.panes.next") + def panes_next(self) -> None: + """ + Go to the next layout pane. + """ + self.master.window.switch() + + @command.command("console.options.reset.current") + def options_reset_current(self) -> None: + """ + Reset the current option in the options editor. + """ + fv = self.master.window.current("options") + if not fv: + raise exceptions.CommandError("Not viewing options.") + self.master.commands.call("options.reset.one %s" % fv.current_name()) + + @command.command("console.nav.start") + def nav_start(self) -> None: + """ + Go to the start of a list or scrollable. + """ + self.master.inject_key("m_start") + + @command.command("console.nav.end") + def nav_end(self) -> None: + """ + Go to the end of a list or scrollable. + """ + self.master.inject_key("m_end") + + @command.command("console.nav.next") + def nav_next(self) -> None: + """ + Go to the next navigatable item. + """ + self.master.inject_key("m_next") + + @command.command("console.nav.select") + def nav_select(self) -> None: + """ + Select a navigable item for viewing or editing. + """ + self.master.inject_key("m_select") + + @command.command("console.nav.up") + def nav_up(self) -> None: + """ + Go up. + """ + self.master.inject_key("up") + + @command.command("console.nav.down") + def nav_down(self) -> None: + """ + Go down. + """ + self.master.inject_key("down") + + @command.command("console.nav.pageup") + def nav_pageup(self) -> None: + """ + Go up. + """ + self.master.inject_key("page up") + + @command.command("console.nav.pagedown") + def nav_pagedown(self) -> None: + """ + Go down. + """ + self.master.inject_key("page down") + + @command.command("console.nav.left") + def nav_left(self) -> None: + """ + Go left. + """ + self.master.inject_key("left") + + @command.command("console.nav.right") + def nav_right(self) -> None: + """ + Go right. + """ + self.master.inject_key("right") + + @command.command("console.choose") + def console_choose( + self, prompt: str, choices: typing.Sequence[str], *cmd: str + ) -> None: + """ + Prompt the user to choose from a specified list of strings, then + invoke another command with all occurances of {choice} replaced by + the choice the user made. + """ + def callback(opt): + # We're now outside of the call context... + repl = " ".join(cmd) + repl = repl.replace("{choice}", opt) + try: + self.master.commands.call(repl) + except exceptions.CommandError as e: + signals.status_message.send(message=str(e)) + + self.master.overlay( + overlay.Chooser(self.master, prompt, choices, "", callback) + ) + + @command.command("console.choose.cmd") + def console_choose_cmd( + self, prompt: str, choicecmd: str, *cmd: str + ) -> None: + """ + Prompt the user to choose from a list of strings returned by a + command, then invoke another command with all occurances of {choice} + replaced by the choice the user made. + """ + choices = ctx.master.commands.call_args(choicecmd, []) + + def callback(opt): + # We're now outside of the call context... + repl = " ".join(cmd) + repl = repl.replace("{choice}", opt) + try: + self.master.commands.call(repl) + except exceptions.CommandError as e: + signals.status_message.send(message=str(e)) + + self.master.overlay( + overlay.Chooser(self.master, prompt, choices, "", callback) + ) + + @command.command("console.command") + def console_command(self, *partial: str) -> None: + """ + Prompt the user to edit a command with a (possilby empty) starting value. + """ + signals.status_prompt_command.send(partial=" ".join(partial)) # type: ignore + + @command.command("console.view.keybindings") + def view_keybindings(self) -> None: + """View the commands list.""" + self.master.switch_view("keybindings") + + @command.command("console.view.commands") + def view_commands(self) -> None: + """View the commands list.""" + self.master.switch_view("commands") + + @command.command("console.view.options") + def view_options(self) -> None: + """View the options editor.""" + self.master.switch_view("options") + + @command.command("console.view.eventlog") + def view_eventlog(self) -> None: + """View the options editor.""" + self.master.switch_view("eventlog") + + @command.command("console.view.help") + def view_help(self) -> None: + """View help.""" + self.master.switch_view("help") + + @command.command("console.view.flow") + def view_flow(self, flow: flow.Flow) -> None: + """View a flow.""" + if hasattr(flow, "request"): + # FIME: Also set focus? + self.master.switch_view("flowview") + + @command.command("console.exit") + def exit(self) -> None: + """Exit mitmproxy.""" + raise urwid.ExitMainLoop + + @command.command("console.view.pop") + def view_pop(self) -> None: + """ + Pop a view off the console stack. At the top level, this prompts the + user to exit mitmproxy. + """ + signals.pop_view_state.send(self) + + @command.command("console.bodyview") + def bodyview(self, f: flow.Flow, part: str) -> None: + """ + Spawn an external viewer for a flow request or response body based + on the detected MIME type. We use the mailcap system to find the + correct viewier, and fall back to the programs in $PAGER or $EDITOR + if necessary. + """ + fpart = getattr(f, part) + if not fpart: + raise exceptions.CommandError("Could not view part %s." % part) + t = fpart.headers.get("content-type") + content = fpart.get_content(strict=False) + if not content: + raise exceptions.CommandError("No content to view.") + self.master.spawn_external_viewer(content, t) + + @command.command("console.edit.focus.options") + def edit_focus_options(self) -> typing.Sequence[str]: + return [ + "cookies", + "form", + "path", + "method", + "query", + "reason", + "request-headers", + "response-headers", + "status_code", + "set-cookies", + "url", + ] + + @command.command("console.edit.focus") + def edit_focus(self, part: str) -> None: + """ + Edit the query of the current focus. + """ + if part == "cookies": + self.master.switch_view("edit_focus_cookies") + elif part == "form": + self.master.switch_view("edit_focus_form") + elif part == "path": + self.master.switch_view("edit_focus_path") + elif part == "query": + self.master.switch_view("edit_focus_query") + elif part == "request-headers": + self.master.switch_view("edit_focus_request_headers") + elif part == "response-headers": + self.master.switch_view("edit_focus_response_headers") + elif part == "set-cookies": + self.master.switch_view("edit_focus_setcookies") + elif part in ["url", "method", "status_code", "reason"]: + self.master.commands.call( + "console.command flow.set @focus %s " % part + ) + + def _grideditor(self): + gewidget = self.master.window.current("grideditor") + if not gewidget: + raise exceptions.CommandError("Not in a grideditor.") + return gewidget.key_responder() + + @command.command("console.grideditor.add") + def grideditor_add(self) -> None: + """ + Add a row after the cursor. + """ + self._grideditor().cmd_add() + + @command.command("console.grideditor.insert") + def grideditor_insert(self) -> None: + """ + Insert a row before the cursor. + """ + self._grideditor().cmd_insert() + + @command.command("console.grideditor.delete") + def grideditor_delete(self) -> None: + """ + Delete row + """ + self._grideditor().cmd_delete() + + @command.command("console.grideditor.readfile") + def grideditor_readfile(self, path: str) -> None: + """ + Read a file into the currrent cell. + """ + self._grideditor().cmd_read_file(path) + + @command.command("console.grideditor.readfile_escaped") + def grideditor_readfile_escaped(self, path: str) -> None: + """ + Read a file containing a Python-style escaped stringinto the + currrent cell. + """ + self._grideditor().cmd_read_file_escaped(path) + + @command.command("console.grideditor.editor") + def grideditor_editor(self) -> None: + """ + Spawn an external editor on the current cell. + """ + self._grideditor().cmd_spawn_editor() + + @command.command("console.flowview.mode.set") + def flowview_mode_set(self) -> None: + """ + Set the display mode for the current flow view. + """ + fv = self.master.window.current("flowview") + if not fv: + raise exceptions.CommandError("Not viewing a flow.") + idx = fv.body.tab_offset + + def callback(opt): + try: + self.master.commands.call_args( + "view.setval", + ["@focus", "flowview_mode_%s" % idx, opt] + ) + except exceptions.CommandError as e: + signals.status_message.send(message=str(e)) + + opts = [i.name.lower() for i in contentviews.views] + self.master.overlay(overlay.Chooser(self.master, "Mode", opts, "", callback)) + + @command.command("console.flowview.mode") + def flowview_mode(self) -> str: + """ + Get the display mode for the current flow view. + """ + fv = self.master.window.current_window("flowview") + if not fv: + raise exceptions.CommandError("Not viewing a flow.") + idx = fv.body.tab_offset + return self.master.commands.call_args( + "view.getval", + [ + "@focus", + "flowview_mode_%s" % idx, + self.master.options.default_contentview, + ] + ) + + @command.command("console.eventlog.clear") + def eventlog_clear(self) -> None: + """ + Clear the event log. + """ + signals.sig_clear_log.send(self) + + def running(self): + self.started = True + + def update(self, flows): + if not flows: + signals.update_settings.send(self) + for f in flows: + signals.flow_change.send(self, flow=f) \ No newline at end of file diff --git a/mitmproxy/tools/console/master.py b/mitmproxy/tools/console/master.py index 315fad94c..cd29dba91 100644 --- a/mitmproxy/tools/console/master.py +++ b/mitmproxy/tools/console/master.py @@ -9,443 +9,21 @@ import subprocess import sys import tempfile import traceback -import typing import urwid -from mitmproxy import ctx from mitmproxy import addons -from mitmproxy import command -from mitmproxy import exceptions from mitmproxy import master from mitmproxy import log -from mitmproxy import flow from mitmproxy.addons import intercept from mitmproxy.addons import readfile from mitmproxy.addons import view +from mitmproxy.tools.console import consoleaddons from mitmproxy.tools.console import defaultkeys from mitmproxy.tools.console import keymap -from mitmproxy.tools.console import overlay from mitmproxy.tools.console import palettes from mitmproxy.tools.console import signals from mitmproxy.tools.console import window -from mitmproxy import contentviews -from mitmproxy.utils import strutils - - -class Logger: - def log(self, evt): - signals.add_log(evt.msg, evt.level) - if evt.level == "alert": - signals.status_message.send( - message=str(evt.msg), - expire=2 - ) - - -class UnsupportedLog: - """ - A small addon to dump info on flow types we don't support yet. - """ - def websocket_message(self, f): - message = f.messages[-1] - signals.add_log(f.message_info(message), "info") - signals.add_log(strutils.bytes_to_escaped_str(message.content), "debug") - - def websocket_end(self, f): - signals.add_log("WebSocket connection closed by {}: {} {}, {}".format( - f.close_sender, - f.close_code, - f.close_message, - f.close_reason), "info") - - def tcp_message(self, f): - message = f.messages[-1] - direction = "->" if message.from_client else "<-" - signals.add_log("{client_host}:{client_port} {direction} tcp {direction} {server_host}:{server_port}".format( - client_host=f.client_conn.address[0], - client_port=f.client_conn.address[1], - server_host=f.server_conn.address[0], - server_port=f.server_conn.address[1], - direction=direction, - ), "info") - signals.add_log(strutils.bytes_to_escaped_str(message.content), "debug") - - -class ConsoleAddon: - """ - An addon that exposes console-specific commands, and hooks into required - events. - """ - def __init__(self, master): - self.master = master - self.started = False - - @command.command("console.layout.options") - def layout_options(self) -> typing.Sequence[str]: - """ - Returns the valid options for console layout. Use these by setting - the console_layout option. - """ - return ["single", "vertical", "horizontal"] - - @command.command("console.layout.cycle") - def layout_cycle(self) -> None: - """ - Cycle through the console layout options. - """ - opts = self.layout_options() - off = self.layout_options().index(ctx.options.console_layout) - ctx.options.update( - console_layout = opts[(off + 1) % len(opts)] - ) - - @command.command("console.panes.next") - def panes_next(self) -> None: - """ - Go to the next layout pane. - """ - self.master.window.switch() - - @command.command("console.options.reset.current") - def options_reset_current(self) -> None: - """ - Reset the current option in the options editor. - """ - fv = self.master.window.current("options") - if not fv: - raise exceptions.CommandError("Not viewing options.") - self.master.commands.call("options.reset.one %s" % fv.current_name()) - - @command.command("console.nav.start") - def nav_start(self) -> None: - """ - Go to the start of a list or scrollable. - """ - self.master.inject_key("m_start") - - @command.command("console.nav.end") - def nav_end(self) -> None: - """ - Go to the end of a list or scrollable. - """ - self.master.inject_key("m_end") - - @command.command("console.nav.next") - def nav_next(self) -> None: - """ - Go to the next navigatable item. - """ - self.master.inject_key("m_next") - - @command.command("console.nav.select") - def nav_select(self) -> None: - """ - Select a navigable item for viewing or editing. - """ - self.master.inject_key("m_select") - - @command.command("console.nav.up") - def nav_up(self) -> None: - """ - Go up. - """ - self.master.inject_key("up") - - @command.command("console.nav.down") - def nav_down(self) -> None: - """ - Go down. - """ - self.master.inject_key("down") - - @command.command("console.nav.pageup") - def nav_pageup(self) -> None: - """ - Go up. - """ - self.master.inject_key("page up") - - @command.command("console.nav.pagedown") - def nav_pagedown(self) -> None: - """ - Go down. - """ - self.master.inject_key("page down") - - @command.command("console.nav.left") - def nav_left(self) -> None: - """ - Go left. - """ - self.master.inject_key("left") - - @command.command("console.nav.right") - def nav_right(self) -> None: - """ - Go right. - """ - self.master.inject_key("right") - - @command.command("console.choose") - def console_choose( - self, prompt: str, choices: typing.Sequence[str], *cmd: str - ) -> None: - """ - Prompt the user to choose from a specified list of strings, then - invoke another command with all occurances of {choice} replaced by - the choice the user made. - """ - def callback(opt): - # We're now outside of the call context... - repl = " ".join(cmd) - repl = repl.replace("{choice}", opt) - try: - self.master.commands.call(repl) - except exceptions.CommandError as e: - signals.status_message.send(message=str(e)) - - self.master.overlay( - overlay.Chooser(self.master, prompt, choices, "", callback) - ) - - @command.command("console.choose.cmd") - def console_choose_cmd( - self, prompt: str, choicecmd: str, *cmd: str - ) -> None: - """ - Prompt the user to choose from a list of strings returned by a - command, then invoke another command with all occurances of {choice} - replaced by the choice the user made. - """ - choices = ctx.master.commands.call_args(choicecmd, []) - - def callback(opt): - # We're now outside of the call context... - repl = " ".join(cmd) - repl = repl.replace("{choice}", opt) - try: - self.master.commands.call(repl) - except exceptions.CommandError as e: - signals.status_message.send(message=str(e)) - - self.master.overlay( - overlay.Chooser(self.master, prompt, choices, "", callback) - ) - - @command.command("console.command") - def console_command(self, *partial: str) -> None: - """ - Prompt the user to edit a command with a (possilby empty) starting value. - """ - signals.status_prompt_command.send(partial=" ".join(partial)) # type: ignore - - @command.command("console.view.keybindings") - def view_keybindings(self) -> None: - """View the commands list.""" - self.master.switch_view("keybindings") - - @command.command("console.view.commands") - def view_commands(self) -> None: - """View the commands list.""" - self.master.switch_view("commands") - - @command.command("console.view.options") - def view_options(self) -> None: - """View the options editor.""" - self.master.switch_view("options") - - @command.command("console.view.eventlog") - def view_eventlog(self) -> None: - """View the options editor.""" - self.master.switch_view("eventlog") - - @command.command("console.view.help") - def view_help(self) -> None: - """View help.""" - self.master.switch_view("help") - - @command.command("console.view.flow") - def view_flow(self, flow: flow.Flow) -> None: - """View a flow.""" - if hasattr(flow, "request"): - # FIME: Also set focus? - self.master.switch_view("flowview") - - @command.command("console.exit") - def exit(self) -> None: - """Exit mitmproxy.""" - raise urwid.ExitMainLoop - - @command.command("console.view.pop") - def view_pop(self) -> None: - """ - Pop a view off the console stack. At the top level, this prompts the - user to exit mitmproxy. - """ - signals.pop_view_state.send(self) - - @command.command("console.bodyview") - def bodyview(self, f: flow.Flow, part: str) -> None: - """ - Spawn an external viewer for a flow request or response body based - on the detected MIME type. We use the mailcap system to find the - correct viewier, and fall back to the programs in $PAGER or $EDITOR - if necessary. - """ - fpart = getattr(f, part) - if not fpart: - raise exceptions.CommandError("Could not view part %s." % part) - t = fpart.headers.get("content-type") - content = fpart.get_content(strict=False) - if not content: - raise exceptions.CommandError("No content to view.") - self.master.spawn_external_viewer(content, t) - - @command.command("console.edit.focus.options") - def edit_focus_options(self) -> typing.Sequence[str]: - return [ - "cookies", - "form", - "path", - "method", - "query", - "reason", - "request-headers", - "response-headers", - "status_code", - "set-cookies", - "url", - ] - - @command.command("console.edit.focus") - def edit_focus(self, part: str) -> None: - """ - Edit the query of the current focus. - """ - if part == "cookies": - self.master.switch_view("edit_focus_cookies") - elif part == "form": - self.master.switch_view("edit_focus_form") - elif part == "path": - self.master.switch_view("edit_focus_path") - elif part == "query": - self.master.switch_view("edit_focus_query") - elif part == "request-headers": - self.master.switch_view("edit_focus_request_headers") - elif part == "response-headers": - self.master.switch_view("edit_focus_response_headers") - elif part == "set-cookies": - self.master.switch_view("edit_focus_setcookies") - elif part in ["url", "method", "status_code", "reason"]: - self.master.commands.call( - "console.command flow.set @focus %s " % part - ) - - def _grideditor(self): - gewidget = self.master.window.current("grideditor") - if not gewidget: - raise exceptions.CommandError("Not in a grideditor.") - return gewidget.key_responder() - - @command.command("console.grideditor.add") - def grideditor_add(self) -> None: - """ - Add a row after the cursor. - """ - self._grideditor().cmd_add() - - @command.command("console.grideditor.insert") - def grideditor_insert(self) -> None: - """ - Insert a row before the cursor. - """ - self._grideditor().cmd_insert() - - @command.command("console.grideditor.delete") - def grideditor_delete(self) -> None: - """ - Delete row - """ - self._grideditor().cmd_delete() - - @command.command("console.grideditor.readfile") - def grideditor_readfile(self, path: str) -> None: - """ - Read a file into the currrent cell. - """ - self._grideditor().cmd_read_file(path) - - @command.command("console.grideditor.readfile_escaped") - def grideditor_readfile_escaped(self, path: str) -> None: - """ - Read a file containing a Python-style escaped stringinto the - currrent cell. - """ - self._grideditor().cmd_read_file_escaped(path) - - @command.command("console.grideditor.editor") - def grideditor_editor(self) -> None: - """ - Spawn an external editor on the current cell. - """ - self._grideditor().cmd_spawn_editor() - - @command.command("console.flowview.mode.set") - def flowview_mode_set(self) -> None: - """ - Set the display mode for the current flow view. - """ - fv = self.master.window.current("flowview") - if not fv: - raise exceptions.CommandError("Not viewing a flow.") - idx = fv.body.tab_offset - - def callback(opt): - try: - self.master.commands.call_args( - "view.setval", - ["@focus", "flowview_mode_%s" % idx, opt] - ) - except exceptions.CommandError as e: - signals.status_message.send(message=str(e)) - - opts = [i.name.lower() for i in contentviews.views] - self.master.overlay(overlay.Chooser(self.master, "Mode", opts, "", callback)) - - @command.command("console.flowview.mode") - def flowview_mode(self) -> str: - """ - Get the display mode for the current flow view. - """ - fv = self.master.window.current_window("flowview") - if not fv: - raise exceptions.CommandError("Not viewing a flow.") - idx = fv.body.tab_offset - return self.master.commands.call_args( - "view.getval", - [ - "@focus", - "flowview_mode_%s" % idx, - self.master.options.default_contentview, - ] - ) - - @command.command("console.eventlog.clear") - def eventlog_clear(self) -> None: - """ - Clear the event log. - """ - signals.sig_clear_log.send(self) - - def running(self): - self.started = True - - def update(self, flows): - if not flows: - signals.update_settings.send(self) - for f in flows: - signals.flow_change.send(self, flow=f) class ConsoleMaster(master.Master): @@ -470,14 +48,14 @@ class ConsoleMaster(master.Master): signals.call_in.connect(self.sig_call_in) signals.sig_add_log.connect(self.sig_add_log) - self.addons.add(Logger()) + self.addons.add(consoleaddons.Logger()) self.addons.add(*addons.default_addons()) self.addons.add( intercept.Intercept(), self.view, - UnsupportedLog(), + consoleaddons.UnsupportedLog(), readfile.ReadFile(), - ConsoleAddon(self), + consoleaddons.ConsoleAddon(self), ) def sigint_handler(*args, **kwargs):