Merge pull request #1669 from mhils/typecheck-options

Typecheck options
This commit is contained in:
Maximilian Hils 2016-10-25 21:00:46 -07:00 committed by GitHub
commit a0ad0b06a0
16 changed files with 138 additions and 20 deletions

View File

@ -21,7 +21,7 @@ class ClientPlayback:
def configure(self, options, updated): def configure(self, options, updated):
if "client_replay" in updated: if "client_replay" in updated:
if options.client_replay: if options.client_replay:
ctx.log.info(options.client_replay) ctx.log.info("Client Replay: {}".format(options.client_replay))
try: try:
flows = io.read_flows_from_paths(options.client_replay) flows = io.read_flows_from_paths(options.client_replay)
except exceptions.FlowReadException as e: except exceptions.FlowReadException as e:

View File

@ -23,13 +23,14 @@ DEFAULT_CLIENT_CIPHERS = "ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES128-GCM-SHA
class Options(optmanager.OptManager): class Options(optmanager.OptManager):
def __init__( def __init__(
self, self,
*, # all args are keyword-only.
# TODO: rename to onboarding_app_* # TODO: rename to onboarding_app_*
app: bool = True, app: bool = True,
app_host: str = APP_HOST, app_host: str = APP_HOST,
app_port: int = APP_PORT, app_port: int = APP_PORT,
anticache: bool = False, anticache: bool = False,
anticomp: bool = False, anticomp: bool = False,
client_replay: Optional[str] = None, client_replay: Sequence[str] = (),
replay_kill_extra: bool = False, replay_kill_extra: bool = False,
keepserving: bool = True, keepserving: bool = True,
no_server: bool = False, no_server: bool = False,
@ -41,12 +42,12 @@ class Options(optmanager.OptManager):
replacements: Sequence[Tuple[str, str, str]] = (), replacements: Sequence[Tuple[str, str, str]] = (),
server_replay_use_headers: Sequence[str] = (), server_replay_use_headers: Sequence[str] = (),
setheaders: Sequence[Tuple[str, str, str]] = (), setheaders: Sequence[Tuple[str, str, str]] = (),
server_replay: Sequence[str] = None, server_replay: Sequence[str] = (),
stickycookie: Optional[str] = None, stickycookie: Optional[str] = None,
stickyauth: Optional[str] = None, stickyauth: Optional[str] = None,
stream_large_bodies: Optional[str] = None, stream_large_bodies: Optional[int] = None,
verbosity: int = 2, verbosity: int = 2,
outfile: Tuple[str, str] = None, outfile: Optional[Tuple[str, str]] = None,
server_replay_ignore_content: bool = False, server_replay_ignore_content: bool = False,
server_replay_ignore_params: Sequence[str] = (), server_replay_ignore_params: Sequence[str] = (),
server_replay_ignore_payload_params: Sequence[str] = (), server_replay_ignore_payload_params: Sequence[str] = (),
@ -71,13 +72,13 @@ class Options(optmanager.OptManager):
rawtcp: bool = False, rawtcp: bool = False,
websockets: bool = False, websockets: bool = False,
spoof_source_address: bool = False, spoof_source_address: bool = False,
upstream_server: str = "", upstream_server: Optional[str] = None,
upstream_auth: str = "", upstream_auth: Optional[str] = None,
ssl_version_client: str = "secure", ssl_version_client: str = "secure",
ssl_version_server: str = "secure", ssl_version_server: str = "secure",
ssl_insecure: bool = False, ssl_insecure: bool = False,
ssl_verify_upstream_trusted_cadir: str = None, ssl_verify_upstream_trusted_cadir: Optional[str] = None,
ssl_verify_upstream_trusted_ca: str = None, ssl_verify_upstream_trusted_ca: Optional[str] = None,
tcp_hosts: Sequence[str] = () tcp_hosts: Sequence[str] = ()
): ):
# We could replace all assignments with clever metaprogramming, # We could replace all assignments with clever metaprogramming,

View File

@ -3,6 +3,7 @@ import blinker
import pprint import pprint
from mitmproxy import exceptions from mitmproxy import exceptions
from mitmproxy.utils import typecheck
""" """
The base implementation for Options. The base implementation for Options.
@ -58,10 +59,19 @@ class OptManager:
def __setattr__(self, attr, value): def __setattr__(self, attr, value):
if not self._initialized: if not self._initialized:
self._typecheck(attr, value)
self._opts[attr] = value self._opts[attr] = value
return return
self.update(**{attr: value}) self.update(**{attr: value})
def _typecheck(self, attr, value):
expected_type = typecheck.get_arg_type_from_constructor_annotation(
type(self), attr
)
if expected_type is None:
return # no type info :(
typecheck.check_type(attr, value, expected_type)
def keys(self): def keys(self):
return set(self._opts.keys()) return set(self._opts.keys())
@ -70,9 +80,10 @@ class OptManager:
def update(self, **kwargs): def update(self, **kwargs):
updated = set(kwargs.keys()) updated = set(kwargs.keys())
for k in kwargs: for k, v in kwargs.items():
if k not in self._opts: if k not in self._opts:
raise KeyError("No such option: %s" % k) raise KeyError("No such option: %s" % k)
self._typecheck(k, v)
with self.rollback(updated): with self.rollback(updated):
self._opts.update(kwargs) self._opts.update(kwargs)
self.changed.send(self, updated=updated) self.changed.send(self, updated=updated)

View File

@ -591,7 +591,7 @@ def client_replay(parser):
group = parser.add_argument_group("Client Replay") group = parser.add_argument_group("Client Replay")
group.add_argument( group.add_argument(
"-c", "--client-replay", "-c", "--client-replay",
action="append", dest="client_replay", default=None, metavar="PATH", action="append", dest="client_replay", default=[], metavar="PATH",
help="Replay client requests from a saved file." help="Replay client requests from a saved file."
) )
@ -600,7 +600,7 @@ def server_replay(parser):
group = parser.add_argument_group("Server Replay") group = parser.add_argument_group("Server Replay")
group.add_argument( group.add_argument(
"-S", "--server-replay", "-S", "--server-replay",
action="append", dest="server_replay", default=None, metavar="PATH", action="append", dest="server_replay", default=[], metavar="PATH",
help="Replay server responses from a saved file." help="Replay server responses from a saved file."
) )
group.add_argument( group.add_argument(
@ -610,7 +610,7 @@ def server_replay(parser):
) )
group.add_argument( group.add_argument(
"--server-replay-use-header", "--server-replay-use-header",
action="append", dest="server_replay_use_headers", type=str, action="append", dest="server_replay_use_headers", type=str, default=[],
help="Request headers to be considered during replay. " help="Request headers to be considered during replay. "
"Can be passed multiple times." "Can be passed multiple times."
) )
@ -638,7 +638,7 @@ def server_replay(parser):
) )
payload.add_argument( payload.add_argument(
"--replay-ignore-payload-param", "--replay-ignore-payload-param",
action="append", dest="server_replay_ignore_payload_params", type=str, action="append", dest="server_replay_ignore_payload_params", type=str, default=[],
help=""" help="""
Request's payload parameters (application/x-www-form-urlencoded or multipart/form-data) to Request's payload parameters (application/x-www-form-urlencoded or multipart/form-data) to
be ignored while searching for a saved flow to replay. be ignored while searching for a saved flow to replay.
@ -648,7 +648,7 @@ def server_replay(parser):
group.add_argument( group.add_argument(
"--replay-ignore-param", "--replay-ignore-param",
action="append", dest="server_replay_ignore_params", type=str, action="append", dest="server_replay_ignore_params", type=str, default=[],
help=""" help="""
Request's parameters to be ignored while searching for a saved flow Request's parameters to be ignored while searching for a saved flow
to replay. Can be passed multiple times. to replay. Can be passed multiple times.

View File

@ -203,9 +203,10 @@ class ConsoleState(state.State):
class Options(mitmproxy.options.Options): class Options(mitmproxy.options.Options):
def __init__( def __init__(
self, self,
*, # all args are keyword-only.
eventlog: bool = False, eventlog: bool = False,
follow: bool = False, follow: bool = False,
intercept: bool = False, intercept: Optional[str] = None,
filter: Optional[str] = None, filter: Optional[str] = None,
palette: Optional[str] = None, palette: Optional[str] = None,
palette_transparent: bool = False, palette_transparent: bool = False,

View File

@ -18,6 +18,7 @@ class DumpError(Exception):
class Options(options.Options): class Options(options.Options):
def __init__( def __init__(
self, self,
*, # all args are keyword-only.
keepserving: bool = False, keepserving: bool = False,
filtstr: Optional[str] = None, filtstr: Optional[str] = None,
flow_detail: int = 1, flow_detail: int = 1,

View File

@ -94,8 +94,9 @@ class WebState(state.State):
class Options(options.Options): class Options(options.Options):
def __init__( def __init__(
self, self,
*, # all args are keyword-only.
intercept: Optional[str] = None, intercept: Optional[str] = None,
wdebug: bool = bool, wdebug: bool = False,
wport: int = 8081, wport: int = 8081,
wiface: str = "127.0.0.1", wiface: str = "127.0.0.1",
wauthenticator: Optional[authentication.PassMan] = None, wauthenticator: Optional[authentication.PassMan] = None,

View File

@ -0,0 +1,54 @@
import typing
def check_type(attr_name: str, value: typing.Any, typeinfo: type) -> None:
"""
This function checks if the provided value is an instance of typeinfo
and raises a TypeError otherwise.
The following types from the typing package have specialized support:
- Union
- Tuple
- TextIO
"""
# If we realize that we need to extend this list substantially, it may make sense
# to use typeguard for this, but right now it's not worth the hassle for 16 lines of code.
e = TypeError("Expected {} for {}, but got {}.".format(
typeinfo,
attr_name,
type(value)
))
if isinstance(typeinfo, typing.UnionMeta):
for T in typeinfo.__union_params__:
try:
check_type(attr_name, value, T)
except TypeError:
pass
else:
return
raise e
if isinstance(typeinfo, typing.TupleMeta):
check_type(attr_name, value, tuple)
if len(typeinfo.__tuple_params__) != len(value):
raise e
for i, (x, T) in enumerate(zip(value, typeinfo.__tuple_params__)):
check_type("{}[{}]".format(attr_name, i), x, T)
return
if typeinfo == typing.TextIO:
if hasattr(value, "read"):
return
if not isinstance(value, typeinfo):
raise e
def get_arg_type_from_constructor_annotation(cls: type, attr: str) -> typing.Optional[type]:
"""
Returns the first type annotation for attr in the class hierarchy.
"""
for c in cls.mro():
if attr in getattr(c.__init__, "__annotations__", ()):
return c.__init__.__annotations__[attr]

View File

@ -67,7 +67,7 @@ setup(
"cryptography>=1.3, <1.6", "cryptography>=1.3, <1.6",
"cssutils>=1.0.1, <1.1", "cssutils>=1.0.1, <1.1",
"Flask>=0.10.1, <0.12", "Flask>=0.10.1, <0.12",
"h2>=2.4.1, <3", "h2>=2.4.1, <2.5",
"html2text>=2016.1.8, <=2016.9.19", "html2text>=2016.1.8, <=2016.9.19",
"hyperframe>=4.0.1, <5", "hyperframe>=4.0.1, <5",
"jsbeautifier>=1.6.3, <1.7", "jsbeautifier>=1.6.3, <1.7",

View File

View File

@ -1,7 +1,8 @@
import pytest
from mitmproxy.utils import data from mitmproxy.utils import data
from . import tutils
def test_pkg_data(): def test_pkg_data():
assert data.pkg_data.path("tools/console") assert data.pkg_data.path("tools/console")
tutils.raises("does not exist", data.pkg_data.path, "nonexistent") with pytest.raises(ValueError):
data.pkg_data.path("nonexistent")

View File

@ -0,0 +1,48 @@
import typing
import pytest
from mitmproxy.utils import typecheck
class TBase:
def __init__(self, bar: int):
pass
class T(TBase):
def __init__(self, foo: str):
super(T, self).__init__(42)
def test_get_arg_type_from_constructor_annotation():
assert typecheck.get_arg_type_from_constructor_annotation(T, "foo") == str
assert typecheck.get_arg_type_from_constructor_annotation(T, "bar") == int
assert not typecheck.get_arg_type_from_constructor_annotation(T, "baz")
def test_check_type():
typecheck.check_type("foo", 42, int)
with pytest.raises(TypeError):
typecheck.check_type("foo", 42, str)
with pytest.raises(TypeError):
typecheck.check_type("foo", None, str)
def test_check_union():
typecheck.check_type("foo", 42, typing.Union[int, str])
typecheck.check_type("foo", "42", typing.Union[int, str])
with pytest.raises(TypeError):
typecheck.check_type("foo", [], typing.Union[int, str])
def test_check_tuple():
with pytest.raises(TypeError):
typecheck.check_type("foo", None, typing.Tuple[int, str])
with pytest.raises(TypeError):
typecheck.check_type("foo", (), typing.Tuple[int, str])
with pytest.raises(TypeError):
typecheck.check_type("foo", (42, 42), typing.Tuple[int, str])
with pytest.raises(TypeError):
typecheck.check_type("foo", ("42", 42), typing.Tuple[int, str])
typecheck.check_type("foo", (42, "42"), typing.Tuple[int, str])