keymap: keys can now bind to multiple contexts

Use this to map the majority of the keys in flowview.
This commit is contained in:
Aldo Cortesi 2017-05-01 16:28:00 +12:00
parent 670d1e408b
commit 1ea4a5a48e
9 changed files with 135 additions and 186 deletions

View File

@ -6,6 +6,7 @@ import sys
from mitmproxy import exceptions
from mitmproxy import eventsequence
from mitmproxy import controller
from mitmproxy import flow
from . import ctx
import pprint
@ -215,6 +216,9 @@ class AddonManager:
if isinstance(message.reply, controller.DummyReply):
message.reply.mark_reset()
if isinstance(message, flow.Flow):
self.trigger("update", [message])
def invoke_addon(self, addon, name, *args, **kwargs):
"""
Invoke an event on an addon and all its children. This method must

View File

@ -205,12 +205,21 @@ class View(collections.Sequence):
@command.command("view.focus.next")
def focus_next(self) -> None:
"""
A list of all the orders we support.
Set focus to the next flow.
"""
idx = self.focus.index + 1
if self.inbounds(idx):
self.focus.flow = self[idx]
@command.command("view.focus.prev")
def focus_prev(self) -> None:
"""
Set focus to the previous flow.
"""
idx = self.focus.index - 1
if self.inbounds(idx):
self.focus.flow = self[idx]
@command.command("view.order.options")
def order_options(self) -> typing.Sequence[str]:
"""
@ -323,6 +332,7 @@ class View(collections.Sequence):
if dups:
self.add(dups)
self.focus.flow = dups[0]
ctx.log.alert("Duplicated %s flows" % len(dups))
@command.command("view.remove")
def remove(self, flows: typing.Sequence[mitmproxy.flow.Flow]) -> None:

View File

@ -7,7 +7,6 @@ from typing import Optional, Union # noqa
import urwid
from mitmproxy import contentviews
from mitmproxy import exceptions
from mitmproxy import http
from mitmproxy.tools.console import common
from mitmproxy.tools.console import flowdetailview
@ -285,26 +284,9 @@ class FlowDetails(tabs.Tabs):
]
return searchable.Searchable(txt)
def view_flow(self, flow):
signals.pop_view_state.send(self)
self.master.view_flow(flow, self.tab_offset)
def _view_nextprev_flow(self, idx, flow):
if not self.view.inbounds(idx):
signals.status_message.send(message="No more flows")
return
self.view_flow(self.view[idx])
def view_next_flow(self, flow):
return self._view_nextprev_flow(self.view.index(flow) + 1, flow)
def view_prev_flow(self, flow):
return self._view_nextprev_flow(self.view.index(flow) - 1, flow)
def change_this_display_mode(self, t):
view = contentviews.get(t)
self.view.settings[self.flow][(self.tab_offset, "prettyview")] = view.name.lower()
signals.flow_change.send(self, flow=self.flow)
def keypress(self, size, key):
conn = None # type: Optional[Union[http.HTTPRequest, http.HTTPResponse]]
@ -319,69 +301,8 @@ class FlowDetails(tabs.Tabs):
if key in ("up", "down", "page up", "page down"):
# Pass scroll events to the wrapped widget
self._w.keypress(size, key)
elif key == "a":
self.flow.resume()
self.master.view.update(self.flow)
elif key == "A":
for f in self.view:
if f.intercepted:
f.resume()
self.master.view.update(self.flow)
elif key == "d":
if self.flow.killable:
self.flow.kill()
self.view.remove(self.flow)
if not self.view.focus.flow:
self.master.view_flowlist()
else:
self.view_flow(self.view.focus.flow)
elif key == "D":
cp = self.flow.copy()
self.master.view.add(cp)
self.master.view.focus.flow = cp
self.view_flow(cp)
signals.status_message.send(message="Duplicated.")
elif key == "p":
self.view_prev_flow(self.flow)
elif key == "r":
try:
self.master.replay_request(self.flow)
except exceptions.ReplayException as e:
signals.add_log("Replay error: %s" % e, "warn")
signals.flow_change.send(self, flow = self.flow)
elif key == "V":
if self.flow.modified():
self.flow.revert()
signals.flow_change.send(self, flow = self.flow)
signals.status_message.send(message="Reverted.")
else:
signals.status_message.send(message="Flow not modified.")
elif key == "W":
signals.status_prompt_path.send(
prompt = "Save this flow",
callback = self.master.save_one_flow,
args = (self.flow,)
)
elif key == "|":
signals.status_prompt_path.send(
prompt = "Send flow to script",
callback = self.master.run_script_once,
args = (self.flow,)
)
elif key in set("bfgmxvzEC") and not conn:
signals.status_message.send(
message = "Tab to the request or response",
expire = 1
)
return
elif key == "b":
if self.tab_offset == TAB_REQ:
common.ask_save_body("q", self.flow)
else:
common.ask_save_body("s", self.flow)
elif key == "f":
self.view.settings[self.flow][(self.tab_offset, "fullcontents")] = True
signals.flow_change.send(self, flow = self.flow)
signals.status_message.send(message="Loading all body data...")
elif key == "m":
opts = [i.name.lower() for i in contentviews.views]
@ -393,35 +314,6 @@ class FlowDetails(tabs.Tabs):
self.change_this_display_mode
)
)
elif key == "E":
pass
# if self.tab_offset == TAB_REQ:
# scope = "q"
# else:
# scope = "s"
# signals.status_prompt_onekey.send(
# self,
# prompt = "Export to file",
# keys = [(e[0], e[1]) for e in export.EXPORTERS],
# callback = common.export_to_clip_or_file,
# args = (scope, self.flow, common.ask_save_path)
# )
elif key == "C":
pass
# if self.tab_offset == TAB_REQ:
# scope = "q"
# else:
# scope = "s"
# signals.status_prompt_onekey.send(
# self,
# prompt = "Export to clipboard",
# keys = [(e[0], e[1]) for e in export.EXPORTERS],
# callback = common.export_to_clip_or_file,
# args = (scope, self.flow, common.copy_to_clipboard_or_prompt)
# )
elif key == "x":
conn.content = None
signals.flow_change.send(self, flow=self.flow)
elif key == "v":
if conn.raw_content:
t = conn.headers.get("content-type")
@ -452,7 +344,6 @@ class FlowDetails(tabs.Tabs):
callback = self.encode_callback,
args = (conn,)
)
signals.flow_change.send(self, flow = self.flow)
else:
# Key is not handled here.
return key
@ -464,7 +355,6 @@ class FlowDetails(tabs.Tabs):
"b": "br",
}
conn.encode(encoding_map[key])
signals.flow_change.send(self, flow = self.flow)
class FlowView(urwid.Frame):

View File

@ -478,12 +478,12 @@ class FocusEditor(urwid.WidgetWrap):
"""
raise NotImplementedError
def set_data_update(self, flow, vals):
self.set_data(flow, vals)
signals.flow_change.send(self, flow = flow)
def set_data(self, flow, vals):
def set_data(self, vals, flow):
"""
Set the current data on the flow.
"""
signals.flow_change.send(self, flow = self.flow)
raise NotImplementedError
def set_data_update(self, vals, flow):
self.set_data(vals, flow)
signals.flow_change.send(self, flow = flow)

View File

@ -1,8 +1,9 @@
import typing
import collections
from mitmproxy.tools.console import commandeditor
contexts = {
SupportedContexts = {
"commands",
"flowlist",
"flowview",
@ -13,20 +14,34 @@ contexts = {
}
Binding = collections.namedtuple("Binding", ["key", "command", "contexts"])
class Keymap:
def __init__(self, master):
self.executor = commandeditor.CommandExecutor(master)
self.keys = {}
self.bindings = []
def add(self, key: str, command: str, context: str = "global") -> None:
def add(self, key: str, command: str, contexts: typing.Sequence[str]) -> None:
"""
Add a key to the key map. If context is empty, it's considered to be
a global binding.
"""
if context not in contexts:
raise ValueError("Unsupported context: %s" % context)
d = self.keys.setdefault(context, {})
d[key] = command
if not contexts:
raise ValueError("Must specify at least one context.")
for c in contexts:
if c not in SupportedContexts:
raise ValueError("Unsupported context: %s" % c)
b = Binding(key=key, command=command, contexts=contexts)
self.bindings.append(b)
self.bind(b)
def bind(self, binding):
for c in binding.contexts:
d = self.keys.setdefault(c, {})
d[binding.key] = binding.command
def get(self, context: str, key: str) -> typing.Optional[str]:
if context in self.keys:

View File

@ -74,7 +74,8 @@ class UnsupportedLog:
class ConsoleAddon:
"""
An addon that exposes console-specific commands.
An addon that exposes console-specific commands, and hooks into required
events.
"""
def __init__(self, master):
self.master = master
@ -109,24 +110,24 @@ class ConsoleAddon:
@command.command("console.view.commands")
def view_commands(self) -> None:
"""View the commands list."""
self.master.view_commands()
self.master.switch_view("commands")
@command.command("console.view.options")
def view_options(self) -> None:
"""View the options editor."""
self.master.view_options()
self.master.switch_view("options")
@command.command("console.view.help")
def view_help(self) -> None:
"""View help."""
self.master.view_help()
self.master.switch_view("help")
@command.command("console.view.flow")
def view_flow(self, flow: flow.Flow) -> None:
"""View a flow."""
if hasattr(flow, "request"):
# FIME: Also set focus?
self.master.view_flow(flow)
self.master.switch_view("flowview")
@command.command("console.exit")
def exit(self) -> None:
@ -187,6 +188,8 @@ class ConsoleAddon:
def update(self, flows):
if not flows:
signals.update_settings.send(self)
for f in flows:
signals.flow_change.send(self, flow=f)
def configure(self, updated):
if self.started:
@ -195,67 +198,70 @@ class ConsoleAddon:
def default_keymap(km):
km.add(":", "console.command ''")
km.add("?", "console.view.help")
km.add("C", "console.view.commands")
km.add("O", "console.view.options")
km.add("Q", "console.exit")
km.add("q", "console.view.pop")
km.add("i", "console.command set intercept=")
km.add("W", "console.command set save_stream_file=")
km.add(":", "console.command ''", ["global"])
km.add("?", "console.view.help", ["global"])
km.add("C", "console.view.commands", ["global"])
km.add("O", "console.view.options", ["global"])
km.add("Q", "console.exit", ["global"])
km.add("q", "console.view.pop", ["global"])
km.add("i", "console.command set intercept=", ["global"])
km.add("W", "console.command set save_stream_file=", ["global"])
km.add("A", "flow.resume @all", context="flowlist")
km.add("a", "flow.resume @focus", context="flowlist")
km.add("b", "console.command cut.save s.content|@focus ''", context="flowlist")
km.add("d", "view.remove @focus", context="flowlist")
km.add("D", "view.duplicate @focus", context="flowlist")
km.add("e", "set console_eventlog=toggle", context="flowlist")
km.add("A", "flow.resume @all", ["flowlist", "flowview"])
km.add("a", "flow.resume @focus", ["flowlist", "flowview"])
km.add(
"b", "console.command cut.save s.content|@focus ''",
["flowlist", "flowview"]
)
km.add("d", "view.remove @focus", ["flowlist", "flowview"])
km.add("D", "view.duplicate @focus", ["flowlist", "flowview"])
km.add("e", "set console_eventlog=toggle", ["flowlist"])
km.add(
"E",
"console.choose Format export.formats "
"console.command export.file {choice} @focus ''",
context="flowlist"
["flowlist", "flowview"]
)
km.add("f", "console.command 'set view_filter='", context="flowlist")
km.add("F", "set console_focus_follow=toggle", context="flowlist")
km.add("g", "view.go 0", context="flowlist")
km.add("G", "view.go -1", context="flowlist")
km.add("l", "console.command cut.clip ", context="flowlist")
km.add("L", "console.command view.load ", context="flowlist")
km.add("m", "flow.mark.toggle @focus", context="flowlist")
km.add("M", "view.marked.toggle", context="flowlist")
km.add("f", "console.command 'set view_filter='", ["flowlist"])
km.add("F", "set console_focus_follow=toggle", ["flowlist"])
km.add("g", "view.go 0", ["flowlist"])
km.add("G", "view.go -1", ["flowlist"])
km.add("l", "console.command cut.clip ", ["flowlist", "flowview"])
km.add("L", "console.command view.load ", ["flowlist"])
km.add("m", "flow.mark.toggle @focus", ["flowlist"])
km.add("M", "view.marked.toggle", ["flowlist"])
km.add(
"n",
"console.command view.create get https://google.com",
context="flowlist"
["flowlist"]
)
km.add(
"o",
"console.choose Order view.order.options "
"set console_order={choice}",
context="flowlist"
["flowlist"]
)
km.add("r", "replay.client @focus", context="flowlist")
km.add("S", "console.command 'replay.server '")
km.add("v", "set console_order_reversed=toggle", context="flowlist")
km.add("U", "flow.mark @all false", context="flowlist")
km.add("w", "console.command 'save.file @shown '", context="flowlist")
km.add("V", "flow.revert @focus", context="flowlist")
km.add("X", "flow.kill @focus", context="flowlist")
km.add("z", "view.remove @all", context="flowlist")
km.add("Z", "view.remove @hidden", context="flowlist")
km.add("|", "console.command 'script.run @focus '", context="flowlist")
km.add("enter", "console.view.flow @focus", context="flowlist")
km.add("r", "replay.client @focus", ["flowlist", "flowview"])
km.add("S", "console.command 'replay.server '", ["flowlist"])
km.add("v", "set console_order_reversed=toggle", ["flowlist"])
km.add("U", "flow.mark @all false", ["flowlist"])
km.add("w", "console.command 'save.file @shown '", ["flowlist"])
km.add("V", "flow.revert @focus", ["flowlist", "flowview"])
km.add("X", "flow.kill @focus", ["flowlist"])
km.add("z", "view.remove @all", ["flowlist"])
km.add("Z", "view.remove @hidden", ["flowlist"])
km.add("|", "console.command 'script.run @focus '", ["flowlist", "flowview"])
km.add("enter", "console.view.flow @focus", ["flowlist"])
km.add(
"e",
"console.choose Part console.edit.focus.options "
"console.edit.focus {choice}",
context="flowview"
["flowview"]
)
km.add(" ", "view.focus.next", context="flowview")
km.add("X", "console.edit.focus.query", context="flowview")
km.add("w", "console.command 'save.file @focus '", ["flowview"])
km.add(" ", "view.focus.next", ["flowview"])
km.add("p", "view.focus.prev", ["flowview"])
class ConsoleMaster(master.Master):
@ -263,7 +269,6 @@ class ConsoleMaster(master.Master):
def __init__(self, options, server):
super().__init__(options, server)
self.view = view.View() # type: view.View
self.view.sig_view_update.connect(signals.flow_change.send)
self.stream_path = None
# This line is just for type hinting
self.options = self.options # type: Options
@ -434,7 +439,7 @@ class ConsoleMaster(master.Master):
self.loop.set_alarm_in(0.01, self.ticker)
self.loop.set_alarm_in(
0.0001,
lambda *args: self.view_flowlist()
lambda *args: self.switch_view("flowlist")
)
self.start()
@ -466,21 +471,6 @@ class ConsoleMaster(master.Master):
def switch_view(self, name):
self.window.push(name)
def view_help(self):
self.window.push("help")
def view_options(self):
self.window.push("options")
def view_commands(self):
self.window.push("commands")
def view_flowlist(self):
self.window.push("flowlist")
def view_flow(self, flow, tab_offset=0):
self.window.push("flowview")
def quit(self, a):
if a != "n":
self.shutdown()

View File

@ -27,6 +27,7 @@ class Window(urwid.Frame):
signals.focus.connect(self.sig_focus)
self.master.view.focus.sig_change.connect(self.focus_changed)
signals.flow_change.connect(self.flow_changed)
signals.pop_view_state.connect(self.pop)
signals.push_view_state.connect(self.push)
@ -50,6 +51,11 @@ class Window(urwid.Frame):
if f:
f(*args, **kwargs)
def flow_changed(self, sender, flow):
if self.master.view.focus.flow:
if flow.id == self.master.view.focus.flow.id:
self.focus_changed()
def focus_changed(self, *args, **kwargs):
"""
Triggered when the focus changes - either when it's modified, or

View File

@ -218,7 +218,7 @@ def test_resolve():
tctx.command(v.resolve, "~")
def test_go():
def test_movement():
v = view.View()
with taddons.context():
v.add([
@ -240,6 +240,11 @@ def test_go():
v.go(-999)
assert v.focus.index == 0
v.focus_next()
assert v.focus.index == 1
v.focus_prev()
assert v.focus.index == 0
def test_duplicate():
v = view.View()

View File

@ -0,0 +1,29 @@
from mitmproxy.tools.console import keymap
from mitmproxy.test import taddons
from unittest import mock
import pytest
def test_bind():
with taddons.context() as tctx:
km = keymap.Keymap(tctx.master)
km.executor = mock.Mock()
with pytest.raises(ValueError):
km.add("foo", "bar", ["unsupported"])
km.add("key", "str", ["options", "commands"])
assert km.get("options", "key")
assert km.get("commands", "key")
assert not km.get("flowlist", "key")
km.handle("unknown", "unknown")
assert not km.executor.called
km.handle("options", "key")
assert km.executor.called
km.add("glob", "str", ["global"])
km.executor = mock.Mock()
km.handle("options", "glob")
assert km.executor.called