mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2025-02-02 00:05:27 +00:00
commit
79cf6d2a5d
@ -8,6 +8,13 @@ from mitmproxy import optmanager
|
||||
from mitmproxy.net.http import status_codes
|
||||
|
||||
|
||||
FlowSetChoice = typing.NewType("FlowSetChoice", command.Choice)
|
||||
FlowSetChoice.options_command = "flow.set.options"
|
||||
|
||||
FlowEncodeChoice = typing.NewType("FlowEncodeChoice", command.Choice)
|
||||
FlowEncodeChoice.options_command = "flow.encode.options"
|
||||
|
||||
|
||||
class Core:
|
||||
@command.command("set")
|
||||
def set(self, *spec: str) -> None:
|
||||
@ -98,17 +105,13 @@ class Core:
|
||||
@command.command("flow.set")
|
||||
def flow_set(
|
||||
self,
|
||||
flows: typing.Sequence[flow.Flow], spec: str, sval: str
|
||||
flows: typing.Sequence[flow.Flow],
|
||||
spec: FlowSetChoice,
|
||||
sval: str
|
||||
) -> None:
|
||||
"""
|
||||
Quickly set a number of common values on flows.
|
||||
"""
|
||||
opts = self.flow_set_options()
|
||||
if spec not in opts:
|
||||
raise exceptions.CommandError(
|
||||
"Set spec must be one of: %s." % ", ".join(opts)
|
||||
)
|
||||
|
||||
val = sval # type: typing.Union[int, str]
|
||||
if spec == "status_code":
|
||||
try:
|
||||
@ -190,13 +193,15 @@ class Core:
|
||||
ctx.log.alert("Toggled encoding on %s flows." % len(updated))
|
||||
|
||||
@command.command("flow.encode")
|
||||
def encode(self, flows: typing.Sequence[flow.Flow], part: str, enc: str) -> None:
|
||||
def encode(
|
||||
self,
|
||||
flows: typing.Sequence[flow.Flow],
|
||||
part: str,
|
||||
enc: FlowEncodeChoice,
|
||||
) -> None:
|
||||
"""
|
||||
Encode flows with a specified encoding.
|
||||
"""
|
||||
if enc not in self.encode_options():
|
||||
raise exceptions.CommandError("Invalid encoding format: %s" % enc)
|
||||
|
||||
updated = []
|
||||
for f in flows:
|
||||
p = getattr(f, part, None)
|
||||
@ -212,7 +217,6 @@ class Core:
|
||||
def encode_options(self) -> typing.Sequence[str]:
|
||||
"""
|
||||
The possible values for an encoding specification.
|
||||
|
||||
"""
|
||||
return ["gzip", "deflate", "br"]
|
||||
|
||||
|
@ -238,7 +238,7 @@ class View(collections.Sequence):
|
||||
@command.command("view.order.options")
|
||||
def order_options(self) -> typing.Sequence[str]:
|
||||
"""
|
||||
A list of all the orders we support.
|
||||
Choices supported by the console_order option.
|
||||
"""
|
||||
return list(sorted(self.orders.keys()))
|
||||
|
||||
|
@ -18,12 +18,33 @@ Cuts = typing.Sequence[
|
||||
]
|
||||
|
||||
|
||||
# A str that is validated at runtime by calling a command that returns options.
|
||||
#
|
||||
# This requires some explanation. We want to construct a type with two aims: it
|
||||
# must be detected as str by mypy, and it has to be decorated at runtime with an
|
||||
# options_commmand attribute that tells us where to look up options for runtime
|
||||
# validation. Unfortunately, mypy is really, really obtuse about what it detects
|
||||
# as a type - any construction of these types at runtime barfs. The effect is
|
||||
# that while the annotation mechanism is very generaly, if you also use mypy
|
||||
# you're hamstrung. So the middle road is to declare a derived type, which is
|
||||
# then used very clumsily as follows:
|
||||
#
|
||||
# MyType = typing.NewType("MyType", command.Choice)
|
||||
# MyType.options_command = "my.command"
|
||||
#
|
||||
# The resulting type is then used in the function argument decorator.
|
||||
class Choice(str):
|
||||
options_command = ""
|
||||
|
||||
|
||||
def typename(t: type, ret: bool) -> str:
|
||||
"""
|
||||
Translates a type to an explanatory string. If ret is True, we're
|
||||
looking at a return type, else we're looking at a parameter type.
|
||||
"""
|
||||
if issubclass(t, (str, int, bool)):
|
||||
if hasattr(t, "options_command"):
|
||||
return "choice"
|
||||
elif issubclass(t, (str, int, bool)):
|
||||
return t.__name__
|
||||
elif t == typing.Sequence[flow.Flow]:
|
||||
return "[flow]" if ret else "flowspec"
|
||||
@ -157,6 +178,14 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
|
||||
"""
|
||||
Convert a string to a argument to the appropriate type.
|
||||
"""
|
||||
if hasattr(argtype, "options_command"):
|
||||
cmd = getattr(argtype, "options_command")
|
||||
opts = manager.call(cmd)
|
||||
if spec not in opts:
|
||||
raise exceptions.CommandError(
|
||||
"Invalid choice: see %s for options" % cmd
|
||||
)
|
||||
return spec
|
||||
if issubclass(argtype, str):
|
||||
return spec
|
||||
elif argtype == bool:
|
||||
|
@ -31,6 +31,12 @@ console_layouts = [
|
||||
"horizontal",
|
||||
]
|
||||
|
||||
FocusChoice = typing.NewType("FocusChoice", command.Choice)
|
||||
FocusChoice.options_command = "console.edit.focus.options"
|
||||
|
||||
FlowViewModeChoice = typing.NewType("FlowViewModeChoice", command.Choice)
|
||||
FlowViewModeChoice.options_command = "console.flowview.mode.options"
|
||||
|
||||
|
||||
class Logger:
|
||||
def log(self, evt):
|
||||
@ -111,8 +117,7 @@ class ConsoleAddon:
|
||||
@command.command("console.layout.options")
|
||||
def layout_options(self) -> typing.Sequence[str]:
|
||||
"""
|
||||
Returns the valid options for console layout. Use these by setting
|
||||
the console_layout option.
|
||||
Returns the available options for the consoler_layout option.
|
||||
"""
|
||||
return ["single", "vertical", "horizontal"]
|
||||
|
||||
@ -340,6 +345,9 @@ class ConsoleAddon:
|
||||
|
||||
@command.command("console.edit.focus.options")
|
||||
def edit_focus_options(self) -> typing.Sequence[str]:
|
||||
"""
|
||||
Possible components for console.edit.focus.
|
||||
"""
|
||||
return [
|
||||
"cookies",
|
||||
"form",
|
||||
@ -355,9 +363,9 @@ class ConsoleAddon:
|
||||
]
|
||||
|
||||
@command.command("console.edit.focus")
|
||||
def edit_focus(self, part: str) -> None:
|
||||
def edit_focus(self, part: FocusChoice) -> None:
|
||||
"""
|
||||
Edit the query of the current focus.
|
||||
Edit a component of the currently focused flow.
|
||||
"""
|
||||
if part == "cookies":
|
||||
self.master.switch_view("edit_focus_cookies")
|
||||
@ -428,26 +436,32 @@ class ConsoleAddon:
|
||||
self._grideditor().cmd_spawn_editor()
|
||||
|
||||
@command.command("console.flowview.mode.set")
|
||||
def flowview_mode_set(self) -> None:
|
||||
def flowview_mode_set(self, mode: FlowViewModeChoice) -> None:
|
||||
"""
|
||||
Set the display mode for the current flow view.
|
||||
"""
|
||||
fv = self.master.window.current("flowview")
|
||||
fv = self.master.window.current_window("flowview")
|
||||
if not fv:
|
||||
raise exceptions.CommandError("Not viewing a flow.")
|
||||
idx = fv.body.tab_offset
|
||||
|
||||
def callback(opt):
|
||||
try:
|
||||
self.master.commands.call_args(
|
||||
"view.setval",
|
||||
["@focus", "flowview_mode_%s" % idx, opt]
|
||||
)
|
||||
except exceptions.CommandError as e:
|
||||
signals.status_message.send(message=str(e))
|
||||
if mode not in [i.name.lower() for i in contentviews.views]:
|
||||
raise exceptions.CommandError("Invalid flowview mode.")
|
||||
|
||||
opts = [i.name.lower() for i in contentviews.views]
|
||||
self.master.overlay(overlay.Chooser(self.master, "Mode", opts, "", callback))
|
||||
try:
|
||||
self.master.commands.call_args(
|
||||
"view.setval",
|
||||
["@focus", "flowview_mode_%s" % idx, mode]
|
||||
)
|
||||
except exceptions.CommandError as e:
|
||||
signals.status_message.send(message=str(e))
|
||||
|
||||
@command.command("console.flowview.mode.options")
|
||||
def flowview_mode_options(self) -> typing.Sequence[str]:
|
||||
"""
|
||||
Returns the valid options for the flowview mode.
|
||||
"""
|
||||
return [i.name.lower() for i in contentviews.views]
|
||||
|
||||
@command.command("console.flowview.mode")
|
||||
def flowview_mode(self) -> str:
|
||||
|
@ -116,7 +116,15 @@ def map(km):
|
||||
"View flow body in an external viewer"
|
||||
)
|
||||
km.add("p", "view.focus.prev", ["flowview"], "Go to previous flow")
|
||||
km.add("m", "console.flowview.mode.set", ["flowview"], "Set flow view mode")
|
||||
km.add(
|
||||
"m",
|
||||
"""
|
||||
console.choose.cmd Mode console.flowview.mode.options
|
||||
console.flowview.mode.set {choice}
|
||||
""",
|
||||
["flowview"],
|
||||
"Set flow view mode"
|
||||
)
|
||||
km.add(
|
||||
"z",
|
||||
"""
|
||||
|
@ -31,7 +31,7 @@ def check_command_type(value: typing.Any, typeinfo: typing.Any) -> bool:
|
||||
return False
|
||||
elif value is None and typeinfo is None:
|
||||
return True
|
||||
elif not isinstance(value, typeinfo):
|
||||
elif (not isinstance(typeinfo, type)) or (not isinstance(value, typeinfo)):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
@ -69,9 +69,6 @@ def test_flow_set():
|
||||
f = tflow.tflow(resp=True)
|
||||
assert sa.flow_set_options()
|
||||
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
sa.flow_set([f], "flibble", "post")
|
||||
|
||||
assert f.request.method != "post"
|
||||
sa.flow_set([f], "method", "post")
|
||||
assert f.request.method == "POST"
|
||||
@ -126,9 +123,6 @@ def test_encoding():
|
||||
sa.encode_toggle([f], "request")
|
||||
assert "content-encoding" not in f.request.headers
|
||||
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
sa.encode([f], "request", "invalid")
|
||||
|
||||
|
||||
def test_options(tmpdir):
|
||||
p = str(tmpdir.join("path"))
|
||||
|
@ -8,6 +8,10 @@ import io
|
||||
import pytest
|
||||
|
||||
|
||||
TChoice = typing.NewType("TChoice", command.Choice)
|
||||
TChoice.options_command = "choices"
|
||||
|
||||
|
||||
class TAddon:
|
||||
def cmd1(self, foo: str) -> str:
|
||||
"""cmd1 help"""
|
||||
@ -25,6 +29,12 @@ class TAddon:
|
||||
def varargs(self, one: str, *var: str) -> typing.Sequence[str]:
|
||||
return list(var)
|
||||
|
||||
def choices(self) -> typing.Sequence[str]:
|
||||
return ["one", "two", "three"]
|
||||
|
||||
def choose(self, arg: TChoice) -> typing.Sequence[str]: # type: ignore
|
||||
return ["one", "two", "three"]
|
||||
|
||||
|
||||
class TestCommand:
|
||||
def test_varargs(self):
|
||||
@ -86,6 +96,8 @@ def test_typename():
|
||||
assert command.typename(flow.Flow, False) == "flow"
|
||||
assert command.typename(typing.Sequence[str], False) == "[str]"
|
||||
|
||||
assert command.typename(TChoice, False) == "choice"
|
||||
|
||||
|
||||
class DummyConsole:
|
||||
@command.command("view.resolve")
|
||||
@ -134,6 +146,16 @@ def test_parsearg():
|
||||
tctx.master.commands, "foo, bar", typing.Sequence[str]
|
||||
) == ["foo", "bar"]
|
||||
|
||||
a = TAddon()
|
||||
tctx.master.commands.add("choices", a.choices)
|
||||
assert command.parsearg(
|
||||
tctx.master.commands, "one", TChoice,
|
||||
) == "one"
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
assert command.parsearg(
|
||||
tctx.master.commands, "invalid", TChoice,
|
||||
)
|
||||
|
||||
|
||||
class TDec:
|
||||
@command.command("cmd1")
|
||||
|
Loading…
Reference in New Issue
Block a user