mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-23 00:01:36 +00:00
Merge pull request #2712 from cortesi/types3
types: add validation, use it in commands
This commit is contained in:
commit
04d83d7672
@ -10,11 +10,18 @@ import textwrap
|
||||
import functools
|
||||
import sys
|
||||
|
||||
from mitmproxy.utils import typecheck
|
||||
from mitmproxy import exceptions
|
||||
import mitmproxy.types
|
||||
|
||||
|
||||
def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None:
|
||||
sig = inspect.signature(f)
|
||||
try:
|
||||
sig.bind(*args, **kwargs)
|
||||
except TypeError as v:
|
||||
raise exceptions.CommandError("command argument mismatch: %s" % v.args[0])
|
||||
|
||||
|
||||
def lexer(s):
|
||||
# mypy mis-identifies shlex.shlex as abstract
|
||||
lex = shlex.shlex(s) # type: ignore
|
||||
@ -74,8 +81,7 @@ class Command:
|
||||
Call the command with a list of arguments. At this point, all
|
||||
arguments are strings.
|
||||
"""
|
||||
if not self.has_positional and (len(self.paramtypes) != len(args)):
|
||||
raise exceptions.CommandError("Usage: %s" % self.signature_help())
|
||||
verify_arg_signature(self.func, list(args), {})
|
||||
|
||||
remainder = [] # type: typing.Sequence[str]
|
||||
if self.has_positional:
|
||||
@ -84,37 +90,35 @@ class Command:
|
||||
|
||||
pargs = []
|
||||
for arg, paramtype in zip(args, self.paramtypes):
|
||||
if typecheck.check_command_type(arg, paramtype):
|
||||
pargs.append(arg)
|
||||
else:
|
||||
pargs.append(parsearg(self.manager, arg, paramtype))
|
||||
|
||||
if remainder:
|
||||
chk = typecheck.check_command_type(
|
||||
remainder,
|
||||
typing.Sequence[self.paramtypes[-1]] # type: ignore
|
||||
)
|
||||
if chk:
|
||||
pargs.extend(remainder)
|
||||
else:
|
||||
raise exceptions.CommandError("Invalid value type: %s - expected %s" % (remainder, self.paramtypes[-1]))
|
||||
pargs.append(parsearg(self.manager, arg, paramtype))
|
||||
pargs.extend(remainder)
|
||||
|
||||
with self.manager.master.handlecontext():
|
||||
ret = self.func(*pargs)
|
||||
|
||||
if not typecheck.check_command_type(ret, self.returntype):
|
||||
raise exceptions.CommandError("Command returned unexpected data")
|
||||
|
||||
if ret is None and self.returntype is None:
|
||||
return
|
||||
typ = mitmproxy.types.CommandTypes.get(self.returntype)
|
||||
if not typ.is_valid(self.manager, typ, ret):
|
||||
raise exceptions.CommandError(
|
||||
"%s returned unexpected data - expected %s" % (
|
||||
self.path, typ.display
|
||||
)
|
||||
)
|
||||
return ret
|
||||
|
||||
|
||||
ParseResult = typing.NamedTuple(
|
||||
"ParseResult",
|
||||
[("value", str), ("type", typing.Type)],
|
||||
[
|
||||
("value", str),
|
||||
("type", typing.Type),
|
||||
("valid", bool),
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class CommandManager:
|
||||
class CommandManager(mitmproxy.types._CommandBase):
|
||||
def __init__(self, master):
|
||||
self.master = master
|
||||
self.commands = {}
|
||||
@ -161,13 +165,29 @@ class CommandManager:
|
||||
params.extend(self.commands[parts[i]].paramtypes)
|
||||
elif params:
|
||||
typ = params.pop(0)
|
||||
# FIXME: Do we need to check that Arg is positional?
|
||||
if typ == mitmproxy.types.Cmd and params and params[0] == mitmproxy.types.Arg:
|
||||
if parts[i] in self.commands:
|
||||
params[:] = self.commands[parts[i]].paramtypes
|
||||
else:
|
||||
typ = str
|
||||
parse.append(ParseResult(value=parts[i], type=typ))
|
||||
|
||||
to = mitmproxy.types.CommandTypes.get(typ, None)
|
||||
valid = False
|
||||
if to:
|
||||
try:
|
||||
to.parse(self, typ, parts[i])
|
||||
except exceptions.TypeError:
|
||||
valid = False
|
||||
else:
|
||||
valid = True
|
||||
|
||||
parse.append(
|
||||
ParseResult(
|
||||
value=parts[i],
|
||||
type=typ,
|
||||
valid=valid,
|
||||
)
|
||||
)
|
||||
return parse
|
||||
|
||||
def call_args(self, path: str, args: typing.Sequence[str]) -> typing.Any:
|
||||
@ -210,14 +230,6 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
|
||||
raise exceptions.CommandError from e
|
||||
|
||||
|
||||
def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None:
|
||||
sig = inspect.signature(f)
|
||||
try:
|
||||
sig.bind(*args, **kwargs)
|
||||
except TypeError as v:
|
||||
raise exceptions.CommandError("Argument mismatch: %s" % v.args[0])
|
||||
|
||||
|
||||
def command(path):
|
||||
def decorator(function):
|
||||
@functools.wraps(function)
|
||||
|
@ -104,7 +104,7 @@ class ConsoleAddon:
|
||||
@command.command("console.layout.options")
|
||||
def layout_options(self) -> typing.Sequence[str]:
|
||||
"""
|
||||
Returns the available options for the consoler_layout option.
|
||||
Returns the available options for the console_layout option.
|
||||
"""
|
||||
return ["single", "vertical", "horizontal"]
|
||||
|
||||
@ -243,7 +243,11 @@ class ConsoleAddon:
|
||||
|
||||
@command.command("console.choose.cmd")
|
||||
def console_choose_cmd(
|
||||
self, prompt: str, choicecmd: mitmproxy.types.Cmd, *cmd: mitmproxy.types.Arg
|
||||
self,
|
||||
prompt: str,
|
||||
choicecmd: mitmproxy.types.Cmd,
|
||||
subcmd: mitmproxy.types.Cmd,
|
||||
*args: mitmproxy.types.Arg
|
||||
) -> None:
|
||||
"""
|
||||
Prompt the user to choose from a list of strings returned by a
|
||||
@ -254,10 +258,10 @@ class ConsoleAddon:
|
||||
|
||||
def callback(opt):
|
||||
# We're now outside of the call context...
|
||||
repl = " ".join(cmd)
|
||||
repl = " ".join(args)
|
||||
repl = repl.replace("{choice}", opt)
|
||||
try:
|
||||
self.master.commands.call(repl)
|
||||
self.master.commands.call(subcmd + " " + repl)
|
||||
except exceptions.CommandError as e:
|
||||
signals.status_message.send(message=str(e))
|
||||
|
||||
|
@ -41,7 +41,7 @@ def map(km):
|
||||
"e",
|
||||
"""
|
||||
console.choose.cmd Format export.formats
|
||||
console.command export.file {choice} @focus ''
|
||||
console.command export.file {choice} @focus
|
||||
""",
|
||||
["flowlist", "flowview"],
|
||||
"Export this flow to file"
|
||||
|
@ -40,39 +40,55 @@ class Choice:
|
||||
# annotations can cause circular dependencies where there were none before.
|
||||
# Rather than putting types and the CommandManger in the same file, we introduce
|
||||
# a stub type with the signature we use.
|
||||
class _CommandStub:
|
||||
commands = {} # type: typing.Mapping[str, typing.Any]
|
||||
class _CommandBase:
|
||||
commands = {} # type: typing.MutableMapping[str, typing.Any]
|
||||
|
||||
def call_args(self, path: str, args: typing.Sequence[str]) -> typing.Any: # pragma: no cover
|
||||
pass
|
||||
def call_args(self, path: str, args: typing.Sequence[str]) -> typing.Any:
|
||||
raise NotImplementedError
|
||||
|
||||
def call(self, args: typing.Sequence[str]) -> typing.Any: # pragma: no cover
|
||||
pass
|
||||
def call(self, cmd: str) -> typing.Any:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class BaseType:
|
||||
class _BaseType:
|
||||
typ = object # type: typing.Type
|
||||
display = "" # type: str
|
||||
|
||||
def completion(
|
||||
self, manager: _CommandStub, t: type, s: str
|
||||
) -> typing.Sequence[str]: # pragma: no cover
|
||||
pass
|
||||
self, manager: _CommandBase, t: typing.Any, s: str
|
||||
) -> typing.Sequence[str]:
|
||||
"""
|
||||
Returns a list of completion strings for a given prefix. The strings
|
||||
returned don't necessarily need to be suffixes of the prefix, since
|
||||
completers will do prefix filtering themselves..
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def parse(
|
||||
self, manager: _CommandStub, t: type, s: str
|
||||
) -> typing.Any: # pragma: no cover
|
||||
pass
|
||||
self, manager: _CommandBase, typ: typing.Any, s: str
|
||||
) -> typing.Any:
|
||||
"""
|
||||
Parse a string, given the specific type instance (to allow rich type annotations like Choice) and a string.
|
||||
|
||||
Raises exceptions.TypeError if the value is invalid.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
|
||||
"""
|
||||
Check if data is valid for this type.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class Bool(BaseType):
|
||||
class _BoolType(_BaseType):
|
||||
typ = bool
|
||||
display = "bool"
|
||||
|
||||
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
|
||||
def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
|
||||
return ["false", "true"]
|
||||
|
||||
def parse(self, manager: _CommandStub, t: type, s: str) -> bool:
|
||||
def parse(self, manager: _CommandBase, t: type, s: str) -> bool:
|
||||
if s == "true":
|
||||
return True
|
||||
elif s == "false":
|
||||
@ -82,37 +98,46 @@ class Bool(BaseType):
|
||||
"Booleans are 'true' or 'false', got %s" % s
|
||||
)
|
||||
|
||||
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
|
||||
return val in [True, False]
|
||||
|
||||
class Str(BaseType):
|
||||
|
||||
class _StrType(_BaseType):
|
||||
typ = str
|
||||
display = "str"
|
||||
|
||||
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
|
||||
def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
|
||||
return []
|
||||
|
||||
def parse(self, manager: _CommandStub, t: type, s: str) -> str:
|
||||
def parse(self, manager: _CommandBase, t: type, s: str) -> str:
|
||||
return s
|
||||
|
||||
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
|
||||
return isinstance(val, str)
|
||||
|
||||
class Int(BaseType):
|
||||
|
||||
class _IntType(_BaseType):
|
||||
typ = int
|
||||
display = "int"
|
||||
|
||||
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
|
||||
def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
|
||||
return []
|
||||
|
||||
def parse(self, manager: _CommandStub, t: type, s: str) -> int:
|
||||
def parse(self, manager: _CommandBase, t: type, s: str) -> int:
|
||||
try:
|
||||
return int(s)
|
||||
except ValueError as e:
|
||||
raise exceptions.TypeError from e
|
||||
|
||||
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
|
||||
return isinstance(val, int)
|
||||
|
||||
class PathType(BaseType):
|
||||
|
||||
class _PathType(_BaseType):
|
||||
typ = Path
|
||||
display = "path"
|
||||
|
||||
def completion(self, manager: _CommandStub, t: type, start: str) -> typing.Sequence[str]:
|
||||
def completion(self, manager: _CommandBase, t: type, start: str) -> typing.Sequence[str]:
|
||||
if not start:
|
||||
start = "./"
|
||||
path = os.path.expanduser(start)
|
||||
@ -134,44 +159,64 @@ class PathType(BaseType):
|
||||
ret.sort()
|
||||
return ret
|
||||
|
||||
def parse(self, manager: _CommandStub, t: type, s: str) -> str:
|
||||
def parse(self, manager: _CommandBase, t: type, s: str) -> str:
|
||||
return s
|
||||
|
||||
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
|
||||
return isinstance(val, str)
|
||||
|
||||
class CmdType(BaseType):
|
||||
|
||||
class _CmdType(_BaseType):
|
||||
typ = Cmd
|
||||
display = "cmd"
|
||||
|
||||
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
|
||||
def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
|
||||
return list(manager.commands.keys())
|
||||
|
||||
def parse(self, manager: _CommandStub, t: type, s: str) -> str:
|
||||
def parse(self, manager: _CommandBase, t: type, s: str) -> str:
|
||||
return s
|
||||
|
||||
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
|
||||
return val in manager.commands
|
||||
|
||||
class ArgType(BaseType):
|
||||
|
||||
class _ArgType(_BaseType):
|
||||
typ = Arg
|
||||
display = "arg"
|
||||
|
||||
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
|
||||
def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
|
||||
return []
|
||||
|
||||
def parse(self, manager: _CommandStub, t: type, s: str) -> str:
|
||||
def parse(self, manager: _CommandBase, t: type, s: str) -> str:
|
||||
return s
|
||||
|
||||
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
|
||||
return isinstance(val, str)
|
||||
|
||||
class StrSeq(BaseType):
|
||||
|
||||
class _StrSeqType(_BaseType):
|
||||
typ = typing.Sequence[str]
|
||||
display = "[str]"
|
||||
|
||||
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
|
||||
def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
|
||||
return []
|
||||
|
||||
def parse(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
|
||||
def parse(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
|
||||
return [x.strip() for x in s.split(",")]
|
||||
|
||||
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
|
||||
if isinstance(val, str) or isinstance(val, bytes):
|
||||
return False
|
||||
try:
|
||||
for v in val:
|
||||
if not isinstance(v, str):
|
||||
return False
|
||||
except TypeError:
|
||||
return False
|
||||
return True
|
||||
|
||||
class CutSpecType(BaseType):
|
||||
|
||||
class _CutSpecType(_BaseType):
|
||||
typ = CutSpec
|
||||
display = "[cut]"
|
||||
valid_prefixes = [
|
||||
@ -212,7 +257,7 @@ class CutSpecType(BaseType):
|
||||
"server_conn.ssl_established",
|
||||
]
|
||||
|
||||
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
|
||||
def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
|
||||
spec = s.split(",")
|
||||
opts = []
|
||||
for pref in self.valid_prefixes:
|
||||
@ -220,19 +265,33 @@ class CutSpecType(BaseType):
|
||||
opts.append(",".join(spec))
|
||||
return opts
|
||||
|
||||
def parse(self, manager: _CommandStub, t: type, s: str) -> CutSpec:
|
||||
def parse(self, manager: _CommandBase, t: type, s: str) -> CutSpec:
|
||||
parts = s.split(",") # type: typing.Any
|
||||
return parts
|
||||
|
||||
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
|
||||
if not isinstance(val, str):
|
||||
return False
|
||||
parts = [x.strip() for x in val.split(",")]
|
||||
for p in parts:
|
||||
for pref in self.valid_prefixes:
|
||||
if p.startswith(pref):
|
||||
break
|
||||
else:
|
||||
return False
|
||||
return True
|
||||
|
||||
class BaseFlowType(BaseType):
|
||||
valid_prefixes = [
|
||||
|
||||
class _BaseFlowType(_BaseType):
|
||||
viewmarkers = [
|
||||
"@all",
|
||||
"@focus",
|
||||
"@shown",
|
||||
"@hidden",
|
||||
"@marked",
|
||||
"@unmarked",
|
||||
]
|
||||
valid_prefixes = viewmarkers + [
|
||||
"~q",
|
||||
"~s",
|
||||
"~a",
|
||||
@ -248,15 +307,15 @@ class BaseFlowType(BaseType):
|
||||
"~c",
|
||||
]
|
||||
|
||||
def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]:
|
||||
def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
|
||||
return self.valid_prefixes
|
||||
|
||||
|
||||
class FlowType(BaseFlowType):
|
||||
class _FlowType(_BaseFlowType):
|
||||
typ = flow.Flow
|
||||
display = "flow"
|
||||
|
||||
def parse(self, manager: _CommandStub, t: type, s: str) -> flow.Flow:
|
||||
def parse(self, manager: _CommandBase, t: type, s: str) -> flow.Flow:
|
||||
flows = manager.call_args("view.resolve", [s])
|
||||
if len(flows) != 1:
|
||||
raise exceptions.TypeError(
|
||||
@ -264,43 +323,73 @@ class FlowType(BaseFlowType):
|
||||
)
|
||||
return flows[0]
|
||||
|
||||
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
|
||||
return isinstance(val, flow.Flow)
|
||||
|
||||
class FlowsType(BaseFlowType):
|
||||
|
||||
class _FlowsType(_BaseFlowType):
|
||||
typ = typing.Sequence[flow.Flow]
|
||||
display = "[flow]"
|
||||
|
||||
def parse(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[flow.Flow]:
|
||||
def parse(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[flow.Flow]:
|
||||
return manager.call_args("view.resolve", [s])
|
||||
|
||||
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
|
||||
try:
|
||||
for v in val:
|
||||
if not isinstance(v, flow.Flow):
|
||||
return False
|
||||
except TypeError:
|
||||
return False
|
||||
return True
|
||||
|
||||
class DataType:
|
||||
|
||||
class _DataType(_BaseType):
|
||||
typ = Data
|
||||
display = "[data]"
|
||||
|
||||
def completion(
|
||||
self, manager: _CommandStub, t: type, s: str
|
||||
self, manager: _CommandBase, t: type, s: str
|
||||
) -> typing.Sequence[str]: # pragma: no cover
|
||||
raise exceptions.TypeError("data cannot be passed as argument")
|
||||
|
||||
def parse(
|
||||
self, manager: _CommandStub, t: type, s: str
|
||||
self, manager: _CommandBase, t: type, s: str
|
||||
) -> typing.Any: # pragma: no cover
|
||||
raise exceptions.TypeError("data cannot be passed as argument")
|
||||
|
||||
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
|
||||
# FIXME: validate that all rows have equal length, and all columns have equal types
|
||||
try:
|
||||
for row in val:
|
||||
for cell in row:
|
||||
if not (isinstance(cell, str) or isinstance(cell, bytes)):
|
||||
return False
|
||||
except TypeError:
|
||||
return False
|
||||
return True
|
||||
|
||||
class ChoiceType:
|
||||
|
||||
class _ChoiceType(_BaseType):
|
||||
typ = Choice
|
||||
display = "choice"
|
||||
|
||||
def completion(self, manager: _CommandStub, t: Choice, s: str) -> typing.Sequence[str]:
|
||||
def completion(self, manager: _CommandBase, t: Choice, s: str) -> typing.Sequence[str]:
|
||||
return manager.call(t.options_command)
|
||||
|
||||
def parse(self, manager: _CommandStub, t: Choice, s: str) -> str:
|
||||
def parse(self, manager: _CommandBase, t: Choice, s: str) -> str:
|
||||
opts = manager.call(t.options_command)
|
||||
if s not in opts:
|
||||
raise exceptions.TypeError("Invalid choice.")
|
||||
return s
|
||||
|
||||
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
|
||||
try:
|
||||
opts = manager.call(typ.options_command)
|
||||
except exceptions.CommandError:
|
||||
return False
|
||||
return val in opts
|
||||
|
||||
|
||||
class TypeManager:
|
||||
def __init__(self, *types):
|
||||
@ -308,23 +397,23 @@ class TypeManager:
|
||||
for t in types:
|
||||
self.typemap[t.typ] = t()
|
||||
|
||||
def get(self, t: type, default=None) -> BaseType:
|
||||
def get(self, t: type, default=None) -> _BaseType:
|
||||
if type(t) in self.typemap:
|
||||
return self.typemap[type(t)]
|
||||
return self.typemap.get(t, default)
|
||||
|
||||
|
||||
CommandTypes = TypeManager(
|
||||
ArgType,
|
||||
Bool,
|
||||
ChoiceType,
|
||||
CmdType,
|
||||
CutSpecType,
|
||||
DataType,
|
||||
FlowType,
|
||||
FlowsType,
|
||||
Int,
|
||||
PathType,
|
||||
Str,
|
||||
StrSeq,
|
||||
_ArgType,
|
||||
_BoolType,
|
||||
_ChoiceType,
|
||||
_CmdType,
|
||||
_CutSpecType,
|
||||
_DataType,
|
||||
_FlowType,
|
||||
_FlowsType,
|
||||
_IntType,
|
||||
_PathType,
|
||||
_StrType,
|
||||
_StrSeqType,
|
||||
)
|
||||
|
@ -1,41 +1,6 @@
|
||||
import typing
|
||||
|
||||
|
||||
def check_command_type(value: typing.Any, typeinfo: typing.Any) -> bool:
|
||||
"""
|
||||
Check if the provided value is an instance of typeinfo. Returns True if the
|
||||
types match, False otherwise. This function supports only those types
|
||||
required for command return values.
|
||||
"""
|
||||
typename = str(typeinfo)
|
||||
if typename.startswith("typing.Sequence"):
|
||||
try:
|
||||
T = typeinfo.__args__[0] # type: ignore
|
||||
except AttributeError:
|
||||
# Python 3.5.0
|
||||
T = typeinfo.__parameters__[0] # type: ignore
|
||||
if not isinstance(value, (tuple, list)):
|
||||
return False
|
||||
for v in value:
|
||||
if not check_command_type(v, T):
|
||||
return False
|
||||
elif typename.startswith("typing.Union"):
|
||||
try:
|
||||
types = typeinfo.__args__ # type: ignore
|
||||
except AttributeError:
|
||||
# Python 3.5.x
|
||||
types = typeinfo.__union_params__ # type: ignore
|
||||
for T in types:
|
||||
checks = [check_command_type(value, T) for T in types]
|
||||
if not any(checks):
|
||||
return False
|
||||
elif value is None and typeinfo is None:
|
||||
return True
|
||||
elif not isinstance(value, typeinfo):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def check_option_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None:
|
||||
"""
|
||||
Check if the provided value is an instance of typeinfo and raises a
|
||||
|
@ -8,8 +8,6 @@ import mitmproxy.types
|
||||
import io
|
||||
import pytest
|
||||
|
||||
from mitmproxy.utils import typecheck
|
||||
|
||||
|
||||
class TAddon:
|
||||
@command.command("cmd1")
|
||||
@ -80,46 +78,46 @@ class TestCommand:
|
||||
[
|
||||
"foo bar",
|
||||
[
|
||||
command.ParseResult(value = "foo", type = mitmproxy.types.Cmd),
|
||||
command.ParseResult(value = "bar", type = str)
|
||||
command.ParseResult(value = "foo", type = mitmproxy.types.Cmd, valid = True),
|
||||
command.ParseResult(value = "bar", type = str, valid = True)
|
||||
],
|
||||
],
|
||||
[
|
||||
"foo 'bar",
|
||||
[
|
||||
command.ParseResult(value = "foo", type = mitmproxy.types.Cmd),
|
||||
command.ParseResult(value = "'bar", type = str)
|
||||
command.ParseResult(value = "foo", type = mitmproxy.types.Cmd, valid = True),
|
||||
command.ParseResult(value = "'bar", type = str, valid = True)
|
||||
]
|
||||
],
|
||||
["a", [command.ParseResult(value = "a", type = mitmproxy.types.Cmd)]],
|
||||
["", [command.ParseResult(value = "", type = mitmproxy.types.Cmd)]],
|
||||
["a", [command.ParseResult(value = "a", type = mitmproxy.types.Cmd, valid = True)]],
|
||||
["", [command.ParseResult(value = "", type = mitmproxy.types.Cmd, valid = True)]],
|
||||
[
|
||||
"cmd3 1",
|
||||
[
|
||||
command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd),
|
||||
command.ParseResult(value = "1", type = int),
|
||||
command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True),
|
||||
command.ParseResult(value = "1", type = int, valid = True),
|
||||
]
|
||||
],
|
||||
[
|
||||
"cmd3 ",
|
||||
[
|
||||
command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd),
|
||||
command.ParseResult(value = "", type = int),
|
||||
command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True),
|
||||
command.ParseResult(value = "", type = int, valid = False),
|
||||
]
|
||||
],
|
||||
[
|
||||
"subcommand ",
|
||||
[
|
||||
command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd),
|
||||
command.ParseResult(value = "", type = mitmproxy.types.Cmd),
|
||||
command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd, valid = True),
|
||||
command.ParseResult(value = "", type = mitmproxy.types.Cmd, valid = True),
|
||||
]
|
||||
],
|
||||
[
|
||||
"subcommand cmd3 ",
|
||||
[
|
||||
command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd),
|
||||
command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd),
|
||||
command.ParseResult(value = "", type = int),
|
||||
command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd, valid = True),
|
||||
command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True),
|
||||
command.ParseResult(value = "", type = int, valid = False),
|
||||
]
|
||||
],
|
||||
]
|
||||
@ -140,7 +138,7 @@ def test_simple():
|
||||
c.call("nonexistent")
|
||||
with pytest.raises(exceptions.CommandError, match="Invalid"):
|
||||
c.call("")
|
||||
with pytest.raises(exceptions.CommandError, match="Usage"):
|
||||
with pytest.raises(exceptions.CommandError, match="argument mismatch"):
|
||||
c.call("one.two too many args")
|
||||
|
||||
c.add("empty", a.empty)
|
||||
@ -262,12 +260,3 @@ def test_verify_arg_signature():
|
||||
command.verify_arg_signature(lambda: None, [1, 2], {})
|
||||
print('hello there')
|
||||
command.verify_arg_signature(lambda a, b: None, [1, 2], {})
|
||||
|
||||
|
||||
def test_choice():
|
||||
"""
|
||||
basic typechecking for choices should fail as we cannot verify if strings are a valid choice
|
||||
at this point.
|
||||
"""
|
||||
c = mitmproxy.types.Choice("foo")
|
||||
assert not typecheck.check_command_type("foo", c)
|
||||
|
@ -24,24 +24,30 @@ def chdir(path: str):
|
||||
|
||||
def test_bool():
|
||||
with taddons.context() as tctx:
|
||||
b = mitmproxy.types.Bool()
|
||||
b = mitmproxy.types._BoolType()
|
||||
assert b.completion(tctx.master.commands, bool, "b") == ["false", "true"]
|
||||
assert b.parse(tctx.master.commands, bool, "true") is True
|
||||
assert b.parse(tctx.master.commands, bool, "false") is False
|
||||
assert b.is_valid(tctx.master.commands, bool, True) is True
|
||||
assert b.is_valid(tctx.master.commands, bool, "foo") is False
|
||||
with pytest.raises(mitmproxy.exceptions.TypeError):
|
||||
b.parse(tctx.master.commands, bool, "foo")
|
||||
|
||||
|
||||
def test_str():
|
||||
with taddons.context() as tctx:
|
||||
b = mitmproxy.types.Str()
|
||||
b = mitmproxy.types._StrType()
|
||||
assert b.is_valid(tctx.master.commands, str, "foo") is True
|
||||
assert b.is_valid(tctx.master.commands, str, 1) is False
|
||||
assert b.completion(tctx.master.commands, str, "") == []
|
||||
assert b.parse(tctx.master.commands, str, "foo") == "foo"
|
||||
|
||||
|
||||
def test_int():
|
||||
with taddons.context() as tctx:
|
||||
b = mitmproxy.types.Int()
|
||||
b = mitmproxy.types._IntType()
|
||||
assert b.is_valid(tctx.master.commands, int, "foo") is False
|
||||
assert b.is_valid(tctx.master.commands, int, 1) is True
|
||||
assert b.completion(tctx.master.commands, int, "b") == []
|
||||
assert b.parse(tctx.master.commands, int, "1") == 1
|
||||
assert b.parse(tctx.master.commands, int, "999") == 999
|
||||
@ -51,9 +57,11 @@ def test_int():
|
||||
|
||||
def test_path():
|
||||
with taddons.context() as tctx:
|
||||
b = mitmproxy.types.PathType()
|
||||
b = mitmproxy.types._PathType()
|
||||
assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/foo") == "/foo"
|
||||
assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/bar") == "/bar"
|
||||
assert b.is_valid(tctx.master.commands, mitmproxy.types.Path, "foo") is True
|
||||
assert b.is_valid(tctx.master.commands, mitmproxy.types.Path, 3) is False
|
||||
|
||||
def normPathOpts(prefix, match):
|
||||
ret = []
|
||||
@ -77,7 +85,9 @@ def test_path():
|
||||
def test_cmd():
|
||||
with taddons.context() as tctx:
|
||||
tctx.master.addons.add(test_command.TAddon())
|
||||
b = mitmproxy.types.CmdType()
|
||||
b = mitmproxy.types._CmdType()
|
||||
assert b.is_valid(tctx.master.commands, mitmproxy.types.Cmd, "foo") is False
|
||||
assert b.is_valid(tctx.master.commands, mitmproxy.types.Cmd, "cmd1") is True
|
||||
assert b.parse(tctx.master.commands, mitmproxy.types.Cmd, "foo") == "foo"
|
||||
assert len(
|
||||
b.completion(tctx.master.commands, mitmproxy.types.Cmd, "")
|
||||
@ -86,8 +96,12 @@ def test_cmd():
|
||||
|
||||
def test_cutspec():
|
||||
with taddons.context() as tctx:
|
||||
b = mitmproxy.types.CutSpecType()
|
||||
b = mitmproxy.types._CutSpecType()
|
||||
b.parse(tctx.master.commands, mitmproxy.types.CutSpec, "foo,bar") == ["foo", "bar"]
|
||||
assert b.is_valid(tctx.master.commands, mitmproxy.types.CutSpec, 1) is False
|
||||
assert b.is_valid(tctx.master.commands, mitmproxy.types.CutSpec, "foo") is False
|
||||
assert b.is_valid(tctx.master.commands, mitmproxy.types.CutSpec, "request.path") is True
|
||||
|
||||
assert b.completion(
|
||||
tctx.master.commands, mitmproxy.types.CutSpec, "request.p"
|
||||
) == b.valid_prefixes
|
||||
@ -98,17 +112,23 @@ def test_cutspec():
|
||||
|
||||
def test_arg():
|
||||
with taddons.context() as tctx:
|
||||
b = mitmproxy.types.ArgType()
|
||||
b = mitmproxy.types._ArgType()
|
||||
assert b.completion(tctx.master.commands, mitmproxy.types.Arg, "") == []
|
||||
assert b.parse(tctx.master.commands, mitmproxy.types.Arg, "foo") == "foo"
|
||||
assert b.is_valid(tctx.master.commands, mitmproxy.types.Arg, "foo") is True
|
||||
assert b.is_valid(tctx.master.commands, mitmproxy.types.Arg, 1) is False
|
||||
|
||||
|
||||
def test_strseq():
|
||||
with taddons.context() as tctx:
|
||||
b = mitmproxy.types.StrSeq()
|
||||
b = mitmproxy.types._StrSeqType()
|
||||
assert b.completion(tctx.master.commands, typing.Sequence[str], "") == []
|
||||
assert b.parse(tctx.master.commands, typing.Sequence[str], "foo") == ["foo"]
|
||||
assert b.parse(tctx.master.commands, typing.Sequence[str], "foo,bar") == ["foo", "bar"]
|
||||
assert b.is_valid(tctx.master.commands, typing.Sequence[str], ["foo"]) is True
|
||||
assert b.is_valid(tctx.master.commands, typing.Sequence[str], ["a", "b", 3]) is False
|
||||
assert b.is_valid(tctx.master.commands, typing.Sequence[str], 1) is False
|
||||
assert b.is_valid(tctx.master.commands, typing.Sequence[str], "foo") is False
|
||||
|
||||
|
||||
class DummyConsole:
|
||||
@ -129,9 +149,11 @@ class DummyConsole:
|
||||
def test_flow():
|
||||
with taddons.context() as tctx:
|
||||
tctx.master.addons.add(DummyConsole())
|
||||
b = mitmproxy.types.FlowType()
|
||||
b = mitmproxy.types._FlowType()
|
||||
assert len(b.completion(tctx.master.commands, flow.Flow, "")) == len(b.valid_prefixes)
|
||||
assert b.parse(tctx.master.commands, flow.Flow, "1")
|
||||
assert b.is_valid(tctx.master.commands, flow.Flow, tflow.tflow()) is True
|
||||
assert b.is_valid(tctx.master.commands, flow.Flow, "xx") is False
|
||||
with pytest.raises(mitmproxy.exceptions.TypeError):
|
||||
assert b.parse(tctx.master.commands, flow.Flow, "0")
|
||||
with pytest.raises(mitmproxy.exceptions.TypeError):
|
||||
@ -141,10 +163,13 @@ def test_flow():
|
||||
def test_flows():
|
||||
with taddons.context() as tctx:
|
||||
tctx.master.addons.add(DummyConsole())
|
||||
b = mitmproxy.types.FlowsType()
|
||||
b = mitmproxy.types._FlowsType()
|
||||
assert len(
|
||||
b.completion(tctx.master.commands, typing.Sequence[flow.Flow], "")
|
||||
) == len(b.valid_prefixes)
|
||||
assert b.is_valid(tctx.master.commands, typing.Sequence[flow.Flow], [tflow.tflow()]) is True
|
||||
assert b.is_valid(tctx.master.commands, typing.Sequence[flow.Flow], "xx") is False
|
||||
assert b.is_valid(tctx.master.commands, typing.Sequence[flow.Flow], 0) is False
|
||||
assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "0")) == 0
|
||||
assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "1")) == 1
|
||||
assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "2")) == 2
|
||||
@ -152,7 +177,12 @@ def test_flows():
|
||||
|
||||
def test_data():
|
||||
with taddons.context() as tctx:
|
||||
b = mitmproxy.types.DataType()
|
||||
b = mitmproxy.types._DataType()
|
||||
assert b.is_valid(tctx.master.commands, mitmproxy.types.Data, 0) is False
|
||||
assert b.is_valid(tctx.master.commands, mitmproxy.types.Data, []) is True
|
||||
assert b.is_valid(tctx.master.commands, mitmproxy.types.Data, [["x"]]) is True
|
||||
assert b.is_valid(tctx.master.commands, mitmproxy.types.Data, [[b"x"]]) is True
|
||||
assert b.is_valid(tctx.master.commands, mitmproxy.types.Data, [[1]]) is False
|
||||
with pytest.raises(mitmproxy.exceptions.TypeError):
|
||||
b.parse(tctx.master.commands, mitmproxy.types.Data, "foo")
|
||||
with pytest.raises(mitmproxy.exceptions.TypeError):
|
||||
@ -162,7 +192,22 @@ def test_data():
|
||||
def test_choice():
|
||||
with taddons.context() as tctx:
|
||||
tctx.master.addons.add(DummyConsole())
|
||||
b = mitmproxy.types.ChoiceType()
|
||||
b = mitmproxy.types._ChoiceType()
|
||||
assert b.is_valid(
|
||||
tctx.master.commands,
|
||||
mitmproxy.types.Choice("options"),
|
||||
"one",
|
||||
) is True
|
||||
assert b.is_valid(
|
||||
tctx.master.commands,
|
||||
mitmproxy.types.Choice("options"),
|
||||
"invalid",
|
||||
) is False
|
||||
assert b.is_valid(
|
||||
tctx.master.commands,
|
||||
mitmproxy.types.Choice("nonexistent"),
|
||||
"invalid",
|
||||
) is False
|
||||
comp = b.completion(tctx.master.commands, mitmproxy.types.Choice("options"), "")
|
||||
assert comp == ["one", "two", "three"]
|
||||
assert b.parse(tctx.master.commands, mitmproxy.types.Choice("options"), "one") == "one"
|
||||
|
@ -87,28 +87,6 @@ def test_check_any():
|
||||
typecheck.check_option_type("foo", None, typing.Any)
|
||||
|
||||
|
||||
def test_check_command_type():
|
||||
assert(typecheck.check_command_type("foo", str))
|
||||
assert(typecheck.check_command_type(["foo"], typing.Sequence[str]))
|
||||
assert(not typecheck.check_command_type(["foo", 1], typing.Sequence[str]))
|
||||
assert(typecheck.check_command_type(None, None))
|
||||
assert(not typecheck.check_command_type(["foo"], typing.Sequence[int]))
|
||||
assert(not typecheck.check_command_type("foo", typing.Sequence[int]))
|
||||
|
||||
# Python 3.5 only defines __parameters__
|
||||
m = mock.Mock()
|
||||
m.__str__ = lambda self: "typing.Sequence"
|
||||
m.__parameters__ = (int,)
|
||||
|
||||
typecheck.check_command_type([10], m)
|
||||
|
||||
# Python 3.5 only defines __union_params__
|
||||
m = mock.Mock()
|
||||
m.__str__ = lambda self: "typing.Union"
|
||||
m.__union_params__ = (int,)
|
||||
assert not typecheck.check_command_type([22], m)
|
||||
|
||||
|
||||
def test_typesec_to_str():
|
||||
assert(typecheck.typespec_to_str(str)) == "str"
|
||||
assert(typecheck.typespec_to_str(typing.Sequence[str])) == "sequence of str"
|
||||
|
Loading…
Reference in New Issue
Block a user