diff --git a/CHANGELOG.md b/CHANGELOG.md index b111648e9..3369b1015 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ * Speculative fix for some rare HTTP/2 connection stalls (#5158, @EndUser509) * Add ability to specify custom ports with LDAP authentication (#5068, @demonoidvk) * Console Improvements on Windows (@mhils) +* Fix processing of `--set` options (#5067, @marwinxxii) ## 28 September 2021: mitmproxy 7.0.4 diff --git a/mitmproxy/addons/core.py b/mitmproxy/addons/core.py index d3e553dbc..97284c4cf 100644 --- a/mitmproxy/addons/core.py +++ b/mitmproxy/addons/core.py @@ -50,18 +50,19 @@ class Core: ) @command.command("set") - def set(self, option: str, value: str = "") -> None: + def set(self, option: str, *value: str) -> None: """ Set an option. When the value is omitted, booleans are set to true, strings and integers are set to None (if permitted), and sequences are emptied. Boolean values can be true, false or toggle. Multiple values are concatenated with a single space. - Sequences are set using multiple invocations to set for - the same option. """ - strspec = f"{option}={value}" + if value: + specs = [f"{option}={v}" for v in value] + else: + specs = [option] try: - ctx.options.set(strspec) + ctx.options.set(*specs) except exceptions.OptionsError as e: raise exceptions.CommandError(e) from e diff --git a/mitmproxy/optmanager.py b/mitmproxy/optmanager.py index 00374c4ea..367a78128 100644 --- a/mitmproxy/optmanager.py +++ b/mitmproxy/optmanager.py @@ -1,14 +1,14 @@ -import collections import contextlib -import blinker -import blinker._saferef -import pprint import copy +from dataclasses import dataclass import functools import os -import typing +import pprint import textwrap +import typing +import blinker +import blinker._saferef import ruamel.yaml from mitmproxy import exceptions @@ -79,6 +79,11 @@ class _Option: return o +@dataclass +class _UnconvertedStrings: + val: typing.List[str] + + class OptManager: """ OptManager is the base class from which Options objects are derived. @@ -92,7 +97,7 @@ class OptManager: mutation doesn't change the option state inadvertently. """ def __init__(self): - self.deferred: typing.Dict[str, typing.List[str]] = {} + self.deferred: typing.Dict[str, typing.Any] = {} self.changed = blinker.Signal() self.errored = blinker.Signal() # Options must be the last attribute here - after that, we raise an @@ -288,60 +293,84 @@ class OptManager: options=options ) - def set(self, *spec, defer=False): + def set(self, *specs: str, defer: bool = False) -> None: """ - Takes a list of set specification in standard form (option=value). - Options that are known are updated immediately. If defer is true, - options that are not known are deferred, and will be set once they - are added. - """ - vals = {} - unknown: typing.Dict[str, typing.List[str]] = collections.defaultdict(list) - for i in spec: - parts = i.split("=", maxsplit=1) - if len(parts) == 1: - optname, optval = parts[0], None - else: - optname, optval = parts[0], parts[1] - if optname in self._options: - vals[optname] = self.parse_setval(self._options[optname], optval, vals.get(optname)) - else: - unknown[optname].append(optval) - if defer: - self.deferred.update(unknown) - elif unknown: - raise exceptions.OptionsError("Unknown options: %s" % ", ".join(unknown.keys())) - self.update(**vals) + Takes a list of set specification in standard form (option=value). + Options that are known are updated immediately. If defer is true, + options that are not known are deferred, and will be set once they + are added. - def process_deferred(self): + May raise an `OptionsError` if a value is malformed or an option is unknown and defer is False. + """ + # First, group specs by option name. + unprocessed: typing.Dict[str, typing.List[str]] = {} + for spec in specs: + if "=" in spec: + name, value = spec.split("=", maxsplit=1) + unprocessed.setdefault(name, []).append(value) + else: + unprocessed.setdefault(spec, []) + + # Second, convert values to the correct type. + processed: typing.Dict[str, typing.Any] = {} + for name in list(unprocessed.keys()): + if name in self._options: + processed[name] = self._parse_setval(self._options[name], unprocessed.pop(name)) + + # Third, stash away unrecognized options or complain about them. + if defer: + self.deferred.update({ + k: _UnconvertedStrings(v) + for k, v in unprocessed.items() + }) + elif unprocessed: + raise exceptions.OptionsError(f"Unknown option(s): {', '.join(unprocessed)}") + + # Finally, apply updated options. + self.update(**processed) + + def process_deferred(self) -> None: """ Processes options that were deferred in previous calls to set, and have since been added. """ - update = {} - for optname, optvals in self.deferred.items(): + update: typing.Dict[str, typing.Any] = {} + for optname, value in self.deferred.items(): if optname in self._options: - for optval in optvals: - optval = self.parse_setval(self._options[optname], optval, update.get(optname)) - update[optname] = optval + if isinstance(value, _UnconvertedStrings): + value = self._parse_setval(self._options[optname], value.val) + update[optname] = value self.update(**update) for k in update.keys(): del self.deferred[k] - def parse_setval(self, o: _Option, optstr: typing.Optional[str], currentvalue: typing.Any) -> typing.Any: + def _parse_setval(self, o: _Option, values: typing.List[str]) -> typing.Any: """ Convert a string to a value appropriate for the option type. """ + if o.typespec == typing.Sequence[str]: + return values + if len(values) > 1: + raise exceptions.OptionsError(f"Received multiple values for {o.name}: {values}") + + optstr: typing.Optional[str] + if values: + optstr = values[0] + else: + optstr = None + if o.typespec in (str, typing.Optional[str]): + if o.typespec == str and optstr is None: + raise exceptions.OptionsError(f"Option is required: {o.name}") return optstr elif o.typespec in (int, typing.Optional[int]): if optstr: try: return int(optstr) except ValueError: - raise exceptions.OptionsError("Not an integer: %s" % optstr) + raise exceptions.OptionsError(f"Not an integer: {optstr}") elif o.typespec == int: - raise exceptions.OptionsError("Option is required: %s" % o.name) + raise exceptions.OptionsError(f"Option is required: {o.name}") else: return None elif o.typespec == bool: @@ -353,17 +382,9 @@ class OptManager: return False else: raise exceptions.OptionsError( - "Boolean must be \"true\", \"false\", or have the value " "omitted (a synonym for \"true\")." + 'Boolean must be "true", "false", or have the value omitted (a synonym for "true").' ) - elif o.typespec == typing.Sequence[str]: - if not optstr: - return [] - else: - if currentvalue: - return currentvalue + [optstr] - else: - return [optstr] - raise NotImplementedError("Unsupported option type: %s", o.typespec) + raise NotImplementedError(f"Unsupported option type: {o.typespec}") def make_parser(self, parser, optname, metavar=None, short=None): """ diff --git a/mitmproxy/tools/console/options.py b/mitmproxy/tools/console/options.py index 0f7ff0bff..f16a0b0e8 100644 --- a/mitmproxy/tools/console/options.py +++ b/mitmproxy/tools/console/options.py @@ -181,9 +181,7 @@ class OptionsList(urwid.ListBox): foc, idx = self.get_focus() v = self.walker.get_edit_text() try: - current = getattr(self.master.options, foc.opt.name) - d = self.master.options.parse_setval(foc.opt, v, current) - self.master.options.update(**{foc.opt.name: d}) + self.master.options.set(f"{foc.opt.name}={v}") except exceptions.OptionsError as v: signals.status_message.send(message=str(v)) self.walker.stop_editing() diff --git a/test/mitmproxy/test_optmanager.py b/test/mitmproxy/test_optmanager.py index d0f05678f..52b06ff14 100644 --- a/test/mitmproxy/test_optmanager.py +++ b/test/mitmproxy/test_optmanager.py @@ -71,7 +71,7 @@ def test_defaults(): def test_required_int(): o = TO() with pytest.raises(exceptions.OptionsError): - o.parse_setval(o._options["required_int"], None, None) + o._parse_setval(o._options["required_int"], []) def test_deepcopy(): @@ -402,13 +402,15 @@ def test_set(): opts.set("str=foo") assert opts.str == "foo" - with pytest.raises(TypeError): + with pytest.raises(exceptions.OptionsError): opts.set("str") opts.set("optstr=foo") assert opts.optstr == "foo" opts.set("optstr") assert opts.optstr is None + with pytest.raises(exceptions.OptionsError, match="Received multiple values"): + opts.set("optstr=foo", "optstr=bar") opts.set("bool=false") assert opts.bool is False diff --git a/test/mitmproxy/tools/console/test_commander.py b/test/mitmproxy/tools/console/test_commander.py index a297fcf7a..fd917f8c9 100644 --- a/test/mitmproxy/tools/console/test_commander.py +++ b/test/mitmproxy/tools/console/test_commander.py @@ -355,7 +355,7 @@ class TestCommandBuffer: ('commander_command', 'set'), ('text', ' '), ('commander_hint', 'option '), - ('commander_hint', 'value '), + ('commander_hint', '*value '), ] def test_render(self): @@ -380,5 +380,5 @@ class TestCommandBuffer: ('commander_command', 'set'), ('text', ' '), ('commander_hint', 'option '), - ('commander_hint', 'value '), + ('commander_hint', '*value '), ]