mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-23 00:01:36 +00:00
refactor grideditor for py3 compatibility
This commit is contained in:
parent
56796aeda2
commit
480ae46b88
@ -13,8 +13,8 @@ class Replace:
|
|||||||
.replacements is a list of tuples (fpat, rex, s):
|
.replacements is a list of tuples (fpat, rex, s):
|
||||||
|
|
||||||
fpatt: a string specifying a filter pattern.
|
fpatt: a string specifying a filter pattern.
|
||||||
rex: a regular expression.
|
rex: a regular expression, as bytes.
|
||||||
s: the replacement string
|
s: the replacement string, as bytes
|
||||||
"""
|
"""
|
||||||
lst = []
|
lst = []
|
||||||
for fpatt, rex, s in options.replacements:
|
for fpatt, rex, s in options.replacements:
|
||||||
|
@ -1,719 +0,0 @@
|
|||||||
from __future__ import absolute_import, print_function, division
|
|
||||||
|
|
||||||
import copy
|
|
||||||
import os
|
|
||||||
import re
|
|
||||||
|
|
||||||
import urwid
|
|
||||||
|
|
||||||
from mitmproxy import exceptions
|
|
||||||
from mitmproxy import filt
|
|
||||||
from mitmproxy.builtins import script
|
|
||||||
from mitmproxy.console import common
|
|
||||||
from mitmproxy.console import signals
|
|
||||||
from netlib import strutils
|
|
||||||
from netlib.http import cookies
|
|
||||||
from netlib.http import user_agents
|
|
||||||
|
|
||||||
FOOTER = [
|
|
||||||
('heading_key', "enter"), ":edit ",
|
|
||||||
('heading_key', "q"), ":back ",
|
|
||||||
]
|
|
||||||
FOOTER_EDITING = [
|
|
||||||
('heading_key', "esc"), ":stop editing ",
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
class TextColumn:
|
|
||||||
subeditor = None
|
|
||||||
|
|
||||||
def __init__(self, heading):
|
|
||||||
self.heading = heading
|
|
||||||
|
|
||||||
def text(self, obj):
|
|
||||||
return SEscaped(obj or "")
|
|
||||||
|
|
||||||
def blank(self):
|
|
||||||
return ""
|
|
||||||
|
|
||||||
def keypress(self, key, editor):
|
|
||||||
if key == "r":
|
|
||||||
if editor.walker.get_current_value() is not None:
|
|
||||||
signals.status_prompt_path.send(
|
|
||||||
self,
|
|
||||||
prompt = "Read file",
|
|
||||||
callback = editor.read_file
|
|
||||||
)
|
|
||||||
elif key == "R":
|
|
||||||
if editor.walker.get_current_value() is not None:
|
|
||||||
signals.status_prompt_path.send(
|
|
||||||
editor,
|
|
||||||
prompt = "Read unescaped file",
|
|
||||||
callback = editor.read_file,
|
|
||||||
args = (True,)
|
|
||||||
)
|
|
||||||
elif key == "e":
|
|
||||||
o = editor.walker.get_current_value()
|
|
||||||
if o is not None:
|
|
||||||
n = editor.master.spawn_editor(o.encode("string-escape"))
|
|
||||||
n = strutils.clean_hanging_newline(n)
|
|
||||||
editor.walker.set_current_value(n, False)
|
|
||||||
editor.walker._modified()
|
|
||||||
elif key in ["enter"]:
|
|
||||||
editor.walker.start_edit()
|
|
||||||
else:
|
|
||||||
return key
|
|
||||||
|
|
||||||
|
|
||||||
class SubgridColumn:
|
|
||||||
|
|
||||||
def __init__(self, heading, subeditor):
|
|
||||||
self.heading = heading
|
|
||||||
self.subeditor = subeditor
|
|
||||||
|
|
||||||
def text(self, obj):
|
|
||||||
p = cookies._format_pairs(obj, sep="\n")
|
|
||||||
return urwid.Text(p)
|
|
||||||
|
|
||||||
def blank(self):
|
|
||||||
return []
|
|
||||||
|
|
||||||
def keypress(self, key, editor):
|
|
||||||
if key in "rRe":
|
|
||||||
signals.status_message.send(
|
|
||||||
self,
|
|
||||||
message = "Press enter to edit this field.",
|
|
||||||
expire = 1000
|
|
||||||
)
|
|
||||||
return
|
|
||||||
elif key in ["enter"]:
|
|
||||||
editor.master.view_grideditor(
|
|
||||||
self.subeditor(
|
|
||||||
editor.master,
|
|
||||||
editor.walker.get_current_value(),
|
|
||||||
editor.set_subeditor_value,
|
|
||||||
editor.walker.focus,
|
|
||||||
editor.walker.focus_col
|
|
||||||
)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
return key
|
|
||||||
|
|
||||||
|
|
||||||
class SEscaped(urwid.WidgetWrap):
|
|
||||||
|
|
||||||
def __init__(self, txt):
|
|
||||||
txt = txt.encode("string-escape")
|
|
||||||
w = urwid.Text(txt, wrap="any")
|
|
||||||
urwid.WidgetWrap.__init__(self, w)
|
|
||||||
|
|
||||||
def get_text(self):
|
|
||||||
return self._w.get_text()[0]
|
|
||||||
|
|
||||||
def keypress(self, size, key):
|
|
||||||
return key
|
|
||||||
|
|
||||||
def selectable(self):
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
class SEdit(urwid.WidgetWrap):
|
|
||||||
|
|
||||||
def __init__(self, txt):
|
|
||||||
txt = txt.encode("string-escape")
|
|
||||||
w = urwid.Edit(edit_text=txt, wrap="any", multiline=True)
|
|
||||||
w = urwid.AttrWrap(w, "editfield")
|
|
||||||
urwid.WidgetWrap.__init__(self, w)
|
|
||||||
|
|
||||||
def get_text(self):
|
|
||||||
return self._w.get_text()[0].strip()
|
|
||||||
|
|
||||||
def selectable(self):
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
class GridRow(urwid.WidgetWrap):
|
|
||||||
|
|
||||||
def __init__(self, focused, editing, editor, values):
|
|
||||||
self.focused, self.editing, self.editor = focused, editing, editor
|
|
||||||
|
|
||||||
errors = values[1]
|
|
||||||
self.fields = []
|
|
||||||
for i, v in enumerate(values[0]):
|
|
||||||
if focused == i and editing:
|
|
||||||
self.editing = SEdit(v)
|
|
||||||
self.fields.append(self.editing)
|
|
||||||
else:
|
|
||||||
w = self.editor.columns[i].text(v)
|
|
||||||
if focused == i:
|
|
||||||
if i in errors:
|
|
||||||
w = urwid.AttrWrap(w, "focusfield_error")
|
|
||||||
else:
|
|
||||||
w = urwid.AttrWrap(w, "focusfield")
|
|
||||||
elif i in errors:
|
|
||||||
w = urwid.AttrWrap(w, "field_error")
|
|
||||||
self.fields.append(w)
|
|
||||||
|
|
||||||
fspecs = self.fields[:]
|
|
||||||
if len(self.fields) > 1:
|
|
||||||
fspecs[0] = ("fixed", self.editor.first_width + 2, fspecs[0])
|
|
||||||
w = urwid.Columns(
|
|
||||||
fspecs,
|
|
||||||
dividechars = 2
|
|
||||||
)
|
|
||||||
if focused is not None:
|
|
||||||
w.set_focus_column(focused)
|
|
||||||
urwid.WidgetWrap.__init__(self, w)
|
|
||||||
|
|
||||||
def get_edit_value(self):
|
|
||||||
return self.editing.get_text()
|
|
||||||
|
|
||||||
def keypress(self, s, k):
|
|
||||||
if self.editing:
|
|
||||||
w = self._w.column_widths(s)[self.focused]
|
|
||||||
k = self.editing.keypress((w,), k)
|
|
||||||
return k
|
|
||||||
|
|
||||||
def selectable(self):
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
class GridWalker(urwid.ListWalker):
|
|
||||||
|
|
||||||
"""
|
|
||||||
Stores rows as a list of (rows, errors) tuples, where rows is a list
|
|
||||||
and errors is a set with an entry of each offset in rows that is an
|
|
||||||
error.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, lst, editor):
|
|
||||||
self.lst = [(i, set([])) for i in lst]
|
|
||||||
self.editor = editor
|
|
||||||
self.focus = 0
|
|
||||||
self.focus_col = 0
|
|
||||||
self.editing = False
|
|
||||||
|
|
||||||
def _modified(self):
|
|
||||||
self.editor.show_empty_msg()
|
|
||||||
return urwid.ListWalker._modified(self)
|
|
||||||
|
|
||||||
def add_value(self, lst):
|
|
||||||
self.lst.append((lst[:], set([])))
|
|
||||||
self._modified()
|
|
||||||
|
|
||||||
def get_current_value(self):
|
|
||||||
if self.lst:
|
|
||||||
return self.lst[self.focus][0][self.focus_col]
|
|
||||||
|
|
||||||
def set_current_value(self, val, unescaped):
|
|
||||||
if not unescaped:
|
|
||||||
try:
|
|
||||||
val = val.decode("string-escape")
|
|
||||||
except ValueError:
|
|
||||||
signals.status_message.send(
|
|
||||||
self,
|
|
||||||
message = "Invalid Python-style string encoding.",
|
|
||||||
expire = 1000
|
|
||||||
)
|
|
||||||
return
|
|
||||||
errors = self.lst[self.focus][1]
|
|
||||||
emsg = self.editor.is_error(self.focus_col, val)
|
|
||||||
if emsg:
|
|
||||||
signals.status_message.send(message = emsg, expire = 1)
|
|
||||||
errors.add(self.focus_col)
|
|
||||||
else:
|
|
||||||
errors.discard(self.focus_col)
|
|
||||||
self.set_value(val, self.focus, self.focus_col, errors)
|
|
||||||
|
|
||||||
def set_value(self, val, focus, focus_col, errors=None):
|
|
||||||
if not errors:
|
|
||||||
errors = set([])
|
|
||||||
row = list(self.lst[focus][0])
|
|
||||||
row[focus_col] = val
|
|
||||||
self.lst[focus] = [tuple(row), errors]
|
|
||||||
self._modified()
|
|
||||||
|
|
||||||
def delete_focus(self):
|
|
||||||
if self.lst:
|
|
||||||
del self.lst[self.focus]
|
|
||||||
self.focus = min(len(self.lst) - 1, self.focus)
|
|
||||||
self._modified()
|
|
||||||
|
|
||||||
def _insert(self, pos):
|
|
||||||
self.focus = pos
|
|
||||||
self.lst.insert(
|
|
||||||
self.focus,
|
|
||||||
[
|
|
||||||
[c.blank() for c in self.editor.columns], set([])
|
|
||||||
]
|
|
||||||
)
|
|
||||||
self.focus_col = 0
|
|
||||||
self.start_edit()
|
|
||||||
|
|
||||||
def insert(self):
|
|
||||||
return self._insert(self.focus)
|
|
||||||
|
|
||||||
def add(self):
|
|
||||||
return self._insert(min(self.focus + 1, len(self.lst)))
|
|
||||||
|
|
||||||
def start_edit(self):
|
|
||||||
col = self.editor.columns[self.focus_col]
|
|
||||||
if self.lst and not col.subeditor:
|
|
||||||
self.editing = GridRow(
|
|
||||||
self.focus_col, True, self.editor, self.lst[self.focus]
|
|
||||||
)
|
|
||||||
self.editor.master.loop.widget.footer.update(FOOTER_EDITING)
|
|
||||||
self._modified()
|
|
||||||
|
|
||||||
def stop_edit(self):
|
|
||||||
if self.editing:
|
|
||||||
self.editor.master.loop.widget.footer.update(FOOTER)
|
|
||||||
self.set_current_value(self.editing.get_edit_value(), False)
|
|
||||||
self.editing = False
|
|
||||||
self._modified()
|
|
||||||
|
|
||||||
def left(self):
|
|
||||||
self.focus_col = max(self.focus_col - 1, 0)
|
|
||||||
self._modified()
|
|
||||||
|
|
||||||
def right(self):
|
|
||||||
self.focus_col = min(self.focus_col + 1, len(self.editor.columns) - 1)
|
|
||||||
self._modified()
|
|
||||||
|
|
||||||
def tab_next(self):
|
|
||||||
self.stop_edit()
|
|
||||||
if self.focus_col < len(self.editor.columns) - 1:
|
|
||||||
self.focus_col += 1
|
|
||||||
elif self.focus != len(self.lst) - 1:
|
|
||||||
self.focus_col = 0
|
|
||||||
self.focus += 1
|
|
||||||
self._modified()
|
|
||||||
|
|
||||||
def get_focus(self):
|
|
||||||
if self.editing:
|
|
||||||
return self.editing, self.focus
|
|
||||||
elif self.lst:
|
|
||||||
return GridRow(
|
|
||||||
self.focus_col,
|
|
||||||
False,
|
|
||||||
self.editor,
|
|
||||||
self.lst[self.focus]
|
|
||||||
), self.focus
|
|
||||||
else:
|
|
||||||
return None, None
|
|
||||||
|
|
||||||
def set_focus(self, focus):
|
|
||||||
self.stop_edit()
|
|
||||||
self.focus = focus
|
|
||||||
self._modified()
|
|
||||||
|
|
||||||
def get_next(self, pos):
|
|
||||||
if pos + 1 >= len(self.lst):
|
|
||||||
return None, None
|
|
||||||
return GridRow(None, False, self.editor, self.lst[pos + 1]), pos + 1
|
|
||||||
|
|
||||||
def get_prev(self, pos):
|
|
||||||
if pos - 1 < 0:
|
|
||||||
return None, None
|
|
||||||
return GridRow(None, False, self.editor, self.lst[pos - 1]), pos - 1
|
|
||||||
|
|
||||||
|
|
||||||
class GridListBox(urwid.ListBox):
|
|
||||||
|
|
||||||
def __init__(self, lw):
|
|
||||||
urwid.ListBox.__init__(self, lw)
|
|
||||||
|
|
||||||
|
|
||||||
FIRST_WIDTH_MAX = 40
|
|
||||||
FIRST_WIDTH_MIN = 20
|
|
||||||
|
|
||||||
|
|
||||||
class GridEditor(urwid.WidgetWrap):
|
|
||||||
title = None
|
|
||||||
columns = None
|
|
||||||
|
|
||||||
def __init__(self, master, value, callback, *cb_args, **cb_kwargs):
|
|
||||||
value = self.data_in(copy.deepcopy(value))
|
|
||||||
self.master, self.value, self.callback = master, value, callback
|
|
||||||
self.cb_args, self.cb_kwargs = cb_args, cb_kwargs
|
|
||||||
|
|
||||||
first_width = 20
|
|
||||||
if value:
|
|
||||||
for r in value:
|
|
||||||
assert len(r) == len(self.columns)
|
|
||||||
first_width = max(len(r), first_width)
|
|
||||||
self.first_width = min(first_width, FIRST_WIDTH_MAX)
|
|
||||||
|
|
||||||
title = urwid.Text(self.title)
|
|
||||||
title = urwid.Padding(title, align="left", width=("relative", 100))
|
|
||||||
title = urwid.AttrWrap(title, "heading")
|
|
||||||
|
|
||||||
headings = []
|
|
||||||
for i, col in enumerate(self.columns):
|
|
||||||
c = urwid.Text(col.heading)
|
|
||||||
if i == 0 and len(self.columns) > 1:
|
|
||||||
headings.append(("fixed", first_width + 2, c))
|
|
||||||
else:
|
|
||||||
headings.append(c)
|
|
||||||
h = urwid.Columns(
|
|
||||||
headings,
|
|
||||||
dividechars = 2
|
|
||||||
)
|
|
||||||
h = urwid.AttrWrap(h, "heading")
|
|
||||||
|
|
||||||
self.walker = GridWalker(self.value, self)
|
|
||||||
self.lb = GridListBox(self.walker)
|
|
||||||
self._w = urwid.Frame(
|
|
||||||
self.lb,
|
|
||||||
header = urwid.Pile([title, h])
|
|
||||||
)
|
|
||||||
self.master.loop.widget.footer.update("")
|
|
||||||
self.show_empty_msg()
|
|
||||||
|
|
||||||
def show_empty_msg(self):
|
|
||||||
if self.walker.lst:
|
|
||||||
self._w.set_footer(None)
|
|
||||||
else:
|
|
||||||
self._w.set_footer(
|
|
||||||
urwid.Text(
|
|
||||||
[
|
|
||||||
("highlight", "No values. Press "),
|
|
||||||
("key", "a"),
|
|
||||||
("highlight", " to add some."),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
def encode(self, s):
|
|
||||||
if not self.encoding:
|
|
||||||
return s
|
|
||||||
try:
|
|
||||||
return s.encode(self.encoding)
|
|
||||||
except ValueError:
|
|
||||||
return None
|
|
||||||
|
|
||||||
def read_file(self, p, unescaped=False):
|
|
||||||
if p:
|
|
||||||
try:
|
|
||||||
p = os.path.expanduser(p)
|
|
||||||
d = open(p, "rb").read()
|
|
||||||
self.walker.set_current_value(d, unescaped)
|
|
||||||
self.walker._modified()
|
|
||||||
except IOError as v:
|
|
||||||
return str(v)
|
|
||||||
|
|
||||||
def set_subeditor_value(self, val, focus, focus_col):
|
|
||||||
self.walker.set_value(val, focus, focus_col)
|
|
||||||
|
|
||||||
def keypress(self, size, key):
|
|
||||||
if self.walker.editing:
|
|
||||||
if key in ["esc"]:
|
|
||||||
self.walker.stop_edit()
|
|
||||||
elif key == "tab":
|
|
||||||
pf, pfc = self.walker.focus, self.walker.focus_col
|
|
||||||
self.walker.tab_next()
|
|
||||||
if self.walker.focus == pf and self.walker.focus_col != pfc:
|
|
||||||
self.walker.start_edit()
|
|
||||||
else:
|
|
||||||
self._w.keypress(size, key)
|
|
||||||
return None
|
|
||||||
|
|
||||||
key = common.shortcuts(key)
|
|
||||||
column = self.columns[self.walker.focus_col]
|
|
||||||
if key in ["q", "esc"]:
|
|
||||||
res = []
|
|
||||||
for i in self.walker.lst:
|
|
||||||
if not i[1] and any([x for x in i[0]]):
|
|
||||||
res.append(i[0])
|
|
||||||
self.callback(self.data_out(res), *self.cb_args, **self.cb_kwargs)
|
|
||||||
signals.pop_view_state.send(self)
|
|
||||||
elif key == "g":
|
|
||||||
self.walker.set_focus(0)
|
|
||||||
elif key == "G":
|
|
||||||
self.walker.set_focus(len(self.walker.lst) - 1)
|
|
||||||
elif key in ["h", "left"]:
|
|
||||||
self.walker.left()
|
|
||||||
elif key in ["l", "right"]:
|
|
||||||
self.walker.right()
|
|
||||||
elif key == "tab":
|
|
||||||
self.walker.tab_next()
|
|
||||||
elif key == "a":
|
|
||||||
self.walker.add()
|
|
||||||
elif key == "A":
|
|
||||||
self.walker.insert()
|
|
||||||
elif key == "d":
|
|
||||||
self.walker.delete_focus()
|
|
||||||
elif column.keypress(key, self) and not self.handle_key(key):
|
|
||||||
return self._w.keypress(size, key)
|
|
||||||
|
|
||||||
def data_out(self, data):
|
|
||||||
"""
|
|
||||||
Called on raw list data, before data is returned through the
|
|
||||||
callback.
|
|
||||||
"""
|
|
||||||
return data
|
|
||||||
|
|
||||||
def data_in(self, data):
|
|
||||||
"""
|
|
||||||
Called to prepare provided data.
|
|
||||||
"""
|
|
||||||
return data
|
|
||||||
|
|
||||||
def is_error(self, col, val):
|
|
||||||
"""
|
|
||||||
Return False, or a string error message.
|
|
||||||
"""
|
|
||||||
return False
|
|
||||||
|
|
||||||
def handle_key(self, key):
|
|
||||||
return False
|
|
||||||
|
|
||||||
def make_help(self):
|
|
||||||
text = []
|
|
||||||
text.append(urwid.Text([("text", "Editor control:\n")]))
|
|
||||||
keys = [
|
|
||||||
("A", "insert row before cursor"),
|
|
||||||
("a", "add row after cursor"),
|
|
||||||
("d", "delete row"),
|
|
||||||
("e", "spawn external editor on current field"),
|
|
||||||
("q", "save changes and exit editor"),
|
|
||||||
("r", "read value from file"),
|
|
||||||
("R", "read unescaped value from file"),
|
|
||||||
("esc", "save changes and exit editor"),
|
|
||||||
("tab", "next field"),
|
|
||||||
("enter", "edit field"),
|
|
||||||
]
|
|
||||||
text.extend(
|
|
||||||
common.format_keyvals(keys, key="key", val="text", indent=4)
|
|
||||||
)
|
|
||||||
text.append(
|
|
||||||
urwid.Text(
|
|
||||||
[
|
|
||||||
"\n",
|
|
||||||
("text", "Values are escaped Python-style strings.\n"),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
)
|
|
||||||
return text
|
|
||||||
|
|
||||||
|
|
||||||
class QueryEditor(GridEditor):
|
|
||||||
title = "Editing query"
|
|
||||||
columns = [
|
|
||||||
TextColumn("Key"),
|
|
||||||
TextColumn("Value")
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
class HeaderEditor(GridEditor):
|
|
||||||
title = "Editing headers"
|
|
||||||
columns = [
|
|
||||||
TextColumn("Key"),
|
|
||||||
TextColumn("Value")
|
|
||||||
]
|
|
||||||
|
|
||||||
def make_help(self):
|
|
||||||
h = GridEditor.make_help(self)
|
|
||||||
text = []
|
|
||||||
text.append(urwid.Text([("text", "Special keys:\n")]))
|
|
||||||
keys = [
|
|
||||||
("U", "add User-Agent header"),
|
|
||||||
]
|
|
||||||
text.extend(
|
|
||||||
common.format_keyvals(keys, key="key", val="text", indent=4)
|
|
||||||
)
|
|
||||||
text.append(urwid.Text([("text", "\n")]))
|
|
||||||
text.extend(h)
|
|
||||||
return text
|
|
||||||
|
|
||||||
def set_user_agent(self, k):
|
|
||||||
ua = user_agents.get_by_shortcut(k)
|
|
||||||
if ua:
|
|
||||||
self.walker.add_value(
|
|
||||||
[
|
|
||||||
"User-Agent",
|
|
||||||
ua[2]
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
def handle_key(self, key):
|
|
||||||
if key == "U":
|
|
||||||
signals.status_prompt_onekey.send(
|
|
||||||
prompt = "Add User-Agent header:",
|
|
||||||
keys = [(i[0], i[1]) for i in user_agents.UASTRINGS],
|
|
||||||
callback = self.set_user_agent,
|
|
||||||
)
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
class URLEncodedFormEditor(GridEditor):
|
|
||||||
title = "Editing URL-encoded form"
|
|
||||||
columns = [
|
|
||||||
TextColumn("Key"),
|
|
||||||
TextColumn("Value")
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
class ReplaceEditor(GridEditor):
|
|
||||||
title = "Editing replacement patterns"
|
|
||||||
columns = [
|
|
||||||
TextColumn("Filter"),
|
|
||||||
TextColumn("Regex"),
|
|
||||||
TextColumn("Replacement"),
|
|
||||||
]
|
|
||||||
|
|
||||||
def is_error(self, col, val):
|
|
||||||
if col == 0:
|
|
||||||
if not filt.parse(val):
|
|
||||||
return "Invalid filter specification."
|
|
||||||
elif col == 1:
|
|
||||||
try:
|
|
||||||
re.compile(val)
|
|
||||||
except re.error:
|
|
||||||
return "Invalid regular expression."
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
class SetHeadersEditor(GridEditor):
|
|
||||||
title = "Editing header set patterns"
|
|
||||||
columns = [
|
|
||||||
TextColumn("Filter"),
|
|
||||||
TextColumn("Header"),
|
|
||||||
TextColumn("Value"),
|
|
||||||
]
|
|
||||||
|
|
||||||
def is_error(self, col, val):
|
|
||||||
if col == 0:
|
|
||||||
if not filt.parse(val):
|
|
||||||
return "Invalid filter specification"
|
|
||||||
return False
|
|
||||||
|
|
||||||
def make_help(self):
|
|
||||||
h = GridEditor.make_help(self)
|
|
||||||
text = []
|
|
||||||
text.append(urwid.Text([("text", "Special keys:\n")]))
|
|
||||||
keys = [
|
|
||||||
("U", "add User-Agent header"),
|
|
||||||
]
|
|
||||||
text.extend(
|
|
||||||
common.format_keyvals(keys, key="key", val="text", indent=4)
|
|
||||||
)
|
|
||||||
text.append(urwid.Text([("text", "\n")]))
|
|
||||||
text.extend(h)
|
|
||||||
return text
|
|
||||||
|
|
||||||
def set_user_agent(self, k):
|
|
||||||
ua = user_agents.get_by_shortcut(k)
|
|
||||||
if ua:
|
|
||||||
self.walker.add_value(
|
|
||||||
[
|
|
||||||
".*",
|
|
||||||
"User-Agent",
|
|
||||||
ua[2]
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
def handle_key(self, key):
|
|
||||||
if key == "U":
|
|
||||||
signals.status_prompt_onekey.send(
|
|
||||||
prompt = "Add User-Agent header:",
|
|
||||||
keys = [(i[0], i[1]) for i in user_agents.UASTRINGS],
|
|
||||||
callback = self.set_user_agent,
|
|
||||||
)
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
class PathEditor(GridEditor):
|
|
||||||
title = "Editing URL path components"
|
|
||||||
columns = [
|
|
||||||
TextColumn("Component"),
|
|
||||||
]
|
|
||||||
|
|
||||||
def data_in(self, data):
|
|
||||||
return [[i] for i in data]
|
|
||||||
|
|
||||||
def data_out(self, data):
|
|
||||||
return [i[0] for i in data]
|
|
||||||
|
|
||||||
|
|
||||||
class ScriptEditor(GridEditor):
|
|
||||||
title = "Editing scripts"
|
|
||||||
columns = [
|
|
||||||
TextColumn("Command"),
|
|
||||||
]
|
|
||||||
|
|
||||||
def is_error(self, col, val):
|
|
||||||
try:
|
|
||||||
script.parse_command(val)
|
|
||||||
except exceptions.AddonError as e:
|
|
||||||
return str(e)
|
|
||||||
|
|
||||||
|
|
||||||
class HostPatternEditor(GridEditor):
|
|
||||||
title = "Editing host patterns"
|
|
||||||
columns = [
|
|
||||||
TextColumn("Regex (matched on hostname:port / ip:port)")
|
|
||||||
]
|
|
||||||
|
|
||||||
def is_error(self, col, val):
|
|
||||||
try:
|
|
||||||
re.compile(val, re.IGNORECASE)
|
|
||||||
except re.error as e:
|
|
||||||
return "Invalid regex: %s" % str(e)
|
|
||||||
|
|
||||||
def data_in(self, data):
|
|
||||||
return [[i] for i in data]
|
|
||||||
|
|
||||||
def data_out(self, data):
|
|
||||||
return [i[0] for i in data]
|
|
||||||
|
|
||||||
|
|
||||||
class CookieEditor(GridEditor):
|
|
||||||
title = "Editing request Cookie header"
|
|
||||||
columns = [
|
|
||||||
TextColumn("Name"),
|
|
||||||
TextColumn("Value"),
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
class CookieAttributeEditor(GridEditor):
|
|
||||||
title = "Editing Set-Cookie attributes"
|
|
||||||
columns = [
|
|
||||||
TextColumn("Name"),
|
|
||||||
TextColumn("Value"),
|
|
||||||
]
|
|
||||||
|
|
||||||
def data_out(self, data):
|
|
||||||
ret = []
|
|
||||||
for i in data:
|
|
||||||
if not i[1]:
|
|
||||||
ret.append([i[0], None])
|
|
||||||
else:
|
|
||||||
ret.append(i)
|
|
||||||
return ret
|
|
||||||
|
|
||||||
|
|
||||||
class SetCookieEditor(GridEditor):
|
|
||||||
title = "Editing response SetCookie header"
|
|
||||||
columns = [
|
|
||||||
TextColumn("Name"),
|
|
||||||
TextColumn("Value"),
|
|
||||||
SubgridColumn("Attributes", CookieAttributeEditor),
|
|
||||||
]
|
|
||||||
|
|
||||||
def data_in(self, data):
|
|
||||||
flattened = []
|
|
||||||
for key, (value, attrs) in data:
|
|
||||||
flattened.append([key, value, attrs.items(multi=True)])
|
|
||||||
return flattened
|
|
||||||
|
|
||||||
def data_out(self, data):
|
|
||||||
vals = []
|
|
||||||
for key, value, attrs in data:
|
|
||||||
vals.append(
|
|
||||||
[
|
|
||||||
key,
|
|
||||||
(value, attrs)
|
|
||||||
]
|
|
||||||
)
|
|
||||||
return vals
|
|
2
mitmproxy/console/grideditor/__init__.py
Normal file
2
mitmproxy/console/grideditor/__init__.py
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
from .editors import * # noqa
|
||||||
|
from . import base # noqa
|
427
mitmproxy/console/grideditor/base.py
Normal file
427
mitmproxy/console/grideditor/base.py
Normal file
@ -0,0 +1,427 @@
|
|||||||
|
from __future__ import absolute_import, print_function, division
|
||||||
|
import abc
|
||||||
|
import copy
|
||||||
|
|
||||||
|
import six
|
||||||
|
import urwid
|
||||||
|
from mitmproxy.console import common
|
||||||
|
from mitmproxy.console import signals
|
||||||
|
|
||||||
|
from typing import Any # noqa
|
||||||
|
from typing import Callable # noqa
|
||||||
|
from typing import Container # noqa
|
||||||
|
from typing import Iterable # noqa
|
||||||
|
from typing import Optional # noqa
|
||||||
|
from typing import Sequence # noqa
|
||||||
|
from typing import Tuple # noqa
|
||||||
|
|
||||||
|
FOOTER = [
|
||||||
|
('heading_key', "enter"), ":edit ",
|
||||||
|
('heading_key', "q"), ":back ",
|
||||||
|
]
|
||||||
|
FOOTER_EDITING = [
|
||||||
|
('heading_key', "esc"), ":stop editing ",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
|
class Column(object):
|
||||||
|
subeditor = None
|
||||||
|
|
||||||
|
def __init__(self, heading):
|
||||||
|
self.heading = heading
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def Display(self, data):
|
||||||
|
# type: () -> Cell
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def Edit(self, data):
|
||||||
|
# type: () -> Cell
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def blank(self):
|
||||||
|
# type: () -> Any
|
||||||
|
pass
|
||||||
|
|
||||||
|
def keypress(self, key, editor):
|
||||||
|
# type: (str, GridEditor) -> Optional[str]
|
||||||
|
return key
|
||||||
|
|
||||||
|
|
||||||
|
class Cell(urwid.WidgetWrap):
|
||||||
|
__metaclass__ = abc.ABCMeta
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def get_data(self):
|
||||||
|
"""
|
||||||
|
Raises:
|
||||||
|
ValueError, if the current content is invalid.
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
def selectable(self):
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class GridRow(urwid.WidgetWrap):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
focused, # type: Optional[int]
|
||||||
|
editing, # type: bool
|
||||||
|
editor, # type: GridEditor
|
||||||
|
values # type: Tuple[Iterable[bytes], Container[int]
|
||||||
|
):
|
||||||
|
self.focused = focused
|
||||||
|
self.editor = editor
|
||||||
|
self.edit_col = None # type: Optional[Cell]
|
||||||
|
|
||||||
|
errors = values[1]
|
||||||
|
self.fields = []
|
||||||
|
for i, v in enumerate(values[0]):
|
||||||
|
if focused == i and editing:
|
||||||
|
self.edit_col = self.editor.columns[i].Edit(v)
|
||||||
|
self.fields.append(self.edit_col)
|
||||||
|
else:
|
||||||
|
w = self.editor.columns[i].Display(v)
|
||||||
|
if focused == i:
|
||||||
|
if i in errors:
|
||||||
|
w = urwid.AttrWrap(w, "focusfield_error")
|
||||||
|
else:
|
||||||
|
w = urwid.AttrWrap(w, "focusfield")
|
||||||
|
elif i in errors:
|
||||||
|
w = urwid.AttrWrap(w, "field_error")
|
||||||
|
self.fields.append(w)
|
||||||
|
|
||||||
|
fspecs = self.fields[:]
|
||||||
|
if len(self.fields) > 1:
|
||||||
|
fspecs[0] = ("fixed", self.editor.first_width + 2, fspecs[0])
|
||||||
|
w = urwid.Columns(
|
||||||
|
fspecs,
|
||||||
|
dividechars=2
|
||||||
|
)
|
||||||
|
if focused is not None:
|
||||||
|
w.set_focus_column(focused)
|
||||||
|
super(GridRow, self).__init__(w)
|
||||||
|
|
||||||
|
def keypress(self, s, k):
|
||||||
|
if self.edit_col:
|
||||||
|
w = self._w.column_widths(s)[self.focused]
|
||||||
|
k = self.edit_col.keypress((w,), k)
|
||||||
|
return k
|
||||||
|
|
||||||
|
def selectable(self):
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class GridWalker(urwid.ListWalker):
|
||||||
|
"""
|
||||||
|
Stores rows as a list of (rows, errors) tuples, where rows is a list
|
||||||
|
and errors is a set with an entry of each offset in rows that is an
|
||||||
|
error.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
lst, # type: Iterable[list]
|
||||||
|
editor # type: GridEditor
|
||||||
|
):
|
||||||
|
self.lst = [(i, set()) for i in lst]
|
||||||
|
self.editor = editor
|
||||||
|
self.focus = 0
|
||||||
|
self.focus_col = 0
|
||||||
|
self.edit_row = None # type: Optional[GridRow]
|
||||||
|
|
||||||
|
def _modified(self):
|
||||||
|
self.editor.show_empty_msg()
|
||||||
|
return super(GridWalker, self)._modified()
|
||||||
|
|
||||||
|
def add_value(self, lst):
|
||||||
|
self.lst.append(
|
||||||
|
(lst[:], set())
|
||||||
|
)
|
||||||
|
self._modified()
|
||||||
|
|
||||||
|
def get_current_value(self):
|
||||||
|
if self.lst:
|
||||||
|
return self.lst[self.focus][0][self.focus_col]
|
||||||
|
|
||||||
|
def set_current_value(self, val):
|
||||||
|
errors = self.lst[self.focus][1]
|
||||||
|
emsg = self.editor.is_error(self.focus_col, val)
|
||||||
|
if emsg:
|
||||||
|
signals.status_message.send(message=emsg, expire=5)
|
||||||
|
errors.add(self.focus_col)
|
||||||
|
else:
|
||||||
|
errors.discard(self.focus_col)
|
||||||
|
self.set_value(val, self.focus, self.focus_col, errors)
|
||||||
|
|
||||||
|
def set_value(self, val, focus, focus_col, errors=None):
|
||||||
|
if not errors:
|
||||||
|
errors = set([])
|
||||||
|
row = list(self.lst[focus][0])
|
||||||
|
row[focus_col] = val
|
||||||
|
self.lst[focus] = [tuple(row), errors]
|
||||||
|
self._modified()
|
||||||
|
|
||||||
|
def delete_focus(self):
|
||||||
|
if self.lst:
|
||||||
|
del self.lst[self.focus]
|
||||||
|
self.focus = min(len(self.lst) - 1, self.focus)
|
||||||
|
self._modified()
|
||||||
|
|
||||||
|
def _insert(self, pos):
|
||||||
|
self.focus = pos
|
||||||
|
self.lst.insert(
|
||||||
|
self.focus,
|
||||||
|
([c.blank() for c in self.editor.columns], set([]))
|
||||||
|
)
|
||||||
|
self.focus_col = 0
|
||||||
|
self.start_edit()
|
||||||
|
|
||||||
|
def insert(self):
|
||||||
|
return self._insert(self.focus)
|
||||||
|
|
||||||
|
def add(self):
|
||||||
|
return self._insert(min(self.focus + 1, len(self.lst)))
|
||||||
|
|
||||||
|
def start_edit(self):
|
||||||
|
col = self.editor.columns[self.focus_col]
|
||||||
|
if self.lst and not col.subeditor:
|
||||||
|
self.edit_row = GridRow(
|
||||||
|
self.focus_col, True, self.editor, self.lst[self.focus]
|
||||||
|
)
|
||||||
|
self.editor.master.loop.widget.footer.update(FOOTER_EDITING)
|
||||||
|
self._modified()
|
||||||
|
|
||||||
|
def stop_edit(self):
|
||||||
|
if self.edit_row:
|
||||||
|
self.editor.master.loop.widget.footer.update(FOOTER)
|
||||||
|
try:
|
||||||
|
val = self.edit_row.edit_col.get_data()
|
||||||
|
except ValueError:
|
||||||
|
return
|
||||||
|
self.edit_row = None
|
||||||
|
self.set_current_value(val)
|
||||||
|
|
||||||
|
def left(self):
|
||||||
|
self.focus_col = max(self.focus_col - 1, 0)
|
||||||
|
self._modified()
|
||||||
|
|
||||||
|
def right(self):
|
||||||
|
self.focus_col = min(self.focus_col + 1, len(self.editor.columns) - 1)
|
||||||
|
self._modified()
|
||||||
|
|
||||||
|
def tab_next(self):
|
||||||
|
self.stop_edit()
|
||||||
|
if self.focus_col < len(self.editor.columns) - 1:
|
||||||
|
self.focus_col += 1
|
||||||
|
elif self.focus != len(self.lst) - 1:
|
||||||
|
self.focus_col = 0
|
||||||
|
self.focus += 1
|
||||||
|
self._modified()
|
||||||
|
|
||||||
|
def get_focus(self):
|
||||||
|
if self.edit_row:
|
||||||
|
return self.edit_row, self.focus
|
||||||
|
elif self.lst:
|
||||||
|
return GridRow(
|
||||||
|
self.focus_col,
|
||||||
|
False,
|
||||||
|
self.editor,
|
||||||
|
self.lst[self.focus]
|
||||||
|
), self.focus
|
||||||
|
else:
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
def set_focus(self, focus):
|
||||||
|
self.stop_edit()
|
||||||
|
self.focus = focus
|
||||||
|
self._modified()
|
||||||
|
|
||||||
|
def get_next(self, pos):
|
||||||
|
if pos + 1 >= len(self.lst):
|
||||||
|
return None, None
|
||||||
|
return GridRow(None, False, self.editor, self.lst[pos + 1]), pos + 1
|
||||||
|
|
||||||
|
def get_prev(self, pos):
|
||||||
|
if pos - 1 < 0:
|
||||||
|
return None, None
|
||||||
|
return GridRow(None, False, self.editor, self.lst[pos - 1]), pos - 1
|
||||||
|
|
||||||
|
|
||||||
|
class GridListBox(urwid.ListBox):
|
||||||
|
def __init__(self, lw):
|
||||||
|
super(GridListBox, self).__init__(lw)
|
||||||
|
|
||||||
|
|
||||||
|
FIRST_WIDTH_MAX = 40
|
||||||
|
FIRST_WIDTH_MIN = 20
|
||||||
|
|
||||||
|
|
||||||
|
class GridEditor(urwid.WidgetWrap):
|
||||||
|
title = None # type: str
|
||||||
|
columns = None # type: Sequence[Column]
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
master, # type: "mitmproxy.console.master.ConsoleMaster"
|
||||||
|
value, # type: Any
|
||||||
|
callback, # type: Callable[..., None]
|
||||||
|
*cb_args,
|
||||||
|
**cb_kwargs
|
||||||
|
):
|
||||||
|
value = self.data_in(copy.deepcopy(value))
|
||||||
|
self.master = master
|
||||||
|
self.value = value
|
||||||
|
self.callback = callback
|
||||||
|
self.cb_args = cb_args
|
||||||
|
self.cb_kwargs = cb_kwargs
|
||||||
|
|
||||||
|
first_width = 20
|
||||||
|
if value:
|
||||||
|
for r in value:
|
||||||
|
assert len(r) == len(self.columns)
|
||||||
|
first_width = max(len(r), first_width)
|
||||||
|
self.first_width = min(first_width, FIRST_WIDTH_MAX)
|
||||||
|
|
||||||
|
title = urwid.Text(self.title)
|
||||||
|
title = urwid.Padding(title, align="left", width=("relative", 100))
|
||||||
|
title = urwid.AttrWrap(title, "heading")
|
||||||
|
|
||||||
|
headings = []
|
||||||
|
for i, col in enumerate(self.columns):
|
||||||
|
c = urwid.Text(col.heading)
|
||||||
|
if i == 0 and len(self.columns) > 1:
|
||||||
|
headings.append(("fixed", first_width + 2, c))
|
||||||
|
else:
|
||||||
|
headings.append(c)
|
||||||
|
h = urwid.Columns(
|
||||||
|
headings,
|
||||||
|
dividechars=2
|
||||||
|
)
|
||||||
|
h = urwid.AttrWrap(h, "heading")
|
||||||
|
|
||||||
|
self.walker = GridWalker(self.value, self)
|
||||||
|
self.lb = GridListBox(self.walker)
|
||||||
|
w = urwid.Frame(
|
||||||
|
self.lb,
|
||||||
|
header=urwid.Pile([title, h])
|
||||||
|
)
|
||||||
|
super(GridEditor, self).__init__(w)
|
||||||
|
self.master.loop.widget.footer.update("")
|
||||||
|
self.show_empty_msg()
|
||||||
|
|
||||||
|
def show_empty_msg(self):
|
||||||
|
if self.walker.lst:
|
||||||
|
self._w.set_footer(None)
|
||||||
|
else:
|
||||||
|
self._w.set_footer(
|
||||||
|
urwid.Text(
|
||||||
|
[
|
||||||
|
("highlight", "No values. Press "),
|
||||||
|
("key", "a"),
|
||||||
|
("highlight", " to add some."),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
def set_subeditor_value(self, val, focus, focus_col):
|
||||||
|
self.walker.set_value(val, focus, focus_col)
|
||||||
|
|
||||||
|
def keypress(self, size, key):
|
||||||
|
if self.walker.edit_row:
|
||||||
|
if key in ["esc"]:
|
||||||
|
self.walker.stop_edit()
|
||||||
|
elif key == "tab":
|
||||||
|
pf, pfc = self.walker.focus, self.walker.focus_col
|
||||||
|
self.walker.tab_next()
|
||||||
|
if self.walker.focus == pf and self.walker.focus_col != pfc:
|
||||||
|
self.walker.start_edit()
|
||||||
|
else:
|
||||||
|
self._w.keypress(size, key)
|
||||||
|
return None
|
||||||
|
|
||||||
|
key = common.shortcuts(key)
|
||||||
|
column = self.columns[self.walker.focus_col]
|
||||||
|
if key in ["q", "esc"]:
|
||||||
|
res = []
|
||||||
|
for i in self.walker.lst:
|
||||||
|
if not i[1] and any([x for x in i[0]]):
|
||||||
|
res.append(i[0])
|
||||||
|
self.callback(self.data_out(res), *self.cb_args, **self.cb_kwargs)
|
||||||
|
signals.pop_view_state.send(self)
|
||||||
|
elif key == "g":
|
||||||
|
self.walker.set_focus(0)
|
||||||
|
elif key == "G":
|
||||||
|
self.walker.set_focus(len(self.walker.lst) - 1)
|
||||||
|
elif key in ["h", "left"]:
|
||||||
|
self.walker.left()
|
||||||
|
elif key in ["l", "right"]:
|
||||||
|
self.walker.right()
|
||||||
|
elif key == "tab":
|
||||||
|
self.walker.tab_next()
|
||||||
|
elif key == "a":
|
||||||
|
self.walker.add()
|
||||||
|
elif key == "A":
|
||||||
|
self.walker.insert()
|
||||||
|
elif key == "d":
|
||||||
|
self.walker.delete_focus()
|
||||||
|
elif column.keypress(key, self) and not self.handle_key(key):
|
||||||
|
return self._w.keypress(size, key)
|
||||||
|
|
||||||
|
def data_out(self, data):
|
||||||
|
# type: (Sequence[list]) -> Any
|
||||||
|
"""
|
||||||
|
Called on raw list data, before data is returned through the
|
||||||
|
callback.
|
||||||
|
"""
|
||||||
|
return data
|
||||||
|
|
||||||
|
def data_in(self, data):
|
||||||
|
# type: (Any) -> Iterable[list]
|
||||||
|
"""
|
||||||
|
Called to prepare provided data.
|
||||||
|
"""
|
||||||
|
return data
|
||||||
|
|
||||||
|
def is_error(self, col, val):
|
||||||
|
# type: (int, Any) -> Optional[str]
|
||||||
|
"""
|
||||||
|
Return None, or a string error message.
|
||||||
|
"""
|
||||||
|
return False
|
||||||
|
|
||||||
|
def handle_key(self, key):
|
||||||
|
return False
|
||||||
|
|
||||||
|
def make_help(self):
|
||||||
|
text = [
|
||||||
|
urwid.Text([("text", "Editor control:\n")])
|
||||||
|
]
|
||||||
|
keys = [
|
||||||
|
("A", "insert row before cursor"),
|
||||||
|
("a", "add row after cursor"),
|
||||||
|
("d", "delete row"),
|
||||||
|
("e", "spawn external editor on current field"),
|
||||||
|
("q", "save changes and exit editor"),
|
||||||
|
("r", "read value from file"),
|
||||||
|
("R", "read unescaped value from file"),
|
||||||
|
("esc", "save changes and exit editor"),
|
||||||
|
("tab", "next field"),
|
||||||
|
("enter", "edit field"),
|
||||||
|
]
|
||||||
|
text.extend(
|
||||||
|
common.format_keyvals(keys, key="key", val="text", indent=4)
|
||||||
|
)
|
||||||
|
text.append(
|
||||||
|
urwid.Text(
|
||||||
|
[
|
||||||
|
"\n",
|
||||||
|
("text", "Values are escaped Python-style strings.\n"),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return text
|
103
mitmproxy/console/grideditor/col_bytes.py
Normal file
103
mitmproxy/console/grideditor/col_bytes.py
Normal file
@ -0,0 +1,103 @@
|
|||||||
|
from __future__ import absolute_import, print_function, division
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
import urwid
|
||||||
|
from mitmproxy.console import signals
|
||||||
|
from mitmproxy.console.grideditor import base
|
||||||
|
from netlib import strutils
|
||||||
|
|
||||||
|
|
||||||
|
def read_file(filename, callback, escaped):
|
||||||
|
# type: (str, Callable[...,None], bool) -> Optional[str]
|
||||||
|
if not filename:
|
||||||
|
return
|
||||||
|
|
||||||
|
filename = os.path.expanduser(filename)
|
||||||
|
try:
|
||||||
|
with open(filename, "r" if escaped else "rb") as f:
|
||||||
|
d = f.read()
|
||||||
|
except IOError as v:
|
||||||
|
return str(v)
|
||||||
|
|
||||||
|
if escaped:
|
||||||
|
try:
|
||||||
|
d = strutils.escaped_str_to_bytes(d)
|
||||||
|
except ValueError:
|
||||||
|
return "Invalid Python-style string encoding."
|
||||||
|
# TODO: Refactor the status_prompt_path signal so that we
|
||||||
|
# can raise exceptions here and return the content instead.
|
||||||
|
callback(d)
|
||||||
|
|
||||||
|
|
||||||
|
class Column(base.Column):
|
||||||
|
def Display(self, data):
|
||||||
|
return Display(data)
|
||||||
|
|
||||||
|
def Edit(self, data):
|
||||||
|
return Edit(data)
|
||||||
|
|
||||||
|
def blank(self):
|
||||||
|
return b""
|
||||||
|
|
||||||
|
def keypress(self, key, editor):
|
||||||
|
if key == "r":
|
||||||
|
if editor.walker.get_current_value() is not None:
|
||||||
|
signals.status_prompt_path.send(
|
||||||
|
self,
|
||||||
|
prompt="Read file",
|
||||||
|
callback=read_file,
|
||||||
|
args=(editor.walker.set_current_value, True)
|
||||||
|
)
|
||||||
|
elif key == "R":
|
||||||
|
if editor.walker.get_current_value() is not None:
|
||||||
|
signals.status_prompt_path.send(
|
||||||
|
self,
|
||||||
|
prompt="Read unescaped file",
|
||||||
|
callback=read_file,
|
||||||
|
args=(editor.walker.set_current_value, False)
|
||||||
|
)
|
||||||
|
elif key == "e":
|
||||||
|
o = editor.walker.get_current_value()
|
||||||
|
if o is not None:
|
||||||
|
n = editor.master.spawn_editor(o)
|
||||||
|
n = strutils.clean_hanging_newline(n)
|
||||||
|
editor.walker.set_current_value(n)
|
||||||
|
elif key in ["enter"]:
|
||||||
|
editor.walker.start_edit()
|
||||||
|
else:
|
||||||
|
return key
|
||||||
|
|
||||||
|
|
||||||
|
class Display(base.Cell):
|
||||||
|
def __init__(self, data):
|
||||||
|
# type: (bytes) -> Display
|
||||||
|
self.data = data
|
||||||
|
escaped = strutils.bytes_to_escaped_str(data)
|
||||||
|
w = urwid.Text(escaped, wrap="any")
|
||||||
|
super(Display, self).__init__(w)
|
||||||
|
|
||||||
|
def get_data(self):
|
||||||
|
return self.data
|
||||||
|
|
||||||
|
|
||||||
|
class Edit(base.Cell):
|
||||||
|
def __init__(self, data):
|
||||||
|
# type: (bytes) -> Edit
|
||||||
|
data = strutils.bytes_to_escaped_str(data)
|
||||||
|
w = urwid.Edit(edit_text=data, wrap="any", multiline=True)
|
||||||
|
w = urwid.AttrWrap(w, "editfield")
|
||||||
|
super(Edit, self).__init__(w)
|
||||||
|
|
||||||
|
def get_data(self):
|
||||||
|
# type: () -> bytes
|
||||||
|
txt = self._w.get_text()[0].strip()
|
||||||
|
try:
|
||||||
|
return strutils.escaped_str_to_bytes(txt)
|
||||||
|
except ValueError:
|
||||||
|
signals.status_message.send(
|
||||||
|
self,
|
||||||
|
message="Invalid Python-style string encoding.",
|
||||||
|
expire=1000
|
||||||
|
)
|
||||||
|
raise
|
51
mitmproxy/console/grideditor/col_subgrid.py
Normal file
51
mitmproxy/console/grideditor/col_subgrid.py
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
from __future__ import absolute_import, print_function, division
|
||||||
|
import urwid
|
||||||
|
from mitmproxy.console.grideditor import base
|
||||||
|
from mitmproxy.console import signals
|
||||||
|
from netlib.http import cookies
|
||||||
|
|
||||||
|
|
||||||
|
class Column(base.Column):
|
||||||
|
def __init__(self, heading, subeditor):
|
||||||
|
super(Column, self).__init__(heading)
|
||||||
|
self.subeditor = subeditor
|
||||||
|
|
||||||
|
def Edit(self, data):
|
||||||
|
raise RuntimeError("SubgridColumn should handle edits itself")
|
||||||
|
|
||||||
|
def Display(self, data):
|
||||||
|
return Display(data)
|
||||||
|
|
||||||
|
def blank(self):
|
||||||
|
return []
|
||||||
|
|
||||||
|
def keypress(self, key, editor):
|
||||||
|
if key in "rRe":
|
||||||
|
signals.status_message.send(
|
||||||
|
self,
|
||||||
|
message="Press enter to edit this field.",
|
||||||
|
expire=1000
|
||||||
|
)
|
||||||
|
return
|
||||||
|
elif key in ["enter"]:
|
||||||
|
editor.master.view_grideditor(
|
||||||
|
self.subeditor(
|
||||||
|
editor.master,
|
||||||
|
editor.walker.get_current_value(),
|
||||||
|
editor.set_subeditor_value,
|
||||||
|
editor.walker.focus,
|
||||||
|
editor.walker.focus_col
|
||||||
|
)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
return key
|
||||||
|
|
||||||
|
|
||||||
|
class Display(base.Cell):
|
||||||
|
def __init__(self, data):
|
||||||
|
p = cookies._format_pairs(data, sep="\n")
|
||||||
|
w = urwid.Text(p)
|
||||||
|
super(Display, self).__init__(w)
|
||||||
|
|
||||||
|
def get_data(self):
|
||||||
|
pass
|
55
mitmproxy/console/grideditor/col_text.py
Normal file
55
mitmproxy/console/grideditor/col_text.py
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
"""
|
||||||
|
Welcome to the encoding dance!
|
||||||
|
|
||||||
|
In a nutshell, text columns are actually a proxy class for byte columns,
|
||||||
|
which just encode/decodes contents.
|
||||||
|
"""
|
||||||
|
from __future__ import absolute_import, print_function, division
|
||||||
|
|
||||||
|
from mitmproxy.console import signals
|
||||||
|
from mitmproxy.console.grideditor import col_bytes
|
||||||
|
|
||||||
|
|
||||||
|
class Column(col_bytes.Column):
|
||||||
|
def __init__(self, heading, encoding="utf8", errors="surrogateescape"):
|
||||||
|
super(Column, self).__init__(heading)
|
||||||
|
self.encoding_args = encoding, errors
|
||||||
|
|
||||||
|
def Display(self, data):
|
||||||
|
return TDisplay(data, self.encoding_args)
|
||||||
|
|
||||||
|
def Edit(self, data):
|
||||||
|
return TEdit(data, self.encoding_args)
|
||||||
|
|
||||||
|
def blank(self):
|
||||||
|
return u""
|
||||||
|
|
||||||
|
|
||||||
|
# This is the same for both edit and display.
|
||||||
|
class EncodingMixin(object):
|
||||||
|
def __init__(self, data, encoding_args):
|
||||||
|
# type: (str) -> TDisplay
|
||||||
|
self.encoding_args = encoding_args
|
||||||
|
data = data.encode(*self.encoding_args)
|
||||||
|
super(EncodingMixin, self).__init__(data)
|
||||||
|
|
||||||
|
def get_data(self):
|
||||||
|
data = super(EncodingMixin, self).get_data()
|
||||||
|
try:
|
||||||
|
return data.decode(*self.encoding_args)
|
||||||
|
except ValueError:
|
||||||
|
signals.status_message.send(
|
||||||
|
self,
|
||||||
|
message="Invalid encoding.",
|
||||||
|
expire=1000
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
# urwid forces a different name for a subclass.
|
||||||
|
class TDisplay(EncodingMixin, col_bytes.Display):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class TEdit(EncodingMixin, col_bytes.Edit):
|
||||||
|
pass
|
239
mitmproxy/console/grideditor/editors.py
Normal file
239
mitmproxy/console/grideditor/editors.py
Normal file
@ -0,0 +1,239 @@
|
|||||||
|
from __future__ import absolute_import, print_function, division
|
||||||
|
import re
|
||||||
|
import urwid
|
||||||
|
from mitmproxy import filt
|
||||||
|
from mitmproxy.builtins import script
|
||||||
|
from mitmproxy import exceptions
|
||||||
|
from mitmproxy.console import common
|
||||||
|
from mitmproxy.console.grideditor import base
|
||||||
|
from mitmproxy.console.grideditor import col_bytes
|
||||||
|
from mitmproxy.console.grideditor import col_text
|
||||||
|
from mitmproxy.console.grideditor import col_subgrid
|
||||||
|
from mitmproxy.console import signals
|
||||||
|
from netlib.http import user_agents
|
||||||
|
|
||||||
|
|
||||||
|
class QueryEditor(base.GridEditor):
|
||||||
|
title = "Editing query"
|
||||||
|
columns = [
|
||||||
|
col_text.Column("Key"),
|
||||||
|
col_text.Column("Value")
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class HeaderEditor(base.GridEditor):
|
||||||
|
title = "Editing headers"
|
||||||
|
columns = [
|
||||||
|
col_bytes.Column("Key"),
|
||||||
|
col_bytes.Column("Value")
|
||||||
|
]
|
||||||
|
|
||||||
|
def make_help(self):
|
||||||
|
h = super(HeaderEditor, self).make_help()
|
||||||
|
text = [
|
||||||
|
urwid.Text([("text", "Special keys:\n")])
|
||||||
|
]
|
||||||
|
keys = [
|
||||||
|
("U", "add User-Agent header"),
|
||||||
|
]
|
||||||
|
text.extend(
|
||||||
|
common.format_keyvals(keys, key="key", val="text", indent=4)
|
||||||
|
)
|
||||||
|
text.append(urwid.Text([("text", "\n")]))
|
||||||
|
text.extend(h)
|
||||||
|
return text
|
||||||
|
|
||||||
|
def set_user_agent(self, k):
|
||||||
|
ua = user_agents.get_by_shortcut(k)
|
||||||
|
if ua:
|
||||||
|
self.walker.add_value(
|
||||||
|
[
|
||||||
|
b"User-Agent",
|
||||||
|
ua[2].encode()
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
def handle_key(self, key):
|
||||||
|
if key == "U":
|
||||||
|
signals.status_prompt_onekey.send(
|
||||||
|
prompt="Add User-Agent header:",
|
||||||
|
keys=[(i[0], i[1]) for i in user_agents.UASTRINGS],
|
||||||
|
callback=self.set_user_agent,
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class URLEncodedFormEditor(base.GridEditor):
|
||||||
|
title = "Editing URL-encoded form"
|
||||||
|
columns = [
|
||||||
|
col_bytes.Column("Key"),
|
||||||
|
col_bytes.Column("Value")
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class ReplaceEditor(base.GridEditor):
|
||||||
|
title = "Editing replacement patterns"
|
||||||
|
columns = [
|
||||||
|
col_text.Column("Filter"),
|
||||||
|
col_bytes.Column("Regex"),
|
||||||
|
col_bytes.Column("Replacement"),
|
||||||
|
]
|
||||||
|
|
||||||
|
def is_error(self, col, val):
|
||||||
|
if col == 0:
|
||||||
|
if not filt.parse(val):
|
||||||
|
return "Invalid filter specification."
|
||||||
|
elif col == 1:
|
||||||
|
try:
|
||||||
|
re.compile(val)
|
||||||
|
except re.error:
|
||||||
|
return "Invalid regular expression."
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
class SetHeadersEditor(base.GridEditor):
|
||||||
|
title = "Editing header set patterns"
|
||||||
|
columns = [
|
||||||
|
col_text.Column("Filter"),
|
||||||
|
col_bytes.Column("Header"),
|
||||||
|
col_bytes.Column("Value"),
|
||||||
|
]
|
||||||
|
|
||||||
|
def is_error(self, col, val):
|
||||||
|
if col == 0:
|
||||||
|
if not filt.parse(val):
|
||||||
|
return "Invalid filter specification"
|
||||||
|
return False
|
||||||
|
|
||||||
|
def make_help(self):
|
||||||
|
h = super(SetHeadersEditor, self).make_help()
|
||||||
|
text = [
|
||||||
|
urwid.Text([("text", "Special keys:\n")])
|
||||||
|
]
|
||||||
|
keys = [
|
||||||
|
("U", "add User-Agent header"),
|
||||||
|
]
|
||||||
|
text.extend(
|
||||||
|
common.format_keyvals(keys, key="key", val="text", indent=4)
|
||||||
|
)
|
||||||
|
text.append(urwid.Text([("text", "\n")]))
|
||||||
|
text.extend(h)
|
||||||
|
return text
|
||||||
|
|
||||||
|
def set_user_agent(self, k):
|
||||||
|
ua = user_agents.get_by_shortcut(k)
|
||||||
|
if ua:
|
||||||
|
self.walker.add_value(
|
||||||
|
[
|
||||||
|
".*",
|
||||||
|
b"User-Agent",
|
||||||
|
ua[2].encode()
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
def handle_key(self, key):
|
||||||
|
if key == "U":
|
||||||
|
signals.status_prompt_onekey.send(
|
||||||
|
prompt="Add User-Agent header:",
|
||||||
|
keys=[(i[0], i[1]) for i in user_agents.UASTRINGS],
|
||||||
|
callback=self.set_user_agent,
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class PathEditor(base.GridEditor):
|
||||||
|
# TODO: Next row on enter?
|
||||||
|
|
||||||
|
title = "Editing URL path components"
|
||||||
|
columns = [
|
||||||
|
col_text.Column("Component"),
|
||||||
|
]
|
||||||
|
|
||||||
|
def data_in(self, data):
|
||||||
|
return [[i] for i in data]
|
||||||
|
|
||||||
|
def data_out(self, data):
|
||||||
|
return [i[0] for i in data]
|
||||||
|
|
||||||
|
|
||||||
|
class ScriptEditor(base.GridEditor):
|
||||||
|
title = "Editing scripts"
|
||||||
|
columns = [
|
||||||
|
col_text.Column("Command"),
|
||||||
|
]
|
||||||
|
|
||||||
|
def is_error(self, col, val):
|
||||||
|
try:
|
||||||
|
script.parse_command(val)
|
||||||
|
except exceptions.AddonError as e:
|
||||||
|
return str(e)
|
||||||
|
|
||||||
|
|
||||||
|
class HostPatternEditor(base.GridEditor):
|
||||||
|
title = "Editing host patterns"
|
||||||
|
columns = [
|
||||||
|
col_text.Column("Regex (matched on hostname:port / ip:port)")
|
||||||
|
]
|
||||||
|
|
||||||
|
def is_error(self, col, val):
|
||||||
|
try:
|
||||||
|
re.compile(val, re.IGNORECASE)
|
||||||
|
except re.error as e:
|
||||||
|
return "Invalid regex: %s" % str(e)
|
||||||
|
|
||||||
|
def data_in(self, data):
|
||||||
|
return [[i] for i in data]
|
||||||
|
|
||||||
|
def data_out(self, data):
|
||||||
|
return [i[0] for i in data]
|
||||||
|
|
||||||
|
|
||||||
|
class CookieEditor(base.GridEditor):
|
||||||
|
title = "Editing request Cookie header"
|
||||||
|
columns = [
|
||||||
|
col_text.Column("Name"),
|
||||||
|
col_text.Column("Value"),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class CookieAttributeEditor(base.GridEditor):
|
||||||
|
title = "Editing Set-Cookie attributes"
|
||||||
|
columns = [
|
||||||
|
col_text.Column("Name"),
|
||||||
|
col_text.Column("Value"),
|
||||||
|
]
|
||||||
|
|
||||||
|
def data_out(self, data):
|
||||||
|
ret = []
|
||||||
|
for i in data:
|
||||||
|
if not i[1]:
|
||||||
|
ret.append([i[0], None])
|
||||||
|
else:
|
||||||
|
ret.append(i)
|
||||||
|
return ret
|
||||||
|
|
||||||
|
|
||||||
|
class SetCookieEditor(base.GridEditor):
|
||||||
|
title = "Editing response SetCookie header"
|
||||||
|
columns = [
|
||||||
|
col_text.Column("Name"),
|
||||||
|
col_text.Column("Value"),
|
||||||
|
col_subgrid.Column("Attributes", CookieAttributeEditor),
|
||||||
|
]
|
||||||
|
|
||||||
|
def data_in(self, data):
|
||||||
|
flattened = []
|
||||||
|
for key, (value, attrs) in data:
|
||||||
|
flattened.append([key, value, attrs.items(multi=True)])
|
||||||
|
return flattened
|
||||||
|
|
||||||
|
def data_out(self, data):
|
||||||
|
vals = []
|
||||||
|
for key, value, attrs in data:
|
||||||
|
vals.append(
|
||||||
|
[
|
||||||
|
key,
|
||||||
|
(value, attrs)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
return vals
|
@ -390,13 +390,12 @@ class ConsoleMaster(flow.FlowMaster):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def spawn_editor(self, data):
|
def spawn_editor(self, data):
|
||||||
fd, name = tempfile.mkstemp('', "mproxy")
|
text = not isinstance(data, bytes)
|
||||||
|
fd, name = tempfile.mkstemp('', "mproxy", text=text)
|
||||||
os.write(fd, data)
|
os.write(fd, data)
|
||||||
os.close(fd)
|
os.close(fd)
|
||||||
c = os.environ.get("EDITOR")
|
|
||||||
# if no EDITOR is set, assume 'vi'
|
# if no EDITOR is set, assume 'vi'
|
||||||
if not c:
|
c = os.environ.get("EDITOR") or "vi"
|
||||||
c = "vi"
|
|
||||||
cmd = shlex.split(c)
|
cmd = shlex.split(c)
|
||||||
cmd.append(name)
|
cmd.append(name)
|
||||||
self.ui.stop()
|
self.ui.stop()
|
||||||
@ -404,10 +403,11 @@ class ConsoleMaster(flow.FlowMaster):
|
|||||||
subprocess.call(cmd)
|
subprocess.call(cmd)
|
||||||
except:
|
except:
|
||||||
signals.status_message.send(
|
signals.status_message.send(
|
||||||
message = "Can't start editor: %s" % " ".join(c)
|
message="Can't start editor: %s" % " ".join(c)
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
data = open(name, "rb").read()
|
with open(name, "r" if text else "rb") as f:
|
||||||
|
data = f.read()
|
||||||
self.ui.start()
|
self.ui.start()
|
||||||
os.unlink(name)
|
os.unlink(name)
|
||||||
return data
|
return data
|
||||||
@ -570,7 +570,7 @@ class ConsoleMaster(flow.FlowMaster):
|
|||||||
self,
|
self,
|
||||||
ge,
|
ge,
|
||||||
None,
|
None,
|
||||||
statusbar.StatusBar(self, grideditor.FOOTER),
|
statusbar.StatusBar(self, grideditor.base.FOOTER),
|
||||||
ge.make_help()
|
ge.make_help()
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
Loading…
Reference in New Issue
Block a user