mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-25 09:37:37 +00:00
Refactor how we process --set
options (#5067)
* refactor how we process `--set` options * minor improvements based on @marwinxxii's review * fix nits * update changelog
This commit is contained in:
parent
8e1adbc5df
commit
6f0587734e
@ -33,6 +33,7 @@
|
||||
* Speculative fix for some rare HTTP/2 connection stalls (#5158, @EndUser509)
|
||||
* Add ability to specify custom ports with LDAP authentication (#5068, @demonoidvk)
|
||||
* Console Improvements on Windows (@mhils)
|
||||
* Fix processing of `--set` options (#5067, @marwinxxii)
|
||||
|
||||
## 28 September 2021: mitmproxy 7.0.4
|
||||
|
||||
|
@ -50,18 +50,19 @@ class Core:
|
||||
)
|
||||
|
||||
@command.command("set")
|
||||
def set(self, option: str, value: str = "") -> None:
|
||||
def set(self, option: str, *value: str) -> None:
|
||||
"""
|
||||
Set an option. When the value is omitted, booleans are set to true,
|
||||
strings and integers are set to None (if permitted), and sequences
|
||||
are emptied. Boolean values can be true, false or toggle.
|
||||
Multiple values are concatenated with a single space.
|
||||
Sequences are set using multiple invocations to set for
|
||||
the same option.
|
||||
"""
|
||||
strspec = f"{option}={value}"
|
||||
if value:
|
||||
specs = [f"{option}={v}" for v in value]
|
||||
else:
|
||||
specs = [option]
|
||||
try:
|
||||
ctx.options.set(strspec)
|
||||
ctx.options.set(*specs)
|
||||
except exceptions.OptionsError as e:
|
||||
raise exceptions.CommandError(e) from e
|
||||
|
||||
|
@ -1,14 +1,14 @@
|
||||
import collections
|
||||
import contextlib
|
||||
import blinker
|
||||
import blinker._saferef
|
||||
import pprint
|
||||
import copy
|
||||
from dataclasses import dataclass
|
||||
import functools
|
||||
import os
|
||||
import typing
|
||||
import pprint
|
||||
import textwrap
|
||||
import typing
|
||||
|
||||
import blinker
|
||||
import blinker._saferef
|
||||
import ruamel.yaml
|
||||
|
||||
from mitmproxy import exceptions
|
||||
@ -79,6 +79,11 @@ class _Option:
|
||||
return o
|
||||
|
||||
|
||||
@dataclass
|
||||
class _UnconvertedStrings:
|
||||
val: typing.List[str]
|
||||
|
||||
|
||||
class OptManager:
|
||||
"""
|
||||
OptManager is the base class from which Options objects are derived.
|
||||
@ -92,7 +97,7 @@ class OptManager:
|
||||
mutation doesn't change the option state inadvertently.
|
||||
"""
|
||||
def __init__(self):
|
||||
self.deferred: typing.Dict[str, typing.List[str]] = {}
|
||||
self.deferred: typing.Dict[str, typing.Any] = {}
|
||||
self.changed = blinker.Signal()
|
||||
self.errored = blinker.Signal()
|
||||
# Options must be the last attribute here - after that, we raise an
|
||||
@ -288,60 +293,84 @@ class OptManager:
|
||||
options=options
|
||||
)
|
||||
|
||||
def set(self, *spec, defer=False):
|
||||
def set(self, *specs: str, defer: bool = False) -> None:
|
||||
"""
|
||||
Takes a list of set specification in standard form (option=value).
|
||||
Options that are known are updated immediately. If defer is true,
|
||||
options that are not known are deferred, and will be set once they
|
||||
are added.
|
||||
"""
|
||||
vals = {}
|
||||
unknown: typing.Dict[str, typing.List[str]] = collections.defaultdict(list)
|
||||
for i in spec:
|
||||
parts = i.split("=", maxsplit=1)
|
||||
if len(parts) == 1:
|
||||
optname, optval = parts[0], None
|
||||
else:
|
||||
optname, optval = parts[0], parts[1]
|
||||
if optname in self._options:
|
||||
vals[optname] = self.parse_setval(self._options[optname], optval, vals.get(optname))
|
||||
else:
|
||||
unknown[optname].append(optval)
|
||||
if defer:
|
||||
self.deferred.update(unknown)
|
||||
elif unknown:
|
||||
raise exceptions.OptionsError("Unknown options: %s" % ", ".join(unknown.keys()))
|
||||
self.update(**vals)
|
||||
|
||||
def process_deferred(self):
|
||||
May raise an `OptionsError` if a value is malformed or an option is unknown and defer is False.
|
||||
"""
|
||||
# First, group specs by option name.
|
||||
unprocessed: typing.Dict[str, typing.List[str]] = {}
|
||||
for spec in specs:
|
||||
if "=" in spec:
|
||||
name, value = spec.split("=", maxsplit=1)
|
||||
unprocessed.setdefault(name, []).append(value)
|
||||
else:
|
||||
unprocessed.setdefault(spec, [])
|
||||
|
||||
# Second, convert values to the correct type.
|
||||
processed: typing.Dict[str, typing.Any] = {}
|
||||
for name in list(unprocessed.keys()):
|
||||
if name in self._options:
|
||||
processed[name] = self._parse_setval(self._options[name], unprocessed.pop(name))
|
||||
|
||||
# Third, stash away unrecognized options or complain about them.
|
||||
if defer:
|
||||
self.deferred.update({
|
||||
k: _UnconvertedStrings(v)
|
||||
for k, v in unprocessed.items()
|
||||
})
|
||||
elif unprocessed:
|
||||
raise exceptions.OptionsError(f"Unknown option(s): {', '.join(unprocessed)}")
|
||||
|
||||
# Finally, apply updated options.
|
||||
self.update(**processed)
|
||||
|
||||
def process_deferred(self) -> None:
|
||||
"""
|
||||
Processes options that were deferred in previous calls to set, and
|
||||
have since been added.
|
||||
"""
|
||||
update = {}
|
||||
for optname, optvals in self.deferred.items():
|
||||
update: typing.Dict[str, typing.Any] = {}
|
||||
for optname, value in self.deferred.items():
|
||||
if optname in self._options:
|
||||
for optval in optvals:
|
||||
optval = self.parse_setval(self._options[optname], optval, update.get(optname))
|
||||
update[optname] = optval
|
||||
if isinstance(value, _UnconvertedStrings):
|
||||
value = self._parse_setval(self._options[optname], value.val)
|
||||
update[optname] = value
|
||||
self.update(**update)
|
||||
for k in update.keys():
|
||||
del self.deferred[k]
|
||||
|
||||
def parse_setval(self, o: _Option, optstr: typing.Optional[str], currentvalue: typing.Any) -> typing.Any:
|
||||
def _parse_setval(self, o: _Option, values: typing.List[str]) -> typing.Any:
|
||||
"""
|
||||
Convert a string to a value appropriate for the option type.
|
||||
"""
|
||||
if o.typespec == typing.Sequence[str]:
|
||||
return values
|
||||
if len(values) > 1:
|
||||
raise exceptions.OptionsError(f"Received multiple values for {o.name}: {values}")
|
||||
|
||||
optstr: typing.Optional[str]
|
||||
if values:
|
||||
optstr = values[0]
|
||||
else:
|
||||
optstr = None
|
||||
|
||||
if o.typespec in (str, typing.Optional[str]):
|
||||
if o.typespec == str and optstr is None:
|
||||
raise exceptions.OptionsError(f"Option is required: {o.name}")
|
||||
return optstr
|
||||
elif o.typespec in (int, typing.Optional[int]):
|
||||
if optstr:
|
||||
try:
|
||||
return int(optstr)
|
||||
except ValueError:
|
||||
raise exceptions.OptionsError("Not an integer: %s" % optstr)
|
||||
raise exceptions.OptionsError(f"Not an integer: {optstr}")
|
||||
elif o.typespec == int:
|
||||
raise exceptions.OptionsError("Option is required: %s" % o.name)
|
||||
raise exceptions.OptionsError(f"Option is required: {o.name}")
|
||||
else:
|
||||
return None
|
||||
elif o.typespec == bool:
|
||||
@ -353,17 +382,9 @@ class OptManager:
|
||||
return False
|
||||
else:
|
||||
raise exceptions.OptionsError(
|
||||
"Boolean must be \"true\", \"false\", or have the value " "omitted (a synonym for \"true\")."
|
||||
'Boolean must be "true", "false", or have the value omitted (a synonym for "true").'
|
||||
)
|
||||
elif o.typespec == typing.Sequence[str]:
|
||||
if not optstr:
|
||||
return []
|
||||
else:
|
||||
if currentvalue:
|
||||
return currentvalue + [optstr]
|
||||
else:
|
||||
return [optstr]
|
||||
raise NotImplementedError("Unsupported option type: %s", o.typespec)
|
||||
raise NotImplementedError(f"Unsupported option type: {o.typespec}")
|
||||
|
||||
def make_parser(self, parser, optname, metavar=None, short=None):
|
||||
"""
|
||||
|
@ -181,9 +181,7 @@ class OptionsList(urwid.ListBox):
|
||||
foc, idx = self.get_focus()
|
||||
v = self.walker.get_edit_text()
|
||||
try:
|
||||
current = getattr(self.master.options, foc.opt.name)
|
||||
d = self.master.options.parse_setval(foc.opt, v, current)
|
||||
self.master.options.update(**{foc.opt.name: d})
|
||||
self.master.options.set(f"{foc.opt.name}={v}")
|
||||
except exceptions.OptionsError as v:
|
||||
signals.status_message.send(message=str(v))
|
||||
self.walker.stop_editing()
|
||||
|
@ -71,7 +71,7 @@ def test_defaults():
|
||||
def test_required_int():
|
||||
o = TO()
|
||||
with pytest.raises(exceptions.OptionsError):
|
||||
o.parse_setval(o._options["required_int"], None, None)
|
||||
o._parse_setval(o._options["required_int"], [])
|
||||
|
||||
|
||||
def test_deepcopy():
|
||||
@ -402,13 +402,15 @@ def test_set():
|
||||
|
||||
opts.set("str=foo")
|
||||
assert opts.str == "foo"
|
||||
with pytest.raises(TypeError):
|
||||
with pytest.raises(exceptions.OptionsError):
|
||||
opts.set("str")
|
||||
|
||||
opts.set("optstr=foo")
|
||||
assert opts.optstr == "foo"
|
||||
opts.set("optstr")
|
||||
assert opts.optstr is None
|
||||
with pytest.raises(exceptions.OptionsError, match="Received multiple values"):
|
||||
opts.set("optstr=foo", "optstr=bar")
|
||||
|
||||
opts.set("bool=false")
|
||||
assert opts.bool is False
|
||||
|
@ -355,7 +355,7 @@ class TestCommandBuffer:
|
||||
('commander_command', 'set'),
|
||||
('text', ' '),
|
||||
('commander_hint', 'option '),
|
||||
('commander_hint', 'value '),
|
||||
('commander_hint', '*value '),
|
||||
]
|
||||
|
||||
def test_render(self):
|
||||
@ -380,5 +380,5 @@ class TestCommandBuffer:
|
||||
('commander_command', 'set'),
|
||||
('text', ' '),
|
||||
('commander_hint', 'option '),
|
||||
('commander_hint', 'value '),
|
||||
('commander_hint', '*value '),
|
||||
]
|
||||
|
Loading…
Reference in New Issue
Block a user