mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2025-02-02 00:05:27 +00:00
types: use new type validation mechanism in commands
This commit is contained in:
parent
cda14830d3
commit
6563feaf05
@ -10,11 +10,18 @@ import textwrap
|
|||||||
import functools
|
import functools
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
from mitmproxy.utils import typecheck
|
|
||||||
from mitmproxy import exceptions
|
from mitmproxy import exceptions
|
||||||
import mitmproxy.types
|
import mitmproxy.types
|
||||||
|
|
||||||
|
|
||||||
|
def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None:
|
||||||
|
sig = inspect.signature(f)
|
||||||
|
try:
|
||||||
|
sig.bind(*args, **kwargs)
|
||||||
|
except TypeError as v:
|
||||||
|
raise exceptions.CommandError("command argument mismatch: %s" % v.args[0])
|
||||||
|
|
||||||
|
|
||||||
def lexer(s):
|
def lexer(s):
|
||||||
# mypy mis-identifies shlex.shlex as abstract
|
# mypy mis-identifies shlex.shlex as abstract
|
||||||
lex = shlex.shlex(s) # type: ignore
|
lex = shlex.shlex(s) # type: ignore
|
||||||
@ -74,8 +81,7 @@ class Command:
|
|||||||
Call the command with a list of arguments. At this point, all
|
Call the command with a list of arguments. At this point, all
|
||||||
arguments are strings.
|
arguments are strings.
|
||||||
"""
|
"""
|
||||||
if not self.has_positional and (len(self.paramtypes) != len(args)):
|
verify_arg_signature(self.func, list(args), {})
|
||||||
raise exceptions.CommandError("Usage: %s" % self.signature_help())
|
|
||||||
|
|
||||||
remainder = [] # type: typing.Sequence[str]
|
remainder = [] # type: typing.Sequence[str]
|
||||||
if self.has_positional:
|
if self.has_positional:
|
||||||
@ -84,27 +90,21 @@ class Command:
|
|||||||
|
|
||||||
pargs = []
|
pargs = []
|
||||||
for arg, paramtype in zip(args, self.paramtypes):
|
for arg, paramtype in zip(args, self.paramtypes):
|
||||||
if typecheck.check_command_type(arg, paramtype):
|
pargs.append(parsearg(self.manager, arg, paramtype))
|
||||||
pargs.append(arg)
|
pargs.extend(remainder)
|
||||||
else:
|
|
||||||
pargs.append(parsearg(self.manager, arg, paramtype))
|
|
||||||
|
|
||||||
if remainder:
|
|
||||||
chk = typecheck.check_command_type(
|
|
||||||
remainder,
|
|
||||||
typing.Sequence[self.paramtypes[-1]] # type: ignore
|
|
||||||
)
|
|
||||||
if chk:
|
|
||||||
pargs.extend(remainder)
|
|
||||||
else:
|
|
||||||
raise exceptions.CommandError("Invalid value type: %s - expected %s" % (remainder, self.paramtypes[-1]))
|
|
||||||
|
|
||||||
with self.manager.master.handlecontext():
|
with self.manager.master.handlecontext():
|
||||||
ret = self.func(*pargs)
|
ret = self.func(*pargs)
|
||||||
|
|
||||||
if not typecheck.check_command_type(ret, self.returntype):
|
if ret is None and self.returntype is None:
|
||||||
raise exceptions.CommandError("Command returned unexpected data")
|
return
|
||||||
|
typ = mitmproxy.types.CommandTypes.get(self.returntype)
|
||||||
|
if not typ.is_valid(self.manager, typ, ret):
|
||||||
|
raise exceptions.CommandError(
|
||||||
|
"%s returned unexpected data - expected %s" % (
|
||||||
|
self.path, typ.display
|
||||||
|
)
|
||||||
|
)
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
|
||||||
@ -210,14 +210,6 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
|
|||||||
raise exceptions.CommandError from e
|
raise exceptions.CommandError from e
|
||||||
|
|
||||||
|
|
||||||
def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None:
|
|
||||||
sig = inspect.signature(f)
|
|
||||||
try:
|
|
||||||
sig.bind(*args, **kwargs)
|
|
||||||
except TypeError as v:
|
|
||||||
raise exceptions.CommandError("Argument mismatch: %s" % v.args[0])
|
|
||||||
|
|
||||||
|
|
||||||
def command(path):
|
def command(path):
|
||||||
def decorator(function):
|
def decorator(function):
|
||||||
@functools.wraps(function)
|
@functools.wraps(function)
|
||||||
|
@ -205,7 +205,15 @@ class _StrSeqType(_BaseType):
|
|||||||
return [x.strip() for x in s.split(",")]
|
return [x.strip() for x in s.split(",")]
|
||||||
|
|
||||||
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
|
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
|
||||||
return isinstance(val, str)
|
if isinstance(val, str) or isinstance(val, bytes):
|
||||||
|
return False
|
||||||
|
try:
|
||||||
|
for v in val:
|
||||||
|
if not isinstance(v, str):
|
||||||
|
return False
|
||||||
|
except TypeError:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
class _CutSpecType(_BaseType):
|
class _CutSpecType(_BaseType):
|
||||||
|
@ -1,41 +1,6 @@
|
|||||||
import typing
|
import typing
|
||||||
|
|
||||||
|
|
||||||
def check_command_type(value: typing.Any, typeinfo: typing.Any) -> bool:
|
|
||||||
"""
|
|
||||||
Check if the provided value is an instance of typeinfo. Returns True if the
|
|
||||||
types match, False otherwise. This function supports only those types
|
|
||||||
required for command return values.
|
|
||||||
"""
|
|
||||||
typename = str(typeinfo)
|
|
||||||
if typename.startswith("typing.Sequence"):
|
|
||||||
try:
|
|
||||||
T = typeinfo.__args__[0] # type: ignore
|
|
||||||
except AttributeError:
|
|
||||||
# Python 3.5.0
|
|
||||||
T = typeinfo.__parameters__[0] # type: ignore
|
|
||||||
if not isinstance(value, (tuple, list)):
|
|
||||||
return False
|
|
||||||
for v in value:
|
|
||||||
if not check_command_type(v, T):
|
|
||||||
return False
|
|
||||||
elif typename.startswith("typing.Union"):
|
|
||||||
try:
|
|
||||||
types = typeinfo.__args__ # type: ignore
|
|
||||||
except AttributeError:
|
|
||||||
# Python 3.5.x
|
|
||||||
types = typeinfo.__union_params__ # type: ignore
|
|
||||||
for T in types:
|
|
||||||
checks = [check_command_type(value, T) for T in types]
|
|
||||||
if not any(checks):
|
|
||||||
return False
|
|
||||||
elif value is None and typeinfo is None:
|
|
||||||
return True
|
|
||||||
elif not isinstance(value, typeinfo):
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
def check_option_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None:
|
def check_option_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None:
|
||||||
"""
|
"""
|
||||||
Check if the provided value is an instance of typeinfo and raises a
|
Check if the provided value is an instance of typeinfo and raises a
|
||||||
|
@ -8,8 +8,6 @@ import mitmproxy.types
|
|||||||
import io
|
import io
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from mitmproxy.utils import typecheck
|
|
||||||
|
|
||||||
|
|
||||||
class TAddon:
|
class TAddon:
|
||||||
@command.command("cmd1")
|
@command.command("cmd1")
|
||||||
@ -140,7 +138,7 @@ def test_simple():
|
|||||||
c.call("nonexistent")
|
c.call("nonexistent")
|
||||||
with pytest.raises(exceptions.CommandError, match="Invalid"):
|
with pytest.raises(exceptions.CommandError, match="Invalid"):
|
||||||
c.call("")
|
c.call("")
|
||||||
with pytest.raises(exceptions.CommandError, match="Usage"):
|
with pytest.raises(exceptions.CommandError, match="argument mismatch"):
|
||||||
c.call("one.two too many args")
|
c.call("one.two too many args")
|
||||||
|
|
||||||
c.add("empty", a.empty)
|
c.add("empty", a.empty)
|
||||||
@ -262,12 +260,3 @@ def test_verify_arg_signature():
|
|||||||
command.verify_arg_signature(lambda: None, [1, 2], {})
|
command.verify_arg_signature(lambda: None, [1, 2], {})
|
||||||
print('hello there')
|
print('hello there')
|
||||||
command.verify_arg_signature(lambda a, b: None, [1, 2], {})
|
command.verify_arg_signature(lambda a, b: None, [1, 2], {})
|
||||||
|
|
||||||
|
|
||||||
def test_choice():
|
|
||||||
"""
|
|
||||||
basic typechecking for choices should fail as we cannot verify if strings are a valid choice
|
|
||||||
at this point.
|
|
||||||
"""
|
|
||||||
c = mitmproxy.types.Choice("foo")
|
|
||||||
assert not typecheck.check_command_type("foo", c)
|
|
||||||
|
@ -125,8 +125,10 @@ def test_strseq():
|
|||||||
assert b.completion(tctx.master.commands, typing.Sequence[str], "") == []
|
assert b.completion(tctx.master.commands, typing.Sequence[str], "") == []
|
||||||
assert b.parse(tctx.master.commands, typing.Sequence[str], "foo") == ["foo"]
|
assert b.parse(tctx.master.commands, typing.Sequence[str], "foo") == ["foo"]
|
||||||
assert b.parse(tctx.master.commands, typing.Sequence[str], "foo,bar") == ["foo", "bar"]
|
assert b.parse(tctx.master.commands, typing.Sequence[str], "foo,bar") == ["foo", "bar"]
|
||||||
assert b.is_valid(tctx.master.commands, typing.Sequence[str], "foo") is True
|
assert b.is_valid(tctx.master.commands, typing.Sequence[str], ["foo"]) is True
|
||||||
|
assert b.is_valid(tctx.master.commands, typing.Sequence[str], ["a", "b", 3]) is False
|
||||||
assert b.is_valid(tctx.master.commands, typing.Sequence[str], 1) is False
|
assert b.is_valid(tctx.master.commands, typing.Sequence[str], 1) is False
|
||||||
|
assert b.is_valid(tctx.master.commands, typing.Sequence[str], "foo") is False
|
||||||
|
|
||||||
|
|
||||||
class DummyConsole:
|
class DummyConsole:
|
||||||
|
@ -87,28 +87,6 @@ def test_check_any():
|
|||||||
typecheck.check_option_type("foo", None, typing.Any)
|
typecheck.check_option_type("foo", None, typing.Any)
|
||||||
|
|
||||||
|
|
||||||
def test_check_command_type():
|
|
||||||
assert(typecheck.check_command_type("foo", str))
|
|
||||||
assert(typecheck.check_command_type(["foo"], typing.Sequence[str]))
|
|
||||||
assert(not typecheck.check_command_type(["foo", 1], typing.Sequence[str]))
|
|
||||||
assert(typecheck.check_command_type(None, None))
|
|
||||||
assert(not typecheck.check_command_type(["foo"], typing.Sequence[int]))
|
|
||||||
assert(not typecheck.check_command_type("foo", typing.Sequence[int]))
|
|
||||||
|
|
||||||
# Python 3.5 only defines __parameters__
|
|
||||||
m = mock.Mock()
|
|
||||||
m.__str__ = lambda self: "typing.Sequence"
|
|
||||||
m.__parameters__ = (int,)
|
|
||||||
|
|
||||||
typecheck.check_command_type([10], m)
|
|
||||||
|
|
||||||
# Python 3.5 only defines __union_params__
|
|
||||||
m = mock.Mock()
|
|
||||||
m.__str__ = lambda self: "typing.Union"
|
|
||||||
m.__union_params__ = (int,)
|
|
||||||
assert not typecheck.check_command_type([22], m)
|
|
||||||
|
|
||||||
|
|
||||||
def test_typesec_to_str():
|
def test_typesec_to_str():
|
||||||
assert(typecheck.typespec_to_str(str)) == "str"
|
assert(typecheck.typespec_to_str(str)) == "str"
|
||||||
assert(typecheck.typespec_to_str(typing.Sequence[str])) == "sequence of str"
|
assert(typecheck.typespec_to_str(typing.Sequence[str])) == "sequence of str"
|
||||||
|
Loading…
Reference in New Issue
Block a user