mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2025-02-02 00:05:27 +00:00
commands: refactor types
The type system was scattered over a number of places, making it hard to follow. This collects all command types in types.py, and completion, validation and parsing for each type is centralised. We should use the same mechanism for options.
This commit is contained in:
parent
b1f923e148
commit
b0b67fe2a7
@ -3,6 +3,7 @@ from mitmproxy import ctx
|
||||
from mitmproxy import io
|
||||
from mitmproxy import flow
|
||||
from mitmproxy import command
|
||||
import mitmproxy.types
|
||||
|
||||
import typing
|
||||
|
||||
@ -37,7 +38,7 @@ class ClientPlayback:
|
||||
ctx.master.addons.trigger("update", [])
|
||||
|
||||
@command.command("replay.client.file")
|
||||
def load_file(self, path: command.Path) -> None:
|
||||
def load_file(self, path: mitmproxy.types.Path) -> None:
|
||||
try:
|
||||
flows = io.read_flows_from_paths([path])
|
||||
except exceptions.FlowReadException as e:
|
||||
|
@ -6,6 +6,7 @@ from mitmproxy import command
|
||||
from mitmproxy import flow
|
||||
from mitmproxy import optmanager
|
||||
from mitmproxy.net.http import status_codes
|
||||
import mitmproxy.types
|
||||
|
||||
|
||||
class Core:
|
||||
@ -96,7 +97,7 @@ class Core:
|
||||
]
|
||||
|
||||
@command.command("flow.set")
|
||||
@command.argument("spec", type=command.Choice("flow.set.options"))
|
||||
@command.argument("spec", type=mitmproxy.types.Choice("flow.set.options"))
|
||||
def flow_set(
|
||||
self,
|
||||
flows: typing.Sequence[flow.Flow],
|
||||
@ -187,7 +188,7 @@ class Core:
|
||||
ctx.log.alert("Toggled encoding on %s flows." % len(updated))
|
||||
|
||||
@command.command("flow.encode")
|
||||
@command.argument("enc", type=command.Choice("flow.encode.options"))
|
||||
@command.argument("enc", type=mitmproxy.types.Choice("flow.encode.options"))
|
||||
def encode(
|
||||
self,
|
||||
flows: typing.Sequence[flow.Flow],
|
||||
@ -216,7 +217,7 @@ class Core:
|
||||
return ["gzip", "deflate", "br"]
|
||||
|
||||
@command.command("options.load")
|
||||
def options_load(self, path: command.Path) -> None:
|
||||
def options_load(self, path: mitmproxy.types.Path) -> None:
|
||||
"""
|
||||
Load options from a file.
|
||||
"""
|
||||
@ -228,7 +229,7 @@ class Core:
|
||||
) from e
|
||||
|
||||
@command.command("options.save")
|
||||
def options_save(self, path: command.Path) -> None:
|
||||
def options_save(self, path: mitmproxy.types.Path) -> None:
|
||||
"""
|
||||
Save options to a file.
|
||||
"""
|
||||
|
@ -7,6 +7,7 @@ from mitmproxy import flow
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy import certs
|
||||
from mitmproxy.utils import strutils
|
||||
import mitmproxy.types
|
||||
|
||||
import pyperclip
|
||||
|
||||
@ -51,8 +52,8 @@ class Cut:
|
||||
def cut(
|
||||
self,
|
||||
flows: typing.Sequence[flow.Flow],
|
||||
cuts: typing.Sequence[command.Cut]
|
||||
) -> command.Cuts:
|
||||
cuts: mitmproxy.types.CutSpec,
|
||||
) -> mitmproxy.types.Data:
|
||||
"""
|
||||
Cut data from a set of flows. Cut specifications are attribute paths
|
||||
from the base of the flow object, with a few conveniences - "port"
|
||||
@ -62,17 +63,17 @@ class Cut:
|
||||
or "false", "bytes" are preserved, and all other values are
|
||||
converted to strings.
|
||||
"""
|
||||
ret = []
|
||||
ret = [] # type:typing.List[typing.List[typing.Union[str, bytes]]]
|
||||
for f in flows:
|
||||
ret.append([extract(c, f) for c in cuts])
|
||||
return ret
|
||||
return ret # type: ignore
|
||||
|
||||
@command.command("cut.save")
|
||||
def save(
|
||||
self,
|
||||
flows: typing.Sequence[flow.Flow],
|
||||
cuts: typing.Sequence[command.Cut],
|
||||
path: command.Path
|
||||
cuts: mitmproxy.types.CutSpec,
|
||||
path: mitmproxy.types.Path
|
||||
) -> None:
|
||||
"""
|
||||
Save cuts to file. If there are multiple flows or cuts, the format
|
||||
@ -84,7 +85,7 @@ class Cut:
|
||||
append = False
|
||||
if path.startswith("+"):
|
||||
append = True
|
||||
path = command.Path(path[1:])
|
||||
path = mitmproxy.types.Path(path[1:])
|
||||
if len(cuts) == 1 and len(flows) == 1:
|
||||
with open(path, "ab" if append else "wb") as fp:
|
||||
if fp.tell() > 0:
|
||||
@ -110,7 +111,7 @@ class Cut:
|
||||
def clip(
|
||||
self,
|
||||
flows: typing.Sequence[flow.Flow],
|
||||
cuts: typing.Sequence[command.Cut],
|
||||
cuts: mitmproxy.types.CutSpec,
|
||||
) -> None:
|
||||
"""
|
||||
Send cuts to the clipboard. If there are multiple flows or cuts, the
|
||||
|
@ -5,6 +5,7 @@ from mitmproxy import flow
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy.utils import strutils
|
||||
from mitmproxy.net.http.http1 import assemble
|
||||
import mitmproxy.types
|
||||
|
||||
import pyperclip
|
||||
|
||||
@ -49,7 +50,7 @@ class Export():
|
||||
return list(sorted(formats.keys()))
|
||||
|
||||
@command.command("export.file")
|
||||
def file(self, fmt: str, f: flow.Flow, path: command.Path) -> None:
|
||||
def file(self, fmt: str, f: flow.Flow, path: mitmproxy.types.Path) -> None:
|
||||
"""
|
||||
Export a flow to path.
|
||||
"""
|
||||
|
@ -7,6 +7,7 @@ from mitmproxy import flowfilter
|
||||
from mitmproxy import io
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy import flow
|
||||
import mitmproxy.types
|
||||
|
||||
|
||||
class Save:
|
||||
@ -50,7 +51,7 @@ class Save:
|
||||
self.start_stream_to_path(ctx.options.save_stream_file, self.filt)
|
||||
|
||||
@command.command("save.file")
|
||||
def save(self, flows: typing.Sequence[flow.Flow], path: command.Path) -> None:
|
||||
def save(self, flows: typing.Sequence[flow.Flow], path: mitmproxy.types.Path) -> None:
|
||||
"""
|
||||
Save flows to a file. If the path starts with a +, flows are
|
||||
appended to the file, otherwise it is over-written.
|
||||
|
@ -9,6 +9,7 @@ from mitmproxy import flow
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import io
|
||||
from mitmproxy import command
|
||||
import mitmproxy.types
|
||||
|
||||
|
||||
class ServerPlayback:
|
||||
@ -31,7 +32,7 @@ class ServerPlayback:
|
||||
ctx.master.addons.trigger("update", [])
|
||||
|
||||
@command.command("replay.server.file")
|
||||
def load_file(self, path: command.Path) -> None:
|
||||
def load_file(self, path: mitmproxy.types.Path) -> None:
|
||||
try:
|
||||
flows = io.read_flows_from_paths([path])
|
||||
except exceptions.FlowReadException as e:
|
||||
|
@ -351,7 +351,7 @@ class View(collections.Sequence):
|
||||
ctx.master.addons.trigger("update", updated)
|
||||
|
||||
@command.command("view.load")
|
||||
def load_file(self, path: command.Path) -> None:
|
||||
def load_file(self, path: mitmproxy.types.Path) -> None:
|
||||
"""
|
||||
Load flows into the view, without processing them with addons.
|
||||
"""
|
||||
|
@ -12,7 +12,7 @@ import sys
|
||||
|
||||
from mitmproxy.utils import typecheck
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import flow
|
||||
import mitmproxy.types
|
||||
|
||||
|
||||
def lexer(s):
|
||||
@ -24,113 +24,14 @@ def lexer(s):
|
||||
return lex
|
||||
|
||||
|
||||
# This is an awkward location for these values, but it's better than having
|
||||
# the console core import and depend on an addon. FIXME: Add a way for
|
||||
# addons to add custom types and manage their completion and validation.
|
||||
valid_flow_prefixes = [
|
||||
"@all",
|
||||
"@focus",
|
||||
"@shown",
|
||||
"@hidden",
|
||||
"@marked",
|
||||
"@unmarked",
|
||||
"~q",
|
||||
"~s",
|
||||
"~a",
|
||||
"~hq",
|
||||
"~hs",
|
||||
"~b",
|
||||
"~bq",
|
||||
"~bs",
|
||||
"~t",
|
||||
"~d",
|
||||
"~m",
|
||||
"~u",
|
||||
"~c",
|
||||
]
|
||||
|
||||
|
||||
Cuts = typing.Sequence[
|
||||
typing.Sequence[typing.Union[str, bytes]]
|
||||
]
|
||||
|
||||
|
||||
class Cut(str):
|
||||
# This is an awkward location for these values, but it's better than having
|
||||
# the console core import and depend on an addon. FIXME: Add a way for
|
||||
# addons to add custom types and manage their completion and validation.
|
||||
valid_prefixes = [
|
||||
"request.method",
|
||||
"request.scheme",
|
||||
"request.host",
|
||||
"request.http_version",
|
||||
"request.port",
|
||||
"request.path",
|
||||
"request.url",
|
||||
"request.text",
|
||||
"request.content",
|
||||
"request.raw_content",
|
||||
"request.timestamp_start",
|
||||
"request.timestamp_end",
|
||||
"request.header[",
|
||||
|
||||
"response.status_code",
|
||||
"response.reason",
|
||||
"response.text",
|
||||
"response.content",
|
||||
"response.timestamp_start",
|
||||
"response.timestamp_end",
|
||||
"response.raw_content",
|
||||
"response.header[",
|
||||
|
||||
"client_conn.address.port",
|
||||
"client_conn.address.host",
|
||||
"client_conn.tls_version",
|
||||
"client_conn.sni",
|
||||
"client_conn.ssl_established",
|
||||
|
||||
"server_conn.address.port",
|
||||
"server_conn.address.host",
|
||||
"server_conn.ip_address.host",
|
||||
"server_conn.tls_version",
|
||||
"server_conn.sni",
|
||||
"server_conn.ssl_established",
|
||||
]
|
||||
|
||||
|
||||
class Path(str):
|
||||
pass
|
||||
|
||||
|
||||
class Cmd(str):
|
||||
pass
|
||||
|
||||
|
||||
class Arg(str):
|
||||
pass
|
||||
|
||||
|
||||
def typename(t: type) -> str:
|
||||
"""
|
||||
Translates a type to an explanatory string. If ret is True, we're
|
||||
looking at a return type, else we're looking at a parameter type.
|
||||
Translates a type to an explanatory string.
|
||||
"""
|
||||
if isinstance(t, Choice):
|
||||
return "choice"
|
||||
elif t == typing.Sequence[flow.Flow]:
|
||||
return "[flow]"
|
||||
elif t == typing.Sequence[str]:
|
||||
return "[str]"
|
||||
elif t == typing.Sequence[Cut]:
|
||||
return "[cut]"
|
||||
elif t == Cuts:
|
||||
return "[cuts]"
|
||||
elif t == flow.Flow:
|
||||
return "flow"
|
||||
elif issubclass(t, (str, int, bool)):
|
||||
return t.__name__.lower()
|
||||
else: # pragma: no cover
|
||||
to = mitmproxy.types.CommandTypes.get(t, None)
|
||||
if not to:
|
||||
raise NotImplementedError(t)
|
||||
return to.display
|
||||
|
||||
|
||||
class Command:
|
||||
@ -168,7 +69,7 @@ class Command:
|
||||
ret = " -> " + ret
|
||||
return "%s %s%s" % (self.path, params, ret)
|
||||
|
||||
def call(self, args: typing.Sequence[str]):
|
||||
def call(self, args: typing.Sequence[str]) -> typing.Any:
|
||||
"""
|
||||
Call the command with a list of arguments. At this point, all
|
||||
arguments are strings.
|
||||
@ -255,13 +156,13 @@ class CommandManager:
|
||||
typ = None # type: typing.Type
|
||||
for i in range(len(parts)):
|
||||
if i == 0:
|
||||
typ = Cmd
|
||||
typ = mitmproxy.types.Cmd
|
||||
if parts[i] in self.commands:
|
||||
params.extend(self.commands[parts[i]].paramtypes)
|
||||
elif params:
|
||||
typ = params.pop(0)
|
||||
# FIXME: Do we need to check that Arg is positional?
|
||||
if typ == Cmd and params and params[0] == Arg:
|
||||
if typ == mitmproxy.types.Cmd and params and params[0] == mitmproxy.types.Arg:
|
||||
if parts[i] in self.commands:
|
||||
params[:] = self.commands[parts[i]].paramtypes
|
||||
else:
|
||||
@ -269,7 +170,7 @@ class CommandManager:
|
||||
parse.append(ParseResult(value=parts[i], type=typ))
|
||||
return parse
|
||||
|
||||
def call_args(self, path, args):
|
||||
def call_args(self, path: str, args: typing.Sequence[str]) -> typing.Any:
|
||||
"""
|
||||
Call a command using a list of string arguments. May raise CommandError.
|
||||
"""
|
||||
@ -300,45 +201,13 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
|
||||
"""
|
||||
Convert a string to a argument to the appropriate type.
|
||||
"""
|
||||
if isinstance(argtype, Choice):
|
||||
cmd = argtype.options_command
|
||||
opts = manager.call(cmd)
|
||||
if spec not in opts:
|
||||
raise exceptions.CommandError(
|
||||
"Invalid choice: see %s for options" % cmd
|
||||
)
|
||||
return spec
|
||||
elif issubclass(argtype, str):
|
||||
return spec
|
||||
elif argtype == bool:
|
||||
if spec == "true":
|
||||
return True
|
||||
elif spec == "false":
|
||||
return False
|
||||
else:
|
||||
raise exceptions.CommandError(
|
||||
"Booleans are 'true' or 'false', got %s" % spec
|
||||
)
|
||||
elif issubclass(argtype, int):
|
||||
try:
|
||||
return int(spec)
|
||||
except ValueError as e:
|
||||
raise exceptions.CommandError("Expected an integer, got %s." % spec)
|
||||
elif argtype == typing.Sequence[flow.Flow]:
|
||||
return manager.call_args("view.resolve", [spec])
|
||||
elif argtype == Cuts:
|
||||
return manager.call_args("cut", [spec])
|
||||
elif argtype == flow.Flow:
|
||||
flows = manager.call_args("view.resolve", [spec])
|
||||
if len(flows) != 1:
|
||||
raise exceptions.CommandError(
|
||||
"Command requires one flow, specification matched %s." % len(flows)
|
||||
)
|
||||
return flows[0]
|
||||
elif argtype in (typing.Sequence[str], typing.Sequence[Cut]):
|
||||
return [i.strip() for i in spec.split(",")]
|
||||
else:
|
||||
t = mitmproxy.types.CommandTypes.get(argtype, None)
|
||||
if not t:
|
||||
raise exceptions.CommandError("Unsupported argument type: %s" % argtype)
|
||||
try:
|
||||
return t.parse(manager, argtype, spec) # type: ignore
|
||||
except exceptions.TypeError as e:
|
||||
raise exceptions.CommandError from e
|
||||
|
||||
|
||||
def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None:
|
||||
@ -360,21 +229,11 @@ def command(path):
|
||||
return decorator
|
||||
|
||||
|
||||
class Choice:
|
||||
def __init__(self, options_command):
|
||||
self.options_command = options_command
|
||||
|
||||
def __instancecheck__(self, instance):
|
||||
# return false here so that arguments are piped through parsearg,
|
||||
# which does extended validation.
|
||||
return False
|
||||
|
||||
|
||||
def argument(name, type):
|
||||
"""
|
||||
Set the type of a command argument at runtime.
|
||||
This is useful for more specific types such as command.Choice, which we cannot annotate
|
||||
directly as mypy does not like that.
|
||||
Set the type of a command argument at runtime. This is useful for more
|
||||
specific types such as mitmproxy.types.Choice, which we cannot annotate
|
||||
directly as mypy does not like that.
|
||||
"""
|
||||
def decorator(f: types.FunctionType) -> types.FunctionType:
|
||||
assert name in f.__annotations__
|
||||
|
@ -112,6 +112,10 @@ class AddonHalt(MitmproxyException):
|
||||
pass
|
||||
|
||||
|
||||
class TypeError(MitmproxyException):
|
||||
pass
|
||||
|
||||
|
||||
"""
|
||||
Net-layer exceptions
|
||||
"""
|
||||
|
@ -1,6 +1,4 @@
|
||||
import abc
|
||||
import glob
|
||||
import os
|
||||
import typing
|
||||
|
||||
import urwid
|
||||
@ -9,6 +7,7 @@ from urwid.text_layout import calc_coords
|
||||
import mitmproxy.flow
|
||||
import mitmproxy.master
|
||||
import mitmproxy.command
|
||||
import mitmproxy.types
|
||||
|
||||
|
||||
class Completer: # pragma: no cover
|
||||
@ -39,30 +38,6 @@ class ListCompleter(Completer):
|
||||
return ret
|
||||
|
||||
|
||||
# Generates the completion options for a specific starting input
|
||||
def pathOptions(start: str) -> typing.Sequence[str]:
|
||||
if not start:
|
||||
start = "./"
|
||||
path = os.path.expanduser(start)
|
||||
ret = []
|
||||
if os.path.isdir(path):
|
||||
files = glob.glob(os.path.join(path, "*"))
|
||||
prefix = start
|
||||
else:
|
||||
files = glob.glob(path + "*")
|
||||
prefix = os.path.dirname(start)
|
||||
prefix = prefix or "./"
|
||||
for f in files:
|
||||
display = os.path.join(prefix, os.path.normpath(os.path.basename(f)))
|
||||
if os.path.isdir(f):
|
||||
display += "/"
|
||||
ret.append(display)
|
||||
if not ret:
|
||||
ret = [start]
|
||||
ret.sort()
|
||||
return ret
|
||||
|
||||
|
||||
CompletionState = typing.NamedTuple(
|
||||
"CompletionState",
|
||||
[
|
||||
@ -106,48 +81,12 @@ class CommandBuffer():
|
||||
if not self.completion:
|
||||
parts = self.master.commands.parse_partial(self.buf[:self.cursor])
|
||||
last = parts[-1]
|
||||
if last.type == mitmproxy.command.Cmd:
|
||||
ct = mitmproxy.types.CommandTypes.get(last.type, None)
|
||||
if ct:
|
||||
self.completion = CompletionState(
|
||||
completer = ListCompleter(
|
||||
parts[-1].value,
|
||||
self.master.commands.commands.keys(),
|
||||
),
|
||||
parse = parts,
|
||||
)
|
||||
if last.type == typing.Sequence[mitmproxy.command.Cut]:
|
||||
spec = parts[-1].value.split(",")
|
||||
opts = []
|
||||
for pref in mitmproxy.command.Cut.valid_prefixes:
|
||||
spec[-1] = pref
|
||||
opts.append(",".join(spec))
|
||||
self.completion = CompletionState(
|
||||
completer = ListCompleter(
|
||||
parts[-1].value,
|
||||
opts,
|
||||
),
|
||||
parse = parts,
|
||||
)
|
||||
elif isinstance(last.type, mitmproxy.command.Choice):
|
||||
self.completion = CompletionState(
|
||||
completer = ListCompleter(
|
||||
parts[-1].value,
|
||||
self.master.commands.call(last.type.options_command),
|
||||
),
|
||||
parse = parts,
|
||||
)
|
||||
elif last.type == mitmproxy.command.Path:
|
||||
self.completion = CompletionState(
|
||||
completer = ListCompleter(
|
||||
"",
|
||||
pathOptions(parts[1].value)
|
||||
),
|
||||
parse = parts,
|
||||
)
|
||||
elif last.type in (typing.Sequence[mitmproxy.flow.Flow], mitmproxy.flow.Flow):
|
||||
self.completion = CompletionState(
|
||||
completer = ListCompleter(
|
||||
"",
|
||||
mitmproxy.command.valid_flow_prefixes,
|
||||
ct.completion(self.master.commands, last.type, parts[-1].value)
|
||||
),
|
||||
parse = parts,
|
||||
)
|
||||
|
@ -7,6 +7,8 @@ from mitmproxy import exceptions
|
||||
from mitmproxy import flow
|
||||
from mitmproxy import contentviews
|
||||
from mitmproxy.utils import strutils
|
||||
import mitmproxy.types
|
||||
|
||||
|
||||
from mitmproxy.tools.console import overlay
|
||||
from mitmproxy.tools.console import signals
|
||||
@ -218,8 +220,8 @@ class ConsoleAddon:
|
||||
self,
|
||||
prompt: str,
|
||||
choices: typing.Sequence[str],
|
||||
cmd: command.Cmd,
|
||||
*args: command.Arg
|
||||
cmd: mitmproxy.types.Cmd,
|
||||
*args: mitmproxy.types.Arg
|
||||
) -> None:
|
||||
"""
|
||||
Prompt the user to choose from a specified list of strings, then
|
||||
@ -241,7 +243,7 @@ class ConsoleAddon:
|
||||
|
||||
@command.command("console.choose.cmd")
|
||||
def console_choose_cmd(
|
||||
self, prompt: str, choicecmd: command.Cmd, *cmd: command.Arg
|
||||
self, prompt: str, choicecmd: mitmproxy.types.Cmd, *cmd: mitmproxy.types.Arg
|
||||
) -> None:
|
||||
"""
|
||||
Prompt the user to choose from a list of strings returned by a
|
||||
@ -352,7 +354,7 @@ class ConsoleAddon:
|
||||
]
|
||||
|
||||
@command.command("console.edit.focus")
|
||||
@command.argument("part", type=command.Choice("console.edit.focus.options"))
|
||||
@command.argument("part", type=mitmproxy.types.Choice("console.edit.focus.options"))
|
||||
def edit_focus(self, part: str) -> None:
|
||||
"""
|
||||
Edit a component of the currently focused flow.
|
||||
@ -404,14 +406,14 @@ class ConsoleAddon:
|
||||
self._grideditor().cmd_delete()
|
||||
|
||||
@command.command("console.grideditor.load")
|
||||
def grideditor_load(self, path: command.Path) -> None:
|
||||
def grideditor_load(self, path: mitmproxy.types.Path) -> None:
|
||||
"""
|
||||
Read a file into the currrent cell.
|
||||
"""
|
||||
self._grideditor().cmd_read_file(path)
|
||||
|
||||
@command.command("console.grideditor.load_escaped")
|
||||
def grideditor_load_escaped(self, path: command.Path) -> None:
|
||||
def grideditor_load_escaped(self, path: mitmproxy.types.Path) -> None:
|
||||
"""
|
||||
Read a file containing a Python-style escaped string into the
|
||||
currrent cell.
|
||||
@ -419,7 +421,7 @@ class ConsoleAddon:
|
||||
self._grideditor().cmd_read_file_escaped(path)
|
||||
|
||||
@command.command("console.grideditor.save")
|
||||
def grideditor_save(self, path: command.Path) -> None:
|
||||
def grideditor_save(self, path: mitmproxy.types.Path) -> None:
|
||||
"""
|
||||
Save data to file as a CSV.
|
||||
"""
|
||||
@ -440,7 +442,7 @@ class ConsoleAddon:
|
||||
self._grideditor().cmd_spawn_editor()
|
||||
|
||||
@command.command("console.flowview.mode.set")
|
||||
@command.argument("mode", type=command.Choice("console.flowview.mode.options"))
|
||||
@command.argument("mode", type=mitmproxy.types.Choice("console.flowview.mode.options"))
|
||||
def flowview_mode_set(self, mode: str) -> None:
|
||||
"""
|
||||
Set the display mode for the current flow view.
|
||||
@ -498,8 +500,8 @@ class ConsoleAddon:
|
||||
self,
|
||||
contexts: typing.Sequence[str],
|
||||
key: str,
|
||||
cmd: command.Cmd,
|
||||
*args: command.Arg
|
||||
cmd: mitmproxy.types.Cmd,
|
||||
*args: mitmproxy.types.Arg
|
||||
) -> None:
|
||||
"""
|
||||
Bind a shortcut key.
|
||||
|
330
mitmproxy/types.py
Normal file
330
mitmproxy/types.py
Normal file
@ -0,0 +1,330 @@
|
||||
import os
|
||||
import glob
|
||||
import typing
|
||||
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import flow
|
||||
|
||||
|
||||
class Path(str):
|
||||
pass
|
||||
|
||||
|
||||
class Cmd(str):
|
||||
pass
|
||||
|
||||
|
||||
class Arg(str):
|
||||
pass
|
||||
|
||||
|
||||
class CutSpec(typing.Sequence[str]):
|
||||
pass
|
||||
|
||||
|
||||
class Data(typing.Sequence[typing.Sequence[typing.Union[str, bytes]]]):
|
||||
pass
|
||||
|
||||
|
||||
class Choice:
|
||||
def __init__(self, options_command):
|
||||
self.options_command = options_command
|
||||
|
||||
def __instancecheck__(self, instance): # pragma: no cover
|
||||
# return false here so that arguments are piped through parsearg,
|
||||
# which does extended validation.
|
||||
return False
|
||||
|
||||
|
||||
# One of the many charming things about mypy is that introducing type
|
||||
# annotations can cause circular dependencies where there were none before.
|
||||
# Rather than putting types and the CommandManger in the same file, we introduce
|
||||
# a stub type with the signature we use.
|
||||
class _CommandStub:
|
||||
commands = {} # type: typing.Mapping[str, typing.Any]
|
||||
|
||||
def call_args(self, path: str, args: typing.Sequence[str]) -> typing.Any: # pragma: no cover
|
||||
pass
|
||||
|
||||
def call(self, args: typing.Sequence[str]) -> typing.Any: # pragma: no cover
|
||||
pass
|
||||
|
||||
|
||||
class BaseType:
|
||||
typ = object # type: typing.Type
|
||||
display = "" # type: str
|
||||
|
||||
def completion(
|
||||
self, manager: _CommandStub, t: type, s: str
|
||||
) -> typing.Sequence[str]: # pragma: no cover
|
||||
pass
|
||||
|
||||
def parse(
|
||||
self, manager: _CommandStub, t: type, s: str
|
||||
) -> typing.Any: # pragma: no cover
|
||||
pass
|
||||
|
||||
|
||||
class Bool(BaseType):
|
||||
typ = bool
|
||||
display = "bool"
|
||||
|
||||
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
|
||||
return ["false", "true"]
|
||||
|
||||
def parse(self, manager: _CommandStub, t: type, s: str) -> bool:
|
||||
if s == "true":
|
||||
return True
|
||||
elif s == "false":
|
||||
return False
|
||||
else:
|
||||
raise exceptions.TypeError(
|
||||
"Booleans are 'true' or 'false', got %s" % s
|
||||
)
|
||||
|
||||
|
||||
class Str(BaseType):
|
||||
typ = str
|
||||
display = "str"
|
||||
|
||||
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
|
||||
return []
|
||||
|
||||
def parse(self, manager: _CommandStub, t: type, s: str) -> str:
|
||||
return s
|
||||
|
||||
|
||||
class Int(BaseType):
|
||||
typ = int
|
||||
display = "int"
|
||||
|
||||
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
|
||||
return []
|
||||
|
||||
def parse(self, manager: _CommandStub, t: type, s: str) -> int:
|
||||
try:
|
||||
return int(s)
|
||||
except ValueError as e:
|
||||
raise exceptions.TypeError from e
|
||||
|
||||
|
||||
class PathType(BaseType):
|
||||
typ = Path
|
||||
display = "path"
|
||||
|
||||
def completion(self, manager: _CommandStub, t: type, start: str) -> typing.Sequence[str]:
|
||||
if not start:
|
||||
start = "./"
|
||||
path = os.path.expanduser(start)
|
||||
ret = []
|
||||
if os.path.isdir(path):
|
||||
files = glob.glob(os.path.join(path, "*"))
|
||||
prefix = start
|
||||
else:
|
||||
files = glob.glob(path + "*")
|
||||
prefix = os.path.dirname(start)
|
||||
prefix = prefix or "./"
|
||||
for f in files:
|
||||
display = os.path.join(prefix, os.path.normpath(os.path.basename(f)))
|
||||
if os.path.isdir(f):
|
||||
display += "/"
|
||||
ret.append(display)
|
||||
if not ret:
|
||||
ret = [start]
|
||||
ret.sort()
|
||||
return ret
|
||||
|
||||
def parse(self, manager: _CommandStub, t: type, s: str) -> str:
|
||||
return s
|
||||
|
||||
|
||||
class CmdType(BaseType):
|
||||
typ = Cmd
|
||||
display = "cmd"
|
||||
|
||||
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
|
||||
return list(manager.commands.keys())
|
||||
|
||||
def parse(self, manager: _CommandStub, t: type, s: str) -> str:
|
||||
return s
|
||||
|
||||
|
||||
class ArgType(BaseType):
|
||||
typ = Arg
|
||||
display = "arg"
|
||||
|
||||
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
|
||||
return []
|
||||
|
||||
def parse(self, manager: _CommandStub, t: type, s: str) -> str:
|
||||
return s
|
||||
|
||||
|
||||
class StrSeq(BaseType):
|
||||
typ = typing.Sequence[str]
|
||||
display = "[str]"
|
||||
|
||||
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
|
||||
return []
|
||||
|
||||
def parse(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
|
||||
return [x.strip() for x in s.split(",")]
|
||||
|
||||
|
||||
class CutSpecType(BaseType):
|
||||
typ = CutSpec
|
||||
display = "[cut]"
|
||||
valid_prefixes = [
|
||||
"request.method",
|
||||
"request.scheme",
|
||||
"request.host",
|
||||
"request.http_version",
|
||||
"request.port",
|
||||
"request.path",
|
||||
"request.url",
|
||||
"request.text",
|
||||
"request.content",
|
||||
"request.raw_content",
|
||||
"request.timestamp_start",
|
||||
"request.timestamp_end",
|
||||
"request.header[",
|
||||
|
||||
"response.status_code",
|
||||
"response.reason",
|
||||
"response.text",
|
||||
"response.content",
|
||||
"response.timestamp_start",
|
||||
"response.timestamp_end",
|
||||
"response.raw_content",
|
||||
"response.header[",
|
||||
|
||||
"client_conn.address.port",
|
||||
"client_conn.address.host",
|
||||
"client_conn.tls_version",
|
||||
"client_conn.sni",
|
||||
"client_conn.ssl_established",
|
||||
|
||||
"server_conn.address.port",
|
||||
"server_conn.address.host",
|
||||
"server_conn.ip_address.host",
|
||||
"server_conn.tls_version",
|
||||
"server_conn.sni",
|
||||
"server_conn.ssl_established",
|
||||
]
|
||||
|
||||
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
|
||||
spec = s.split(",")
|
||||
opts = []
|
||||
for pref in self.valid_prefixes:
|
||||
spec[-1] = pref
|
||||
opts.append(",".join(spec))
|
||||
return opts
|
||||
|
||||
def parse(self, manager: _CommandStub, t: type, s: str) -> CutSpec:
|
||||
parts = s.split(",") # type: typing.Any
|
||||
return parts
|
||||
|
||||
|
||||
class BaseFlowType(BaseType):
|
||||
valid_prefixes = [
|
||||
"@all",
|
||||
"@focus",
|
||||
"@shown",
|
||||
"@hidden",
|
||||
"@marked",
|
||||
"@unmarked",
|
||||
"~q",
|
||||
"~s",
|
||||
"~a",
|
||||
"~hq",
|
||||
"~hs",
|
||||
"~b",
|
||||
"~bq",
|
||||
"~bs",
|
||||
"~t",
|
||||
"~d",
|
||||
"~m",
|
||||
"~u",
|
||||
"~c",
|
||||
]
|
||||
|
||||
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
|
||||
return self.valid_prefixes
|
||||
|
||||
|
||||
class FlowType(BaseFlowType):
|
||||
typ = flow.Flow
|
||||
display = "flow"
|
||||
|
||||
def parse(self, manager: _CommandStub, t: type, s: str) -> flow.Flow:
|
||||
flows = manager.call_args("view.resolve", [s])
|
||||
if len(flows) != 1:
|
||||
raise exceptions.TypeError(
|
||||
"Command requires one flow, specification matched %s." % len(flows)
|
||||
)
|
||||
return flows[0]
|
||||
|
||||
|
||||
class FlowsType(BaseFlowType):
|
||||
typ = typing.Sequence[flow.Flow]
|
||||
display = "[flow]"
|
||||
|
||||
def parse(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[flow.Flow]:
|
||||
return manager.call_args("view.resolve", [s])
|
||||
|
||||
|
||||
class DataType:
|
||||
typ = Data
|
||||
display = "[data]"
|
||||
|
||||
def completion(
|
||||
self, manager: _CommandStub, t: type, s: str
|
||||
) -> typing.Sequence[str]: # pragma: no cover
|
||||
raise exceptions.TypeError("data cannot be passed as argument")
|
||||
|
||||
def parse(
|
||||
self, manager: _CommandStub, t: type, s: str
|
||||
) -> typing.Any: # pragma: no cover
|
||||
raise exceptions.TypeError("data cannot be passed as argument")
|
||||
|
||||
|
||||
class ChoiceType:
|
||||
typ = Choice
|
||||
display = "choice"
|
||||
|
||||
def completion(self, manager: _CommandStub, t: Choice, s: str) -> typing.Sequence[str]:
|
||||
return manager.call(t.options_command)
|
||||
|
||||
def parse(self, manager: _CommandStub, t: Choice, s: str) -> str:
|
||||
opts = manager.call(t.options_command)
|
||||
if s not in opts:
|
||||
raise exceptions.TypeError("Invalid choice.")
|
||||
return s
|
||||
|
||||
|
||||
class TypeManager:
|
||||
def __init__(self, *types):
|
||||
self.typemap = {}
|
||||
for t in types:
|
||||
self.typemap[t.typ] = t()
|
||||
|
||||
def get(self, t: type, default=None) -> BaseType:
|
||||
if type(t) in self.typemap:
|
||||
return self.typemap[type(t)]
|
||||
return self.typemap.get(t, default)
|
||||
|
||||
|
||||
CommandTypes = TypeManager(
|
||||
ArgType,
|
||||
Bool,
|
||||
ChoiceType,
|
||||
CmdType,
|
||||
CutSpecType,
|
||||
DataType,
|
||||
FlowType,
|
||||
FlowsType,
|
||||
Int,
|
||||
PathType,
|
||||
Str,
|
||||
StrSeq,
|
||||
)
|
@ -4,6 +4,7 @@ from mitmproxy import flow
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy.test import tflow
|
||||
from mitmproxy.test import taddons
|
||||
import mitmproxy.types
|
||||
import io
|
||||
import pytest
|
||||
|
||||
@ -25,7 +26,7 @@ class TAddon:
|
||||
return foo
|
||||
|
||||
@command.command("subcommand")
|
||||
def subcommand(self, cmd: command.Cmd, *args: command.Arg) -> str:
|
||||
def subcommand(self, cmd: mitmproxy.types.Cmd, *args: mitmproxy.types.Arg) -> str:
|
||||
return "ok"
|
||||
|
||||
@command.command("empty")
|
||||
@ -39,12 +40,12 @@ class TAddon:
|
||||
def choices(self) -> typing.Sequence[str]:
|
||||
return ["one", "two", "three"]
|
||||
|
||||
@command.argument("arg", type=command.Choice("choices"))
|
||||
@command.argument("arg", type=mitmproxy.types.Choice("choices"))
|
||||
def choose(self, arg: str) -> typing.Sequence[str]:
|
||||
return ["one", "two", "three"]
|
||||
|
||||
@command.command("path")
|
||||
def path(self, arg: command.Path) -> None:
|
||||
def path(self, arg: mitmproxy.types.Path) -> None:
|
||||
pass
|
||||
|
||||
|
||||
@ -79,45 +80,45 @@ class TestCommand:
|
||||
[
|
||||
"foo bar",
|
||||
[
|
||||
command.ParseResult(value = "foo", type = command.Cmd),
|
||||
command.ParseResult(value = "foo", type = mitmproxy.types.Cmd),
|
||||
command.ParseResult(value = "bar", type = str)
|
||||
],
|
||||
],
|
||||
[
|
||||
"foo 'bar",
|
||||
[
|
||||
command.ParseResult(value = "foo", type = command.Cmd),
|
||||
command.ParseResult(value = "foo", type = mitmproxy.types.Cmd),
|
||||
command.ParseResult(value = "'bar", type = str)
|
||||
]
|
||||
],
|
||||
["a", [command.ParseResult(value = "a", type = command.Cmd)]],
|
||||
["", [command.ParseResult(value = "", type = command.Cmd)]],
|
||||
["a", [command.ParseResult(value = "a", type = mitmproxy.types.Cmd)]],
|
||||
["", [command.ParseResult(value = "", type = mitmproxy.types.Cmd)]],
|
||||
[
|
||||
"cmd3 1",
|
||||
[
|
||||
command.ParseResult(value = "cmd3", type = command.Cmd),
|
||||
command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd),
|
||||
command.ParseResult(value = "1", type = int),
|
||||
]
|
||||
],
|
||||
[
|
||||
"cmd3 ",
|
||||
[
|
||||
command.ParseResult(value = "cmd3", type = command.Cmd),
|
||||
command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd),
|
||||
command.ParseResult(value = "", type = int),
|
||||
]
|
||||
],
|
||||
[
|
||||
"subcommand ",
|
||||
[
|
||||
command.ParseResult(value = "subcommand", type = command.Cmd),
|
||||
command.ParseResult(value = "", type = command.Cmd),
|
||||
command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd),
|
||||
command.ParseResult(value = "", type = mitmproxy.types.Cmd),
|
||||
]
|
||||
],
|
||||
[
|
||||
"subcommand cmd3 ",
|
||||
[
|
||||
command.ParseResult(value = "subcommand", type = command.Cmd),
|
||||
command.ParseResult(value = "cmd3", type = command.Cmd),
|
||||
command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd),
|
||||
command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd),
|
||||
command.ParseResult(value = "", type = int),
|
||||
]
|
||||
],
|
||||
@ -154,15 +155,15 @@ def test_typename():
|
||||
assert command.typename(str) == "str"
|
||||
assert command.typename(typing.Sequence[flow.Flow]) == "[flow]"
|
||||
|
||||
assert command.typename(command.Cuts) == "[cuts]"
|
||||
assert command.typename(typing.Sequence[command.Cut]) == "[cut]"
|
||||
assert command.typename(mitmproxy.types.Data) == "[data]"
|
||||
assert command.typename(mitmproxy.types.CutSpec) == "[cut]"
|
||||
|
||||
assert command.typename(flow.Flow) == "flow"
|
||||
assert command.typename(typing.Sequence[str]) == "[str]"
|
||||
|
||||
assert command.typename(command.Choice("foo")) == "choice"
|
||||
assert command.typename(command.Path) == "path"
|
||||
assert command.typename(command.Cmd) == "cmd"
|
||||
assert command.typename(mitmproxy.types.Choice("foo")) == "choice"
|
||||
assert command.typename(mitmproxy.types.Path) == "path"
|
||||
assert command.typename(mitmproxy.types.Cmd) == "cmd"
|
||||
|
||||
|
||||
class DummyConsole:
|
||||
@ -172,7 +173,7 @@ class DummyConsole:
|
||||
return [tflow.tflow(resp=True)] * n
|
||||
|
||||
@command.command("cut")
|
||||
def cut(self, spec: str) -> command.Cuts:
|
||||
def cut(self, spec: str) -> mitmproxy.types.Data:
|
||||
return [["test"]]
|
||||
|
||||
|
||||
@ -201,10 +202,6 @@ def test_parsearg():
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
command.parsearg(tctx.master.commands, "foo", Exception)
|
||||
|
||||
assert command.parsearg(
|
||||
tctx.master.commands, "foo", command.Cuts
|
||||
) == [["test"]]
|
||||
|
||||
assert command.parsearg(
|
||||
tctx.master.commands, "foo", typing.Sequence[str]
|
||||
) == ["foo"]
|
||||
@ -215,18 +212,18 @@ def test_parsearg():
|
||||
a = TAddon()
|
||||
tctx.master.commands.add("choices", a.choices)
|
||||
assert command.parsearg(
|
||||
tctx.master.commands, "one", command.Choice("choices"),
|
||||
tctx.master.commands, "one", mitmproxy.types.Choice("choices"),
|
||||
) == "one"
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
assert command.parsearg(
|
||||
tctx.master.commands, "invalid", command.Choice("choices"),
|
||||
tctx.master.commands, "invalid", mitmproxy.types.Choice("choices"),
|
||||
)
|
||||
|
||||
assert command.parsearg(
|
||||
tctx.master.commands, "foo", command.Path
|
||||
tctx.master.commands, "foo", mitmproxy.types.Path
|
||||
) == "foo"
|
||||
assert command.parsearg(
|
||||
tctx.master.commands, "foo", command.Cmd
|
||||
tctx.master.commands, "foo", mitmproxy.types.Cmd
|
||||
) == "foo"
|
||||
|
||||
|
||||
@ -272,5 +269,5 @@ def test_choice():
|
||||
basic typechecking for choices should fail as we cannot verify if strings are a valid choice
|
||||
at this point.
|
||||
"""
|
||||
c = command.Choice("foo")
|
||||
c = mitmproxy.types.Choice("foo")
|
||||
assert not typecheck.check_command_type("foo", c)
|
||||
|
0
test/mitmproxy/test_typemanager.py
Normal file
0
test/mitmproxy/test_typemanager.py
Normal file
175
test/mitmproxy/test_types.py
Normal file
175
test/mitmproxy/test_types.py
Normal file
@ -0,0 +1,175 @@
|
||||
import pytest
|
||||
import os
|
||||
import typing
|
||||
import contextlib
|
||||
|
||||
from mitmproxy.test import tutils
|
||||
import mitmproxy.exceptions
|
||||
import mitmproxy.types
|
||||
from mitmproxy.test import taddons
|
||||
from mitmproxy.test import tflow
|
||||
from mitmproxy import command
|
||||
from mitmproxy import flow
|
||||
|
||||
from . import test_command
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def chdir(path: str):
|
||||
old_dir = os.getcwd()
|
||||
os.chdir(path)
|
||||
yield
|
||||
os.chdir(old_dir)
|
||||
|
||||
|
||||
def test_bool():
|
||||
with taddons.context() as tctx:
|
||||
b = mitmproxy.types.Bool()
|
||||
assert b.completion(tctx.master.commands, bool, "b") == ["false", "true"]
|
||||
assert b.parse(tctx.master.commands, bool, "true") is True
|
||||
assert b.parse(tctx.master.commands, bool, "false") is False
|
||||
with pytest.raises(mitmproxy.exceptions.TypeError):
|
||||
b.parse(tctx.master.commands, bool, "foo")
|
||||
|
||||
|
||||
def test_str():
|
||||
with taddons.context() as tctx:
|
||||
b = mitmproxy.types.Str()
|
||||
assert b.completion(tctx.master.commands, str, "") == []
|
||||
assert b.parse(tctx.master.commands, str, "foo") == "foo"
|
||||
|
||||
|
||||
def test_int():
|
||||
with taddons.context() as tctx:
|
||||
b = mitmproxy.types.Int()
|
||||
assert b.completion(tctx.master.commands, int, "b") == []
|
||||
assert b.parse(tctx.master.commands, int, "1") == 1
|
||||
assert b.parse(tctx.master.commands, int, "999") == 999
|
||||
with pytest.raises(mitmproxy.exceptions.TypeError):
|
||||
b.parse(tctx.master.commands, int, "foo")
|
||||
|
||||
|
||||
def test_path():
|
||||
with taddons.context() as tctx:
|
||||
b = mitmproxy.types.PathType()
|
||||
assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/foo") == "/foo"
|
||||
assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/bar") == "/bar"
|
||||
|
||||
def normPathOpts(prefix, match):
|
||||
ret = []
|
||||
for s in b.completion(tctx.master.commands, mitmproxy.types.Path, match):
|
||||
s = s[len(prefix):]
|
||||
s = s.replace(os.sep, "/")
|
||||
ret.append(s)
|
||||
return ret
|
||||
|
||||
cd = os.path.normpath(tutils.test_data.path("mitmproxy/completion"))
|
||||
assert normPathOpts(cd, cd) == ['/aaa', '/aab', '/aac', '/bbb/']
|
||||
assert normPathOpts(cd, os.path.join(cd, "a")) == ['/aaa', '/aab', '/aac']
|
||||
with chdir(cd):
|
||||
assert normPathOpts("", "./") == ['./aaa', './aab', './aac', './bbb/']
|
||||
assert normPathOpts("", "") == ['./aaa', './aab', './aac', './bbb/']
|
||||
assert b.completion(
|
||||
tctx.master.commands, mitmproxy.types.Path, "nonexistent"
|
||||
) == ["nonexistent"]
|
||||
|
||||
|
||||
def test_cmd():
|
||||
with taddons.context() as tctx:
|
||||
tctx.master.addons.add(test_command.TAddon())
|
||||
b = mitmproxy.types.CmdType()
|
||||
assert b.parse(tctx.master.commands, mitmproxy.types.Cmd, "foo") == "foo"
|
||||
assert len(
|
||||
b.completion(tctx.master.commands, mitmproxy.types.Cmd, "")
|
||||
) == len(tctx.master.commands.commands.keys())
|
||||
|
||||
|
||||
def test_cutspec():
|
||||
with taddons.context() as tctx:
|
||||
b = mitmproxy.types.CutSpecType()
|
||||
b.parse(tctx.master.commands, mitmproxy.types.CutSpec, "foo,bar") == ["foo", "bar"]
|
||||
assert b.completion(
|
||||
tctx.master.commands, mitmproxy.types.CutSpec, "request.p"
|
||||
) == b.valid_prefixes
|
||||
ret = b.completion(tctx.master.commands, mitmproxy.types.CutSpec, "request.port,f")
|
||||
assert ret[0].startswith("request.port,")
|
||||
assert len(ret) == len(b.valid_prefixes)
|
||||
|
||||
|
||||
def test_arg():
|
||||
with taddons.context() as tctx:
|
||||
b = mitmproxy.types.ArgType()
|
||||
assert b.completion(tctx.master.commands, mitmproxy.types.Arg, "") == []
|
||||
assert b.parse(tctx.master.commands, mitmproxy.types.Arg, "foo") == "foo"
|
||||
|
||||
|
||||
def test_strseq():
|
||||
with taddons.context() as tctx:
|
||||
b = mitmproxy.types.StrSeq()
|
||||
assert b.completion(tctx.master.commands, typing.Sequence[str], "") == []
|
||||
assert b.parse(tctx.master.commands, typing.Sequence[str], "foo") == ["foo"]
|
||||
assert b.parse(tctx.master.commands, typing.Sequence[str], "foo,bar") == ["foo", "bar"]
|
||||
|
||||
|
||||
class DummyConsole:
|
||||
@command.command("view.resolve")
|
||||
def resolve(self, spec: str) -> typing.Sequence[flow.Flow]:
|
||||
n = int(spec)
|
||||
return [tflow.tflow(resp=True)] * n
|
||||
|
||||
@command.command("cut")
|
||||
def cut(self, spec: str) -> mitmproxy.types.Data:
|
||||
return [["test"]]
|
||||
|
||||
@command.command("options")
|
||||
def options(self) -> typing.Sequence[str]:
|
||||
return ["one", "two", "three"]
|
||||
|
||||
|
||||
def test_flow():
|
||||
with taddons.context() as tctx:
|
||||
tctx.master.addons.add(DummyConsole())
|
||||
b = mitmproxy.types.FlowType()
|
||||
assert len(b.completion(tctx.master.commands, flow.Flow, "")) == len(b.valid_prefixes)
|
||||
assert b.parse(tctx.master.commands, flow.Flow, "1")
|
||||
with pytest.raises(mitmproxy.exceptions.TypeError):
|
||||
assert b.parse(tctx.master.commands, flow.Flow, "0")
|
||||
with pytest.raises(mitmproxy.exceptions.TypeError):
|
||||
assert b.parse(tctx.master.commands, flow.Flow, "2")
|
||||
|
||||
|
||||
def test_flows():
|
||||
with taddons.context() as tctx:
|
||||
tctx.master.addons.add(DummyConsole())
|
||||
b = mitmproxy.types.FlowsType()
|
||||
assert len(
|
||||
b.completion(tctx.master.commands, typing.Sequence[flow.Flow], "")
|
||||
) == len(b.valid_prefixes)
|
||||
assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "0")) == 0
|
||||
assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "1")) == 1
|
||||
assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "2")) == 2
|
||||
|
||||
|
||||
def test_data():
|
||||
with taddons.context() as tctx:
|
||||
b = mitmproxy.types.DataType()
|
||||
with pytest.raises(mitmproxy.exceptions.TypeError):
|
||||
b.parse(tctx.master.commands, mitmproxy.types.Data, "foo")
|
||||
with pytest.raises(mitmproxy.exceptions.TypeError):
|
||||
b.parse(tctx.master.commands, mitmproxy.types.Data, "foo")
|
||||
|
||||
|
||||
def test_choice():
|
||||
with taddons.context() as tctx:
|
||||
tctx.master.addons.add(DummyConsole())
|
||||
b = mitmproxy.types.ChoiceType()
|
||||
comp = b.completion(tctx.master.commands, mitmproxy.types.Choice("options"), "")
|
||||
assert comp == ["one", "two", "three"]
|
||||
assert b.parse(tctx.master.commands, mitmproxy.types.Choice("options"), "one") == "one"
|
||||
with pytest.raises(mitmproxy.exceptions.TypeError):
|
||||
b.parse(tctx.master.commands, mitmproxy.types.Choice("options"), "invalid")
|
||||
|
||||
|
||||
def test_typemanager():
|
||||
assert mitmproxy.types.CommandTypes.get(bool, None)
|
||||
assert mitmproxy.types.CommandTypes.get(mitmproxy.types.Choice("choide"), None)
|
@ -1,36 +1,6 @@
|
||||
import os
|
||||
import contextlib
|
||||
|
||||
from mitmproxy.tools.console.commander import commander
|
||||
from mitmproxy.test import taddons
|
||||
from mitmproxy.test import tutils
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def chdir(path: str):
|
||||
old_dir = os.getcwd()
|
||||
os.chdir(path)
|
||||
yield
|
||||
os.chdir(old_dir)
|
||||
|
||||
|
||||
def normPathOpts(prefix, match):
|
||||
ret = []
|
||||
for s in commander.pathOptions(match):
|
||||
s = s[len(prefix):]
|
||||
s = s.replace(os.sep, "/")
|
||||
ret.append(s)
|
||||
return ret
|
||||
|
||||
|
||||
def test_pathOptions():
|
||||
cd = os.path.normpath(tutils.test_data.path("mitmproxy/completion"))
|
||||
assert normPathOpts(cd, cd) == ['/aaa', '/aab', '/aac', '/bbb/']
|
||||
assert normPathOpts(cd, os.path.join(cd, "a")) == ['/aaa', '/aab', '/aac']
|
||||
with chdir(cd):
|
||||
assert normPathOpts("", "./") == ['./aaa', './aab', './aac', './bbb/']
|
||||
assert normPathOpts("", "") == ['./aaa', './aab', './aac', './bbb/']
|
||||
assert commander.pathOptions("nonexistent") == ["nonexistent"]
|
||||
|
||||
|
||||
class TestListCompleter:
|
||||
|
@ -4,7 +4,6 @@ from unittest import mock
|
||||
import pytest
|
||||
|
||||
from mitmproxy.utils import typecheck
|
||||
from mitmproxy import command
|
||||
|
||||
|
||||
class TBase:
|
||||
@ -95,9 +94,6 @@ def test_check_command_type():
|
||||
assert(typecheck.check_command_type(None, None))
|
||||
assert(not typecheck.check_command_type(["foo"], typing.Sequence[int]))
|
||||
assert(not typecheck.check_command_type("foo", typing.Sequence[int]))
|
||||
assert(typecheck.check_command_type([["foo", b"bar"]], command.Cuts))
|
||||
assert(not typecheck.check_command_type(["foo", b"bar"], command.Cuts))
|
||||
assert(not typecheck.check_command_type([["foo", 22]], command.Cuts))
|
||||
|
||||
# Python 3.5 only defines __parameters__
|
||||
m = mock.Mock()
|
||||
|
Loading…
Reference in New Issue
Block a user