mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-23 00:01:36 +00:00
commit
29b3e787ca
@ -49,6 +49,10 @@ class Save:
|
||||
self.start_stream_to_path(ctx.options.save_stream_file, self.filt)
|
||||
|
||||
def save(self, flows: typing.Sequence[flow.Flow], path: str) -> None:
|
||||
"""
|
||||
Save flows to a file. If the path starts with a +, flows are
|
||||
appended to the file, otherwise it is over-written.
|
||||
"""
|
||||
try:
|
||||
f = self.open_file(path)
|
||||
except IOError as v:
|
||||
|
@ -323,6 +323,9 @@ class View(collections.Sequence):
|
||||
self.focus_follow = ctx.options.console_focus_follow
|
||||
|
||||
def resolve(self, spec: str) -> typing.Sequence[mitmproxy.flow.Flow]:
|
||||
"""
|
||||
Resolve a flow list specification to an actual list of flows.
|
||||
"""
|
||||
if spec == "@focus":
|
||||
return [self.focus.flow] if self.focus.flow else []
|
||||
elif spec == "@shown":
|
||||
|
@ -1,6 +1,8 @@
|
||||
import inspect
|
||||
import typing
|
||||
import shlex
|
||||
import textwrap
|
||||
|
||||
from mitmproxy.utils import typecheck
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import flow
|
||||
@ -25,12 +27,24 @@ class Command:
|
||||
self.manager = manager
|
||||
self.func = func
|
||||
sig = inspect.signature(self.func)
|
||||
self.help = None
|
||||
if func.__doc__:
|
||||
txt = func.__doc__.strip()
|
||||
self.help = "\n".join(textwrap.wrap(txt))
|
||||
self.paramtypes = [v.annotation for v in sig.parameters.values()]
|
||||
self.returntype = sig.return_annotation
|
||||
|
||||
def paramnames(self) -> typing.Sequence[str]:
|
||||
return [typename(i, False) for i in self.paramtypes]
|
||||
|
||||
def retname(self) -> str:
|
||||
return typename(self.returntype, True) if self.returntype else ""
|
||||
|
||||
def signature_help(self) -> str:
|
||||
params = " ".join([typename(i, False) for i in self.paramtypes])
|
||||
ret = " -> " + typename(self.returntype, True) if self.returntype else ""
|
||||
params = " ".join(self.paramnames())
|
||||
ret = self.retname()
|
||||
if ret:
|
||||
ret = " -> " + ret
|
||||
return "%s %s%s" % (self.path, params, ret)
|
||||
|
||||
def call(self, args: typing.Sequence[str]):
|
||||
|
@ -5,8 +5,8 @@ from mitmproxy.tools.console import signals
|
||||
|
||||
|
||||
class CommandEdit(urwid.Edit):
|
||||
def __init__(self):
|
||||
urwid.Edit.__init__(self, ":", "")
|
||||
def __init__(self, partial):
|
||||
urwid.Edit.__init__(self, ":", partial)
|
||||
|
||||
def keypress(self, size, key):
|
||||
return urwid.Edit.keypress(self, size, key)
|
175
mitmproxy/tools/console/commands.py
Normal file
175
mitmproxy/tools/console/commands.py
Normal file
@ -0,0 +1,175 @@
|
||||
import urwid
|
||||
import blinker
|
||||
import textwrap
|
||||
from mitmproxy.tools.console import common
|
||||
from mitmproxy.tools.console import signals
|
||||
|
||||
HELP_HEIGHT = 5
|
||||
|
||||
|
||||
footer = [
|
||||
('heading_key', "enter"), ":edit ",
|
||||
('heading_key', "?"), ":help ",
|
||||
]
|
||||
|
||||
|
||||
def _mkhelp():
|
||||
text = []
|
||||
keys = [
|
||||
("enter", "execute command"),
|
||||
]
|
||||
text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
|
||||
return text
|
||||
|
||||
|
||||
help_context = _mkhelp()
|
||||
|
||||
|
||||
def fcol(s, width, attr):
|
||||
s = str(s)
|
||||
return (
|
||||
"fixed",
|
||||
width,
|
||||
urwid.Text((attr, s))
|
||||
)
|
||||
|
||||
|
||||
command_focus_change = blinker.Signal()
|
||||
|
||||
|
||||
class CommandItem(urwid.WidgetWrap):
|
||||
def __init__(self, walker, cmd, focused):
|
||||
self.walker, self.cmd, self.focused = walker, cmd, focused
|
||||
super().__init__(None)
|
||||
self._w = self.get_widget()
|
||||
|
||||
def get_widget(self):
|
||||
parts = [
|
||||
("focus", ">> " if self.focused else " "),
|
||||
("title", self.cmd.path),
|
||||
("text", " "),
|
||||
("text", " ".join(self.cmd.paramnames())),
|
||||
]
|
||||
if self.cmd.returntype:
|
||||
parts.append([
|
||||
("title", " -> "),
|
||||
("text", self.cmd.retname()),
|
||||
])
|
||||
|
||||
return urwid.AttrMap(
|
||||
urwid.Padding(urwid.Text(parts)),
|
||||
"text"
|
||||
)
|
||||
|
||||
def get_edit_text(self):
|
||||
return self._w[1].get_edit_text()
|
||||
|
||||
def selectable(self):
|
||||
return True
|
||||
|
||||
def keypress(self, size, key):
|
||||
return key
|
||||
|
||||
|
||||
class CommandListWalker(urwid.ListWalker):
|
||||
def __init__(self, master):
|
||||
self.master = master
|
||||
|
||||
self.index = 0
|
||||
self.focusobj = None
|
||||
self.cmds = list(master.commands.commands.values())
|
||||
self.cmds.sort(key=lambda x: x.signature_help())
|
||||
self.set_focus(0)
|
||||
|
||||
def get_edit_text(self):
|
||||
return self.focus_obj.get_edit_text()
|
||||
|
||||
def _get(self, pos):
|
||||
cmd = self.cmds[pos]
|
||||
return CommandItem(self, cmd, pos == self.index)
|
||||
|
||||
def get_focus(self):
|
||||
return self.focus_obj, self.index
|
||||
|
||||
def set_focus(self, index):
|
||||
cmd = self.cmds[index]
|
||||
self.index = index
|
||||
self.focus_obj = self._get(self.index)
|
||||
command_focus_change.send(cmd.help or "")
|
||||
|
||||
def get_next(self, pos):
|
||||
if pos >= len(self.cmds) - 1:
|
||||
return None, None
|
||||
pos = pos + 1
|
||||
return self._get(pos), pos
|
||||
|
||||
def get_prev(self, pos):
|
||||
pos = pos - 1
|
||||
if pos < 0:
|
||||
return None, None
|
||||
return self._get(pos), pos
|
||||
|
||||
|
||||
class CommandsList(urwid.ListBox):
|
||||
def __init__(self, master):
|
||||
self.master = master
|
||||
self.walker = CommandListWalker(master)
|
||||
super().__init__(self.walker)
|
||||
|
||||
def keypress(self, size, key):
|
||||
if key == "enter":
|
||||
foc, idx = self.get_focus()
|
||||
signals.status_prompt_command.send(partial=foc.cmd.path + " ")
|
||||
return super().keypress(size, key)
|
||||
|
||||
|
||||
class CommandHelp(urwid.Frame):
|
||||
def __init__(self, master):
|
||||
self.master = master
|
||||
super().__init__(self.widget(""))
|
||||
self.set_active(False)
|
||||
command_focus_change.connect(self.sig_mod)
|
||||
|
||||
def set_active(self, val):
|
||||
h = urwid.Text("Command Help")
|
||||
style = "heading" if val else "heading_inactive"
|
||||
self.header = urwid.AttrWrap(h, style)
|
||||
|
||||
def widget(self, txt):
|
||||
cols, _ = self.master.ui.get_cols_rows()
|
||||
return urwid.ListBox(
|
||||
[urwid.Text(i) for i in textwrap.wrap(txt, cols)]
|
||||
)
|
||||
|
||||
def sig_mod(self, txt):
|
||||
self.set_body(self.widget(txt))
|
||||
|
||||
|
||||
class Commands(urwid.Pile):
|
||||
def __init__(self, master):
|
||||
oh = CommandHelp(master)
|
||||
super().__init__(
|
||||
[
|
||||
CommandsList(master),
|
||||
(HELP_HEIGHT, oh),
|
||||
]
|
||||
)
|
||||
self.master = master
|
||||
|
||||
def keypress(self, size, key):
|
||||
key = common.shortcuts(key)
|
||||
if key == "tab":
|
||||
self.focus_position = (
|
||||
self.focus_position + 1
|
||||
) % len(self.widget_list)
|
||||
self.widget_list[1].set_active(self.focus_position == 1)
|
||||
key = None
|
||||
|
||||
# This is essentially a copypasta from urwid.Pile's keypress handler.
|
||||
# So much for "closed for modification, but open for extension".
|
||||
item_rows = None
|
||||
if len(size) == 2:
|
||||
item_rows = self.get_item_rows(size, focus = True)
|
||||
i = self.widget_list.index(self.focus_item)
|
||||
tsize = self.get_item_size(size, i, True, item_rows)
|
||||
return self.focus_item.keypress(tsize, key)
|
@ -353,9 +353,7 @@ class FlowListBox(urwid.ListBox):
|
||||
|
||||
def keypress(self, size, key):
|
||||
key = common.shortcuts(key)
|
||||
if key == ":":
|
||||
signals.status_prompt_command.send()
|
||||
elif key == "A":
|
||||
if key == "A":
|
||||
for f in self.master.view:
|
||||
if f.intercepted:
|
||||
f.resume()
|
||||
|
34
mitmproxy/tools/console/keymap.py
Normal file
34
mitmproxy/tools/console/keymap.py
Normal file
@ -0,0 +1,34 @@
|
||||
import typing
|
||||
from mitmproxy.tools.console import commandeditor
|
||||
|
||||
|
||||
class Keymap:
|
||||
def __init__(self, master):
|
||||
self.executor = commandeditor.CommandExecutor(master)
|
||||
self.keys = {}
|
||||
|
||||
def add(self, key: str, command: str, context: str = "") -> None:
|
||||
"""
|
||||
Add a key to the key map. If context is empty, it's considered to be
|
||||
a global binding.
|
||||
"""
|
||||
d = self.keys.setdefault(context, {})
|
||||
d[key] = command
|
||||
|
||||
def get(self, context: str, key: str) -> typing.Optional[str]:
|
||||
if context in self.keys:
|
||||
return self.keys[context].get(key, None)
|
||||
return None
|
||||
|
||||
def handle(self, context: str, key: str) -> typing.Optional[str]:
|
||||
"""
|
||||
Returns the key if it has not been handled, or None.
|
||||
"""
|
||||
cmd = self.get(context, key)
|
||||
if cmd:
|
||||
return self.executor(cmd)
|
||||
if cmd != "":
|
||||
cmd = self.get("", key)
|
||||
if cmd:
|
||||
return self.executor(cmd)
|
||||
return key
|
@ -24,9 +24,10 @@ from mitmproxy.tools.console import flowlist
|
||||
from mitmproxy.tools.console import flowview
|
||||
from mitmproxy.tools.console import grideditor
|
||||
from mitmproxy.tools.console import help
|
||||
from mitmproxy.tools.console import keymap
|
||||
from mitmproxy.tools.console import options
|
||||
from mitmproxy.tools.console import commands
|
||||
from mitmproxy.tools.console import overlay
|
||||
from mitmproxy.tools.console import palettepicker
|
||||
from mitmproxy.tools.console import palettes
|
||||
from mitmproxy.tools.console import signals
|
||||
from mitmproxy.tools.console import statusbar
|
||||
@ -75,6 +76,58 @@ class UnsupportedLog:
|
||||
signals.add_log(strutils.bytes_to_escaped_str(message.content), "debug")
|
||||
|
||||
|
||||
class ConsoleCommands:
|
||||
"""
|
||||
An addon that exposes console-specific commands.
|
||||
"""
|
||||
def __init__(self, master):
|
||||
self.master = master
|
||||
|
||||
def command(self) -> None:
|
||||
"""Prompt for a command."""
|
||||
signals.status_prompt_command.send()
|
||||
|
||||
def view_commands(self) -> None:
|
||||
"""View the commands list."""
|
||||
self.master.view_commands()
|
||||
|
||||
def view_options(self) -> None:
|
||||
"""View the options editor."""
|
||||
self.master.view_options()
|
||||
|
||||
def view_help(self) -> None:
|
||||
"""View help."""
|
||||
self.master.view_help()
|
||||
|
||||
def exit(self) -> None:
|
||||
"""Exit mitmproxy."""
|
||||
raise urwid.ExitMainLoop
|
||||
|
||||
def view_pop(self) -> None:
|
||||
"""
|
||||
Pop a view off the console stack. At the top level, this prompts the
|
||||
user to exit mitmproxy.
|
||||
"""
|
||||
signals.pop_view_state.send(self)
|
||||
|
||||
def load(self, l):
|
||||
l.add_command("console.command", self.command)
|
||||
l.add_command("console.exit", self.exit)
|
||||
l.add_command("console.view.commands", self.view_commands)
|
||||
l.add_command("console.view.help", self.view_help)
|
||||
l.add_command("console.view.options", self.view_options)
|
||||
l.add_command("console.view.pop", self.view_pop)
|
||||
|
||||
|
||||
def default_keymap(km):
|
||||
km.add(":", "console.command")
|
||||
km.add("?", "console.view.help")
|
||||
km.add("C", "console.view.commands")
|
||||
km.add("O", "console.view.options")
|
||||
km.add("Q", "console.exit")
|
||||
km.add("q", "console.view.pop")
|
||||
|
||||
|
||||
class ConsoleMaster(master.Master):
|
||||
|
||||
def __init__(self, options, server):
|
||||
@ -84,6 +137,8 @@ class ConsoleMaster(master.Master):
|
||||
self.stream_path = None
|
||||
# This line is just for type hinting
|
||||
self.options = self.options # type: Options
|
||||
self.keymap = keymap.Keymap(self)
|
||||
default_keymap(self.keymap)
|
||||
self.options.errored.connect(self.options_error)
|
||||
|
||||
self.logbuffer = urwid.SimpleListWalker([])
|
||||
@ -102,6 +157,7 @@ class ConsoleMaster(master.Master):
|
||||
self.view,
|
||||
UnsupportedLog(),
|
||||
readfile.ReadFile(),
|
||||
ConsoleCommands(self),
|
||||
)
|
||||
|
||||
def sigint_handler(*args, **kwargs):
|
||||
@ -331,12 +387,13 @@ class ConsoleMaster(master.Master):
|
||||
)
|
||||
)
|
||||
|
||||
def view_help(self, helpctx):
|
||||
def view_help(self):
|
||||
hc = self.view_stack[0].helpctx
|
||||
signals.push_view_state.send(
|
||||
self,
|
||||
window = window.Window(
|
||||
self,
|
||||
help.HelpView(helpctx),
|
||||
help.HelpView(hc),
|
||||
None,
|
||||
statusbar.StatusBar(self, help.footer),
|
||||
None
|
||||
@ -358,15 +415,18 @@ class ConsoleMaster(master.Master):
|
||||
)
|
||||
)
|
||||
|
||||
def view_palette_picker(self):
|
||||
def view_commands(self):
|
||||
for i in self.view_stack:
|
||||
if isinstance(i["body"], commands.Commands):
|
||||
return
|
||||
signals.push_view_state.send(
|
||||
self,
|
||||
window = window.Window(
|
||||
self,
|
||||
palettepicker.PalettePicker(self),
|
||||
commands.Commands(self),
|
||||
None,
|
||||
statusbar.StatusBar(self, palettepicker.footer),
|
||||
palettepicker.help_context,
|
||||
statusbar.StatusBar(self, commands.footer),
|
||||
options.help_context,
|
||||
)
|
||||
)
|
||||
|
||||
|
@ -1,78 +0,0 @@
|
||||
import urwid
|
||||
|
||||
from mitmproxy.tools.console import common
|
||||
from mitmproxy.tools.console import palettes
|
||||
from mitmproxy.tools.console import select
|
||||
|
||||
footer = [
|
||||
('heading_key', "enter/space"), ":select",
|
||||
]
|
||||
|
||||
|
||||
def _mkhelp():
|
||||
text = []
|
||||
keys = [
|
||||
("enter/space", "select"),
|
||||
]
|
||||
text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
|
||||
return text
|
||||
|
||||
|
||||
help_context = _mkhelp()
|
||||
|
||||
|
||||
class PalettePicker(urwid.WidgetWrap):
|
||||
|
||||
def __init__(self, master):
|
||||
self.master = master
|
||||
low, high = [], []
|
||||
for k, v in palettes.palettes.items():
|
||||
if v.high:
|
||||
high.append(k)
|
||||
else:
|
||||
low.append(k)
|
||||
high.sort()
|
||||
low.sort()
|
||||
|
||||
options = [
|
||||
select.Heading("High Colour")
|
||||
]
|
||||
|
||||
def mkopt(name):
|
||||
return select.Option(
|
||||
i,
|
||||
None,
|
||||
lambda: self.master.options.console_palette == name,
|
||||
lambda: setattr(self.master.options, "console_palette", name)
|
||||
)
|
||||
|
||||
for i in high:
|
||||
options.append(mkopt(i))
|
||||
options.append(select.Heading("Low Colour"))
|
||||
for i in low:
|
||||
options.append(mkopt(i))
|
||||
|
||||
options.extend(
|
||||
[
|
||||
select.Heading("Options"),
|
||||
select.Option(
|
||||
"Transparent",
|
||||
"T",
|
||||
lambda: master.options.console_palette_transparent,
|
||||
master.options.toggler("console_palette_transparent")
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
self.lb = select.Select(options)
|
||||
title = urwid.Text("Palettes")
|
||||
title = urwid.Padding(title, align="left", width=("relative", 100))
|
||||
title = urwid.AttrWrap(title, "heading")
|
||||
self._w = urwid.Frame(
|
||||
self.lb,
|
||||
header = title
|
||||
)
|
||||
master.options.changed.connect(self.sig_options_changed)
|
||||
|
||||
def sig_options_changed(self, options, updated):
|
||||
self.lb.walker._modified()
|
@ -5,7 +5,7 @@ import urwid
|
||||
from mitmproxy.tools.console import common
|
||||
from mitmproxy.tools.console import pathedit
|
||||
from mitmproxy.tools.console import signals
|
||||
from mitmproxy.tools.console import command
|
||||
from mitmproxy.tools.console import commandeditor
|
||||
import mitmproxy.tools.console.master # noqa
|
||||
|
||||
|
||||
@ -69,10 +69,10 @@ class ActionBar(urwid.WidgetWrap):
|
||||
self._w = urwid.Edit(self.prep_prompt(prompt), text or "")
|
||||
self.prompting = PromptStub(callback, args)
|
||||
|
||||
def sig_prompt_command(self, sender):
|
||||
def sig_prompt_command(self, sender, partial=""):
|
||||
signals.focus.send(self, section="footer")
|
||||
self._w = command.CommandEdit()
|
||||
self.prompting = command.CommandExecutor(self.master)
|
||||
self._w = commandeditor.CommandEdit(partial)
|
||||
self.prompting = commandeditor.CommandExecutor(self.master)
|
||||
|
||||
def sig_path_prompt(self, sender, prompt, callback, args=()):
|
||||
signals.focus.send(self, section="footer")
|
||||
|
@ -82,8 +82,9 @@ class Window(urwid.Frame):
|
||||
|
||||
def keypress(self, size, k):
|
||||
k = super().keypress(size, k)
|
||||
if k == "?":
|
||||
self.master.view_help(self.helpctx)
|
||||
k = self.master.keymap.handle("", k)
|
||||
if not k:
|
||||
return
|
||||
elif k == "i":
|
||||
signals.status_prompt.send(
|
||||
self,
|
||||
@ -91,21 +92,5 @@ class Window(urwid.Frame):
|
||||
text = self.master.options.intercept,
|
||||
callback = self.master.options.setter("intercept")
|
||||
)
|
||||
elif k == "O":
|
||||
self.master.view_options()
|
||||
elif k == "Q":
|
||||
raise urwid.ExitMainLoop
|
||||
elif k == "q":
|
||||
signals.pop_view_state.send(self)
|
||||
elif k == "R":
|
||||
signals.status_prompt_onekey.send(
|
||||
self,
|
||||
prompt = "Replay",
|
||||
keys = (
|
||||
("client", "c"),
|
||||
("server", "s"),
|
||||
),
|
||||
callback = self.handle_replay,
|
||||
)
|
||||
else:
|
||||
return k
|
||||
|
@ -12,11 +12,15 @@ import pytest
|
||||
|
||||
class TAddon:
|
||||
def cmd1(self, foo: str) -> str:
|
||||
"""cmd1 help"""
|
||||
return "ret " + foo
|
||||
|
||||
def cmd2(self, foo: str) -> str:
|
||||
return 99
|
||||
|
||||
def empty(self) -> None:
|
||||
pass
|
||||
|
||||
|
||||
class TestCommand:
|
||||
def test_call(self):
|
||||
@ -40,6 +44,7 @@ def test_simple():
|
||||
c = command.CommandManager(m)
|
||||
a = TAddon()
|
||||
c.add("one.two", a.cmd1)
|
||||
assert c.commands["one.two"].help == "cmd1 help"
|
||||
assert(c.call("one.two foo") == "ret foo")
|
||||
with pytest.raises(exceptions.CommandError, match="Unknown"):
|
||||
c.call("nonexistent")
|
||||
@ -48,6 +53,9 @@ def test_simple():
|
||||
with pytest.raises(exceptions.CommandError, match="Usage"):
|
||||
c.call("one.two too many args")
|
||||
|
||||
c.add("empty", a.empty)
|
||||
c.call("empty")
|
||||
|
||||
|
||||
def test_typename():
|
||||
assert command.typename(str, True) == "str"
|
||||
|
Loading…
Reference in New Issue
Block a user