mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-26 18:18:25 +00:00
console: flow resolution command
This is our first built-in command, which will be used by very many other commands. Also add a --commands option to dump all commands, analogous to --options.
This commit is contained in:
parent
ee3dd3f3c5
commit
8c4810f606
@ -111,10 +111,8 @@ class View(collections.Sequence):
|
|||||||
|
|
||||||
self.default_order = OrderRequestStart(self)
|
self.default_order = OrderRequestStart(self)
|
||||||
self.orders = dict(
|
self.orders = dict(
|
||||||
time = OrderRequestStart(self),
|
time = OrderRequestStart(self), method = OrderRequestMethod(self),
|
||||||
method = OrderRequestMethod(self),
|
url = OrderRequestURL(self), size = OrderKeySize(self),
|
||||||
url = OrderRequestURL(self),
|
|
||||||
size = OrderKeySize(self),
|
|
||||||
)
|
)
|
||||||
self.order_key = self.default_order
|
self.order_key = self.default_order
|
||||||
self.order_reversed = False
|
self.order_reversed = False
|
||||||
@ -324,6 +322,26 @@ class View(collections.Sequence):
|
|||||||
if "console_focus_follow" in updated:
|
if "console_focus_follow" in updated:
|
||||||
self.focus_follow = ctx.options.console_focus_follow
|
self.focus_follow = ctx.options.console_focus_follow
|
||||||
|
|
||||||
|
def resolve(self, spec: str) -> typing.Sequence[mitmproxy.flow.Flow]:
|
||||||
|
if spec == "@focus":
|
||||||
|
return [self.focus.flow] if self.focus.flow else []
|
||||||
|
elif spec == "@shown":
|
||||||
|
return [i for i in self]
|
||||||
|
elif spec == "@hidden":
|
||||||
|
return [i for i in self._store.values() if i not in self._view]
|
||||||
|
elif spec == "@marked":
|
||||||
|
return [i for i in self._store.values() if i.marked]
|
||||||
|
elif spec == "@unmarked":
|
||||||
|
return [i for i in self._store.values() if not i.marked]
|
||||||
|
else:
|
||||||
|
filt = flowfilter.parse(spec)
|
||||||
|
if not filt:
|
||||||
|
raise exceptions.CommandError("Invalid flow filter: %s" % spec)
|
||||||
|
return [i for i in self._store.values() if filt(i)]
|
||||||
|
|
||||||
|
def load(self, l):
|
||||||
|
l.add_command("console.resolve", self.resolve)
|
||||||
|
|
||||||
def request(self, f):
|
def request(self, f):
|
||||||
self.add(f)
|
self.add(f)
|
||||||
|
|
||||||
|
@ -2,17 +2,17 @@ import inspect
|
|||||||
import typing
|
import typing
|
||||||
import shlex
|
import shlex
|
||||||
from mitmproxy.utils import typecheck
|
from mitmproxy.utils import typecheck
|
||||||
|
from mitmproxy import exceptions
|
||||||
|
from mitmproxy import flow
|
||||||
class CommandError(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def typename(t: type) -> str:
|
def typename(t: type) -> str:
|
||||||
if t in (str, int, bool):
|
if t in (str, int, bool):
|
||||||
return t.__name__
|
return t.__name__
|
||||||
|
if t == typing.Sequence[flow.Flow]:
|
||||||
|
return "[flow]"
|
||||||
else: # pragma: no cover
|
else: # pragma: no cover
|
||||||
raise NotImplementedError
|
raise NotImplementedError(t)
|
||||||
|
|
||||||
|
|
||||||
def parsearg(spec: str, argtype: type) -> typing.Any:
|
def parsearg(spec: str, argtype: type) -> typing.Any:
|
||||||
@ -22,7 +22,7 @@ def parsearg(spec: str, argtype: type) -> typing.Any:
|
|||||||
if argtype == str:
|
if argtype == str:
|
||||||
return spec
|
return spec
|
||||||
else:
|
else:
|
||||||
raise CommandError("Unsupported argument type: %s" % argtype)
|
raise exceptions.CommandError("Unsupported argument type: %s" % argtype)
|
||||||
|
|
||||||
|
|
||||||
class Command:
|
class Command:
|
||||||
@ -44,7 +44,7 @@ class Command:
|
|||||||
Call the command with a set of arguments. At this point, all argumets are strings.
|
Call the command with a set of arguments. At this point, all argumets are strings.
|
||||||
"""
|
"""
|
||||||
if len(self.paramtypes) != len(args):
|
if len(self.paramtypes) != len(args):
|
||||||
raise CommandError("Usage: %s" % self.signature_help())
|
raise exceptions.CommandError("Usage: %s" % self.signature_help())
|
||||||
|
|
||||||
args = [parsearg(args[i], self.paramtypes[i]) for i in range(len(args))]
|
args = [parsearg(args[i], self.paramtypes[i]) for i in range(len(args))]
|
||||||
|
|
||||||
@ -52,7 +52,7 @@ class Command:
|
|||||||
ret = self.func(*args)
|
ret = self.func(*args)
|
||||||
|
|
||||||
if not typecheck.check_command_return_type(ret, self.returntype):
|
if not typecheck.check_command_return_type(ret, self.returntype):
|
||||||
raise CommandError("Command returned unexpected data")
|
raise exceptions.CommandError("Command returned unexpected data")
|
||||||
|
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
@ -65,14 +65,19 @@ class CommandManager:
|
|||||||
def add(self, path: str, func: typing.Callable):
|
def add(self, path: str, func: typing.Callable):
|
||||||
self.commands[path] = Command(self, path, func)
|
self.commands[path] = Command(self, path, func)
|
||||||
|
|
||||||
|
def call_args(self, path, args):
|
||||||
|
"""
|
||||||
|
Call a command using a list of string arguments. May raise CommandError.
|
||||||
|
"""
|
||||||
|
if path not in self.commands:
|
||||||
|
raise exceptions.CommandError("Unknown command: %s" % path)
|
||||||
|
return self.commands[path].call(args)
|
||||||
|
|
||||||
def call(self, cmdstr: str):
|
def call(self, cmdstr: str):
|
||||||
"""
|
"""
|
||||||
Call a command using a string. May raise CommandError.
|
Call a command using a string. May raise CommandError.
|
||||||
"""
|
"""
|
||||||
parts = shlex.split(cmdstr)
|
parts = shlex.split(cmdstr)
|
||||||
if not len(parts) >= 1:
|
if not len(parts) >= 1:
|
||||||
raise CommandError("Invalid command: %s" % cmdstr)
|
raise exceptions.CommandError("Invalid command: %s" % cmdstr)
|
||||||
path = parts[0]
|
return self.call_args(parts[0], parts[1:])
|
||||||
if path not in self.commands:
|
|
||||||
raise CommandError("Unknown command: %s" % path)
|
|
||||||
return self.commands[path].call(parts[1:])
|
|
||||||
|
@ -93,6 +93,10 @@ class SetServerNotAllowedException(MitmproxyException):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class CommandError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class OptionsError(MitmproxyException):
|
class OptionsError(MitmproxyException):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -6,6 +6,7 @@ import mitmproxy.options
|
|||||||
from mitmproxy import proxy
|
from mitmproxy import proxy
|
||||||
from mitmproxy import addonmanager
|
from mitmproxy import addonmanager
|
||||||
from mitmproxy import eventsequence
|
from mitmproxy import eventsequence
|
||||||
|
from mitmproxy import command
|
||||||
from mitmproxy.addons import script
|
from mitmproxy.addons import script
|
||||||
|
|
||||||
|
|
||||||
@ -126,3 +127,10 @@ class context:
|
|||||||
Recursively invoke an event on an addon and all its children.
|
Recursively invoke an event on an addon and all its children.
|
||||||
"""
|
"""
|
||||||
return self.master.addons.invoke_addon(addon, event, *args, **kwargs)
|
return self.master.addons.invoke_addon(addon, event, *args, **kwargs)
|
||||||
|
|
||||||
|
def command(self, func, *args):
|
||||||
|
"""
|
||||||
|
Invoke a command function within a command context, mimicing the actual command environment.
|
||||||
|
"""
|
||||||
|
cmd = command.Command(self.master.commands, "test.command", func)
|
||||||
|
return cmd.call(args)
|
||||||
|
@ -25,6 +25,11 @@ def common_options(parser, opts):
|
|||||||
action='store_true',
|
action='store_true',
|
||||||
help="Show all options and their default values",
|
help="Show all options and their default values",
|
||||||
)
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
'--commands',
|
||||||
|
action='store_true',
|
||||||
|
help="Show all commands and their signatures",
|
||||||
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--conf",
|
"--conf",
|
||||||
type=str, dest="conf", default=CONFIG_PATH,
|
type=str, dest="conf", default=CONFIG_PATH,
|
||||||
|
@ -39,7 +39,7 @@ def process_options(parser, opts, args):
|
|||||||
if args.version:
|
if args.version:
|
||||||
print(debug.dump_system_info())
|
print(debug.dump_system_info())
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
if args.quiet or args.options:
|
if args.quiet or args.options or args.commands:
|
||||||
args.verbosity = 0
|
args.verbosity = 0
|
||||||
args.flow_detail = 0
|
args.flow_detail = 0
|
||||||
|
|
||||||
@ -84,6 +84,13 @@ def run(MasterKlass, args, extra=None): # pragma: no cover
|
|||||||
if args.options:
|
if args.options:
|
||||||
print(optmanager.dump_defaults(opts))
|
print(optmanager.dump_defaults(opts))
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
if args.commands:
|
||||||
|
cmds = []
|
||||||
|
for c in master.commands.commands.values():
|
||||||
|
cmds.append(c.signature_help())
|
||||||
|
for i in sorted(cmds):
|
||||||
|
print(i)
|
||||||
|
sys.exit(0)
|
||||||
opts.set(*args.setoptions)
|
opts.set(*args.setoptions)
|
||||||
if extra:
|
if extra:
|
||||||
opts.update(**extra(args))
|
opts.update(**extra(args))
|
||||||
|
@ -7,6 +7,20 @@ def check_command_return_type(value: typing.Any, typeinfo: typing.Any) -> bool:
|
|||||||
types match, False otherwise. This function supports only those types
|
types match, False otherwise. This function supports only those types
|
||||||
required for command return values.
|
required for command return values.
|
||||||
"""
|
"""
|
||||||
|
typename = str(typeinfo)
|
||||||
|
if typename.startswith("typing.Sequence"):
|
||||||
|
try:
|
||||||
|
T = typeinfo.__args__[0] # type: ignore
|
||||||
|
except AttributeError:
|
||||||
|
# Python 3.5.0
|
||||||
|
T = typeinfo.__parameters__[0] # type: ignore
|
||||||
|
if not isinstance(value, (tuple, list)):
|
||||||
|
return False
|
||||||
|
for v in value:
|
||||||
|
if not check_command_return_type(v, T):
|
||||||
|
return False
|
||||||
|
elif not isinstance(value, typeinfo):
|
||||||
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
@ -130,6 +130,46 @@ def test_filter():
|
|||||||
assert len(v) == 4
|
assert len(v) == 4
|
||||||
|
|
||||||
|
|
||||||
|
def test_resolve():
|
||||||
|
v = view.View()
|
||||||
|
with taddons.context(options=options.Options()) as tctx:
|
||||||
|
assert tctx.command(v.resolve, "@focus") == []
|
||||||
|
assert tctx.command(v.resolve, "@shown") == []
|
||||||
|
assert tctx.command(v.resolve, "@hidden") == []
|
||||||
|
assert tctx.command(v.resolve, "@marked") == []
|
||||||
|
assert tctx.command(v.resolve, "@unmarked") == []
|
||||||
|
assert tctx.command(v.resolve, "~m get") == []
|
||||||
|
v.request(tft(method="get"))
|
||||||
|
assert len(tctx.command(v.resolve, "~m get")) == 1
|
||||||
|
assert len(tctx.command(v.resolve, "@focus")) == 1
|
||||||
|
assert len(tctx.command(v.resolve, "@shown")) == 1
|
||||||
|
assert len(tctx.command(v.resolve, "@unmarked")) == 1
|
||||||
|
assert tctx.command(v.resolve, "@hidden") == []
|
||||||
|
assert tctx.command(v.resolve, "@marked") == []
|
||||||
|
v.request(tft(method="put"))
|
||||||
|
assert len(tctx.command(v.resolve, "@focus")) == 1
|
||||||
|
assert len(tctx.command(v.resolve, "@shown")) == 2
|
||||||
|
assert tctx.command(v.resolve, "@hidden") == []
|
||||||
|
assert tctx.command(v.resolve, "@marked") == []
|
||||||
|
|
||||||
|
v.request(tft(method="get"))
|
||||||
|
v.request(tft(method="put"))
|
||||||
|
|
||||||
|
f = flowfilter.parse("~m get")
|
||||||
|
v.set_filter(f)
|
||||||
|
v[0].marked = True
|
||||||
|
|
||||||
|
def m(l):
|
||||||
|
return [i.request.method for i in l]
|
||||||
|
|
||||||
|
assert m(tctx.command(v.resolve, "~m get")) == ["GET", "GET"]
|
||||||
|
assert m(tctx.command(v.resolve, "~m put")) == ["PUT", "PUT"]
|
||||||
|
assert m(tctx.command(v.resolve, "@shown")) == ["GET", "GET"]
|
||||||
|
assert m(tctx.command(v.resolve, "@hidden")) == ["PUT", "PUT"]
|
||||||
|
assert m(tctx.command(v.resolve, "@marked")) == ["GET"]
|
||||||
|
assert m(tctx.command(v.resolve, "@unmarked")) == ["PUT", "GET", "PUT"]
|
||||||
|
|
||||||
|
|
||||||
def test_order():
|
def test_order():
|
||||||
v = view.View()
|
v = view.View()
|
||||||
with taddons.context(options=options.Options()) as tctx:
|
with taddons.context(options=options.Options()) as tctx:
|
||||||
|
@ -2,6 +2,7 @@ from mitmproxy import command
|
|||||||
from mitmproxy import master
|
from mitmproxy import master
|
||||||
from mitmproxy import options
|
from mitmproxy import options
|
||||||
from mitmproxy import proxy
|
from mitmproxy import proxy
|
||||||
|
from mitmproxy import exceptions
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
@ -29,7 +30,7 @@ def test_simple():
|
|||||||
a = TAddon()
|
a = TAddon()
|
||||||
c.add("one.two", a.cmd1)
|
c.add("one.two", a.cmd1)
|
||||||
assert(c.call("one.two foo") == "ret foo")
|
assert(c.call("one.two foo") == "ret foo")
|
||||||
with pytest.raises(command.CommandError, match="Unknown"):
|
with pytest.raises(exceptions.CommandError, match="Unknown"):
|
||||||
c.call("nonexistent")
|
c.call("nonexistent")
|
||||||
with pytest.raises(command.CommandError, match="Invalid"):
|
with pytest.raises(exceptions.CommandError, match="Invalid"):
|
||||||
c.call("")
|
c.call("")
|
||||||
|
Loading…
Reference in New Issue
Block a user