mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2025-01-30 14:58:38 +00:00
Merge pull request #2397 from cortesi/neverenoughconsole
console: keymap-related improvements
This commit is contained in:
commit
309274689c
@ -74,7 +74,8 @@ class Command:
|
||||
|
||||
def call(self, args: typing.Sequence[str]):
|
||||
"""
|
||||
Call the command with a set of arguments. At this point, all argumets are strings.
|
||||
Call the command with a list of arguments. At this point, all
|
||||
arguments are strings.
|
||||
"""
|
||||
if not self.has_positional and (len(self.paramtypes) != len(args)):
|
||||
raise exceptions.CommandError("Usage: %s" % self.signature_help())
|
||||
|
500
mitmproxy/tools/console/consoleaddons.py
Normal file
500
mitmproxy/tools/console/consoleaddons.py
Normal file
@ -0,0 +1,500 @@
|
||||
import typing
|
||||
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy import command
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import flow
|
||||
from mitmproxy import contentviews
|
||||
from mitmproxy.utils import strutils
|
||||
|
||||
from mitmproxy.tools.console import overlay
|
||||
from mitmproxy.tools.console import signals
|
||||
from mitmproxy.tools.console import keymap
|
||||
|
||||
|
||||
class Logger:
|
||||
def log(self, evt):
|
||||
signals.add_log(evt.msg, evt.level)
|
||||
if evt.level == "alert":
|
||||
signals.status_message.send(
|
||||
message=str(evt.msg),
|
||||
expire=2
|
||||
)
|
||||
|
||||
|
||||
class UnsupportedLog:
|
||||
"""
|
||||
A small addon to dump info on flow types we don't support yet.
|
||||
"""
|
||||
def websocket_message(self, f):
|
||||
message = f.messages[-1]
|
||||
signals.add_log(f.message_info(message), "info")
|
||||
signals.add_log(strutils.bytes_to_escaped_str(message.content), "debug")
|
||||
|
||||
def websocket_end(self, f):
|
||||
signals.add_log("WebSocket connection closed by {}: {} {}, {}".format(
|
||||
f.close_sender,
|
||||
f.close_code,
|
||||
f.close_message,
|
||||
f.close_reason), "info")
|
||||
|
||||
def tcp_message(self, f):
|
||||
message = f.messages[-1]
|
||||
direction = "->" if message.from_client else "<-"
|
||||
signals.add_log("{client_host}:{client_port} {direction} tcp {direction} {server_host}:{server_port}".format(
|
||||
client_host=f.client_conn.address[0],
|
||||
client_port=f.client_conn.address[1],
|
||||
server_host=f.server_conn.address[0],
|
||||
server_port=f.server_conn.address[1],
|
||||
direction=direction,
|
||||
), "info")
|
||||
signals.add_log(strutils.bytes_to_escaped_str(message.content), "debug")
|
||||
|
||||
|
||||
class ConsoleAddon:
|
||||
"""
|
||||
An addon that exposes console-specific commands, and hooks into required
|
||||
events.
|
||||
"""
|
||||
def __init__(self, master):
|
||||
self.master = master
|
||||
self.started = False
|
||||
|
||||
@command.command("console.layout.options")
|
||||
def layout_options(self) -> typing.Sequence[str]:
|
||||
"""
|
||||
Returns the valid options for console layout. Use these by setting
|
||||
the console_layout option.
|
||||
"""
|
||||
return ["single", "vertical", "horizontal"]
|
||||
|
||||
@command.command("console.layout.cycle")
|
||||
def layout_cycle(self) -> None:
|
||||
"""
|
||||
Cycle through the console layout options.
|
||||
"""
|
||||
opts = self.layout_options()
|
||||
off = self.layout_options().index(ctx.options.console_layout)
|
||||
ctx.options.update(
|
||||
console_layout = opts[(off + 1) % len(opts)]
|
||||
)
|
||||
|
||||
@command.command("console.panes.next")
|
||||
def panes_next(self) -> None:
|
||||
"""
|
||||
Go to the next layout pane.
|
||||
"""
|
||||
self.master.window.switch()
|
||||
|
||||
@command.command("console.options.reset.focus")
|
||||
def options_reset_current(self) -> None:
|
||||
"""
|
||||
Reset the current option in the options editor.
|
||||
"""
|
||||
fv = self.master.window.current("options")
|
||||
if not fv:
|
||||
raise exceptions.CommandError("Not viewing options.")
|
||||
self.master.commands.call("options.reset.one %s" % fv.current_name())
|
||||
|
||||
@command.command("console.nav.start")
|
||||
def nav_start(self) -> None:
|
||||
"""
|
||||
Go to the start of a list or scrollable.
|
||||
"""
|
||||
self.master.inject_key("m_start")
|
||||
|
||||
@command.command("console.nav.end")
|
||||
def nav_end(self) -> None:
|
||||
"""
|
||||
Go to the end of a list or scrollable.
|
||||
"""
|
||||
self.master.inject_key("m_end")
|
||||
|
||||
@command.command("console.nav.next")
|
||||
def nav_next(self) -> None:
|
||||
"""
|
||||
Go to the next navigatable item.
|
||||
"""
|
||||
self.master.inject_key("m_next")
|
||||
|
||||
@command.command("console.nav.select")
|
||||
def nav_select(self) -> None:
|
||||
"""
|
||||
Select a navigable item for viewing or editing.
|
||||
"""
|
||||
self.master.inject_key("m_select")
|
||||
|
||||
@command.command("console.nav.up")
|
||||
def nav_up(self) -> None:
|
||||
"""
|
||||
Go up.
|
||||
"""
|
||||
self.master.inject_key("up")
|
||||
|
||||
@command.command("console.nav.down")
|
||||
def nav_down(self) -> None:
|
||||
"""
|
||||
Go down.
|
||||
"""
|
||||
self.master.inject_key("down")
|
||||
|
||||
@command.command("console.nav.pageup")
|
||||
def nav_pageup(self) -> None:
|
||||
"""
|
||||
Go up.
|
||||
"""
|
||||
self.master.inject_key("page up")
|
||||
|
||||
@command.command("console.nav.pagedown")
|
||||
def nav_pagedown(self) -> None:
|
||||
"""
|
||||
Go down.
|
||||
"""
|
||||
self.master.inject_key("page down")
|
||||
|
||||
@command.command("console.nav.left")
|
||||
def nav_left(self) -> None:
|
||||
"""
|
||||
Go left.
|
||||
"""
|
||||
self.master.inject_key("left")
|
||||
|
||||
@command.command("console.nav.right")
|
||||
def nav_right(self) -> None:
|
||||
"""
|
||||
Go right.
|
||||
"""
|
||||
self.master.inject_key("right")
|
||||
|
||||
@command.command("console.choose")
|
||||
def console_choose(
|
||||
self, prompt: str, choices: typing.Sequence[str], *cmd: str
|
||||
) -> None:
|
||||
"""
|
||||
Prompt the user to choose from a specified list of strings, then
|
||||
invoke another command with all occurances of {choice} replaced by
|
||||
the choice the user made.
|
||||
"""
|
||||
def callback(opt):
|
||||
# We're now outside of the call context...
|
||||
repl = " ".join(cmd)
|
||||
repl = repl.replace("{choice}", opt)
|
||||
try:
|
||||
self.master.commands.call(repl)
|
||||
except exceptions.CommandError as e:
|
||||
signals.status_message.send(message=str(e))
|
||||
|
||||
self.master.overlay(
|
||||
overlay.Chooser(self.master, prompt, choices, "", callback)
|
||||
)
|
||||
|
||||
@command.command("console.choose.cmd")
|
||||
def console_choose_cmd(
|
||||
self, prompt: str, choicecmd: str, *cmd: str
|
||||
) -> None:
|
||||
"""
|
||||
Prompt the user to choose from a list of strings returned by a
|
||||
command, then invoke another command with all occurances of {choice}
|
||||
replaced by the choice the user made.
|
||||
"""
|
||||
choices = ctx.master.commands.call_args(choicecmd, [])
|
||||
|
||||
def callback(opt):
|
||||
# We're now outside of the call context...
|
||||
repl = " ".join(cmd)
|
||||
repl = repl.replace("{choice}", opt)
|
||||
try:
|
||||
self.master.commands.call(repl)
|
||||
except exceptions.CommandError as e:
|
||||
signals.status_message.send(message=str(e))
|
||||
|
||||
self.master.overlay(
|
||||
overlay.Chooser(self.master, prompt, choices, "", callback)
|
||||
)
|
||||
|
||||
@command.command("console.command")
|
||||
def console_command(self, *partial: str) -> None:
|
||||
"""
|
||||
Prompt the user to edit a command with a (possilby empty) starting value.
|
||||
"""
|
||||
signals.status_prompt_command.send(partial=" ".join(partial)) # type: ignore
|
||||
|
||||
@command.command("console.view.keybindings")
|
||||
def view_keybindings(self) -> None:
|
||||
"""View the commands list."""
|
||||
self.master.switch_view("keybindings")
|
||||
|
||||
@command.command("console.view.commands")
|
||||
def view_commands(self) -> None:
|
||||
"""View the commands list."""
|
||||
self.master.switch_view("commands")
|
||||
|
||||
@command.command("console.view.options")
|
||||
def view_options(self) -> None:
|
||||
"""View the options editor."""
|
||||
self.master.switch_view("options")
|
||||
|
||||
@command.command("console.view.eventlog")
|
||||
def view_eventlog(self) -> None:
|
||||
"""View the options editor."""
|
||||
self.master.switch_view("eventlog")
|
||||
|
||||
@command.command("console.view.help")
|
||||
def view_help(self) -> None:
|
||||
"""View help."""
|
||||
self.master.switch_view("help")
|
||||
|
||||
@command.command("console.view.flow")
|
||||
def view_flow(self, flow: flow.Flow) -> None:
|
||||
"""View a flow."""
|
||||
if hasattr(flow, "request"):
|
||||
# FIME: Also set focus?
|
||||
self.master.switch_view("flowview")
|
||||
|
||||
@command.command("console.exit")
|
||||
def exit(self) -> None:
|
||||
"""Exit mitmproxy."""
|
||||
self.master.shutdown()
|
||||
|
||||
@command.command("console.view.pop")
|
||||
def view_pop(self) -> None:
|
||||
"""
|
||||
Pop a view off the console stack. At the top level, this prompts the
|
||||
user to exit mitmproxy.
|
||||
"""
|
||||
signals.pop_view_state.send(self)
|
||||
|
||||
@command.command("console.bodyview")
|
||||
def bodyview(self, f: flow.Flow, part: str) -> None:
|
||||
"""
|
||||
Spawn an external viewer for a flow request or response body based
|
||||
on the detected MIME type. We use the mailcap system to find the
|
||||
correct viewier, and fall back to the programs in $PAGER or $EDITOR
|
||||
if necessary.
|
||||
"""
|
||||
fpart = getattr(f, part)
|
||||
if not fpart:
|
||||
raise exceptions.CommandError("Could not view part %s." % part)
|
||||
t = fpart.headers.get("content-type")
|
||||
content = fpart.get_content(strict=False)
|
||||
if not content:
|
||||
raise exceptions.CommandError("No content to view.")
|
||||
self.master.spawn_external_viewer(content, t)
|
||||
|
||||
@command.command("console.edit.focus.options")
|
||||
def edit_focus_options(self) -> typing.Sequence[str]:
|
||||
return [
|
||||
"cookies",
|
||||
"form",
|
||||
"path",
|
||||
"method",
|
||||
"query",
|
||||
"reason",
|
||||
"request-headers",
|
||||
"response-headers",
|
||||
"status_code",
|
||||
"set-cookies",
|
||||
"url",
|
||||
]
|
||||
|
||||
@command.command("console.edit.focus")
|
||||
def edit_focus(self, part: str) -> None:
|
||||
"""
|
||||
Edit the query of the current focus.
|
||||
"""
|
||||
if part == "cookies":
|
||||
self.master.switch_view("edit_focus_cookies")
|
||||
elif part == "form":
|
||||
self.master.switch_view("edit_focus_form")
|
||||
elif part == "path":
|
||||
self.master.switch_view("edit_focus_path")
|
||||
elif part == "query":
|
||||
self.master.switch_view("edit_focus_query")
|
||||
elif part == "request-headers":
|
||||
self.master.switch_view("edit_focus_request_headers")
|
||||
elif part == "response-headers":
|
||||
self.master.switch_view("edit_focus_response_headers")
|
||||
elif part == "set-cookies":
|
||||
self.master.switch_view("edit_focus_setcookies")
|
||||
elif part in ["url", "method", "status_code", "reason"]:
|
||||
self.master.commands.call(
|
||||
"console.command flow.set @focus %s " % part
|
||||
)
|
||||
|
||||
def _grideditor(self):
|
||||
gewidget = self.master.window.current("grideditor")
|
||||
if not gewidget:
|
||||
raise exceptions.CommandError("Not in a grideditor.")
|
||||
return gewidget.key_responder()
|
||||
|
||||
@command.command("console.grideditor.add")
|
||||
def grideditor_add(self) -> None:
|
||||
"""
|
||||
Add a row after the cursor.
|
||||
"""
|
||||
self._grideditor().cmd_add()
|
||||
|
||||
@command.command("console.grideditor.insert")
|
||||
def grideditor_insert(self) -> None:
|
||||
"""
|
||||
Insert a row before the cursor.
|
||||
"""
|
||||
self._grideditor().cmd_insert()
|
||||
|
||||
@command.command("console.grideditor.delete")
|
||||
def grideditor_delete(self) -> None:
|
||||
"""
|
||||
Delete row
|
||||
"""
|
||||
self._grideditor().cmd_delete()
|
||||
|
||||
@command.command("console.grideditor.readfile")
|
||||
def grideditor_readfile(self, path: str) -> None:
|
||||
"""
|
||||
Read a file into the currrent cell.
|
||||
"""
|
||||
self._grideditor().cmd_read_file(path)
|
||||
|
||||
@command.command("console.grideditor.readfile_escaped")
|
||||
def grideditor_readfile_escaped(self, path: str) -> None:
|
||||
"""
|
||||
Read a file containing a Python-style escaped stringinto the
|
||||
currrent cell.
|
||||
"""
|
||||
self._grideditor().cmd_read_file_escaped(path)
|
||||
|
||||
@command.command("console.grideditor.editor")
|
||||
def grideditor_editor(self) -> None:
|
||||
"""
|
||||
Spawn an external editor on the current cell.
|
||||
"""
|
||||
self._grideditor().cmd_spawn_editor()
|
||||
|
||||
@command.command("console.flowview.mode.set")
|
||||
def flowview_mode_set(self) -> None:
|
||||
"""
|
||||
Set the display mode for the current flow view.
|
||||
"""
|
||||
fv = self.master.window.current("flowview")
|
||||
if not fv:
|
||||
raise exceptions.CommandError("Not viewing a flow.")
|
||||
idx = fv.body.tab_offset
|
||||
|
||||
def callback(opt):
|
||||
try:
|
||||
self.master.commands.call_args(
|
||||
"view.setval",
|
||||
["@focus", "flowview_mode_%s" % idx, opt]
|
||||
)
|
||||
except exceptions.CommandError as e:
|
||||
signals.status_message.send(message=str(e))
|
||||
|
||||
opts = [i.name.lower() for i in contentviews.views]
|
||||
self.master.overlay(overlay.Chooser(self.master, "Mode", opts, "", callback))
|
||||
|
||||
@command.command("console.flowview.mode")
|
||||
def flowview_mode(self) -> str:
|
||||
"""
|
||||
Get the display mode for the current flow view.
|
||||
"""
|
||||
fv = self.master.window.current_window("flowview")
|
||||
if not fv:
|
||||
raise exceptions.CommandError("Not viewing a flow.")
|
||||
idx = fv.body.tab_offset
|
||||
return self.master.commands.call_args(
|
||||
"view.getval",
|
||||
[
|
||||
"@focus",
|
||||
"flowview_mode_%s" % idx,
|
||||
self.master.options.default_contentview,
|
||||
]
|
||||
)
|
||||
|
||||
@command.command("console.eventlog.clear")
|
||||
def eventlog_clear(self) -> None:
|
||||
"""
|
||||
Clear the event log.
|
||||
"""
|
||||
signals.sig_clear_log.send(self)
|
||||
|
||||
@command.command("console.key.contexts")
|
||||
def key_contexts(self) -> typing.Sequence[str]:
|
||||
"""
|
||||
The available contexts for key binding.
|
||||
"""
|
||||
return list(sorted(keymap.Contexts))
|
||||
|
||||
@command.command("console.key.bind")
|
||||
def key_bind(self, contexts: typing.Sequence[str], key: str, *command: str) -> None:
|
||||
"""
|
||||
Bind a shortcut key.
|
||||
"""
|
||||
try:
|
||||
self.master.keymap.add(
|
||||
key,
|
||||
" ".join(command),
|
||||
contexts,
|
||||
""
|
||||
)
|
||||
except ValueError as v:
|
||||
raise exceptions.CommandError(v)
|
||||
|
||||
@command.command("console.key.unbind")
|
||||
def key_unbind(self, contexts: typing.Sequence[str], key: str) -> None:
|
||||
"""
|
||||
Un-bind a shortcut key.
|
||||
"""
|
||||
try:
|
||||
self.master.keymap.remove(key, contexts)
|
||||
except ValueError as v:
|
||||
raise exceptions.CommandError(v)
|
||||
|
||||
def _keyfocus(self):
|
||||
kwidget = self.master.window.current("keybindings")
|
||||
if not kwidget:
|
||||
raise exceptions.CommandError("Not viewing key bindings.")
|
||||
f = kwidget.focus()
|
||||
if not f:
|
||||
raise exceptions.CommandError("No key binding focused")
|
||||
return f
|
||||
|
||||
@command.command("console.key.unbind.focus")
|
||||
def key_unbind_focus(self) -> None:
|
||||
"""
|
||||
Un-bind the shortcut key currently focused in the key binding viewer.
|
||||
"""
|
||||
b = self._keyfocus()
|
||||
try:
|
||||
self.master.keymap.remove(b.key, b.contexts)
|
||||
except ValueError as v:
|
||||
raise exceptions.CommandError(v)
|
||||
|
||||
@command.command("console.key.execute.focus")
|
||||
def key_execute_focus(self) -> None:
|
||||
"""
|
||||
Execute the currently focused key binding.
|
||||
"""
|
||||
b = self._keyfocus()
|
||||
self.console_command(b.command)
|
||||
|
||||
@command.command("console.key.edit.focus")
|
||||
def key_edit_focus(self) -> None:
|
||||
"""
|
||||
Execute the currently focused key binding.
|
||||
"""
|
||||
b = self._keyfocus()
|
||||
self.console_command(
|
||||
"console.key.bind",
|
||||
",".join(b.contexts),
|
||||
b.key,
|
||||
b.command,
|
||||
)
|
||||
|
||||
def running(self):
|
||||
self.started = True
|
||||
|
||||
def update(self, flows):
|
||||
if not flows:
|
||||
signals.update_settings.send(self)
|
||||
for f in flows:
|
||||
signals.flow_change.send(self, flow=f)
|
@ -1,6 +1,6 @@
|
||||
|
||||
def map(km):
|
||||
km.add(":", "console.command ''", ["global"], "Command prompt")
|
||||
km.add(":", "console.command ", ["global"], "Command prompt")
|
||||
km.add("?", "console.view.help", ["global"], "View help")
|
||||
km.add("C", "console.view.commands", ["global"], "View commands")
|
||||
km.add("K", "console.view.keybindings", ["global"], "View key bindings")
|
||||
@ -20,7 +20,7 @@ def map(km):
|
||||
km.add("h", "console.nav.left", ["global"], "Left")
|
||||
km.add("tab", "console.nav.next", ["global"], "Next")
|
||||
km.add("enter", "console.nav.select", ["global"], "Select")
|
||||
km.add(" ", "console.nav.pagedown", ["global"], "Page down")
|
||||
km.add("space", "console.nav.pagedown", ["global"], "Page down")
|
||||
km.add("ctrl f", "console.nav.pagedown", ["global"], "Page down")
|
||||
km.add("ctrl b", "console.nav.pageup", ["global"], "Page up")
|
||||
|
||||
@ -102,7 +102,7 @@ def map(km):
|
||||
"Toggle viewing full contents on this flow",
|
||||
)
|
||||
km.add("w", "console.command save.file @focus ", ["flowview"], "Save flow to file")
|
||||
km.add(" ", "view.focus.next", ["flowview"], "Go to next flow")
|
||||
km.add("space", "view.focus.next", ["flowview"], "Go to next flow")
|
||||
|
||||
km.add(
|
||||
"v",
|
||||
@ -128,7 +128,7 @@ def map(km):
|
||||
km.add("L", "console.command options.load ", ["options"], "Load from file")
|
||||
km.add("S", "console.command options.save ", ["options"], "Save to file")
|
||||
km.add("D", "options.reset", ["options"], "Reset all options")
|
||||
km.add("d", "console.options.reset.current", ["options"], "Reset this option")
|
||||
km.add("d", "console.options.reset.focus", ["options"], "Reset this option")
|
||||
|
||||
km.add("a", "console.grideditor.add", ["grideditor"], "Add a row after cursor")
|
||||
km.add("A", "console.grideditor.insert", ["grideditor"], "Insert a row before cursor")
|
||||
@ -148,3 +148,31 @@ def map(km):
|
||||
km.add("e", "console.grideditor.editor", ["grideditor"], "Edit in external editor")
|
||||
|
||||
km.add("z", "console.eventlog.clear", ["eventlog"], "Clear")
|
||||
|
||||
km.add(
|
||||
"a",
|
||||
"""
|
||||
console.choose.cmd "Context" console.key.contexts
|
||||
console.command console.key.bind {choice}
|
||||
""",
|
||||
["keybindings"],
|
||||
"Add a key binding"
|
||||
)
|
||||
km.add(
|
||||
"d",
|
||||
"console.key.unbind.focus",
|
||||
["keybindings"],
|
||||
"Unbind the currently focused key binding"
|
||||
)
|
||||
km.add(
|
||||
"x",
|
||||
"console.key.execute.focus",
|
||||
["keybindings"],
|
||||
"Execute the currently focused key binding"
|
||||
)
|
||||
km.add(
|
||||
"enter",
|
||||
"console.key.edit.focus",
|
||||
["keybindings"],
|
||||
"Edit the currently focused key binding"
|
||||
)
|
||||
|
@ -2,6 +2,7 @@ import urwid
|
||||
import blinker
|
||||
import textwrap
|
||||
from mitmproxy.tools.console import layoutwidget
|
||||
from mitmproxy.tools.console import signals
|
||||
|
||||
HELP_HEIGHT = 5
|
||||
|
||||
@ -43,6 +44,12 @@ class KeyListWalker(urwid.ListWalker):
|
||||
self.focusobj = None
|
||||
self.bindings = list(master.keymap.list("all"))
|
||||
self.set_focus(0)
|
||||
signals.keybindings_change.connect(self.sig_modified)
|
||||
|
||||
def sig_modified(self, sender):
|
||||
self.bindings = list(self.master.keymap.list("all"))
|
||||
self.set_focus(min(self.index, len(self.bindings) - 1))
|
||||
self._modified()
|
||||
|
||||
def get_edit_text(self):
|
||||
return self.focus_obj.get_edit_text()
|
||||
@ -128,6 +135,12 @@ class KeyBindings(urwid.Pile, layoutwidget.LayoutWidget):
|
||||
)
|
||||
self.master = master
|
||||
|
||||
def focus(self):
|
||||
if self.focus_position != 0:
|
||||
return None
|
||||
f = self.widget_list[0]
|
||||
return f.walker.get_focus()[0].binding
|
||||
|
||||
def keypress(self, size, key):
|
||||
if key == "m_next":
|
||||
self.focus_position = (
|
||||
|
@ -1,9 +1,9 @@
|
||||
import typing
|
||||
import collections
|
||||
from mitmproxy.tools.console import commandeditor
|
||||
from mitmproxy.tools.console import signals
|
||||
|
||||
|
||||
SupportedContexts = {
|
||||
Contexts = {
|
||||
"chooser",
|
||||
"commands",
|
||||
"eventlog",
|
||||
@ -12,59 +12,113 @@ SupportedContexts = {
|
||||
"global",
|
||||
"grideditor",
|
||||
"help",
|
||||
"keybindings",
|
||||
"options",
|
||||
}
|
||||
|
||||
|
||||
Binding = collections.namedtuple(
|
||||
"Binding",
|
||||
["key", "command", "contexts", "help"]
|
||||
)
|
||||
class Binding:
|
||||
def __init__(self, key, command, contexts, help):
|
||||
self.key, self.command, self.contexts = key, command, sorted(contexts)
|
||||
self.help = help
|
||||
|
||||
def keyspec(self):
|
||||
"""
|
||||
Translate the key spec from a convenient user specification to one
|
||||
Urwid understands.
|
||||
"""
|
||||
return self.key.replace("space", " ")
|
||||
|
||||
def sortkey(self):
|
||||
return self.key + ",".join(self.contexts)
|
||||
|
||||
|
||||
class Keymap:
|
||||
def __init__(self, master):
|
||||
self.executor = commandeditor.CommandExecutor(master)
|
||||
self.keys = {}
|
||||
for c in Contexts:
|
||||
self.keys[c] = {}
|
||||
self.bindings = []
|
||||
|
||||
def add(self, key: str, command: str, contexts: typing.Sequence[str], help="") -> None:
|
||||
"""
|
||||
Add a key to the key map. If context is empty, it's considered to be
|
||||
a global binding.
|
||||
"""
|
||||
def _check_contexts(self, contexts):
|
||||
if not contexts:
|
||||
raise ValueError("Must specify at least one context.")
|
||||
for c in contexts:
|
||||
if c not in SupportedContexts:
|
||||
if c not in Contexts:
|
||||
raise ValueError("Unsupported context: %s" % c)
|
||||
|
||||
b = Binding(key=key, command=command, contexts=contexts, help=help)
|
||||
self.bindings.append(b)
|
||||
self.bind(b)
|
||||
def add(
|
||||
self,
|
||||
key: str,
|
||||
command: str,
|
||||
contexts: typing.Sequence[str],
|
||||
help=""
|
||||
) -> None:
|
||||
"""
|
||||
Add a key to the key map.
|
||||
"""
|
||||
self._check_contexts(contexts)
|
||||
|
||||
def bind(self, binding):
|
||||
for b in self.bindings:
|
||||
if b.key == key and b.command.strip() == command.strip():
|
||||
b.contexts = sorted(list(set(b.contexts + contexts)))
|
||||
if help:
|
||||
b.help = help
|
||||
self.bind(b)
|
||||
break
|
||||
else:
|
||||
self.remove(key, contexts)
|
||||
b = Binding(key=key, command=command, contexts=contexts, help=help)
|
||||
self.bindings.append(b)
|
||||
self.bind(b)
|
||||
signals.keybindings_change.send(self)
|
||||
|
||||
def remove(self, key: str, contexts: typing.Sequence[str]) -> None:
|
||||
"""
|
||||
Remove a key from the key map.
|
||||
"""
|
||||
self._check_contexts(contexts)
|
||||
for c in contexts:
|
||||
b = self.get(c, key)
|
||||
if b:
|
||||
self.unbind(b)
|
||||
b.contexts = [x for x in b.contexts if x != c]
|
||||
if b.contexts:
|
||||
self.bindings.append(b)
|
||||
self.bind(b)
|
||||
signals.keybindings_change.send(self)
|
||||
|
||||
def bind(self, binding: Binding) -> None:
|
||||
for c in binding.contexts:
|
||||
d = self.keys.setdefault(c, {})
|
||||
d[binding.key] = binding.command
|
||||
self.keys[c][binding.keyspec()] = binding
|
||||
|
||||
def get(self, context: str, key: str) -> typing.Optional[str]:
|
||||
def unbind(self, binding: Binding) -> None:
|
||||
"""
|
||||
Unbind also removes the binding from the list.
|
||||
"""
|
||||
for c in binding.contexts:
|
||||
del self.keys[c][binding.keyspec()]
|
||||
self.bindings = [b for b in self.bindings if b != binding]
|
||||
|
||||
def get(self, context: str, key: str) -> typing.Optional[Binding]:
|
||||
if context in self.keys:
|
||||
return self.keys[context].get(key, None)
|
||||
return None
|
||||
|
||||
def list(self, context: str) -> typing.Sequence[Binding]:
|
||||
b = [b for b in self.bindings if context in b.contexts or context == "all"]
|
||||
b.sort(key=lambda x: x.key)
|
||||
return b
|
||||
b = [x for x in self.bindings if context in x.contexts or context == "all"]
|
||||
single = [x for x in b if len(x.key.split()) == 1]
|
||||
multi = [x for x in b if len(x.key.split()) != 1]
|
||||
single.sort(key=lambda x: x.sortkey())
|
||||
multi.sort(key=lambda x: x.sortkey())
|
||||
return single + multi
|
||||
|
||||
def handle(self, context: str, key: str) -> typing.Optional[str]:
|
||||
"""
|
||||
Returns the key if it has not been handled, or None.
|
||||
"""
|
||||
cmd = self.get(context, key)
|
||||
if not cmd:
|
||||
cmd = self.get("global", key)
|
||||
if cmd:
|
||||
return self.executor(cmd)
|
||||
b = self.get(context, key) or self.get("global", key)
|
||||
if b:
|
||||
return self.executor(b.command)
|
||||
return key
|
||||
|
@ -9,443 +9,21 @@ import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import traceback
|
||||
import typing
|
||||
|
||||
import urwid
|
||||
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy import addons
|
||||
from mitmproxy import command
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import master
|
||||
from mitmproxy import log
|
||||
from mitmproxy import flow
|
||||
from mitmproxy.addons import intercept
|
||||
from mitmproxy.addons import readfile
|
||||
from mitmproxy.addons import view
|
||||
from mitmproxy.tools.console import consoleaddons
|
||||
from mitmproxy.tools.console import defaultkeys
|
||||
from mitmproxy.tools.console import keymap
|
||||
from mitmproxy.tools.console import overlay
|
||||
from mitmproxy.tools.console import palettes
|
||||
from mitmproxy.tools.console import signals
|
||||
from mitmproxy.tools.console import window
|
||||
from mitmproxy import contentviews
|
||||
from mitmproxy.utils import strutils
|
||||
|
||||
|
||||
class Logger:
|
||||
def log(self, evt):
|
||||
signals.add_log(evt.msg, evt.level)
|
||||
if evt.level == "alert":
|
||||
signals.status_message.send(
|
||||
message=str(evt.msg),
|
||||
expire=2
|
||||
)
|
||||
|
||||
|
||||
class UnsupportedLog:
|
||||
"""
|
||||
A small addon to dump info on flow types we don't support yet.
|
||||
"""
|
||||
def websocket_message(self, f):
|
||||
message = f.messages[-1]
|
||||
signals.add_log(f.message_info(message), "info")
|
||||
signals.add_log(strutils.bytes_to_escaped_str(message.content), "debug")
|
||||
|
||||
def websocket_end(self, f):
|
||||
signals.add_log("WebSocket connection closed by {}: {} {}, {}".format(
|
||||
f.close_sender,
|
||||
f.close_code,
|
||||
f.close_message,
|
||||
f.close_reason), "info")
|
||||
|
||||
def tcp_message(self, f):
|
||||
message = f.messages[-1]
|
||||
direction = "->" if message.from_client else "<-"
|
||||
signals.add_log("{client_host}:{client_port} {direction} tcp {direction} {server_host}:{server_port}".format(
|
||||
client_host=f.client_conn.address[0],
|
||||
client_port=f.client_conn.address[1],
|
||||
server_host=f.server_conn.address[0],
|
||||
server_port=f.server_conn.address[1],
|
||||
direction=direction,
|
||||
), "info")
|
||||
signals.add_log(strutils.bytes_to_escaped_str(message.content), "debug")
|
||||
|
||||
|
||||
class ConsoleAddon:
|
||||
"""
|
||||
An addon that exposes console-specific commands, and hooks into required
|
||||
events.
|
||||
"""
|
||||
def __init__(self, master):
|
||||
self.master = master
|
||||
self.started = False
|
||||
|
||||
@command.command("console.layout.options")
|
||||
def layout_options(self) -> typing.Sequence[str]:
|
||||
"""
|
||||
Returns the valid options for console layout. Use these by setting
|
||||
the console_layout option.
|
||||
"""
|
||||
return ["single", "vertical", "horizontal"]
|
||||
|
||||
@command.command("console.layout.cycle")
|
||||
def layout_cycle(self) -> None:
|
||||
"""
|
||||
Cycle through the console layout options.
|
||||
"""
|
||||
opts = self.layout_options()
|
||||
off = self.layout_options().index(ctx.options.console_layout)
|
||||
ctx.options.update(
|
||||
console_layout = opts[(off + 1) % len(opts)]
|
||||
)
|
||||
|
||||
@command.command("console.panes.next")
|
||||
def panes_next(self) -> None:
|
||||
"""
|
||||
Go to the next layout pane.
|
||||
"""
|
||||
self.master.window.switch()
|
||||
|
||||
@command.command("console.options.reset.current")
|
||||
def options_reset_current(self) -> None:
|
||||
"""
|
||||
Reset the current option in the options editor.
|
||||
"""
|
||||
fv = self.master.window.current("options")
|
||||
if not fv:
|
||||
raise exceptions.CommandError("Not viewing options.")
|
||||
self.master.commands.call("options.reset.one %s" % fv.current_name())
|
||||
|
||||
@command.command("console.nav.start")
|
||||
def nav_start(self) -> None:
|
||||
"""
|
||||
Go to the start of a list or scrollable.
|
||||
"""
|
||||
self.master.inject_key("m_start")
|
||||
|
||||
@command.command("console.nav.end")
|
||||
def nav_end(self) -> None:
|
||||
"""
|
||||
Go to the end of a list or scrollable.
|
||||
"""
|
||||
self.master.inject_key("m_end")
|
||||
|
||||
@command.command("console.nav.next")
|
||||
def nav_next(self) -> None:
|
||||
"""
|
||||
Go to the next navigatable item.
|
||||
"""
|
||||
self.master.inject_key("m_next")
|
||||
|
||||
@command.command("console.nav.select")
|
||||
def nav_select(self) -> None:
|
||||
"""
|
||||
Select a navigable item for viewing or editing.
|
||||
"""
|
||||
self.master.inject_key("m_select")
|
||||
|
||||
@command.command("console.nav.up")
|
||||
def nav_up(self) -> None:
|
||||
"""
|
||||
Go up.
|
||||
"""
|
||||
self.master.inject_key("up")
|
||||
|
||||
@command.command("console.nav.down")
|
||||
def nav_down(self) -> None:
|
||||
"""
|
||||
Go down.
|
||||
"""
|
||||
self.master.inject_key("down")
|
||||
|
||||
@command.command("console.nav.pageup")
|
||||
def nav_pageup(self) -> None:
|
||||
"""
|
||||
Go up.
|
||||
"""
|
||||
self.master.inject_key("page up")
|
||||
|
||||
@command.command("console.nav.pagedown")
|
||||
def nav_pagedown(self) -> None:
|
||||
"""
|
||||
Go down.
|
||||
"""
|
||||
self.master.inject_key("page down")
|
||||
|
||||
@command.command("console.nav.left")
|
||||
def nav_left(self) -> None:
|
||||
"""
|
||||
Go left.
|
||||
"""
|
||||
self.master.inject_key("left")
|
||||
|
||||
@command.command("console.nav.right")
|
||||
def nav_right(self) -> None:
|
||||
"""
|
||||
Go right.
|
||||
"""
|
||||
self.master.inject_key("right")
|
||||
|
||||
@command.command("console.choose")
|
||||
def console_choose(
|
||||
self, prompt: str, choices: typing.Sequence[str], *cmd: str
|
||||
) -> None:
|
||||
"""
|
||||
Prompt the user to choose from a specified list of strings, then
|
||||
invoke another command with all occurances of {choice} replaced by
|
||||
the choice the user made.
|
||||
"""
|
||||
def callback(opt):
|
||||
# We're now outside of the call context...
|
||||
repl = " ".join(cmd)
|
||||
repl = repl.replace("{choice}", opt)
|
||||
try:
|
||||
self.master.commands.call(repl)
|
||||
except exceptions.CommandError as e:
|
||||
signals.status_message.send(message=str(e))
|
||||
|
||||
self.master.overlay(
|
||||
overlay.Chooser(self.master, prompt, choices, "", callback)
|
||||
)
|
||||
|
||||
@command.command("console.choose.cmd")
|
||||
def console_choose_cmd(
|
||||
self, prompt: str, choicecmd: str, *cmd: str
|
||||
) -> None:
|
||||
"""
|
||||
Prompt the user to choose from a list of strings returned by a
|
||||
command, then invoke another command with all occurances of {choice}
|
||||
replaced by the choice the user made.
|
||||
"""
|
||||
choices = ctx.master.commands.call_args(choicecmd, [])
|
||||
|
||||
def callback(opt):
|
||||
# We're now outside of the call context...
|
||||
repl = " ".join(cmd)
|
||||
repl = repl.replace("{choice}", opt)
|
||||
try:
|
||||
self.master.commands.call(repl)
|
||||
except exceptions.CommandError as e:
|
||||
signals.status_message.send(message=str(e))
|
||||
|
||||
self.master.overlay(
|
||||
overlay.Chooser(self.master, prompt, choices, "", callback)
|
||||
)
|
||||
|
||||
@command.command("console.command")
|
||||
def console_command(self, *partial: str) -> None:
|
||||
"""
|
||||
Prompt the user to edit a command with a (possilby empty) starting value.
|
||||
"""
|
||||
signals.status_prompt_command.send(partial=" ".join(partial)) # type: ignore
|
||||
|
||||
@command.command("console.view.keybindings")
|
||||
def view_keybindings(self) -> None:
|
||||
"""View the commands list."""
|
||||
self.master.switch_view("keybindings")
|
||||
|
||||
@command.command("console.view.commands")
|
||||
def view_commands(self) -> None:
|
||||
"""View the commands list."""
|
||||
self.master.switch_view("commands")
|
||||
|
||||
@command.command("console.view.options")
|
||||
def view_options(self) -> None:
|
||||
"""View the options editor."""
|
||||
self.master.switch_view("options")
|
||||
|
||||
@command.command("console.view.eventlog")
|
||||
def view_eventlog(self) -> None:
|
||||
"""View the options editor."""
|
||||
self.master.switch_view("eventlog")
|
||||
|
||||
@command.command("console.view.help")
|
||||
def view_help(self) -> None:
|
||||
"""View help."""
|
||||
self.master.switch_view("help")
|
||||
|
||||
@command.command("console.view.flow")
|
||||
def view_flow(self, flow: flow.Flow) -> None:
|
||||
"""View a flow."""
|
||||
if hasattr(flow, "request"):
|
||||
# FIME: Also set focus?
|
||||
self.master.switch_view("flowview")
|
||||
|
||||
@command.command("console.exit")
|
||||
def exit(self) -> None:
|
||||
"""Exit mitmproxy."""
|
||||
raise urwid.ExitMainLoop
|
||||
|
||||
@command.command("console.view.pop")
|
||||
def view_pop(self) -> None:
|
||||
"""
|
||||
Pop a view off the console stack. At the top level, this prompts the
|
||||
user to exit mitmproxy.
|
||||
"""
|
||||
signals.pop_view_state.send(self)
|
||||
|
||||
@command.command("console.bodyview")
|
||||
def bodyview(self, f: flow.Flow, part: str) -> None:
|
||||
"""
|
||||
Spawn an external viewer for a flow request or response body based
|
||||
on the detected MIME type. We use the mailcap system to find the
|
||||
correct viewier, and fall back to the programs in $PAGER or $EDITOR
|
||||
if necessary.
|
||||
"""
|
||||
fpart = getattr(f, part)
|
||||
if not fpart:
|
||||
raise exceptions.CommandError("Could not view part %s." % part)
|
||||
t = fpart.headers.get("content-type")
|
||||
content = fpart.get_content(strict=False)
|
||||
if not content:
|
||||
raise exceptions.CommandError("No content to view.")
|
||||
self.master.spawn_external_viewer(content, t)
|
||||
|
||||
@command.command("console.edit.focus.options")
|
||||
def edit_focus_options(self) -> typing.Sequence[str]:
|
||||
return [
|
||||
"cookies",
|
||||
"form",
|
||||
"path",
|
||||
"method",
|
||||
"query",
|
||||
"reason",
|
||||
"request-headers",
|
||||
"response-headers",
|
||||
"status_code",
|
||||
"set-cookies",
|
||||
"url",
|
||||
]
|
||||
|
||||
@command.command("console.edit.focus")
|
||||
def edit_focus(self, part: str) -> None:
|
||||
"""
|
||||
Edit the query of the current focus.
|
||||
"""
|
||||
if part == "cookies":
|
||||
self.master.switch_view("edit_focus_cookies")
|
||||
elif part == "form":
|
||||
self.master.switch_view("edit_focus_form")
|
||||
elif part == "path":
|
||||
self.master.switch_view("edit_focus_path")
|
||||
elif part == "query":
|
||||
self.master.switch_view("edit_focus_query")
|
||||
elif part == "request-headers":
|
||||
self.master.switch_view("edit_focus_request_headers")
|
||||
elif part == "response-headers":
|
||||
self.master.switch_view("edit_focus_response_headers")
|
||||
elif part == "set-cookies":
|
||||
self.master.switch_view("edit_focus_setcookies")
|
||||
elif part in ["url", "method", "status_code", "reason"]:
|
||||
self.master.commands.call(
|
||||
"console.command flow.set @focus %s " % part
|
||||
)
|
||||
|
||||
def _grideditor(self):
|
||||
gewidget = self.master.window.current("grideditor")
|
||||
if not gewidget:
|
||||
raise exceptions.CommandError("Not in a grideditor.")
|
||||
return gewidget.key_responder()
|
||||
|
||||
@command.command("console.grideditor.add")
|
||||
def grideditor_add(self) -> None:
|
||||
"""
|
||||
Add a row after the cursor.
|
||||
"""
|
||||
self._grideditor().cmd_add()
|
||||
|
||||
@command.command("console.grideditor.insert")
|
||||
def grideditor_insert(self) -> None:
|
||||
"""
|
||||
Insert a row before the cursor.
|
||||
"""
|
||||
self._grideditor().cmd_insert()
|
||||
|
||||
@command.command("console.grideditor.delete")
|
||||
def grideditor_delete(self) -> None:
|
||||
"""
|
||||
Delete row
|
||||
"""
|
||||
self._grideditor().cmd_delete()
|
||||
|
||||
@command.command("console.grideditor.readfile")
|
||||
def grideditor_readfile(self, path: str) -> None:
|
||||
"""
|
||||
Read a file into the currrent cell.
|
||||
"""
|
||||
self._grideditor().cmd_read_file(path)
|
||||
|
||||
@command.command("console.grideditor.readfile_escaped")
|
||||
def grideditor_readfile_escaped(self, path: str) -> None:
|
||||
"""
|
||||
Read a file containing a Python-style escaped stringinto the
|
||||
currrent cell.
|
||||
"""
|
||||
self._grideditor().cmd_read_file_escaped(path)
|
||||
|
||||
@command.command("console.grideditor.editor")
|
||||
def grideditor_editor(self) -> None:
|
||||
"""
|
||||
Spawn an external editor on the current cell.
|
||||
"""
|
||||
self._grideditor().cmd_spawn_editor()
|
||||
|
||||
@command.command("console.flowview.mode.set")
|
||||
def flowview_mode_set(self) -> None:
|
||||
"""
|
||||
Set the display mode for the current flow view.
|
||||
"""
|
||||
fv = self.master.window.current("flowview")
|
||||
if not fv:
|
||||
raise exceptions.CommandError("Not viewing a flow.")
|
||||
idx = fv.body.tab_offset
|
||||
|
||||
def callback(opt):
|
||||
try:
|
||||
self.master.commands.call_args(
|
||||
"view.setval",
|
||||
["@focus", "flowview_mode_%s" % idx, opt]
|
||||
)
|
||||
except exceptions.CommandError as e:
|
||||
signals.status_message.send(message=str(e))
|
||||
|
||||
opts = [i.name.lower() for i in contentviews.views]
|
||||
self.master.overlay(overlay.Chooser(self.master, "Mode", opts, "", callback))
|
||||
|
||||
@command.command("console.flowview.mode")
|
||||
def flowview_mode(self) -> str:
|
||||
"""
|
||||
Get the display mode for the current flow view.
|
||||
"""
|
||||
fv = self.master.window.current_window("flowview")
|
||||
if not fv:
|
||||
raise exceptions.CommandError("Not viewing a flow.")
|
||||
idx = fv.body.tab_offset
|
||||
return self.master.commands.call_args(
|
||||
"view.getval",
|
||||
[
|
||||
"@focus",
|
||||
"flowview_mode_%s" % idx,
|
||||
self.master.options.default_contentview,
|
||||
]
|
||||
)
|
||||
|
||||
@command.command("console.eventlog.clear")
|
||||
def eventlog_clear(self) -> None:
|
||||
"""
|
||||
Clear the event log.
|
||||
"""
|
||||
signals.sig_clear_log.send(self)
|
||||
|
||||
def running(self):
|
||||
self.started = True
|
||||
|
||||
def update(self, flows):
|
||||
if not flows:
|
||||
signals.update_settings.send(self)
|
||||
for f in flows:
|
||||
signals.flow_change.send(self, flow=f)
|
||||
|
||||
|
||||
class ConsoleMaster(master.Master):
|
||||
@ -470,14 +48,14 @@ class ConsoleMaster(master.Master):
|
||||
|
||||
signals.call_in.connect(self.sig_call_in)
|
||||
signals.sig_add_log.connect(self.sig_add_log)
|
||||
self.addons.add(Logger())
|
||||
self.addons.add(consoleaddons.Logger())
|
||||
self.addons.add(*addons.default_addons())
|
||||
self.addons.add(
|
||||
intercept.Intercept(),
|
||||
self.view,
|
||||
UnsupportedLog(),
|
||||
consoleaddons.UnsupportedLog(),
|
||||
readfile.ReadFile(),
|
||||
ConsoleAddon(self),
|
||||
consoleaddons.ConsoleAddon(self),
|
||||
)
|
||||
|
||||
def sigint_handler(*args, **kwargs):
|
||||
|
@ -48,3 +48,6 @@ flowlist_change = blinker.Signal()
|
||||
# Pop and push view state onto a stack
|
||||
pop_view_state = blinker.Signal()
|
||||
push_view_state = blinker.Signal()
|
||||
|
||||
# Fired when the key bindings change
|
||||
keybindings_change = blinker.Signal()
|
||||
|
@ -4,6 +4,11 @@ from unittest import mock
|
||||
import pytest
|
||||
|
||||
|
||||
def test_binding():
|
||||
b = keymap.Binding("space", "cmd", ["options"], "")
|
||||
assert b.keyspec() == " "
|
||||
|
||||
|
||||
def test_bind():
|
||||
with taddons.context() as tctx:
|
||||
km = keymap.Keymap(tctx.master)
|
||||
@ -30,3 +35,38 @@ def test_bind():
|
||||
assert km.executor.called
|
||||
|
||||
assert len((km.list("global"))) == 1
|
||||
|
||||
|
||||
def test_join():
|
||||
with taddons.context() as tctx:
|
||||
km = keymap.Keymap(tctx.master)
|
||||
km.add("key", "str", ["options"], "help1")
|
||||
km.add("key", "str", ["commands"])
|
||||
return
|
||||
assert len(km.bindings) == 1
|
||||
assert len(km.bindings[0].contexts) == 2
|
||||
assert km.bindings[0].help == "help1"
|
||||
km.add("key", "str", ["commands"], "help2")
|
||||
assert len(km.bindings) == 1
|
||||
assert len(km.bindings[0].contexts) == 2
|
||||
assert km.bindings[0].help == "help2"
|
||||
|
||||
assert km.get("commands", "key")
|
||||
km.unbind(km.bindings[0])
|
||||
assert len(km.bindings) == 0
|
||||
assert not km.get("commands", "key")
|
||||
|
||||
|
||||
def test_remove():
|
||||
with taddons.context() as tctx:
|
||||
km = keymap.Keymap(tctx.master)
|
||||
km.add("key", "str", ["options", "commands"], "help1")
|
||||
assert len(km.bindings) == 1
|
||||
assert "options" in km.bindings[0].contexts
|
||||
|
||||
km.remove("key", ["options"])
|
||||
assert len(km.bindings) == 1
|
||||
assert "options" not in km.bindings[0].contexts
|
||||
|
||||
km.remove("key", ["commands"])
|
||||
assert len(km.bindings) == 0
|
||||
|
Loading…
Reference in New Issue
Block a user