Merge pull request #2273 from cortesi/commands

Commands
This commit is contained in:
Aldo Cortesi 2017-04-28 06:53:41 +12:00 committed by GitHub
commit aab6bf747c
24 changed files with 524 additions and 150 deletions

View File

@ -93,6 +93,9 @@ class Loader:
choices
)
def add_command(self, path: str, func: typing.Callable) -> None:
self.master.commands.add(path, func)
def traverse(chain):
"""
@ -142,7 +145,7 @@ class AddonManager:
for a in traverse([addon]):
name = _get_name(a)
if name in self.lookup:
raise exceptions.AddonError(
raise exceptions.AddonManagerError(
"An addon called '%s' already exists." % name
)
l = Loader(self.master)
@ -172,7 +175,7 @@ class AddonManager:
for a in traverse([addon]):
n = _get_name(a)
if n not in self.lookup:
raise exceptions.AddonError("No such addon: %s" % n)
raise exceptions.AddonManagerError("No such addon: %s" % n)
self.chain = [i for i in self.chain if i is not a]
del self.lookup[_get_name(a)]
with self.master.handlecontext():
@ -221,7 +224,7 @@ class AddonManager:
func = getattr(a, name, None)
if func:
if not callable(func):
raise exceptions.AddonError(
raise exceptions.AddonManagerError(
"Addon handler %s not callable" % name
)
func(*args, **kwargs)

View File

@ -14,7 +14,7 @@ from mitmproxy.addons import setheaders
from mitmproxy.addons import stickyauth
from mitmproxy.addons import stickycookie
from mitmproxy.addons import streambodies
from mitmproxy.addons import streamfile
from mitmproxy.addons import save
from mitmproxy.addons import upstream_auth
@ -36,6 +36,6 @@ def default_addons():
stickyauth.StickyAuth(),
stickycookie.StickyCookie(),
streambodies.StreamBodies(),
streamfile.StreamFile(),
save.Save(),
upstream_auth.UpstreamAuth(),
]

View File

@ -1,21 +1,31 @@
import os.path
import typing
from mitmproxy import exceptions
from mitmproxy import flowfilter
from mitmproxy import io
from mitmproxy import ctx
from mitmproxy import flow
class StreamFile:
class Save:
def __init__(self):
self.stream = None
self.filt = None
self.active_flows = set() # type: Set[flow.Flow]
def start_stream_to_path(self, path, mode, flt):
def open_file(self, path):
if path.startswith("+"):
path = path[1:]
mode = "ab"
else:
mode = "wb"
path = os.path.expanduser(path)
return open(path, mode)
def start_stream_to_path(self, path, flt):
try:
f = open(path, mode)
f = self.open_file(path)
except IOError as v:
raise exceptions.OptionsError(str(v))
self.stream = io.FilteredFlowWriter(f, flt)
@ -23,26 +33,32 @@ class StreamFile:
def configure(self, updated):
# We're already streaming - stop the previous stream and restart
if "streamfile_filter" in updated:
if ctx.options.streamfile_filter:
self.filt = flowfilter.parse(ctx.options.streamfile_filter)
if "save_stream_filter" in updated:
if ctx.options.save_stream_filter:
self.filt = flowfilter.parse(ctx.options.save_stream_filter)
if not self.filt:
raise exceptions.OptionsError(
"Invalid filter specification: %s" % ctx.options.streamfile_filter
"Invalid filter specification: %s" % ctx.options.save_stream_filter
)
else:
self.filt = None
if "streamfile" in updated:
if "save_stream_file" in updated:
if self.stream:
self.done()
if ctx.options.streamfile:
if ctx.options.streamfile.startswith("+"):
path = ctx.options.streamfile[1:]
mode = "ab"
else:
path = ctx.options.streamfile
mode = "wb"
self.start_stream_to_path(path, mode, self.filt)
if ctx.options.save_stream_file:
self.start_stream_to_path(ctx.options.save_stream_file, self.filt)
def save(self, flows: typing.Sequence[flow.Flow], path: str) -> None:
try:
f = self.open_file(path)
except IOError as v:
raise exceptions.CommandError(v) from v
stream = io.FlowWriter(f)
for i in flows:
stream.add(i)
def load(self, l):
l.add_command("save.file", self.save)
def tcp_start(self, flow):
if self.stream:
@ -64,8 +80,8 @@ class StreamFile:
def done(self):
if self.stream:
for flow in self.active_flows:
self.stream.add(flow)
for f in self.active_flows:
self.stream.add(f)
self.active_flows = set([])
self.stream.fo.close()
self.stream = None

View File

@ -111,10 +111,8 @@ class View(collections.Sequence):
self.default_order = OrderRequestStart(self)
self.orders = dict(
time = OrderRequestStart(self),
method = OrderRequestMethod(self),
url = OrderRequestURL(self),
size = OrderKeySize(self),
time = OrderRequestStart(self), method = OrderRequestMethod(self),
url = OrderRequestURL(self), size = OrderKeySize(self),
)
self.order_key = self.default_order
self.order_reversed = False
@ -324,6 +322,26 @@ class View(collections.Sequence):
if "console_focus_follow" in updated:
self.focus_follow = ctx.options.console_focus_follow
def resolve(self, spec: str) -> typing.Sequence[mitmproxy.flow.Flow]:
if spec == "@focus":
return [self.focus.flow] if self.focus.flow else []
elif spec == "@shown":
return [i for i in self]
elif spec == "@hidden":
return [i for i in self._store.values() if i not in self._view]
elif spec == "@marked":
return [i for i in self._store.values() if i.marked]
elif spec == "@unmarked":
return [i for i in self._store.values() if not i.marked]
else:
filt = flowfilter.parse(spec)
if not filt:
raise exceptions.CommandError("Invalid flow filter: %s" % spec)
return [i for i in self._store.values() if filt(i)]
def load(self, l):
l.add_command("console.resolve", self.resolve)
def request(self, f):
self.add(f)

91
mitmproxy/command.py Normal file
View File

@ -0,0 +1,91 @@
import inspect
import typing
import shlex
from mitmproxy.utils import typecheck
from mitmproxy import exceptions
from mitmproxy import flow
def typename(t: type, ret: bool) -> str:
"""
Translates a type to an explanatory string. Ifl ret is True, we're
looking at a return type, else we're looking at a parameter type.
"""
if t in (str, int, bool):
return t.__name__
if t == typing.Sequence[flow.Flow]:
return "[flow]" if ret else "flowspec"
else: # pragma: no cover
raise NotImplementedError(t)
class Command:
def __init__(self, manager, path, func) -> None:
self.path = path
self.manager = manager
self.func = func
sig = inspect.signature(self.func)
self.paramtypes = [v.annotation for v in sig.parameters.values()]
self.returntype = sig.return_annotation
def signature_help(self) -> str:
params = " ".join([typename(i, False) for i in self.paramtypes])
ret = " -> " + typename(self.returntype, True) if self.returntype else ""
return "%s %s%s" % (self.path, params, ret)
def call(self, args: typing.Sequence[str]):
"""
Call the command with a set of arguments. At this point, all argumets are strings.
"""
if len(self.paramtypes) != len(args):
raise exceptions.CommandError("Usage: %s" % self.signature_help())
pargs = []
for i in range(len(args)):
pargs.append(parsearg(self.manager, args[i], self.paramtypes[i]))
with self.manager.master.handlecontext():
ret = self.func(*pargs)
if not typecheck.check_command_return_type(ret, self.returntype):
raise exceptions.CommandError("Command returned unexpected data")
return ret
class CommandManager:
def __init__(self, master):
self.master = master
self.commands = {}
def add(self, path: str, func: typing.Callable):
self.commands[path] = Command(self, path, func)
def call_args(self, path, args):
"""
Call a command using a list of string arguments. May raise CommandError.
"""
if path not in self.commands:
raise exceptions.CommandError("Unknown command: %s" % path)
return self.commands[path].call(args)
def call(self, cmdstr: str):
"""
Call a command using a string. May raise CommandError.
"""
parts = shlex.split(cmdstr)
if not len(parts) >= 1:
raise exceptions.CommandError("Invalid command: %s" % cmdstr)
return self.call_args(parts[0], parts[1:])
def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
"""
Convert a string to a argument to the appropriate type.
"""
if argtype == str:
return spec
elif argtype == typing.Sequence[flow.Flow]:
return manager.call_args("console.resolve", [spec])
else:
raise exceptions.CommandError("Unsupported argument type: %s" % argtype)

View File

@ -93,11 +93,15 @@ class SetServerNotAllowedException(MitmproxyException):
pass
class CommandError(Exception):
pass
class OptionsError(MitmproxyException):
pass
class AddonError(MitmproxyException):
class AddonManagerError(MitmproxyException):
pass

View File

@ -8,6 +8,7 @@ from mitmproxy import controller
from mitmproxy import eventsequence
from mitmproxy import exceptions
from mitmproxy import connections
from mitmproxy import command
from mitmproxy import http
from mitmproxy import log
from mitmproxy.proxy.protocol import http_replay
@ -34,6 +35,7 @@ class Master:
"""
def __init__(self, opts, server):
self.options = opts or options.Options()
self.commands = command.CommandManager(self)
self.addons = addonmanager.AddonManager(self)
self.event_queue = queue.Queue()
self.should_exit = threading.Event()

View File

@ -159,11 +159,11 @@ class Options(optmanager.OptManager):
choices = [i.name.lower() for i in contentviews.views]
)
self.add_option(
"streamfile", Optional[str], None,
"Write flows to file. Prefix path with + to append."
"save_stream_file", Optional[str], None,
"Stream flows to file as they arrive. Prefix path with + to append."
)
self.add_option(
"streamfile_filter", Optional[str], None,
"save_stream_filter", Optional[str], None,
"Filter which flows are written to file."
)
self.add_option(

View File

@ -31,7 +31,7 @@ class _Option:
help: str,
choices: typing.Optional[typing.Sequence[str]]
) -> None:
typecheck.check_type(name, default, typespec)
typecheck.check_option_type(name, default, typespec)
self.name = name
self.typespec = typespec
self._default = default
@ -54,7 +54,7 @@ class _Option:
return copy.deepcopy(v)
def set(self, value: typing.Any) -> None:
typecheck.check_type(self.name, value, self.typespec)
typecheck.check_option_type(self.name, value, self.typespec)
self.value = value
def reset(self) -> None:

View File

@ -6,6 +6,7 @@ import mitmproxy.options
from mitmproxy import proxy
from mitmproxy import addonmanager
from mitmproxy import eventsequence
from mitmproxy import command
from mitmproxy.addons import script
@ -126,3 +127,10 @@ class context:
Recursively invoke an event on an addon and all its children.
"""
return self.master.addons.invoke_addon(addon, event, *args, **kwargs)
def command(self, func, *args):
"""
Invoke a command function with a list of string arguments within a command context, mimicing the actual command environment.
"""
cmd = command.Command(self.master.commands, "test.command", func)
return cmd.call(args)

View File

@ -25,6 +25,11 @@ def common_options(parser, opts):
action='store_true',
help="Show all options and their default values",
)
parser.add_argument(
'--commands',
action='store_true',
help="Show all commands and their signatures",
)
parser.add_argument(
"--conf",
type=str, dest="conf", default=CONFIG_PATH,
@ -61,7 +66,7 @@ def common_options(parser, opts):
opts.make_parser(parser, "scripts", metavar="SCRIPT", short="s")
opts.make_parser(parser, "stickycookie", metavar="FILTER")
opts.make_parser(parser, "stickyauth", metavar="FILTER")
opts.make_parser(parser, "streamfile", metavar="PATH", short="w")
opts.make_parser(parser, "save_stream_file", metavar="PATH", short="w")
opts.make_parser(parser, "anticomp")
# Proxy options
@ -123,7 +128,7 @@ def mitmdump(opts):
nargs="...",
help="""
Filter expression, equivalent to setting both the view_filter
and streamfile_filter options.
and save_stream_filter options.
"""
)
return parser

View File

@ -0,0 +1,27 @@
import urwid
from mitmproxy import exceptions
from mitmproxy.tools.console import signals
class CommandEdit(urwid.Edit):
def __init__(self):
urwid.Edit.__init__(self, ":", "")
def keypress(self, size, key):
return urwid.Edit.keypress(self, size, key)
class CommandExecutor:
def __init__(self, master):
self.master = master
def __call__(self, cmd):
if cmd.strip():
try:
ret = self.master.commands.call(cmd)
except exceptions.CommandError as v:
signals.status_message.send(message=str(v))
else:
if type(ret) == str:
signals.status_message.send(message=ret)

View File

@ -353,7 +353,9 @@ class FlowListBox(urwid.ListBox):
def keypress(self, size, key):
key = common.shortcuts(key)
if key == "A":
if key == ":":
signals.status_prompt_command.send()
elif key == "A":
for f in self.master.view:
if f.intercepted:
f.resume()
@ -409,13 +411,13 @@ class FlowListBox(urwid.ListBox):
val = not self.master.options.console_order_reversed
self.master.options.console_order_reversed = val
elif key == "W":
if self.master.options.streamfile:
self.master.options.streamfile = None
if self.master.options.save_stream_file:
self.master.options.save_stream_file = None
else:
signals.status_prompt_path.send(
self,
prompt="Stream flows to",
callback= lambda path: self.master.options.update(streamfile=path)
callback= lambda path: self.master.options.update(save_stream_file=path)
)
else:
return urwid.ListBox.keypress(self, size, key)

View File

@ -288,7 +288,7 @@ class ConsoleMaster(master.Master):
screen = self.ui,
handle_mouse = self.options.console_mouse,
)
self.ab = statusbar.ActionBar()
self.ab = statusbar.ActionBar(self)
self.loop.set_alarm_in(0.01, self.ticker)
self.loop.set_alarm_in(

View File

@ -24,6 +24,9 @@ status_prompt_path = blinker.Signal()
# Prompt for a single keystroke
status_prompt_onekey = blinker.Signal()
# Prompt for a command
status_prompt_command = blinker.Signal()
# Call a callback in N seconds
call_in = blinker.Signal()

View File

@ -5,6 +5,7 @@ import urwid
from mitmproxy.tools.console import common
from mitmproxy.tools.console import pathedit
from mitmproxy.tools.console import signals
from mitmproxy.tools.console import command
import mitmproxy.tools.console.master # noqa
@ -32,13 +33,15 @@ class PromptStub:
class ActionBar(urwid.WidgetWrap):
def __init__(self):
def __init__(self, master):
urwid.WidgetWrap.__init__(self, None)
self.master = master
self.clear()
signals.status_message.connect(self.sig_message)
signals.status_prompt.connect(self.sig_prompt)
signals.status_prompt_path.connect(self.sig_path_prompt)
signals.status_prompt_onekey.connect(self.sig_prompt_onekey)
signals.status_prompt_command.connect(self.sig_prompt_command)
self.last_path = ""
@ -66,6 +69,11 @@ class ActionBar(urwid.WidgetWrap):
self._w = urwid.Edit(self.prep_prompt(prompt), text or "")
self.prompting = PromptStub(callback, args)
def sig_prompt_command(self, sender):
signals.focus.send(self, section="footer")
self._w = command.CommandEdit()
self.prompting = command.CommandExecutor(self.master)
def sig_path_prompt(self, sender, prompt, callback, args=()):
signals.focus.send(self, section="footer")
self._w = pathedit.PathEdit(
@ -243,8 +251,8 @@ class StatusBar(urwid.WidgetWrap):
r.append(("heading_key", "s"))
r.append("cripts:%s]" % len(self.master.options.scripts))
if self.master.options.streamfile:
r.append("[W:%s]" % self.master.options.streamfile)
if self.master.options.save_stream_file:
r.append("[W:%s]" % self.master.options.save_stream_file)
return r

View File

@ -39,7 +39,7 @@ def process_options(parser, opts, args):
if args.version:
print(debug.dump_system_info())
sys.exit(0)
if args.quiet or args.options:
if args.quiet or args.options or args.commands:
args.verbosity = 0
args.flow_detail = 0
@ -84,6 +84,13 @@ def run(MasterKlass, args, extra=None): # pragma: no cover
if args.options:
print(optmanager.dump_defaults(opts))
sys.exit(0)
if args.commands:
cmds = []
for c in master.commands.commands.values():
cmds.append(c.signature_help())
for i in sorted(cmds):
print(i)
sys.exit(0)
opts.set(*args.setoptions)
if extra:
opts.update(**extra(args))
@ -120,7 +127,7 @@ def mitmdump(args=None): # pragma: no cover
v = " ".join(args.filter_args)
return dict(
view_filter = v,
streamfile_filter = v,
save_stream_filter = v,
)
return {}

View File

@ -1,20 +1,37 @@
import typing
def check_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None:
def check_command_return_type(value: typing.Any, typeinfo: typing.Any) -> bool:
"""
This function checks if the provided value is an instance of typeinfo
and raises a TypeError otherwise.
The following types from the typing package have specialized support:
- Union
- Tuple
- IO
Check if the provided value is an instance of typeinfo. Returns True if the
types match, False otherwise. This function supports only those types
required for command return values.
"""
# If we realize that we need to extend this list substantially, it may make sense
# to use typeguard for this, but right now it's not worth the hassle for 16 lines of code.
typename = str(typeinfo)
if typename.startswith("typing.Sequence"):
try:
T = typeinfo.__args__[0] # type: ignore
except AttributeError:
# Python 3.5.0
T = typeinfo.__parameters__[0] # type: ignore
if not isinstance(value, (tuple, list)):
return False
for v in value:
if not check_command_return_type(v, T):
return False
elif value is None and typeinfo is None:
return True
elif not isinstance(value, typeinfo):
return False
return True
def check_option_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None:
"""
Check if the provided value is an instance of typeinfo and raises a
TypeError otherwise. This function supports only those types required for
options.
"""
e = TypeError("Expected {} for {}, but got {}.".format(
typeinfo,
name,
@ -32,7 +49,7 @@ def check_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None:
for T in types:
try:
check_type(name, value, T)
check_option_type(name, value, T)
except TypeError:
pass
else:
@ -50,7 +67,7 @@ def check_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None:
if len(types) != len(value):
raise e
for i, (x, T) in enumerate(zip(value, types)):
check_type("{}[{}]".format(name, i), x, T)
check_option_type("{}[{}]".format(name, i), x, T)
return
elif typename.startswith("typing.Sequence"):
try:
@ -58,11 +75,10 @@ def check_type(name: str, value: typing.Any, typeinfo: typing.Any) -> None:
except AttributeError:
# Python 3.5.0
T = typeinfo.__parameters__[0] # type: ignore
if not isinstance(value, (tuple, list)):
raise e
for v in value:
check_type(name, v, T)
check_option_type(name, v, T)
elif typename.startswith("typing.IO"):
if hasattr(value, "read"):
return

View File

@ -0,0 +1,83 @@
import pytest
from mitmproxy.test import taddons
from mitmproxy.test import tflow
from mitmproxy import io
from mitmproxy import exceptions
from mitmproxy import options
from mitmproxy.addons import save
from mitmproxy.addons import view
def test_configure(tmpdir):
sa = save.Save()
with taddons.context(options=options.Options()) as tctx:
with pytest.raises(exceptions.OptionsError):
tctx.configure(sa, save_stream_file=str(tmpdir))
with pytest.raises(Exception, match="Invalid filter"):
tctx.configure(
sa, save_stream_file=str(tmpdir.join("foo")), save_stream_filter="~~"
)
tctx.configure(sa, save_stream_filter="foo")
assert sa.filt
tctx.configure(sa, save_stream_filter=None)
assert not sa.filt
def rd(p):
x = io.FlowReader(open(p, "rb"))
return list(x.stream())
def test_tcp(tmpdir):
sa = save.Save()
with taddons.context() as tctx:
p = str(tmpdir.join("foo"))
tctx.configure(sa, save_stream_file=p)
tt = tflow.ttcpflow()
sa.tcp_start(tt)
sa.tcp_end(tt)
tctx.configure(sa, save_stream_file=None)
assert rd(p)
def test_save_command(tmpdir):
sa = save.Save()
with taddons.context() as tctx:
p = str(tmpdir.join("foo"))
sa.save([tflow.tflow(resp=True)], p)
assert len(rd(p)) == 1
sa.save([tflow.tflow(resp=True)], p)
assert len(rd(p)) == 1
sa.save([tflow.tflow(resp=True)], "+" + p)
assert len(rd(p)) == 2
with pytest.raises(exceptions.CommandError):
sa.save([tflow.tflow(resp=True)], str(tmpdir))
v = view.View()
tctx.master.addons.add(v)
tctx.master.addons.add(sa)
tctx.master.commands.call_args("save.file", ["@shown", p])
def test_simple(tmpdir):
sa = save.Save()
with taddons.context() as tctx:
p = str(tmpdir.join("foo"))
tctx.configure(sa, save_stream_file=p)
f = tflow.tflow(resp=True)
sa.request(f)
sa.response(f)
tctx.configure(sa, save_stream_file=None)
assert rd(p)[0].response
tctx.configure(sa, save_stream_file="+" + p)
f = tflow.tflow()
sa.request(f)
tctx.configure(sa, save_stream_file=None)
assert not rd(p)[1].response

View File

@ -1,62 +0,0 @@
import pytest
from mitmproxy.test import taddons
from mitmproxy.test import tflow
from mitmproxy import io
from mitmproxy import exceptions
from mitmproxy import options
from mitmproxy.addons import streamfile
def test_configure(tmpdir):
sa = streamfile.StreamFile()
with taddons.context(options=options.Options()) as tctx:
with pytest.raises(exceptions.OptionsError):
tctx.configure(sa, streamfile=str(tmpdir))
with pytest.raises(Exception, match="Invalid filter"):
tctx.configure(
sa, streamfile=str(tmpdir.join("foo")), streamfile_filter="~~"
)
tctx.configure(sa, streamfile_filter="foo")
assert sa.filt
tctx.configure(sa, streamfile_filter=None)
assert not sa.filt
def rd(p):
x = io.FlowReader(open(p, "rb"))
return list(x.stream())
def test_tcp(tmpdir):
sa = streamfile.StreamFile()
with taddons.context() as tctx:
p = str(tmpdir.join("foo"))
tctx.configure(sa, streamfile=p)
tt = tflow.ttcpflow()
sa.tcp_start(tt)
sa.tcp_end(tt)
tctx.configure(sa, streamfile=None)
assert rd(p)
def test_simple(tmpdir):
sa = streamfile.StreamFile()
with taddons.context() as tctx:
p = str(tmpdir.join("foo"))
tctx.configure(sa, streamfile=p)
f = tflow.tflow(resp=True)
sa.request(f)
sa.response(f)
tctx.configure(sa, streamfile=None)
assert rd(p)[0].response
tctx.configure(sa, streamfile="+" + p)
f = tflow.tflow()
sa.request(f)
tctx.configure(sa, streamfile=None)
assert not rd(p)[1].response

View File

@ -5,6 +5,7 @@ from mitmproxy.test import tflow
from mitmproxy.addons import view
from mitmproxy import flowfilter
from mitmproxy import options
from mitmproxy import exceptions
from mitmproxy.test import taddons
@ -130,6 +131,55 @@ def test_filter():
assert len(v) == 4
def test_load():
v = view.View()
with taddons.context(options=options.Options()) as tctx:
tctx.master.addons.add(v)
def test_resolve():
v = view.View()
with taddons.context(options=options.Options()) as tctx:
assert tctx.command(v.resolve, "@focus") == []
assert tctx.command(v.resolve, "@shown") == []
assert tctx.command(v.resolve, "@hidden") == []
assert tctx.command(v.resolve, "@marked") == []
assert tctx.command(v.resolve, "@unmarked") == []
assert tctx.command(v.resolve, "~m get") == []
v.request(tft(method="get"))
assert len(tctx.command(v.resolve, "~m get")) == 1
assert len(tctx.command(v.resolve, "@focus")) == 1
assert len(tctx.command(v.resolve, "@shown")) == 1
assert len(tctx.command(v.resolve, "@unmarked")) == 1
assert tctx.command(v.resolve, "@hidden") == []
assert tctx.command(v.resolve, "@marked") == []
v.request(tft(method="put"))
assert len(tctx.command(v.resolve, "@focus")) == 1
assert len(tctx.command(v.resolve, "@shown")) == 2
assert tctx.command(v.resolve, "@hidden") == []
assert tctx.command(v.resolve, "@marked") == []
v.request(tft(method="get"))
v.request(tft(method="put"))
f = flowfilter.parse("~m get")
v.set_filter(f)
v[0].marked = True
def m(l):
return [i.request.method for i in l]
assert m(tctx.command(v.resolve, "~m get")) == ["GET", "GET"]
assert m(tctx.command(v.resolve, "~m put")) == ["PUT", "PUT"]
assert m(tctx.command(v.resolve, "@shown")) == ["GET", "GET"]
assert m(tctx.command(v.resolve, "@hidden")) == ["PUT", "PUT"]
assert m(tctx.command(v.resolve, "@marked")) == ["GET"]
assert m(tctx.command(v.resolve, "@unmarked")) == ["PUT", "GET", "PUT"]
with pytest.raises(exceptions.CommandError, match="Invalid flow filter"):
tctx.command(v.resolve, "~")
def test_order():
v = view.View()
with taddons.context(options=options.Options()) as tctx:

View File

@ -61,9 +61,9 @@ def test_lifecycle():
a = addonmanager.AddonManager(m)
a.add(TAddon("one"))
with pytest.raises(exceptions.AddonError):
with pytest.raises(exceptions.AddonManagerError):
a.add(TAddon("one"))
with pytest.raises(exceptions.AddonError):
with pytest.raises(exceptions.AddonManagerError):
a.remove(TAddon("nonexistent"))
f = tflow.tflow()
@ -82,6 +82,11 @@ def test_loader():
l.add_option("custom_option", bool, False, "help")
l.add_option("custom_option", bool, False, "help")
def cmd(a: str) -> str:
return "foo"
l.add_command("test.command", cmd)
def test_simple():
with taddons.context() as tctx:

View File

@ -0,0 +1,74 @@
import typing
from mitmproxy import command
from mitmproxy import flow
from mitmproxy import master
from mitmproxy import options
from mitmproxy import proxy
from mitmproxy import exceptions
from mitmproxy.test import tflow
from mitmproxy.test import taddons
import pytest
class TAddon:
def cmd1(self, foo: str) -> str:
return "ret " + foo
def cmd2(self, foo: str) -> str:
return 99
class TestCommand:
def test_call(self):
o = options.Options()
m = master.Master(o, proxy.DummyServer(o))
cm = command.CommandManager(m)
a = TAddon()
c = command.Command(cm, "cmd.path", a.cmd1)
assert c.call(["foo"]) == "ret foo"
assert c.signature_help() == "cmd.path str -> str"
c = command.Command(cm, "cmd.two", a.cmd2)
with pytest.raises(exceptions.CommandError):
c.call(["foo"])
def test_simple():
o = options.Options()
m = master.Master(o, proxy.DummyServer(o))
c = command.CommandManager(m)
a = TAddon()
c.add("one.two", a.cmd1)
assert(c.call("one.two foo") == "ret foo")
with pytest.raises(exceptions.CommandError, match="Unknown"):
c.call("nonexistent")
with pytest.raises(exceptions.CommandError, match="Invalid"):
c.call("")
with pytest.raises(exceptions.CommandError, match="Usage"):
c.call("one.two too many args")
def test_typename():
assert command.typename(str, True) == "str"
assert command.typename(typing.Sequence[flow.Flow], True) == "[flow]"
assert command.typename(typing.Sequence[flow.Flow], False) == "flowspec"
class DummyConsole:
def load(self, l):
l.add_command("console.resolve", self.resolve)
def resolve(self, spec: str) -> typing.Sequence[flow.Flow]:
return [tflow.tflow(resp=True)]
def test_parsearg():
with taddons.context() as tctx:
tctx.master.addons.add(DummyConsole())
assert command.parsearg(tctx.master.commands, "foo", str) == "foo"
assert len(command.parsearg(
tctx.master.commands, "~b", typing.Sequence[flow.Flow]
)) == 1
with pytest.raises(exceptions.CommandError):
command.parsearg(tctx.master.commands, "foo", Exception)

View File

@ -16,72 +16,86 @@ class T(TBase):
super(T, self).__init__(42)
def test_check_type():
typecheck.check_type("foo", 42, int)
def test_check_option_type():
typecheck.check_option_type("foo", 42, int)
with pytest.raises(TypeError):
typecheck.check_type("foo", 42, str)
typecheck.check_option_type("foo", 42, str)
with pytest.raises(TypeError):
typecheck.check_type("foo", None, str)
typecheck.check_option_type("foo", None, str)
with pytest.raises(TypeError):
typecheck.check_type("foo", b"foo", str)
typecheck.check_option_type("foo", b"foo", str)
def test_check_union():
typecheck.check_type("foo", 42, typing.Union[int, str])
typecheck.check_type("foo", "42", typing.Union[int, str])
typecheck.check_option_type("foo", 42, typing.Union[int, str])
typecheck.check_option_type("foo", "42", typing.Union[int, str])
with pytest.raises(TypeError):
typecheck.check_type("foo", [], typing.Union[int, str])
typecheck.check_option_type("foo", [], typing.Union[int, str])
# Python 3.5 only defines __union_params__
m = mock.Mock()
m.__str__ = lambda self: "typing.Union"
m.__union_params__ = (int,)
typecheck.check_type("foo", 42, m)
typecheck.check_option_type("foo", 42, m)
def test_check_tuple():
typecheck.check_type("foo", (42, "42"), typing.Tuple[int, str])
typecheck.check_option_type("foo", (42, "42"), typing.Tuple[int, str])
with pytest.raises(TypeError):
typecheck.check_type("foo", None, typing.Tuple[int, str])
typecheck.check_option_type("foo", None, typing.Tuple[int, str])
with pytest.raises(TypeError):
typecheck.check_type("foo", (), typing.Tuple[int, str])
typecheck.check_option_type("foo", (), typing.Tuple[int, str])
with pytest.raises(TypeError):
typecheck.check_type("foo", (42, 42), typing.Tuple[int, str])
typecheck.check_option_type("foo", (42, 42), typing.Tuple[int, str])
with pytest.raises(TypeError):
typecheck.check_type("foo", ("42", 42), typing.Tuple[int, str])
typecheck.check_option_type("foo", ("42", 42), typing.Tuple[int, str])
# Python 3.5 only defines __tuple_params__
m = mock.Mock()
m.__str__ = lambda self: "typing.Tuple"
m.__tuple_params__ = (int, str)
typecheck.check_type("foo", (42, "42"), m)
typecheck.check_option_type("foo", (42, "42"), m)
def test_check_sequence():
typecheck.check_type("foo", [10], typing.Sequence[int])
typecheck.check_option_type("foo", [10], typing.Sequence[int])
with pytest.raises(TypeError):
typecheck.check_type("foo", ["foo"], typing.Sequence[int])
typecheck.check_option_type("foo", ["foo"], typing.Sequence[int])
with pytest.raises(TypeError):
typecheck.check_type("foo", [10, "foo"], typing.Sequence[int])
typecheck.check_option_type("foo", [10, "foo"], typing.Sequence[int])
with pytest.raises(TypeError):
typecheck.check_type("foo", [b"foo"], typing.Sequence[str])
typecheck.check_option_type("foo", [b"foo"], typing.Sequence[str])
with pytest.raises(TypeError):
typecheck.check_type("foo", "foo", typing.Sequence[str])
typecheck.check_option_type("foo", "foo", typing.Sequence[str])
# Python 3.5 only defines __parameters__
m = mock.Mock()
m.__str__ = lambda self: "typing.Sequence"
m.__parameters__ = (int,)
typecheck.check_type("foo", [10], m)
typecheck.check_option_type("foo", [10], m)
def test_check_io():
typecheck.check_type("foo", io.StringIO(), typing.IO[str])
typecheck.check_option_type("foo", io.StringIO(), typing.IO[str])
with pytest.raises(TypeError):
typecheck.check_type("foo", "foo", typing.IO[str])
typecheck.check_option_type("foo", "foo", typing.IO[str])
def test_check_any():
typecheck.check_type("foo", 42, typing.Any)
typecheck.check_type("foo", object(), typing.Any)
typecheck.check_type("foo", None, typing.Any)
typecheck.check_option_type("foo", 42, typing.Any)
typecheck.check_option_type("foo", object(), typing.Any)
typecheck.check_option_type("foo", None, typing.Any)
def test_check_command_return_type():
assert(typecheck.check_command_return_type("foo", str))
assert(typecheck.check_command_return_type(["foo"], typing.Sequence[str]))
assert(typecheck.check_command_return_type(None, None))
assert(not typecheck.check_command_return_type(["foo"], typing.Sequence[int]))
assert(not typecheck.check_command_return_type("foo", typing.Sequence[int]))
# Python 3.5 only defines __parameters__
m = mock.Mock()
m.__str__ = lambda self: "typing.Sequence"
m.__parameters__ = (int,)
typecheck.check_command_return_type([10], m)