mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2025-02-01 15:55:28 +00:00
Refactor console
- All top-level window objects are now persistent. It turns out that urwid keeps a reference to these even after they're no longer used, so they were leaking before. - Revamp editors to work with console commands, and start reworking bindings for flowview.
This commit is contained in:
parent
288448c575
commit
46373977e2
@ -202,6 +202,15 @@ class View(collections.Sequence):
|
||||
self.sig_view_refresh.send(self)
|
||||
|
||||
# API
|
||||
@command.command("view.focus.next")
|
||||
def focus_next(self) -> None:
|
||||
"""
|
||||
A list of all the orders we support.
|
||||
"""
|
||||
idx = self.focus.index + 1
|
||||
if self.inbounds(idx):
|
||||
self.focus.flow = self[idx]
|
||||
|
||||
@command.command("view.order.options")
|
||||
def order_options(self) -> typing.Sequence[str]:
|
||||
"""
|
||||
|
@ -146,6 +146,8 @@ class CommandHelp(urwid.Frame):
|
||||
|
||||
|
||||
class Commands(urwid.Pile):
|
||||
keyctx = "commands"
|
||||
|
||||
def __init__(self, master):
|
||||
oh = CommandHelp(master)
|
||||
super().__init__(
|
||||
|
@ -183,4 +183,4 @@ def flowdetails(state, flow: http.HTTPFlow):
|
||||
text.append(urwid.Text([("head", "Timing:")]))
|
||||
text.extend(common.format_keyvals(parts, key="key", val="text", indent=4))
|
||||
|
||||
return searchable.Searchable(state, text)
|
||||
return searchable.Searchable(text)
|
||||
|
@ -1,7 +1,6 @@
|
||||
import urwid
|
||||
|
||||
from mitmproxy.tools.console import common
|
||||
from mitmproxy.tools.console import signals
|
||||
import mitmproxy.tools.console.master # noqa
|
||||
|
||||
|
||||
@ -145,14 +144,8 @@ class FlowListWalker(urwid.ListWalker):
|
||||
|
||||
def __init__(self, master):
|
||||
self.master = master
|
||||
self.master.view.sig_view_refresh.connect(self.sig_mod)
|
||||
self.master.view.sig_view_add.connect(self.sig_mod)
|
||||
self.master.view.sig_view_remove.connect(self.sig_mod)
|
||||
self.master.view.sig_view_update.connect(self.sig_mod)
|
||||
self.master.view.focus.sig_change.connect(self.sig_mod)
|
||||
signals.flowlist_change.connect(self.sig_mod)
|
||||
|
||||
def sig_mod(self, *args, **kwargs):
|
||||
def view_changed(self):
|
||||
self._modified()
|
||||
|
||||
def get_focus(self):
|
||||
@ -164,7 +157,6 @@ class FlowListWalker(urwid.ListWalker):
|
||||
def set_focus(self, index):
|
||||
if self.master.view.inbounds(index):
|
||||
self.master.view.focus.index = index
|
||||
signals.flowlist_change.send(self)
|
||||
|
||||
def get_next(self, pos):
|
||||
pos = pos + 1
|
||||
@ -182,6 +174,7 @@ class FlowListWalker(urwid.ListWalker):
|
||||
|
||||
|
||||
class FlowListBox(urwid.ListBox):
|
||||
keyctx = "flowlist"
|
||||
|
||||
def __init__(
|
||||
self, master: "mitmproxy.tools.console.master.ConsoleMaster"
|
||||
@ -192,3 +185,6 @@ class FlowListBox(urwid.ListBox):
|
||||
def keypress(self, size, key):
|
||||
key = common.shortcuts(key)
|
||||
return urwid.ListBox.keypress(self, size, key)
|
||||
|
||||
def view_changed(self):
|
||||
self.body.view_changed()
|
||||
|
@ -9,11 +9,9 @@ import urwid
|
||||
from mitmproxy import contentviews
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import http
|
||||
from mitmproxy.net.http import Headers
|
||||
from mitmproxy.net.http import status_codes
|
||||
from mitmproxy.tools.console import common
|
||||
from mitmproxy.tools.console import flowdetailview
|
||||
from mitmproxy.tools.console import grideditor
|
||||
from mitmproxy.tools.console import overlay
|
||||
from mitmproxy.tools.console import searchable
|
||||
from mitmproxy.tools.console import signals
|
||||
@ -106,49 +104,51 @@ class FlowViewHeader(urwid.WidgetWrap):
|
||||
def __init__(
|
||||
self,
|
||||
master: "mitmproxy.tools.console.master.ConsoleMaster",
|
||||
f: http.HTTPFlow
|
||||
) -> None:
|
||||
self.master = master
|
||||
self.flow = f
|
||||
self._w = common.format_flow(
|
||||
f,
|
||||
False,
|
||||
extended=True,
|
||||
hostheader=self.master.options.showhost
|
||||
)
|
||||
signals.flow_change.connect(self.sig_flow_change)
|
||||
self.focus_changed()
|
||||
|
||||
def sig_flow_change(self, sender, flow):
|
||||
if flow == self.flow:
|
||||
def focus_changed(self):
|
||||
if self.master.view.focus.flow:
|
||||
self._w = common.format_flow(
|
||||
flow,
|
||||
self.master.view.focus.flow,
|
||||
False,
|
||||
extended=True,
|
||||
hostheader=self.master.options.showhost
|
||||
)
|
||||
else:
|
||||
self._w = urwid.Pile([])
|
||||
|
||||
|
||||
TAB_REQ = 0
|
||||
TAB_RESP = 1
|
||||
|
||||
|
||||
class FlowView(tabs.Tabs):
|
||||
class FlowDetails(tabs.Tabs):
|
||||
highlight_color = "focusfield"
|
||||
|
||||
def __init__(self, master, view, flow, tab_offset):
|
||||
self.master, self.view, self.flow = master, view, flow
|
||||
super().__init__(
|
||||
[
|
||||
def __init__(self, master, tab_offset):
|
||||
self.master = master
|
||||
super().__init__([], tab_offset)
|
||||
self.show()
|
||||
self.last_displayed_body = None
|
||||
|
||||
def focus_changed(self):
|
||||
if self.master.view.focus.flow:
|
||||
self.tabs = [
|
||||
(self.tab_request, self.view_request),
|
||||
(self.tab_response, self.view_response),
|
||||
(self.tab_details, self.view_details),
|
||||
],
|
||||
tab_offset
|
||||
)
|
||||
|
||||
]
|
||||
self.show()
|
||||
self.last_displayed_body = None
|
||||
signals.flow_change.connect(self.sig_flow_change)
|
||||
|
||||
@property
|
||||
def view(self):
|
||||
return self.master.view
|
||||
|
||||
@property
|
||||
def flow(self):
|
||||
return self.master.view.focus.flow
|
||||
|
||||
def tab_request(self):
|
||||
if self.flow.intercepted and not self.flow.response:
|
||||
@ -174,10 +174,6 @@ class FlowView(tabs.Tabs):
|
||||
def view_details(self):
|
||||
return flowdetailview.flowdetails(self.view, self.flow)
|
||||
|
||||
def sig_flow_change(self, sender, flow):
|
||||
if flow == self.flow:
|
||||
self.show()
|
||||
|
||||
def content_view(self, viewmode, message):
|
||||
if message.raw_content is None:
|
||||
msg, body = "", [urwid.Text([("error", "[content missing]")])]
|
||||
@ -288,7 +284,7 @@ class FlowView(tabs.Tabs):
|
||||
]
|
||||
)
|
||||
]
|
||||
return searchable.Searchable(self.view, txt)
|
||||
return searchable.Searchable(txt)
|
||||
|
||||
def set_method_raw(self, m):
|
||||
if m:
|
||||
@ -330,44 +326,6 @@ class FlowView(tabs.Tabs):
|
||||
self.flow.response.reason = reason
|
||||
signals.flow_change.send(self, flow = self.flow)
|
||||
|
||||
def set_headers(self, fields, conn):
|
||||
conn.headers = Headers(fields)
|
||||
signals.flow_change.send(self, flow = self.flow)
|
||||
|
||||
def set_query(self, lst, conn):
|
||||
conn.query = lst
|
||||
signals.flow_change.send(self, flow = self.flow)
|
||||
|
||||
def set_path_components(self, lst, conn):
|
||||
conn.path_components = lst
|
||||
signals.flow_change.send(self, flow = self.flow)
|
||||
|
||||
def set_form(self, lst, conn):
|
||||
conn.urlencoded_form = lst
|
||||
signals.flow_change.send(self, flow = self.flow)
|
||||
|
||||
def edit_form(self, conn):
|
||||
self.master.view_grideditor(
|
||||
grideditor.URLEncodedFormEditor(
|
||||
self.master,
|
||||
conn.urlencoded_form.items(multi=True),
|
||||
self.set_form,
|
||||
conn
|
||||
)
|
||||
)
|
||||
|
||||
def edit_form_confirm(self, key, conn):
|
||||
if key == "y":
|
||||
self.edit_form(conn)
|
||||
|
||||
def set_cookies(self, lst, conn):
|
||||
conn.cookies = lst
|
||||
signals.flow_change.send(self, flow = self.flow)
|
||||
|
||||
def set_setcookies(self, data, conn):
|
||||
conn.cookies = data
|
||||
signals.flow_change.send(self, flow = self.flow)
|
||||
|
||||
def edit(self, part):
|
||||
if self.tab_offset == TAB_REQ:
|
||||
message = self.flow.request
|
||||
@ -377,24 +335,6 @@ class FlowView(tabs.Tabs):
|
||||
message = self.flow.response
|
||||
|
||||
self.flow.backup()
|
||||
if message == self.flow.request and part == "c":
|
||||
self.master.view_grideditor(
|
||||
grideditor.CookieEditor(
|
||||
self.master,
|
||||
message.cookies.items(multi=True),
|
||||
self.set_cookies,
|
||||
message
|
||||
)
|
||||
)
|
||||
if message == self.flow.response and part == "c":
|
||||
self.master.view_grideditor(
|
||||
grideditor.SetCookieEditor(
|
||||
self.master,
|
||||
message.cookies.items(multi=True),
|
||||
self.set_setcookies,
|
||||
message
|
||||
)
|
||||
)
|
||||
if part == "r":
|
||||
# Fix an issue caused by some editors when editing a
|
||||
# request/response body. Many editors make it hard to save a
|
||||
@ -404,46 +344,6 @@ class FlowView(tabs.Tabs):
|
||||
# from an editor.
|
||||
c = self.master.spawn_editor(message.get_content(strict=False) or b"")
|
||||
message.content = c.rstrip(b"\n")
|
||||
elif part == "f":
|
||||
if not message.urlencoded_form and message.raw_content:
|
||||
signals.status_prompt_onekey.send(
|
||||
prompt = "Existing body is not a URL-encoded form. Clear and edit?",
|
||||
keys = [
|
||||
("yes", "y"),
|
||||
("no", "n"),
|
||||
],
|
||||
callback = self.edit_form_confirm,
|
||||
args = (message,)
|
||||
)
|
||||
else:
|
||||
self.edit_form(message)
|
||||
elif part == "h":
|
||||
self.master.view_grideditor(
|
||||
grideditor.HeaderEditor(
|
||||
self.master,
|
||||
message.headers.fields,
|
||||
self.set_headers,
|
||||
message
|
||||
)
|
||||
)
|
||||
elif part == "p":
|
||||
p = message.path_components
|
||||
self.master.view_grideditor(
|
||||
grideditor.PathEditor(
|
||||
self.master,
|
||||
p,
|
||||
self.set_path_components,
|
||||
message
|
||||
)
|
||||
)
|
||||
elif part == "q":
|
||||
self.master.view_grideditor(
|
||||
grideditor.QueryEditor(
|
||||
self.master,
|
||||
message.query.items(multi=True),
|
||||
self.set_query, message
|
||||
)
|
||||
)
|
||||
elif part == "u":
|
||||
signals.status_prompt.send(
|
||||
prompt = "URL",
|
||||
@ -500,12 +400,6 @@ class FlowView(tabs.Tabs):
|
||||
|
||||
key = super().keypress(size, key)
|
||||
|
||||
# Special case: Space moves over to the next flow.
|
||||
# We need to catch that before applying common.shortcuts()
|
||||
if key == " ":
|
||||
self.view_next_flow(self.flow)
|
||||
return
|
||||
|
||||
key = common.shortcuts(key)
|
||||
if key in ("up", "down", "page up", "page down"):
|
||||
# Pass scroll events to the wrapped widget
|
||||
@ -689,3 +583,18 @@ class FlowView(tabs.Tabs):
|
||||
}
|
||||
conn.encode(encoding_map[key])
|
||||
signals.flow_change.send(self, flow = self.flow)
|
||||
|
||||
|
||||
class FlowView(urwid.Frame):
|
||||
keyctx = "flowview"
|
||||
|
||||
def __init__(self, master):
|
||||
super().__init__(
|
||||
FlowDetails(master, 0),
|
||||
header = FlowViewHeader(master),
|
||||
)
|
||||
self.master = master
|
||||
|
||||
def focus_changed(self, *args, **kwargs):
|
||||
self.body.focus_changed()
|
||||
self.header.focus_changed()
|
||||
|
@ -252,13 +252,12 @@ FIRST_WIDTH_MAX = 40
|
||||
FIRST_WIDTH_MIN = 20
|
||||
|
||||
|
||||
class GridEditor(urwid.WidgetWrap):
|
||||
title = None # type: str
|
||||
columns = None # type: Sequence[Column]
|
||||
|
||||
class BaseGridEditor(urwid.WidgetWrap):
|
||||
def __init__(
|
||||
self,
|
||||
master: "mitmproxy.tools.console.master.ConsoleMaster",
|
||||
title,
|
||||
columns,
|
||||
value: Any,
|
||||
callback: Callable[..., None],
|
||||
*cb_args,
|
||||
@ -266,6 +265,8 @@ class GridEditor(urwid.WidgetWrap):
|
||||
) -> None:
|
||||
value = self.data_in(copy.deepcopy(value))
|
||||
self.master = master
|
||||
self.title = title
|
||||
self.columns = columns
|
||||
self.value = value
|
||||
self.callback = callback
|
||||
self.cb_args = cb_args
|
||||
@ -307,6 +308,13 @@ class GridEditor(urwid.WidgetWrap):
|
||||
signals.footer_help.send(self, helptext="")
|
||||
self.show_empty_msg()
|
||||
|
||||
def view_popping(self):
|
||||
res = []
|
||||
for i in self.walker.lst:
|
||||
if not i[1] and any([x for x in i[0]]):
|
||||
res.append(i[0])
|
||||
self.callback(self.data_out(res), *self.cb_args, **self.cb_kwargs)
|
||||
|
||||
def show_empty_msg(self):
|
||||
if self.walker.lst:
|
||||
self._w.set_footer(None)
|
||||
@ -339,14 +347,7 @@ class GridEditor(urwid.WidgetWrap):
|
||||
|
||||
key = common.shortcuts(key)
|
||||
column = self.columns[self.walker.focus_col]
|
||||
if key in ["q", "esc"]:
|
||||
res = []
|
||||
for i in self.walker.lst:
|
||||
if not i[1] and any([x for x in i[0]]):
|
||||
res.append(i[0])
|
||||
self.callback(self.data_out(res), *self.cb_args, **self.cb_kwargs)
|
||||
signals.pop_view_state.send(self)
|
||||
elif key == "g":
|
||||
if key == "g":
|
||||
self.walker.set_focus(0)
|
||||
elif key == "G":
|
||||
self.walker.set_focus(len(self.walker.lst) - 1)
|
||||
@ -415,3 +416,74 @@ class GridEditor(urwid.WidgetWrap):
|
||||
)
|
||||
)
|
||||
return text
|
||||
|
||||
|
||||
class GridEditor(urwid.WidgetWrap):
|
||||
title = None # type: str
|
||||
columns = None # type: Sequence[Column]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
master: "mitmproxy.tools.console.master.ConsoleMaster",
|
||||
value: Any,
|
||||
callback: Callable[..., None],
|
||||
*cb_args,
|
||||
**cb_kwargs
|
||||
) -> None:
|
||||
super().__init__(
|
||||
master,
|
||||
value,
|
||||
self.title,
|
||||
self.columns,
|
||||
callback,
|
||||
*cb_args,
|
||||
**cb_kwargs
|
||||
)
|
||||
|
||||
|
||||
class FocusEditor(urwid.WidgetWrap):
|
||||
"""
|
||||
A specialised GridEditor that edits the current focused flow.
|
||||
"""
|
||||
keyctx = "grideditor"
|
||||
|
||||
def __init__(self, master):
|
||||
self.master = master
|
||||
self.focus_changed()
|
||||
|
||||
def focus_changed(self):
|
||||
if self.master.view.focus.flow:
|
||||
self._w = BaseGridEditor(
|
||||
self.master.view.focus.flow,
|
||||
self.title,
|
||||
self.columns,
|
||||
self.get_data(self.master.view.focus.flow),
|
||||
self.set_data_update,
|
||||
self.master.view.focus.flow,
|
||||
)
|
||||
else:
|
||||
self._w = urwid.Pile([])
|
||||
|
||||
def call(self, v, name, *args, **kwargs):
|
||||
f = getattr(v, name, None)
|
||||
if f:
|
||||
f(*args, **kwargs)
|
||||
|
||||
def view_popping(self):
|
||||
self.call(self._w, "view_popping")
|
||||
|
||||
def get_data(self, flow):
|
||||
"""
|
||||
Retrieve the data to edit from the current flow.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def set_data_update(self, flow, vals):
|
||||
self.set_data(flow, vals)
|
||||
signals.flow_change.send(self, flow = flow)
|
||||
|
||||
def set_data(self, flow, vals):
|
||||
"""
|
||||
Set the current data on the flow.
|
||||
"""
|
||||
signals.flow_change.send(self, flow = self.flow)
|
||||
|
@ -1,4 +1,3 @@
|
||||
import os
|
||||
import re
|
||||
|
||||
import urwid
|
||||
@ -13,18 +12,24 @@ from mitmproxy.tools.console.grideditor import col_bytes
|
||||
from mitmproxy.tools.console.grideditor import col_subgrid
|
||||
from mitmproxy.tools.console import signals
|
||||
from mitmproxy.net.http import user_agents
|
||||
from mitmproxy.net.http import Headers
|
||||
|
||||
|
||||
class QueryEditor(base.GridEditor):
|
||||
class QueryEditor(base.FocusEditor):
|
||||
title = "Editing query"
|
||||
columns = [
|
||||
col_text.Column("Key"),
|
||||
col_text.Column("Value")
|
||||
]
|
||||
|
||||
def get_data(self, flow):
|
||||
return flow.request.query.items(multi=True)
|
||||
|
||||
class HeaderEditor(base.GridEditor):
|
||||
title = "Editing headers"
|
||||
def set_data(self, vals, flow):
|
||||
flow.request.query = vals
|
||||
|
||||
|
||||
class HeaderEditor(base.FocusEditor):
|
||||
columns = [
|
||||
col_bytes.Column("Key"),
|
||||
col_bytes.Column("Value")
|
||||
@ -65,35 +70,38 @@ class HeaderEditor(base.GridEditor):
|
||||
return True
|
||||
|
||||
|
||||
class URLEncodedFormEditor(base.GridEditor):
|
||||
class RequestHeaderEditor(HeaderEditor):
|
||||
title = "Editing request headers"
|
||||
|
||||
def get_data(self, flow):
|
||||
return flow.request.headers.fields
|
||||
|
||||
def set_data(self, vals, flow):
|
||||
flow.request.headers = Headers(vals)
|
||||
|
||||
|
||||
class ResponseHeaderEditor(HeaderEditor):
|
||||
title = "Editing response headers"
|
||||
|
||||
def get_data(self, flow):
|
||||
return flow.response.headers.fields
|
||||
|
||||
def set_data(self, vals, flow):
|
||||
flow.response.headers = Headers(vals)
|
||||
|
||||
|
||||
class RequestFormEditor(base.FocusEditor):
|
||||
title = "Editing URL-encoded form"
|
||||
columns = [
|
||||
col_text.Column("Key"),
|
||||
col_text.Column("Value")
|
||||
]
|
||||
|
||||
def get_data(self, flow):
|
||||
return flow.request.urlencoded_form.items(multi=True)
|
||||
|
||||
class ReplaceEditor(base.GridEditor):
|
||||
title = "Editing replacement patterns"
|
||||
columns = [
|
||||
col_text.Column("Filter"),
|
||||
col_text.Column("Regex"),
|
||||
col_text.Column("Replacement"),
|
||||
]
|
||||
|
||||
def is_error(self, col, val):
|
||||
if col == 0:
|
||||
if not flowfilter.parse(val):
|
||||
return "Invalid filter specification."
|
||||
elif col == 1:
|
||||
try:
|
||||
re.compile(val)
|
||||
except re.error:
|
||||
return "Invalid regular expression."
|
||||
elif col == 2:
|
||||
if val.startswith("@") and not os.path.isfile(os.path.expanduser(val[1:])):
|
||||
return "Invalid file path"
|
||||
return False
|
||||
def set_data(self, vals, flow):
|
||||
flow.request.urlencoded_form = vals
|
||||
|
||||
|
||||
class SetHeadersEditor(base.GridEditor):
|
||||
@ -146,7 +154,7 @@ class SetHeadersEditor(base.GridEditor):
|
||||
return True
|
||||
|
||||
|
||||
class PathEditor(base.GridEditor):
|
||||
class PathEditor(base.FocusEditor):
|
||||
# TODO: Next row on enter?
|
||||
|
||||
title = "Editing URL path components"
|
||||
@ -160,6 +168,12 @@ class PathEditor(base.GridEditor):
|
||||
def data_out(self, data):
|
||||
return [i[0] for i in data]
|
||||
|
||||
def get_data(self, flow):
|
||||
return self.data_in(flow.request.path_components)
|
||||
|
||||
def set_data(self, vals, flow):
|
||||
flow.request.path_components = self.data_out(vals)
|
||||
|
||||
|
||||
class ScriptEditor(base.GridEditor):
|
||||
title = "Editing scripts"
|
||||
@ -193,13 +207,19 @@ class HostPatternEditor(base.GridEditor):
|
||||
return [i[0] for i in data]
|
||||
|
||||
|
||||
class CookieEditor(base.GridEditor):
|
||||
class CookieEditor(base.FocusEditor):
|
||||
title = "Editing request Cookie header"
|
||||
columns = [
|
||||
col_text.Column("Name"),
|
||||
col_text.Column("Value"),
|
||||
]
|
||||
|
||||
def get_data(self, flow):
|
||||
return flow.request.cookies.items(multi=True)
|
||||
|
||||
def set_data(self, vals, flow):
|
||||
flow.request.cookies = vals
|
||||
|
||||
|
||||
class CookieAttributeEditor(base.GridEditor):
|
||||
title = "Editing Set-Cookie attributes"
|
||||
@ -221,7 +241,7 @@ class CookieAttributeEditor(base.GridEditor):
|
||||
return ret
|
||||
|
||||
|
||||
class SetCookieEditor(base.GridEditor):
|
||||
class SetCookieEditor(base.FocusEditor):
|
||||
title = "Editing response SetCookie header"
|
||||
columns = [
|
||||
col_text.Column("Name"),
|
||||
@ -246,6 +266,12 @@ class SetCookieEditor(base.GridEditor):
|
||||
)
|
||||
return vals
|
||||
|
||||
def get_data(self, flow):
|
||||
return self.data_in(flow.response.cookies.items(multi=True))
|
||||
|
||||
def set_data(self, vals, flow):
|
||||
flow.response.cookies = self.data_out(vals)
|
||||
|
||||
|
||||
class OptionsEditor(base.GridEditor):
|
||||
title = None # type: str
|
||||
|
@ -15,6 +15,7 @@ footer = [
|
||||
|
||||
|
||||
class HelpView(urwid.ListBox):
|
||||
keyctx = "help"
|
||||
|
||||
def __init__(self, help_context):
|
||||
self.help_context = help_context or []
|
||||
|
@ -22,13 +22,8 @@ from mitmproxy import flow
|
||||
from mitmproxy.addons import intercept
|
||||
from mitmproxy.addons import readfile
|
||||
from mitmproxy.addons import view
|
||||
from mitmproxy.tools.console import flowlist
|
||||
from mitmproxy.tools.console import flowview
|
||||
from mitmproxy.tools.console import grideditor
|
||||
from mitmproxy.tools.console import help
|
||||
from mitmproxy.tools.console import keymap
|
||||
from mitmproxy.tools.console import options
|
||||
from mitmproxy.tools.console import commands
|
||||
from mitmproxy.tools.console import overlay
|
||||
from mitmproxy.tools.console import palettes
|
||||
from mitmproxy.tools.console import signals
|
||||
@ -102,7 +97,7 @@ class ConsoleAddon:
|
||||
repl = repl.replace("{choice}", opt)
|
||||
self.master.commands.call(repl)
|
||||
|
||||
self.master.overlay(overlay.Chooser(choicecmd, choices, "", callback))
|
||||
self.master.overlay(overlay.Chooser(prompt, choices, "", callback))
|
||||
ctx.log.info(choices)
|
||||
|
||||
@command.command("console.command")
|
||||
@ -147,6 +142,38 @@ class ConsoleAddon:
|
||||
"""
|
||||
signals.pop_view_state.send(self)
|
||||
|
||||
@command.command("console.edit.focus.options")
|
||||
def edit_focus_options(self) -> typing.Sequence[str]:
|
||||
return [
|
||||
"cookies",
|
||||
"form",
|
||||
"path",
|
||||
"query",
|
||||
"request-headers",
|
||||
"response-headers",
|
||||
"set-cookies",
|
||||
]
|
||||
|
||||
@command.command("console.edit.focus")
|
||||
def edit_focus(self, part: str) -> None:
|
||||
"""
|
||||
Edit the query of the current focus.
|
||||
"""
|
||||
if part == "cookies":
|
||||
self.master.switch_view("edit_focus_cookies")
|
||||
elif part == "form":
|
||||
self.master.switch_view("edit_focus_form")
|
||||
elif part == "path":
|
||||
self.master.switch_view("edit_focus_path")
|
||||
elif part == "query":
|
||||
self.master.switch_view("edit_focus_query")
|
||||
elif part == "request-headers":
|
||||
self.master.switch_view("edit_focus_request_headers")
|
||||
elif part == "response-headers":
|
||||
self.master.switch_view("edit_focus_response_headers")
|
||||
elif part == "set-cookies":
|
||||
self.master.switch_view("edit_focus_setcookies")
|
||||
|
||||
def running(self):
|
||||
self.started = True
|
||||
|
||||
@ -157,7 +184,7 @@ class ConsoleAddon:
|
||||
def configure(self, updated):
|
||||
if self.started:
|
||||
if "console_eventlog" in updated:
|
||||
self.master.refresh_view()
|
||||
pass
|
||||
|
||||
|
||||
def default_keymap(km):
|
||||
@ -213,6 +240,16 @@ def default_keymap(km):
|
||||
km.add("|", "console.command 'script.run @focus '", context="flowlist")
|
||||
km.add("enter", "console.view.flow @focus", context="flowlist")
|
||||
|
||||
km.add(
|
||||
"t",
|
||||
"console.choose Part console.edit.focus.options "
|
||||
"console.edit.focus {choice}",
|
||||
context="flowlist"
|
||||
)
|
||||
|
||||
km.add(" ", "view.focus.next", context="flowview")
|
||||
km.add("X", "console.edit.focus.query", context="flowview")
|
||||
|
||||
|
||||
class ConsoleMaster(master.Master):
|
||||
|
||||
@ -232,9 +269,6 @@ class ConsoleMaster(master.Master):
|
||||
self.view_stack = []
|
||||
|
||||
signals.call_in.connect(self.sig_call_in)
|
||||
signals.pop_view_state.connect(self.sig_pop_view_state)
|
||||
signals.replace_view_state.connect(self.sig_replace_view_state)
|
||||
signals.push_view_state.connect(self.sig_push_view_state)
|
||||
signals.sig_add_log.connect(self.sig_add_log)
|
||||
self.addons.add(Logger())
|
||||
self.addons.add(*addons.default_addons())
|
||||
@ -251,6 +285,9 @@ class ConsoleMaster(master.Master):
|
||||
|
||||
signal.signal(signal.SIGINT, sigint_handler)
|
||||
|
||||
self.ab = None
|
||||
self.window = None
|
||||
|
||||
def __setattr__(self, name, value):
|
||||
self.__dict__[name] = value
|
||||
signals.update_settings.send(self)
|
||||
@ -294,37 +331,6 @@ class ConsoleMaster(master.Master):
|
||||
return callback(*args)
|
||||
self.loop.set_alarm_in(seconds, cb)
|
||||
|
||||
def sig_replace_view_state(self, sender):
|
||||
"""
|
||||
A view has been pushed onto the stack, and is intended to replace
|
||||
the current view rather than creating a new stack entry.
|
||||
"""
|
||||
if len(self.view_stack) > 1:
|
||||
del self.view_stack[1]
|
||||
|
||||
def sig_pop_view_state(self, sender):
|
||||
"""
|
||||
Pop the top view off the view stack. If no more views will be left
|
||||
after this, prompt for exit.
|
||||
"""
|
||||
if len(self.view_stack) > 1:
|
||||
self.view_stack.pop()
|
||||
self.loop.widget = self.view_stack[-1]
|
||||
else:
|
||||
self.prompt_for_exit()
|
||||
|
||||
def sig_push_view_state(self, sender, window):
|
||||
"""
|
||||
Push a new view onto the view stack.
|
||||
"""
|
||||
self.view_stack.append(window)
|
||||
self.loop.widget = window
|
||||
self.loop.draw_screen()
|
||||
|
||||
def refresh_view(self):
|
||||
self.view_flowlist()
|
||||
signals.replace_view_state.send(self)
|
||||
|
||||
def spawn_editor(self, data):
|
||||
text = not isinstance(data, bytes)
|
||||
fd, name = tempfile.mkstemp('', "mproxy", text=text)
|
||||
@ -413,7 +419,10 @@ class ConsoleMaster(master.Master):
|
||||
screen = self.ui,
|
||||
handle_mouse = self.options.console_mouse,
|
||||
)
|
||||
|
||||
self.ab = statusbar.ActionBar(self)
|
||||
self.window = window.Window(self)
|
||||
self.loop.widget = self.window
|
||||
|
||||
self.loop.set_alarm_in(0.01, self.ticker)
|
||||
self.loop.set_alarm_in(
|
||||
@ -439,63 +448,25 @@ class ConsoleMaster(master.Master):
|
||||
def shutdown(self):
|
||||
raise urwid.ExitMainLoop
|
||||
|
||||
def sig_exit_overlay(self, *args, **kwargs):
|
||||
self.loop.widget = self.window
|
||||
|
||||
def overlay(self, widget, **kwargs):
|
||||
signals.push_view_state.send(
|
||||
self,
|
||||
window = overlay.SimpleOverlay(
|
||||
self,
|
||||
widget,
|
||||
self.loop.widget,
|
||||
widget.width,
|
||||
**kwargs
|
||||
)
|
||||
self.loop.widget = overlay.SimpleOverlay(
|
||||
self, widget, self.loop.widget, widget.width, **kwargs
|
||||
)
|
||||
|
||||
def switch_view(self, name):
|
||||
self.window.push(name)
|
||||
|
||||
def view_help(self):
|
||||
hc = self.view_stack[-1].helpctx
|
||||
signals.push_view_state.send(
|
||||
self,
|
||||
window = window.Window(
|
||||
self,
|
||||
help.HelpView(hc),
|
||||
None,
|
||||
statusbar.StatusBar(self, help.footer),
|
||||
None,
|
||||
"help"
|
||||
)
|
||||
)
|
||||
self.window.push("help")
|
||||
|
||||
def view_options(self):
|
||||
for i in self.view_stack:
|
||||
if isinstance(i["body"], options.Options):
|
||||
return
|
||||
signals.push_view_state.send(
|
||||
self,
|
||||
window = window.Window(
|
||||
self,
|
||||
options.Options(self),
|
||||
None,
|
||||
statusbar.StatusBar(self, options.footer),
|
||||
options.help_context,
|
||||
"options"
|
||||
)
|
||||
)
|
||||
self.window.push("options")
|
||||
|
||||
def view_commands(self):
|
||||
for i in self.view_stack:
|
||||
if isinstance(i["body"], commands.Commands):
|
||||
return
|
||||
signals.push_view_state.send(
|
||||
self,
|
||||
window = window.Window(
|
||||
self,
|
||||
commands.Commands(self),
|
||||
None,
|
||||
statusbar.StatusBar(self, commands.footer),
|
||||
commands.help_context,
|
||||
"commands"
|
||||
)
|
||||
)
|
||||
self.window.push("commands")
|
||||
|
||||
def view_grideditor(self, ge):
|
||||
signals.push_view_state.send(
|
||||
@ -511,39 +482,10 @@ class ConsoleMaster(master.Master):
|
||||
)
|
||||
|
||||
def view_flowlist(self):
|
||||
if self.ui.started:
|
||||
self.ui.clear()
|
||||
|
||||
if self.options.console_eventlog:
|
||||
body = flowlist.BodyPile(self)
|
||||
else:
|
||||
body = flowlist.FlowListBox(self)
|
||||
|
||||
signals.push_view_state.send(
|
||||
self,
|
||||
window = window.Window(
|
||||
self,
|
||||
body,
|
||||
None,
|
||||
statusbar.StatusBar(self, flowlist.footer),
|
||||
flowlist.help_context,
|
||||
"flowlist"
|
||||
)
|
||||
)
|
||||
self.window.push("flowlist")
|
||||
|
||||
def view_flow(self, flow, tab_offset=0):
|
||||
self.view.focus.flow = flow
|
||||
signals.push_view_state.send(
|
||||
self,
|
||||
window = window.Window(
|
||||
self,
|
||||
flowview.FlowView(self, self.view, flow, tab_offset),
|
||||
flowview.FlowViewHeader(self, flow),
|
||||
statusbar.StatusBar(self, flowview.footer),
|
||||
flowview.help_context,
|
||||
"flowview"
|
||||
)
|
||||
)
|
||||
self.window.push("flowview")
|
||||
|
||||
def quit(self, a):
|
||||
if a != "n":
|
||||
|
@ -286,6 +286,8 @@ class OptionHelp(urwid.Frame):
|
||||
|
||||
|
||||
class Options(urwid.Pile):
|
||||
keyctx = "options"
|
||||
|
||||
def __init__(self, master):
|
||||
oh = OptionHelp(master)
|
||||
super().__init__(
|
||||
|
@ -84,7 +84,7 @@ class Chooser(urwid.WidgetWrap):
|
||||
self.choices = choices
|
||||
self.callback = callback
|
||||
choicewidth = max([len(i) for i in choices])
|
||||
self.width = max(choicewidth, len(title) + 5)
|
||||
self.width = max(choicewidth, len(title)) + 5
|
||||
self.walker = ChooserListWalker(choices, current)
|
||||
super().__init__(
|
||||
urwid.AttrWrap(
|
||||
|
@ -16,10 +16,9 @@ class Highlight(urwid.AttrMap):
|
||||
|
||||
class Searchable(urwid.ListBox):
|
||||
|
||||
def __init__(self, view, contents):
|
||||
def __init__(self, contents):
|
||||
self.walker = urwid.SimpleFocusListWalker(contents)
|
||||
urwid.ListBox.__init__(self, self.walker)
|
||||
self.view = view
|
||||
self.search_offset = 0
|
||||
self.current_highlight = None
|
||||
self.search_term = None
|
||||
|
@ -48,4 +48,6 @@ flowlist_change = blinker.Signal()
|
||||
# Pop and push view state onto a stack
|
||||
pop_view_state = blinker.Signal()
|
||||
push_view_state = blinker.Signal()
|
||||
replace_view_state = blinker.Signal()
|
||||
|
||||
# Exits overlay if there is one
|
||||
exit_overlay = blinker.Signal()
|
||||
|
@ -143,6 +143,7 @@ class ActionBar(urwid.WidgetWrap):
|
||||
|
||||
|
||||
class StatusBar(urwid.WidgetWrap):
|
||||
keyctx = ""
|
||||
|
||||
def __init__(
|
||||
self, master: "mitmproxy.tools.console.master.ConsoleMaster", helptext
|
||||
|
@ -27,6 +27,7 @@ class Tabs(urwid.WidgetWrap):
|
||||
self.tab_offset = tab_offset
|
||||
self.tabs = tabs
|
||||
self.show()
|
||||
self._w = urwid.Pile([])
|
||||
|
||||
def change_tab(self, offset):
|
||||
self.tab_offset = offset
|
||||
@ -41,6 +42,9 @@ class Tabs(urwid.WidgetWrap):
|
||||
return self._w.keypress(size, key)
|
||||
|
||||
def show(self):
|
||||
if not self.tabs:
|
||||
return
|
||||
|
||||
headers = []
|
||||
for i in range(len(self.tabs)):
|
||||
txt = self.tabs[i][0]()
|
||||
|
@ -1,22 +1,98 @@
|
||||
import urwid
|
||||
|
||||
from mitmproxy.tools.console import signals
|
||||
from mitmproxy.tools.console import statusbar
|
||||
from mitmproxy.tools.console import flowlist
|
||||
from mitmproxy.tools.console import flowview
|
||||
from mitmproxy.tools.console import commands
|
||||
from mitmproxy.tools.console import options
|
||||
from mitmproxy.tools.console import overlay
|
||||
from mitmproxy.tools.console import help
|
||||
from mitmproxy.tools.console import grideditor
|
||||
|
||||
|
||||
class Window(urwid.Frame):
|
||||
|
||||
def __init__(self, master, body, header, footer, helpctx, keyctx):
|
||||
urwid.Frame.__init__(
|
||||
self,
|
||||
urwid.AttrWrap(body, "background"),
|
||||
header = urwid.AttrWrap(header, "background") if header else None,
|
||||
footer = urwid.AttrWrap(footer, "background") if footer else None
|
||||
def __init__(self, master):
|
||||
super().__init__(
|
||||
None,
|
||||
header = None,
|
||||
footer = statusbar.StatusBar(master, ""),
|
||||
)
|
||||
self.master = master
|
||||
self.helpctx = helpctx
|
||||
self.keyctx = keyctx
|
||||
self.primary_stack = []
|
||||
self.master.view.sig_view_refresh.connect(self.view_changed)
|
||||
self.master.view.sig_view_add.connect(self.view_changed)
|
||||
self.master.view.sig_view_remove.connect(self.view_changed)
|
||||
self.master.view.sig_view_update.connect(self.view_changed)
|
||||
self.master.view.focus.sig_change.connect(self.view_changed)
|
||||
signals.focus.connect(self.sig_focus)
|
||||
|
||||
self.master.view.focus.sig_change.connect(self.focus_changed)
|
||||
|
||||
signals.pop_view_state.connect(self.pop)
|
||||
signals.push_view_state.connect(self.push)
|
||||
self.windows = dict(
|
||||
flowlist = flowlist.FlowListBox(self.master),
|
||||
flowview = flowview.FlowView(self.master),
|
||||
commands = commands.Commands(self.master),
|
||||
options = options.Options(self.master),
|
||||
help = help.HelpView(None),
|
||||
edit_focus_query = grideditor.QueryEditor(self.master),
|
||||
edit_focus_cookies = grideditor.CookieEditor(self.master),
|
||||
edit_focus_setcookies = grideditor.SetCookieEditor(self.master),
|
||||
edit_focus_form = grideditor.RequestFormEditor(self.master),
|
||||
edit_focus_path = grideditor.PathEditor(self.master),
|
||||
edit_focus_request_headers = grideditor.RequestHeaderEditor(self.master),
|
||||
edit_focus_response_headers = grideditor.ResponseHeaderEditor(self.master),
|
||||
)
|
||||
|
||||
def call(self, v, name, *args, **kwargs):
|
||||
f = getattr(v, name, None)
|
||||
if f:
|
||||
f(*args, **kwargs)
|
||||
|
||||
def focus_changed(self, *args, **kwargs):
|
||||
"""
|
||||
Triggered when the focus changes - either when it's modified, or
|
||||
when it changes to a different flow altogether.
|
||||
"""
|
||||
self.call(self.focus, "focus_changed")
|
||||
|
||||
def view_changed(self, *args, **kwargs):
|
||||
"""
|
||||
Triggered when the view list has changed.
|
||||
"""
|
||||
self.call(self.focus, "view_changed")
|
||||
|
||||
def view_popping(self, *args, **kwargs):
|
||||
"""
|
||||
Triggered when the view list has changed.
|
||||
"""
|
||||
self.call(self.focus, "view_popping")
|
||||
|
||||
def push(self, wname):
|
||||
self.primary_stack.append(wname)
|
||||
self.body = urwid.AttrWrap(
|
||||
self.windows[wname], "background"
|
||||
)
|
||||
self.view_changed()
|
||||
self.focus_changed()
|
||||
|
||||
def pop(self, *args, **kwargs):
|
||||
if isinstance(self.master.loop.widget, overlay.SimpleOverlay):
|
||||
self.master.loop.widget = self
|
||||
else:
|
||||
if len(self.primary_stack) > 1:
|
||||
self.view_popping()
|
||||
self.primary_stack.pop()
|
||||
self.body = urwid.AttrWrap(
|
||||
self.windows[self.primary_stack[-1]],
|
||||
"background",
|
||||
)
|
||||
self.view_changed()
|
||||
self.focus_changed()
|
||||
else:
|
||||
self.master.prompt_for_exit()
|
||||
|
||||
def sig_focus(self, sender, section):
|
||||
self.focus_position = section
|
||||
|
||||
@ -37,50 +113,8 @@ class Window(urwid.Frame):
|
||||
return False
|
||||
return True
|
||||
|
||||
def handle_replay(self, k):
|
||||
if k == "c":
|
||||
creplay = self.master.addons.get("clientplayback")
|
||||
if self.master.options.client_replay and creplay.count():
|
||||
def stop_client_playback_prompt(a):
|
||||
if a != "n":
|
||||
self.master.options.client_replay = None
|
||||
signals.status_prompt_onekey.send(
|
||||
self,
|
||||
prompt = "Stop current client replay?",
|
||||
keys = (
|
||||
("yes", "y"),
|
||||
("no", "n"),
|
||||
),
|
||||
callback = stop_client_playback_prompt
|
||||
)
|
||||
else:
|
||||
signals.status_prompt_path.send(
|
||||
self,
|
||||
prompt = "Client replay path",
|
||||
callback = lambda x: self.master.options.setter("client_replay")([x])
|
||||
)
|
||||
elif k == "s":
|
||||
a = self.master.addons.get("serverplayback")
|
||||
if a.count():
|
||||
def stop_server_playback(response):
|
||||
if response == "y":
|
||||
self.master.options.server_replay = []
|
||||
signals.status_prompt_onekey.send(
|
||||
self,
|
||||
prompt = "Stop current server replay?",
|
||||
keys = (
|
||||
("yes", "y"),
|
||||
("no", "n"),
|
||||
),
|
||||
callback = stop_server_playback
|
||||
)
|
||||
else:
|
||||
signals.status_prompt_path.send(
|
||||
self,
|
||||
prompt = "Server playback path",
|
||||
callback = lambda x: self.master.options.setter("server_replay")([x])
|
||||
)
|
||||
|
||||
def keypress(self, size, k):
|
||||
k = super().keypress(size, k)
|
||||
return self.master.keymap.handle(self.keyctx, k)
|
||||
if self.focus.keyctx:
|
||||
k = self.master.keymap.handle(self.focus.keyctx, k)
|
||||
if k:
|
||||
return super().keypress(size, k)
|
||||
|
@ -99,7 +99,7 @@ def run(MasterKlass, args, extra=None): # pragma: no cover
|
||||
except exceptions.OptionsError as e:
|
||||
print("%s: %s" % (sys.argv[0], e), file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except (KeyboardInterrupt, RuntimeError):
|
||||
except (KeyboardInterrupt, RuntimeError) as e:
|
||||
pass
|
||||
return master
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user