Merge pull request #2173 from cortesi/coptions

Console options editor
This commit is contained in:
Aldo Cortesi 2017-03-19 12:27:41 +13:00 committed by GitHub
commit 1b330ba453
12 changed files with 563 additions and 336 deletions

View File

@ -26,19 +26,11 @@ APP_PORT = 80
CA_DIR = "~/.mitmproxy" CA_DIR = "~/.mitmproxy"
LISTEN_PORT = 8080 LISTEN_PORT = 8080
# We manually need to specify this, otherwise OpenSSL may select a non-HTTP2 cipher by default. # Some help text style guidelines:
# https://mozilla.github.io/server-side-tls/ssl-config-generator/?server=apache-2.2.15&openssl=1.0.2&hsts=yes&profile=old #
DEFAULT_CLIENT_CIPHERS = ( # - Should be a single paragraph with no linebreaks. Help will be reflowed by
"ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:" # tools.
"ECDHE-ECDSA-AES256-GCM-SHA384:DHE-RSA-AES128-GCM-SHA256:DHE-DSS-AES128-GCM-SHA256:kEDH+AESGCM:" # - Avoid adding information about the data type - we can generate that.
"ECDHE-RSA-AES128-SHA256:ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA:ECDHE-ECDSA-AES128-SHA:"
"ECDHE-RSA-AES256-SHA384:ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA:ECDHE-ECDSA-AES256-SHA:"
"DHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA:DHE-DSS-AES128-SHA256:DHE-RSA-AES256-SHA256:DHE-DSS-AES256-SHA:"
"DHE-RSA-AES256-SHA:ECDHE-RSA-DES-CBC3-SHA:ECDHE-ECDSA-DES-CBC3-SHA:AES128-GCM-SHA256:AES256-GCM-SHA384:"
"AES128-SHA256:AES256-SHA256:AES128-SHA:AES256-SHA:AES:DES-CBC3-SHA:"
"HIGH:!aNULL:!eNULL:!EXPORT:!DES:!RC4:!MD5:!PSK:!aECDH:"
"!EDH-DSS-DES-CBC3-SHA:!EDH-RSA-DES-CBC3-SHA:!KRB5-DES-CBC3-SHA"
)
class Options(optmanager.OptManager): class Options(optmanager.OptManager):
@ -51,8 +43,9 @@ class Options(optmanager.OptManager):
self.add_option( self.add_option(
"onboarding_host", str, APP_HOST, "onboarding_host", str, APP_HOST,
""" """
Domain to serve the onboarding app from. For transparent mode, use Onboarding app domain. For transparent mode, use an IP when a DNS
an IP when a DNS entry for the app domain is not present. """ entry for the app domain is not present.
"""
) )
self.add_option( self.add_option(
"onboarding_port", int, APP_PORT, "onboarding_port", int, APP_PORT,
@ -80,8 +73,9 @@ class Options(optmanager.OptManager):
self.add_option( self.add_option(
"keepserving", bool, False, "keepserving", bool, False,
""" """
Instructs mitmdump to continue serving after client playback, Continue serving after client playback, server playback or file
server playback or file read. This option is ignored by interactive tools, which always keep serving. read. This option is ignored by interactive tools, which always keep
serving.
""" """
) )
self.add_option( self.add_option(
@ -91,8 +85,8 @@ class Options(optmanager.OptManager):
self.add_option( self.add_option(
"server_replay_nopop", bool, False, "server_replay_nopop", bool, False,
""" """
Disable response pop from response flow. This makes it possible to Don't remove flows from server replay state after use. This makes it
replay same response multiple times. possible to replay same response multiple times.
""" """
) )
self.add_option( self.add_option(
@ -174,7 +168,7 @@ class Options(optmanager.OptManager):
"server_replay_ignore_params", Sequence[str], [], "server_replay_ignore_params", Sequence[str], [],
""" """
Request's parameters to be ignored while searching for a saved flow Request's parameters to be ignored while searching for a saved flow
to replay. Can be passed multiple times. to replay.
""" """
) )
self.add_option( self.add_option(
@ -197,11 +191,10 @@ class Options(optmanager.OptManager):
self.add_option( self.add_option(
"proxyauth", Optional[str], None, "proxyauth", Optional[str], None,
""" """
Require authentication before proxying requests. If the value is Require proxy authentication. Value may be "any" to require
"any", we prompt for authentication, but permit any values. If it authenticaiton but accept any credentials, start with "@" to specify
starts with an "@", it is treated as a path to an Apache htpasswd a path to an Apache htpasswd file, or be of the form
file. If its is of the form "username:password", it is treated as a "username:password".
single-user credential.
""" """
) )
self.add_option( self.add_option(
@ -225,17 +218,16 @@ class Options(optmanager.OptManager):
self.add_option( self.add_option(
"certs", Sequence[str], [], "certs", Sequence[str], [],
""" """
SSL certificates. SPEC is of the form "[domain=]path". The SSL certificates of the form "[domain=]path". The domain may include
domain may include a wildcard, and is equal to "*" if not specified. a wildcard, and is equal to "*" if not specified. The file at path
The file at path is a certificate in PEM format. If a private key is is a certificate in PEM format. If a private key is included in the
included in the PEM, it is used, else the default key in the conf PEM, it is used, else the default key in the conf dir is used. The
dir is used. The PEM file should contain the full certificate chain, PEM file should contain the full certificate chain, with the leaf
with the leaf certificate as the first entry. Can be passed multiple certificate as the first entry.
times.
""" """
) )
self.add_option( self.add_option(
"ciphers_client", str, DEFAULT_CLIENT_CIPHERS, "ciphers_client", Optional[str], None,
"Set supported ciphers for client connections using OpenSSL syntax." "Set supported ciphers for client connections using OpenSSL syntax."
) )
self.add_option( self.add_option(

View File

@ -1,9 +1,9 @@
import contextlib import contextlib
import blinker import blinker
import blinker._saferef
import pprint import pprint
import copy import copy
import functools import functools
import weakref
import os import os
import typing import typing
import textwrap import textwrap
@ -36,7 +36,7 @@ class _Option:
self.typespec = typespec self.typespec = typespec
self._default = default self._default = default
self.value = unset self.value = unset
self.help = help self.help = textwrap.dedent(help).strip().replace("\n", " ")
self.choices = choices self.choices = choices
def __repr__(self): def __repr__(self):
@ -61,7 +61,7 @@ class _Option:
self.value = unset self.value = unset
def has_changed(self) -> bool: def has_changed(self) -> bool:
return self.value is not unset return self.current() != self.default
def __eq__(self, other) -> bool: def __eq__(self, other) -> bool:
for i in self.__slots__: for i in self.__slots__:
@ -127,15 +127,24 @@ class OptManager:
Subscribe a callable to the .changed signal, but only for a Subscribe a callable to the .changed signal, but only for a
specified list of options. The callable should accept arguments specified list of options. The callable should accept arguments
(options, updated), and may raise an OptionsError. (options, updated), and may raise an OptionsError.
The event will automatically be unsubscribed if the callable goes out of scope.
""" """
func = weakref.proxy(func) for i in opts:
if i not in self._options:
raise exceptions.OptionsError("No such option: %s" % i)
# We reuse blinker's safe reference functionality to cope with weakrefs
# to bound methods.
func = blinker._saferef.safe_ref(func)
@functools.wraps(func) @functools.wraps(func)
def _call(options, updated): def _call(options, updated):
if updated.intersection(set(opts)): if updated.intersection(set(opts)):
try: f = func()
func(options, updated) if f:
except ReferenceError: f(options, updated)
else:
self.changed.disconnect(_call) self.changed.disconnect(_call)
# Our wrapper function goes out of scope immediately, so we have to set # Our wrapper function goes out of scope immediately, so we have to set
@ -172,7 +181,7 @@ class OptManager:
""" """
for o in self._options.values(): for o in self._options.values():
o.reset() o.reset()
self.changed.send(self._options.keys()) self.changed.send(self, updated=set(self._options.keys()))
def update_known(self, **kwargs): def update_known(self, **kwargs):
""" """
@ -265,44 +274,50 @@ class OptManager:
vals.update(self._setspec(i)) vals.update(self._setspec(i))
self.update(**vals) self.update(**vals)
def _setspec(self, spec): def parse_setval(self, optname: str, optstr: typing.Optional[str]) -> typing.Any:
d = {} """
Convert a string to a value appropriate for the option type.
parts = spec.split("=", maxsplit=1) """
if len(parts) == 1:
optname, optval = parts[0], None
else:
optname, optval = parts[0], parts[1]
if optname not in self._options: if optname not in self._options:
raise exceptions.OptionsError("No such option %s" % optname) raise exceptions.OptionsError("No such option %s" % optname)
o = self._options[optname] o = self._options[optname]
if o.typespec in (str, typing.Optional[str]): if o.typespec in (str, typing.Optional[str]):
d[optname] = optval return optstr
elif o.typespec in (int, typing.Optional[int]): elif o.typespec in (int, typing.Optional[int]):
if optval: if optstr:
try: try:
optval = int(optval) return int(optstr)
except ValueError: except ValueError:
raise exceptions.OptionsError("Not an integer: %s" % optval) raise exceptions.OptionsError("Not an integer: %s" % optstr)
d[optname] = optval elif o.typespec == int:
raise exceptions.OptionsError("Option is required: %s" % optname)
else:
return None
elif o.typespec == bool: elif o.typespec == bool:
if not optval or optval == "true": if not optstr or optstr == "true":
v = True return True
elif optval == "false": elif optstr == "false":
v = False return False
else: else:
raise exceptions.OptionsError( raise exceptions.OptionsError(
"Boolean must be \"true\", \"false\", or have the value " "omitted (a synonym for \"true\")." "Boolean must be \"true\", \"false\", or have the value " "omitted (a synonym for \"true\")."
) )
d[optname] = v
elif o.typespec == typing.Sequence[str]: elif o.typespec == typing.Sequence[str]:
if not optval: if not optstr:
d[optname] = [] return []
else: else:
d[optname] = getattr(self, optname) + [optval] return getattr(self, optname) + [optstr]
else: # pragma: no cover
raise NotImplementedError("Unsupported option type: %s", o.typespec) raise NotImplementedError("Unsupported option type: %s", o.typespec)
def _setspec(self, spec):
d = {}
parts = spec.split("=", maxsplit=1)
if len(parts) == 1:
optname, optval = parts[0], None
else:
optname, optval = parts[0], parts[1]
d[optname] = self.parse_setval(optname, optval)
return d return d
def make_parser(self, parser, optname, metavar=None, short=None): def make_parser(self, parser, optname, metavar=None, short=None):
@ -396,11 +411,7 @@ def dump_defaults(opts):
raise NotImplementedError raise NotImplementedError
txt += " Type %s." % t txt += " Type %s." % t
txt = "\n".join( txt = "\n".join(textwrap.wrap(txt))
textwrap.wrap(
textwrap.dedent(txt)
)
)
s.yaml_set_comment_before_after_key(k, before = "\n" + txt) s.yaml_set_comment_before_after_key(k, before = "\n" + txt)
return ruamel.yaml.round_trip_dump(s) return ruamel.yaml.round_trip_dump(s)

View File

@ -200,6 +200,21 @@ CIPHER_ID_NAME_MAP = {
} }
# We manually need to specify this, otherwise OpenSSL may select a non-HTTP2 cipher by default.
# https://mozilla.github.io/server-side-tls/ssl-config-generator/?server=apache-2.2.15&openssl=1.0.2&hsts=yes&profile=old
DEFAULT_CLIENT_CIPHERS = (
"ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:"
"ECDHE-ECDSA-AES256-GCM-SHA384:DHE-RSA-AES128-GCM-SHA256:DHE-DSS-AES128-GCM-SHA256:kEDH+AESGCM:"
"ECDHE-RSA-AES128-SHA256:ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA:ECDHE-ECDSA-AES128-SHA:"
"ECDHE-RSA-AES256-SHA384:ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA:ECDHE-ECDSA-AES256-SHA:"
"DHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA:DHE-DSS-AES128-SHA256:DHE-RSA-AES256-SHA256:DHE-DSS-AES256-SHA:"
"DHE-RSA-AES256-SHA:ECDHE-RSA-DES-CBC3-SHA:ECDHE-ECDSA-DES-CBC3-SHA:AES128-GCM-SHA256:AES256-GCM-SHA384:"
"AES128-SHA256:AES256-SHA256:AES128-SHA:AES256-SHA:AES:DES-CBC3-SHA:"
"HIGH:!aNULL:!eNULL:!EXPORT:!DES:!RC4:!MD5:!PSK:!aECDH:"
"!EDH-DSS-DES-CBC3-SHA:!EDH-RSA-DES-CBC3-SHA:!KRB5-DES-CBC3-SHA"
)
def is_tls_record_magic(d): def is_tls_record_magic(d):
""" """
Returns: Returns:
@ -475,7 +490,7 @@ class TlsLayer(base.Layer):
cert, key, cert, key,
method=self.config.openssl_method_client, method=self.config.openssl_method_client,
options=self.config.openssl_options_client, options=self.config.openssl_options_client,
cipher_list=self.config.options.ciphers_client, cipher_list=self.config.options.ciphers_client or DEFAULT_CLIENT_CIPHERS,
dhparams=self.config.certstore.dhparams, dhparams=self.config.certstore.dhparams,
chain_file=chain_file, chain_file=chain_file,
alpn_select_callback=self.__alpn_select_callback, alpn_select_callback=self.__alpn_select_callback,

View File

@ -182,12 +182,12 @@ class GridWalker(urwid.ListWalker):
self.edit_row = GridRow( self.edit_row = GridRow(
self.focus_col, True, self.editor, self.lst[self.focus] self.focus_col, True, self.editor, self.lst[self.focus]
) )
self.editor.master.loop.widget.footer.update(FOOTER_EDITING) signals.footer_help.send(self, helptext=FOOTER_EDITING)
self._modified() self._modified()
def stop_edit(self): def stop_edit(self):
if self.edit_row: if self.edit_row:
self.editor.master.loop.widget.footer.update(FOOTER) signals.footer_help.send(self, helptext=FOOTER)
try: try:
val = self.edit_row.edit_col.get_data() val = self.edit_row.edit_col.get_data()
except ValueError: except ValueError:
@ -276,6 +276,8 @@ class GridEditor(urwid.WidgetWrap):
first_width = max(len(r), first_width) first_width = max(len(r), first_width)
self.first_width = min(first_width, FIRST_WIDTH_MAX) self.first_width = min(first_width, FIRST_WIDTH_MAX)
title = None
if self.title:
title = urwid.Text(self.title) title = urwid.Text(self.title)
title = urwid.Padding(title, align="left", width=("relative", 100)) title = urwid.Padding(title, align="left", width=("relative", 100))
title = urwid.AttrWrap(title, "heading") title = urwid.AttrWrap(title, "heading")
@ -297,10 +299,10 @@ class GridEditor(urwid.WidgetWrap):
self.lb = GridListBox(self.walker) self.lb = GridListBox(self.walker)
w = urwid.Frame( w = urwid.Frame(
self.lb, self.lb,
header=urwid.Pile([title, h]) header=urwid.Pile([title, h]) if title else None
) )
super().__init__(w) super().__init__(w)
self.master.loop.widget.footer.update("") signals.footer_help.send(self, helptext="")
self.show_empty_msg() self.show_empty_msg()
def show_empty_msg(self): def show_empty_msg(self):

View File

@ -245,3 +245,20 @@ class SetCookieEditor(base.GridEditor):
] ]
) )
return vals return vals
class OptionsEditor(base.GridEditor):
title = None
columns = [
col_text.Column("")
]
def __init__(self, master, name, vals):
self.name = name
super().__init__(master, [[i] for i in vals], self.callback)
def callback(self, vals):
setattr(self.master.options, self.name, [i[0] for i in vals])
def is_error(self, col, val):
pass

View File

@ -24,6 +24,7 @@ from mitmproxy.tools.console import flowview
from mitmproxy.tools.console import grideditor from mitmproxy.tools.console import grideditor
from mitmproxy.tools.console import help from mitmproxy.tools.console import help
from mitmproxy.tools.console import options from mitmproxy.tools.console import options
from mitmproxy.tools.console import overlay
from mitmproxy.tools.console import palettepicker from mitmproxy.tools.console import palettepicker
from mitmproxy.tools.console import palettes from mitmproxy.tools.console import palettes
from mitmproxy.tools.console import signals from mitmproxy.tools.console import signals
@ -275,7 +276,7 @@ class ConsoleMaster(master.Master):
self.set_palette(self.options, None) self.set_palette(self.options, None)
self.options.subscribe( self.options.subscribe(
self.set_palette, self.set_palette,
["palette", "palette_transparent"] ["console_palette", "console_palette_transparent"]
) )
self.loop = urwid.MainLoop( self.loop = urwid.MainLoop(
urwid.SolidFill("x"), urwid.SolidFill("x"),
@ -285,7 +286,6 @@ class ConsoleMaster(master.Master):
self.ab = statusbar.ActionBar() self.ab = statusbar.ActionBar()
self.loop.set_alarm_in(0.01, self.ticker) self.loop.set_alarm_in(0.01, self.ticker)
self.loop.set_alarm_in( self.loop.set_alarm_in(
0.0001, 0.0001,
lambda *args: self.view_flowlist() lambda *args: self.view_flowlist()
@ -309,6 +309,18 @@ class ConsoleMaster(master.Master):
def shutdown(self): def shutdown(self):
raise urwid.ExitMainLoop raise urwid.ExitMainLoop
def overlay(self, widget, **kwargs):
signals.push_view_state.send(
self,
window = overlay.SimpleOverlay(
self,
widget,
self.loop.widget,
widget.width,
**kwargs
)
)
def view_help(self, helpctx): def view_help(self, helpctx):
signals.push_view_state.send( signals.push_view_state.send(
self, self,

View File

@ -1,27 +1,35 @@
import urwid import urwid
import blinker
import textwrap
import pprint
from typing import Optional, Sequence
from mitmproxy import contentviews from mitmproxy import exceptions
from mitmproxy import optmanager
from mitmproxy.tools.console import common from mitmproxy.tools.console import common
from mitmproxy.tools.console import grideditor
from mitmproxy.tools.console import select
from mitmproxy.tools.console import signals from mitmproxy.tools.console import signals
from mitmproxy.tools.console import overlay
HELP_HEIGHT = 5
def can_edit_inplace(opt):
if opt.choices:
return False
if opt.typespec in [str, int, Optional[str], Optional[int]]:
return True
from mitmproxy.addons import replace
from mitmproxy.addons import setheaders
footer = [ footer = [
('heading_key', "enter/space"), ":toggle ", ('heading_key', "enter"), ":edit ",
('heading_key', "C"), ":clear all ", ('heading_key', "?"), ":help ",
('heading_key', "W"), ":save ",
] ]
def _mkhelp(): def _mkhelp():
text = [] text = []
keys = [ keys = [
("enter/space", "activate option"), ("enter", "edit option"),
("C", "clear all options"), ("D", "reset all to defaults"),
("w", "save options"), ("w", "save options"),
] ]
text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
@ -31,251 +39,248 @@ def _mkhelp():
help_context = _mkhelp() help_context = _mkhelp()
def checker(opt, options): def fcol(s, width, attr):
def _check(): s = str(s)
return options.has_changed(opt) return (
return _check "fixed",
width,
urwid.Text((attr, s))
)
class Options(urwid.WidgetWrap): option_focus_change = blinker.Signal()
def __init__(self, master):
self.master = master class OptionItem(urwid.WidgetWrap):
self.lb = select.Select( def __init__(self, walker, opt, focused, namewidth, editing):
self.walker, self.opt, self.focused = walker, opt, focused
self.namewidth = namewidth
self.editing = editing
super().__init__(None)
self._w = self.get_widget()
def get_widget(self):
val = self.opt.current()
if self.opt.typespec == bool:
displayval = "true" if val else "false"
elif not val:
displayval = ""
elif self.opt.typespec == Sequence[str]:
displayval = pprint.pformat(val, indent=1)
else:
displayval = str(val)
changed = self.walker.master.options.has_changed(self.opt.name)
if self.focused:
valstyle = "option_active_selected" if changed else "option_selected"
else:
valstyle = "option_active" if changed else "text"
if self.editing:
valw = urwid.Edit(edit_text=displayval)
else:
valw = urwid.AttrMap(
urwid.Padding(
urwid.Text([(valstyle, displayval)])
),
valstyle
)
return urwid.Columns(
[ [
select.Heading("Traffic Manipulation"), (
select.Option( self.namewidth,
"Header Set Patterns", urwid.Text([("title", self.opt.name.ljust(self.namewidth))])
"H",
checker("setheaders", master.options),
self.setheaders
), ),
select.Option( valw
"Ignore Patterns", ],
"I", dividechars=2,
checker("ignore_hosts", master.options), focus_column=1
self.ignore_hosts
),
select.Option(
"Replacement Patterns",
"R",
checker("replacements", master.options),
self.replacepatterns
),
select.Option(
"Scripts",
"S",
checker("scripts", master.options),
self.scripts
),
select.Heading("Interface"),
select.Option(
"Default Display Mode",
"M",
checker("default_contentview", master.options),
self.default_displaymode
),
select.Option(
"Palette",
"P",
checker("console_palette", master.options),
self.palette
),
select.Option(
"Show Host",
"w",
checker("showhost", master.options),
master.options.toggler("showhost")
),
select.Heading("Network"),
select.Option(
"Upstream Certs",
"U",
checker("upstream_cert", master.options),
master.options.toggler("upstream_cert")
),
select.Option(
"TCP Proxying",
"T",
checker("tcp_hosts", master.options),
self.tcp_hosts
),
select.Option(
"Don't Verify SSL/TLS Certificates",
"V",
checker("ssl_insecure", master.options),
master.options.toggler("ssl_insecure")
),
select.Heading("Utility"),
select.Option(
"Anti-Cache",
"a",
checker("anticache", master.options),
master.options.toggler("anticache")
),
select.Option(
"Anti-Compression",
"o",
checker("anticomp", master.options),
master.options.toggler("anticomp")
),
select.Option(
"Kill Extra",
"x",
checker("replay_kill_extra", master.options),
master.options.toggler("replay_kill_extra")
),
select.Option(
"No Refresh",
"f",
checker("refresh_server_playback", master.options),
master.options.toggler("refresh_server_playback")
),
select.Option(
"Sticky Auth",
"A",
checker("stickyauth", master.options),
self.sticky_auth
),
select.Option(
"Sticky Cookies",
"t",
checker("stickycookie", master.options),
self.sticky_cookie
),
]
) )
title = urwid.Text("Options")
title = urwid.Padding(title, align="left", width=("relative", 100))
title = urwid.AttrWrap(title, "heading")
w = urwid.Frame(
self.lb,
header = title
)
super().__init__(w)
self.master.loop.widget.footer.update("") def get_edit_text(self):
signals.update_settings.connect(self.sig_update_settings) return self._w[1].get_edit_text()
master.options.changed.connect(self.sig_update_settings)
def sig_update_settings(self, sender, updated=None): def selectable(self):
self.lb.walker._modified() return True
def keypress(self, size, key): def keypress(self, size, key):
if key == "C": if self.editing:
self.clearall() self._w[1].keypress(size, key)
return None return key
if key == "W":
self.save()
return None class OptionListWalker(urwid.ListWalker):
def __init__(self, master):
self.master = master
self.index = 0
self.focusobj = None
self.opts = sorted(master.options.keys())
self.maxlen = max(len(i) for i in self.opts)
self.editing = False
self.set_focus(0)
self.master.options.changed.connect(self.sig_mod)
def sig_mod(self, *args, **kwargs):
self._modified()
self.set_focus(self.index)
def start_editing(self):
self.editing = True
self.focus_obj = self._get(self.index, True)
self._modified()
def stop_editing(self):
self.editing = False
self.focus_obj = self._get(self.index, False)
self._modified()
def get_edit_text(self):
return self.focus_obj.get_edit_text()
def _get(self, pos, editing):
name = self.opts[pos]
opt = self.master.options._options[name]
return OptionItem(
self, opt, pos == self.index, self.maxlen, editing
)
def get_focus(self):
return self.focus_obj, self.index
def set_focus(self, index):
self.editing = False
name = self.opts[index]
opt = self.master.options._options[name]
self.index = index
self.focus_obj = self._get(self.index, self.editing)
option_focus_change.send(opt.help)
def get_next(self, pos):
if pos >= len(self.opts) - 1:
return None, None
pos = pos + 1
return self._get(pos, False), pos
def get_prev(self, pos):
pos = pos - 1
if pos < 0:
return None, None
return self._get(pos, False), pos
class OptionsList(urwid.ListBox):
def __init__(self, master):
self.master = master
self.walker = OptionListWalker(master)
super().__init__(self.walker)
def keypress(self, size, key):
if self.walker.editing:
if key == "enter":
foc, idx = self.get_focus()
v = self.walker.get_edit_text()
try:
d = self.master.options.parse_setval(foc.opt.name, v)
except exceptions.OptionsError as v:
signals.status_message.send(message=str(v))
else:
self.master.options.update(**{foc.opt.name: d})
self.walker.stop_editing()
elif key == "esc":
self.walker.stop_editing()
else:
if key == "g":
self.set_focus(0)
self.walker._modified()
elif key == "G":
self.set_focus(len(self.walker.opts) - 1)
self.walker._modified()
elif key == "enter":
foc, idx = self.get_focus()
if foc.opt.typespec == bool:
self.master.options.toggler(foc.opt.name)()
# Bust the focus widget cache
self.set_focus(self.walker.index)
elif can_edit_inplace(foc.opt):
self.walker.start_editing()
self.walker._modified()
elif foc.opt.choices:
self.master.overlay(
overlay.Chooser(
foc.opt.name,
foc.opt.choices,
foc.opt.current(),
self.master.options.setter(foc.opt.name)
)
)
elif foc.opt.typespec == Sequence[str]:
self.master.overlay(
overlay.OptionsOverlay(
self.master,
foc.opt.name,
foc.opt.current(),
HELP_HEIGHT + 5
),
valign="top"
)
else:
raise NotImplementedError()
return super().keypress(size, key) return super().keypress(size, key)
def do_save(self, path):
optmanager.save(self.master.options, path)
return "Saved"
def save(self): class OptionHelp(urwid.Frame):
signals.status_prompt_path.send( def __init__(self, master):
prompt = "Save options to file", self.master = master
callback = self.do_save super().__init__(self.widget(""))
self.set_active(False)
option_focus_change.connect(self.sig_mod)
def set_active(self, val):
h = urwid.Text("Option Help")
style = "heading" if val else "heading_inactive"
self.header = urwid.AttrWrap(h, style)
def widget(self, txt):
cols, _ = self.master.ui.get_cols_rows()
return urwid.ListBox(
[urwid.Text(i) for i in textwrap.wrap(txt, cols)]
) )
def clearall(self): def sig_mod(self, txt):
self.set_body(self.widget(txt))
class Options(urwid.Pile):
def __init__(self, master):
oh = OptionHelp(master)
super().__init__(
[
OptionsList(master),
(HELP_HEIGHT, oh),
]
)
self.master = master
def keypress(self, size, key):
key = common.shortcuts(key)
if key == "tab":
self.focus_position = (
self.focus_position + 1
) % len(self.widget_list)
self.widget_list[1].set_active(self.focus_position == 1)
key = None
elif key == "D":
self.master.options.reset() self.master.options.reset()
signals.update_settings.send(self) key = None
signals.status_message.send(
message = "Options cleared",
expire = 1
)
def setheaders(self): # This is essentially a copypasta from urwid.Pile's keypress handler.
data = [] # So much for "closed for modification, but open for extension".
for d in self.master.options.setheaders: item_rows = None
if isinstance(d, str): if len(size) == 2:
data.append(setheaders.parse_setheader(d)) item_rows = self.get_item_rows(size, focus = True)
else: i = self.widget_list.index(self.focus_item)
data.append(d) tsize = self.get_item_size(size, i, True, item_rows)
self.master.view_grideditor( return self.focus_item.keypress(tsize, key)
grideditor.SetHeadersEditor(
self.master,
data,
self.master.options.setter("setheaders")
)
)
def tcp_hosts(self):
self.master.view_grideditor(
grideditor.HostPatternEditor(
self.master,
self.master.options.tcp_hosts,
self.master.options.setter("tcp_hosts")
)
)
def ignore_hosts(self):
self.master.view_grideditor(
grideditor.HostPatternEditor(
self.master,
self.master.options.ignore_hosts,
self.master.options.setter("ignore_hosts")
)
)
def replacepatterns(self):
data = []
for d in self.master.options.replacements:
if isinstance(d, str):
data.append(replace.parse_hook(d))
else:
data.append(d)
self.master.view_grideditor(
grideditor.ReplaceEditor(
self.master,
data,
self.master.options.setter("replacements")
)
)
def scripts(self):
def edit_scripts(scripts):
self.master.options.scripts = [x[0] for x in scripts]
self.master.view_grideditor(
grideditor.ScriptEditor(
self.master,
[[i] for i in self.master.options.scripts],
edit_scripts
)
)
def default_displaymode(self):
signals.status_prompt_onekey.send(
prompt = "Global default display mode",
keys = contentviews.view_prompts,
callback = self.change_default_display_mode
)
def change_default_display_mode(self, t):
v = contentviews.get_by_shortcut(t)
self.master.options.default_contentview = v.name
if self.master.view.focus.flow:
signals.flow_change.send(self, flow = self.master.view.focus.flow)
def sticky_auth(self):
signals.status_prompt.send(
prompt = "Sticky auth filter",
text = self.master.options.stickyauth,
callback = self.master.options.setter("stickyauth")
)
def sticky_cookie(self):
signals.status_prompt.send(
prompt = "Sticky cookie filter",
text = self.master.options.stickycookie,
callback = self.master.options.setter("stickycookie")
)
def palette(self):
self.master.view_palette_picker()

View File

@ -0,0 +1,141 @@
import math
import urwid
from mitmproxy.tools.console import common
from mitmproxy.tools.console import signals
from mitmproxy.tools.console import grideditor
class SimpleOverlay(urwid.Overlay):
def __init__(self, master, widget, parent, width, valign="middle"):
self.widget = widget
self.master = master
super().__init__(
widget,
parent,
align="center",
width=width,
valign=valign,
height="pack"
)
def keypress(self, size, key):
key = super().keypress(size, key)
if key == "esc":
signals.pop_view_state.send(self)
if key == "?":
self.master.view_help(self.widget.make_help())
else:
return key
class Choice(urwid.WidgetWrap):
def __init__(self, txt, focus, current):
if current:
s = "option_active_selected" if focus else "option_active"
else:
s = "option_selected" if focus else "text"
return super().__init__(
urwid.AttrWrap(
urwid.Padding(urwid.Text(txt)),
s,
)
)
def selectable(self):
return True
def keypress(self, size, key):
return key
class ChooserListWalker(urwid.ListWalker):
def __init__(self, choices, current):
self.index = 0
self.choices = choices
self.current = current
def _get(self, idx, focus):
c = self.choices[idx]
return Choice(c, focus, c == self.current)
def set_focus(self, index):
self.index = index
def get_focus(self):
return self._get(self.index, True), self.index
def get_next(self, pos):
if pos >= len(self.choices) - 1:
return None, None
pos = pos + 1
return self._get(pos, False), pos
def get_prev(self, pos):
pos = pos - 1
if pos < 0:
return None, None
return self._get(pos, False), pos
class Chooser(urwid.WidgetWrap):
def __init__(self, title, choices, current, callback):
self.choices = choices
self.callback = callback
choicewidth = max([len(i) for i in choices])
self.width = max(choicewidth, len(title) + 5)
self.walker = ChooserListWalker(choices, current)
super().__init__(
urwid.AttrWrap(
urwid.LineBox(
urwid.BoxAdapter(
urwid.ListBox(self.walker),
len(choices)
),
title= title
),
"background"
)
)
def selectable(self):
return True
def keypress(self, size, key):
key = common.shortcuts(key)
if key == "enter":
self.callback(self.choices[self.walker.index])
signals.pop_view_state.send(self)
return super().keypress(size, key)
def make_help(self):
text = []
keys = [
("enter", "choose option"),
("esc", "exit chooser"),
]
text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
return text
class OptionsOverlay(urwid.WidgetWrap):
def __init__(self, master, name, vals, vspace):
"""
vspace: how much vertical space to keep clear
"""
cols, rows = master.ui.get_cols_rows()
self.ge = grideditor.OptionsEditor(master, name, vals)
super().__init__(
urwid.AttrWrap(
urwid.LineBox(
urwid.BoxAdapter(self.ge, rows - vspace),
title=name
),
"background"
)
)
self.width = math.ceil(cols * 0.8)
def make_help(self):
return self.ge.make_help()

View File

@ -30,6 +30,9 @@ call_in = blinker.Signal()
# Focus the body, footer or header of the main window # Focus the body, footer or header of the main window
focus = blinker.Signal() focus = blinker.Signal()
# Set the mini help text in the footer of the main window
footer_help = blinker.Signal()
# Fired when settings change # Fired when settings change
update_settings = blinker.Signal() update_settings = blinker.Signal()

View File

@ -5,7 +5,6 @@ import urwid
from mitmproxy.tools.console import common from mitmproxy.tools.console import common
from mitmproxy.tools.console import pathedit from mitmproxy.tools.console import pathedit
from mitmproxy.tools.console import signals from mitmproxy.tools.console import signals
from mitmproxy.utils import human
class PromptPath: class PromptPath:
@ -143,10 +142,15 @@ class StatusBar(urwid.WidgetWrap):
super().__init__(urwid.Pile([self.ib, self.master.ab])) super().__init__(urwid.Pile([self.ib, self.master.ab]))
signals.update_settings.connect(self.sig_update) signals.update_settings.connect(self.sig_update)
signals.flowlist_change.connect(self.sig_update) signals.flowlist_change.connect(self.sig_update)
signals.footer_help.connect(self.sig_footer_help)
master.options.changed.connect(self.sig_update) master.options.changed.connect(self.sig_update)
master.view.focus.sig_change.connect(self.sig_update) master.view.focus.sig_change.connect(self.sig_update)
self.redraw() self.redraw()
def sig_footer_help(self, sender, helptext):
self.helptext = helptext
self.redraw()
def sig_update(self, sender, updated=None): def sig_update(self, sender, updated=None):
self.redraw() self.redraw()
@ -224,11 +228,7 @@ class StatusBar(urwid.WidgetWrap):
if self.master.options.console_focus_follow: if self.master.options.console_focus_follow:
opts.append("following") opts.append("following")
if self.master.options.stream_large_bodies: if self.master.options.stream_large_bodies:
opts.append( opts.append(self.master.options.stream_large_bodies)
"stream:%s" % human.pretty_size(
self.master.options.stream_large_bodies
)
)
if opts: if opts:
r.append("[%s]" % (":".join(opts))) r.append("[%s]" % (":".join(opts)))
@ -285,10 +285,5 @@ class StatusBar(urwid.WidgetWrap):
]), "heading") ]), "heading")
self.ib._w = status self.ib._w = status
def update(self, text):
self.helptext = text
self.redraw()
self.master.loop.draw_screen()
def selectable(self): def selectable(self):
return True return True

View File

@ -1,7 +1,7 @@
import typing import typing
def check_type(name: str, value: typing.Any, typeinfo: type) -> None: def check_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None:
""" """
This function checks if the provided value is an instance of typeinfo This function checks if the provided value is an instance of typeinfo
and raises a TypeError otherwise. and raises a TypeError otherwise.

View File

@ -14,6 +14,7 @@ class TO(optmanager.OptManager):
self.add_option("one", typing.Optional[int], None, "help") self.add_option("one", typing.Optional[int], None, "help")
self.add_option("two", typing.Optional[int], 2, "help") self.add_option("two", typing.Optional[int], 2, "help")
self.add_option("bool", bool, False, "help") self.add_option("bool", bool, False, "help")
self.add_option("required_int", int, 2, "help")
class TD(optmanager.OptManager): class TD(optmanager.OptManager):
@ -72,9 +73,15 @@ def test_defaults():
assert not o.has_changed(k) assert not o.has_changed(k)
def test_required_int():
o = TO()
with pytest.raises(exceptions.OptionsError):
o.parse_setval("required_int", None)
def test_options(): def test_options():
o = TO() o = TO()
assert o.keys() == {"bool", "one", "two"} assert o.keys() == {"bool", "one", "two", "required_int"}
assert o.one is None assert o.one is None
assert o.two == 2 assert o.two == 2
@ -140,6 +147,18 @@ class Rec():
def test_subscribe(): def test_subscribe():
o = TO() o = TO()
r = Rec() r = Rec()
# pytest.raises keeps a reference here that interferes with the cleanup test
# further down.
try:
o.subscribe(r, ["unknown"])
except exceptions.OptionsError:
pass
else:
raise AssertionError
assert len(o.changed.receivers) == 0
o.subscribe(r, ["two"]) o.subscribe(r, ["two"])
o.one = 2 o.one = 2
assert not r.called assert not r.called
@ -151,6 +170,21 @@ def test_subscribe():
o.two = 4 o.two = 4
assert len(o.changed.receivers) == 0 assert len(o.changed.receivers) == 0
class binder:
def __init__(self):
self.o = TO()
self.called = False
self.o.subscribe(self.bound, ["two"])
def bound(self, *args, **kwargs):
self.called = True
t = binder()
t.o.one = 3
assert not t.called
t.o.two = 3
assert t.called
def test_rollback(): def test_rollback():
o = TO() o = TO()
@ -270,14 +304,14 @@ def test_merge():
def test_option(): def test_option():
o = optmanager._Option("test", int, 1, None, None) o = optmanager._Option("test", int, 1, "help", None)
assert o.current() == 1 assert o.current() == 1
with pytest.raises(TypeError): with pytest.raises(TypeError):
o.set("foo") o.set("foo")
with pytest.raises(TypeError): with pytest.raises(TypeError):
optmanager._Option("test", str, 1, None, None) optmanager._Option("test", str, 1, "help", None)
o2 = optmanager._Option("test", int, 1, None, None) o2 = optmanager._Option("test", int, 1, "help", None)
assert o2 == o assert o2 == o
o2.set(5) o2.set(5)
assert o2 != o assert o2 != o