Merge pull request #3693 from typoon/fix-command-bar-issue-3259

Improve Command Bar UX
This commit is contained in:
Maximilian Hils 2019-11-21 14:13:08 +01:00 committed by GitHub
commit 3550bdfe00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 948 additions and 550 deletions

View File

@ -15,5 +15,5 @@ Usage:
def load(l):
import pydevd
pydevd.settrace("localhost", port=5678, stdoutToServer=True, stderrToServer=True)
import pydevd_pycharm
pydevd_pycharm.settrace("localhost", port=5678, stdoutToServer=True, stderrToServer=True, suspend=False)

View File

@ -83,15 +83,14 @@ class Core:
)
@command.command("set")
def set(self, *spec: str) -> None:
def set(self, option: str, value: str = "") -> None:
"""
Set an option of the form "key[=value]". When the value is omitted,
booleans are set to true, strings and integers are set to None (if
permitted), and sequences are emptied. Boolean values can be true,
false or toggle. If multiple specs are passed, they are joined
into one separated by spaces.
Set an option. When the value is omitted, booleans are set to true,
strings and integers are set to None (if permitted), and sequences
are emptied. Boolean values can be true, false or toggle.
Multiple values are concatenated with a single space.
"""
strspec = " ".join(spec)
strspec = f"{option}={value}"
try:
ctx.options.set(strspec)
except exceptions.OptionsError as e:
@ -109,14 +108,14 @@ class Core:
# FIXME: this will become view.mark later
@command.command("flow.mark")
def mark(self, flows: typing.Sequence[flow.Flow], val: bool) -> None:
def mark(self, flows: typing.Sequence[flow.Flow], boolean: bool) -> None:
"""
Mark flows.
"""
updated = []
for i in flows:
if i.marked != val:
i.marked = val
if i.marked != boolean:
i.marked = boolean
updated.append(i)
ctx.master.addons.trigger("update", updated)
@ -169,18 +168,18 @@ class Core:
]
@command.command("flow.set")
@command.argument("spec", type=mitmproxy.types.Choice("flow.set.options"))
@command.argument("attr", type=mitmproxy.types.Choice("flow.set.options"))
def flow_set(
self,
flows: typing.Sequence[flow.Flow],
spec: str,
sval: str
attr: str,
value: str
) -> None:
"""
Quickly set a number of common values on flows.
"""
val: typing.Union[int, str] = sval
if spec == "status_code":
val: typing.Union[int, str] = value
if attr == "status_code":
try:
val = int(val) # type: ignore
except ValueError as v:
@ -193,13 +192,13 @@ class Core:
req = getattr(f, "request", None)
rupdate = True
if req:
if spec == "method":
if attr == "method":
req.method = val
elif spec == "host":
elif attr == "host":
req.host = val
elif spec == "path":
elif attr == "path":
req.path = val
elif spec == "url":
elif attr == "url":
try:
req.url = val
except ValueError as e:
@ -212,11 +211,11 @@ class Core:
resp = getattr(f, "response", None)
supdate = True
if resp:
if spec == "status_code":
if attr == "status_code":
resp.status_code = val
if val in status_codes.RESPONSES:
resp.reason = status_codes.RESPONSES[val] # type: ignore
elif spec == "reason":
elif attr == "reason":
resp.reason = val
else:
supdate = False
@ -225,7 +224,7 @@ class Core:
updated.append(f)
ctx.master.addons.trigger("update", updated)
ctx.log.alert("Set %s on %s flows." % (spec, len(updated)))
ctx.log.alert("Set %s on %s flows." % (attr, len(updated)))
@command.command("flow.decode")
def decode(self, flows: typing.Sequence[flow.Flow], part: str) -> None:
@ -262,12 +261,12 @@ class Core:
ctx.log.alert("Toggled encoding on %s flows." % len(updated))
@command.command("flow.encode")
@command.argument("enc", type=mitmproxy.types.Choice("flow.encode.options"))
@command.argument("encoding", type=mitmproxy.types.Choice("flow.encode.options"))
def encode(
self,
flows: typing.Sequence[flow.Flow],
part: str,
enc: str,
encoding: str,
) -> None:
"""
Encode flows with a specified encoding.
@ -279,7 +278,7 @@ class Core:
current_enc = p.headers.get("content-encoding", "identity")
if current_enc == "identity":
f.backup()
p.encode(enc)
p.encode(encoding)
updated.append(f)
ctx.master.addons.trigger("update", updated)
ctx.log.alert("Encoded %s flows." % len(updated))

View File

@ -121,14 +121,14 @@ class Export():
return list(sorted(formats.keys()))
@command.command("export.file")
def file(self, fmt: str, f: flow.Flow, path: mitmproxy.types.Path) -> None:
def file(self, format: str, flow: flow.Flow, path: mitmproxy.types.Path) -> None:
"""
Export a flow to path.
"""
if fmt not in formats:
raise exceptions.CommandError("No such export format: %s" % fmt)
func: typing.Any = formats[fmt]
v = func(f)
if format not in formats:
raise exceptions.CommandError("No such export format: %s" % format)
func: typing.Any = formats[format]
v = func(flow)
try:
with open(path, "wb") as fp:
if isinstance(v, bytes):
@ -139,14 +139,14 @@ class Export():
ctx.log.error(str(e))
@command.command("export.clip")
def clip(self, fmt: str, f: flow.Flow) -> None:
def clip(self, format: str, flow: flow.Flow) -> None:
"""
Export a flow to the system clipboard.
"""
if fmt not in formats:
raise exceptions.CommandError("No such export format: %s" % fmt)
func: typing.Any = formats[fmt]
v = strutils.always_str(func(f))
if format not in formats:
raise exceptions.CommandError("No such export format: %s" % format)
func: typing.Any = formats[format]
v = strutils.always_str(func(flow))
try:
pyperclip.copy(v)
except pyperclip.PyperclipException as e:

View File

@ -217,7 +217,7 @@ class View(collections.abc.Sequence):
# Focus
@command.command("view.focus.go")
def go(self, dst: int) -> None:
def go(self, offset: int) -> None:
"""
Go to a specified offset. Positive offests are from the beginning of
the view, negative from the end of the view, so that 0 is the first
@ -225,13 +225,13 @@ class View(collections.abc.Sequence):
"""
if len(self) == 0:
return
if dst < 0:
dst = len(self) + dst
if dst < 0:
dst = 0
if dst > len(self) - 1:
dst = len(self) - 1
self.focus.flow = self[dst]
if offset < 0:
offset = len(self) + offset
if offset < 0:
offset = 0
if offset > len(self) - 1:
offset = len(self) - 1
self.focus.flow = self[offset]
@command.command("view.focus.next")
def focus_next(self) -> None:
@ -266,20 +266,20 @@ class View(collections.abc.Sequence):
return list(sorted(self.orders.keys()))
@command.command("view.order.reverse")
def set_reversed(self, value: bool) -> None:
self.order_reversed = value
def set_reversed(self, boolean: bool) -> None:
self.order_reversed = boolean
self.sig_view_refresh.send(self)
@command.command("view.order.set")
def set_order(self, order: str) -> None:
def set_order(self, order_key: str) -> None:
"""
Sets the current view order.
"""
if order not in self.orders:
if order_key not in self.orders:
raise exceptions.CommandError(
"Unknown flow order: %s" % order
"Unknown flow order: %s" % order_key
)
order_key = self.orders[order]
order_key = self.orders[order_key]
self.order_key = order_key
newview = sortedcontainers.SortedListWithKey(key=order_key)
newview.update(self._view)
@ -298,16 +298,16 @@ class View(collections.abc.Sequence):
# Filter
@command.command("view.filter.set")
def set_filter_cmd(self, f: str) -> None:
def set_filter_cmd(self, filter_expr: str) -> None:
"""
Sets the current view filter.
"""
filt = None
if f:
filt = flowfilter.parse(f)
if filter_expr:
filt = flowfilter.parse(filter_expr)
if not filt:
raise exceptions.CommandError(
"Invalid interception filter: %s" % f
"Invalid interception filter: %s" % filter_expr
)
self.set_filter(filt)
@ -340,11 +340,11 @@ class View(collections.abc.Sequence):
# View Settings
@command.command("view.settings.getval")
def getvalue(self, f: mitmproxy.flow.Flow, key: str, default: str) -> str:
def getvalue(self, flow: mitmproxy.flow.Flow, key: str, default: str) -> str:
"""
Get a value from the settings store for the specified flow.
"""
return self.settings[f].get(key, default)
return self.settings[flow].get(key, default)
@command.command("view.settings.setval.toggle")
def setvalue_toggle(
@ -412,26 +412,26 @@ class View(collections.abc.Sequence):
ctx.log.alert("Removed %s flows" % len(flows))
@command.command("view.flows.resolve")
def resolve(self, spec: str) -> typing.Sequence[mitmproxy.flow.Flow]:
def resolve(self, flow_spec: str) -> typing.Sequence[mitmproxy.flow.Flow]:
"""
Resolve a flow list specification to an actual list of flows.
"""
if spec == "@all":
if flow_spec == "@all":
return [i for i in self._store.values()]
if spec == "@focus":
if flow_spec == "@focus":
return [self.focus.flow] if self.focus.flow else []
elif spec == "@shown":
elif flow_spec == "@shown":
return [i for i in self]
elif spec == "@hidden":
elif flow_spec == "@hidden":
return [i for i in self._store.values() if i not in self._view]
elif spec == "@marked":
elif flow_spec == "@marked":
return [i for i in self._store.values() if i.marked]
elif spec == "@unmarked":
elif flow_spec == "@unmarked":
return [i for i in self._store.values() if not i.marked]
else:
filt = flowfilter.parse(spec)
filt = flowfilter.parse(flow_spec)
if not filt:
raise exceptions.CommandError("Invalid flow filter: %s" % spec)
raise exceptions.CommandError("Invalid flow filter: %s" % flow_spec)
return [i for i in self._store.values() if filt(i)]
@command.command("view.flows.create")

View File

@ -1,20 +1,19 @@
"""
This module manages and invokes typed commands.
"""
import inspect
import types
import io
import typing
import shlex
import textwrap
import functools
import inspect
import sys
import textwrap
import types
import typing
from mitmproxy import exceptions
import mitmproxy.types
from mitmproxy import exceptions, command_lexer
from mitmproxy.command_lexer import unquote
def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None:
def verify_arg_signature(f: typing.Callable, args: typing.Iterable[typing.Any], kwargs: dict) -> None:
sig = inspect.signature(f)
try:
sig.bind(*args, **kwargs)
@ -22,15 +21,6 @@ def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None:
raise exceptions.CommandError("command argument mismatch: %s" % v.args[0])
def lexer(s):
# mypy mis-identifies shlex.shlex as abstract
lex = shlex.shlex(s, posix=True) # type: ignore
lex.wordchars += "."
lex.whitespace_split = True
lex.commenters = ''
return lex
def typename(t: type) -> str:
"""
Translates a type to an explanatory string.
@ -43,208 +33,234 @@ def typename(t: type) -> str:
return to.display
class Command:
returntype: typing.Optional[typing.Type]
def _empty_as_none(x: typing.Any) -> typing.Any:
if x == inspect.Signature.empty:
return None
return x
def __init__(self, manager, path, func) -> None:
self.path = path
class CommandParameter(typing.NamedTuple):
name: str
type: typing.Type
kind: inspect._ParameterKind = inspect.Parameter.POSITIONAL_OR_KEYWORD
def __str__(self):
if self.kind is inspect.Parameter.VAR_POSITIONAL:
return f"*{self.name}"
else:
return self.name
class Command:
name: str
manager: "CommandManager"
signature: inspect.Signature
help: typing.Optional[str]
def __init__(self, manager: "CommandManager", name: str, func: typing.Callable) -> None:
self.name = name
self.manager = manager
self.func = func
sig = inspect.signature(self.func)
self.help = None
self.signature = inspect.signature(self.func)
if func.__doc__:
txt = func.__doc__.strip()
self.help = "\n".join(textwrap.wrap(txt))
self.has_positional = False
for i in sig.parameters.values():
# This is the kind for *args parameters
if i.kind == i.VAR_POSITIONAL:
self.has_positional = True
self.paramtypes = [v.annotation for v in sig.parameters.values()]
if sig.return_annotation == inspect._empty: # type: ignore
self.returntype = None
else:
self.returntype = sig.return_annotation
self.help = None
# This fails with a CommandException if types are invalid
self.signature_help()
for name, parameter in self.signature.parameters.items():
t = parameter.annotation
if not mitmproxy.types.CommandTypes.get(parameter.annotation, None):
raise exceptions.CommandError(f"Argument {name} has an unknown type ({_empty_as_none(t)}) in {func}.")
if self.return_type and not mitmproxy.types.CommandTypes.get(self.return_type, None):
raise exceptions.CommandError(f"Return type has an unknown type ({self.return_type}) in {func}.")
def paramnames(self) -> typing.Sequence[str]:
v = [typename(i) for i in self.paramtypes]
if self.has_positional:
v[-1] = "*" + v[-1]
return v
@property
def return_type(self) -> typing.Optional[typing.Type]:
return _empty_as_none(self.signature.return_annotation)
def retname(self) -> str:
return typename(self.returntype) if self.returntype else ""
@property
def parameters(self) -> typing.List[CommandParameter]:
"""Returns a list of CommandParameters."""
ret = []
for name, param in self.signature.parameters.items():
ret.append(CommandParameter(name, param.annotation, param.kind))
return ret
def signature_help(self) -> str:
params = " ".join(self.paramnames())
ret = self.retname()
if ret:
ret = " -> " + ret
return "%s %s%s" % (self.path, params, ret)
params = " ".join(str(param) for param in self.parameters)
if self.return_type:
ret = f" -> {typename(self.return_type)}"
else:
ret = ""
return f"{self.name} {params}{ret}"
def prepare_args(self, args: typing.Sequence[str]) -> typing.List[typing.Any]:
verify_arg_signature(self.func, list(args), {})
def prepare_args(self, args: typing.Sequence[str]) -> inspect.BoundArguments:
try:
bound_arguments = self.signature.bind(*args)
except TypeError as v:
raise exceptions.CommandError(f"Command argument mismatch: {v.args[0]}")
remainder: typing.Sequence[str] = []
if self.has_positional:
remainder = args[len(self.paramtypes) - 1:]
args = args[:len(self.paramtypes) - 1]
for name, value in bound_arguments.arguments.items():
convert_to = self.signature.parameters[name].annotation
bound_arguments.arguments[name] = parsearg(self.manager, value, convert_to)
pargs = []
for arg, paramtype in zip(args, self.paramtypes):
pargs.append(parsearg(self.manager, arg, paramtype))
pargs.extend(remainder)
return pargs
bound_arguments.apply_defaults()
return bound_arguments
def call(self, args: typing.Sequence[str]) -> typing.Any:
"""
Call the command with a list of arguments. At this point, all
arguments are strings.
Call the command with a list of arguments. At this point, all
arguments are strings.
"""
ret = self.func(*self.prepare_args(args))
if ret is None and self.returntype is None:
bound_args = self.prepare_args(args)
ret = self.func(*bound_args.args, **bound_args.kwargs)
if ret is None and self.return_type is None:
return
typ = mitmproxy.types.CommandTypes.get(self.returntype)
typ = mitmproxy.types.CommandTypes.get(self.return_type)
assert typ
if not typ.is_valid(self.manager, typ, ret):
raise exceptions.CommandError(
"%s returned unexpected data - expected %s" % (
self.path, typ.display
)
f"{self.name} returned unexpected data - expected {typ.display}"
)
return ret
ParseResult = typing.NamedTuple(
"ParseResult",
[
("value", str),
("type", typing.Type),
("valid", bool),
],
)
class ParseResult(typing.NamedTuple):
value: str
type: typing.Type
valid: bool
class CommandManager(mitmproxy.types._CommandBase):
class CommandManager:
commands: typing.Dict[str, Command]
def __init__(self, master):
self.master = master
self.commands: typing.Dict[str, Command] = {}
self.commands = {}
def collect_commands(self, addon):
for i in dir(addon):
if not i.startswith("__"):
o = getattr(addon, i)
try:
is_command = hasattr(o, "command_path")
is_command = hasattr(o, "command_name")
except Exception:
pass # hasattr may raise if o implements __getattr__.
else:
if is_command:
try:
self.add(o.command_path, o)
self.add(o.command_name, o)
except exceptions.CommandError as e:
self.master.log.warn(
"Could not load command %s: %s" % (o.command_path, e)
"Could not load command %s: %s" % (o.command_name, e)
)
def add(self, path: str, func: typing.Callable):
self.commands[path] = Command(self, path, func)
@functools.lru_cache(maxsize=128)
def parse_partial(
self,
cmdstr: str
) -> typing.Tuple[typing.Sequence[ParseResult], typing.Sequence[str]]:
self,
cmdstr: str
) -> typing.Tuple[typing.Sequence[ParseResult], typing.Sequence[CommandParameter]]:
"""
Parse a possibly partial command. Return a sequence of ParseResults and a sequence of remainder type help items.
Parse a possibly partial command. Return a sequence of ParseResults and a sequence of remainder type help items.
"""
buf = io.StringIO(cmdstr)
parts: typing.List[str] = []
lex = lexer(buf)
while 1:
remainder = cmdstr[buf.tell():]
try:
t = lex.get_token()
except ValueError:
parts.append(remainder)
break
if not t:
break
parts.append(t)
if not parts:
parts = [""]
elif cmdstr.endswith(" "):
parts.append("")
parse: typing.List[ParseResult] = []
params: typing.List[type] = []
typ: typing.Type
for i in range(len(parts)):
if i == 0:
typ = mitmproxy.types.Cmd
if parts[i] in self.commands:
params.extend(self.commands[parts[i]].paramtypes)
elif params:
typ = params.pop(0)
if typ == mitmproxy.types.Cmd and params and params[0] == mitmproxy.types.Arg:
if parts[i] in self.commands:
params[:] = self.commands[parts[i]].paramtypes
parts: typing.List[str] = command_lexer.expr.parseString(cmdstr, parseAll=True)
parsed: typing.List[ParseResult] = []
next_params: typing.List[CommandParameter] = [
CommandParameter("", mitmproxy.types.Cmd),
CommandParameter("", mitmproxy.types.CmdArgs),
]
expected: typing.Optional[CommandParameter] = None
for part in parts:
if part.isspace():
parsed.append(
ParseResult(
value=part,
type=mitmproxy.types.Space,
valid=True,
)
)
continue
if expected and expected.kind is inspect.Parameter.VAR_POSITIONAL:
assert not next_params
elif next_params:
expected = next_params.pop(0)
else:
typ = mitmproxy.types.Unknown
expected = CommandParameter("", mitmproxy.types.Unknown)
to = mitmproxy.types.CommandTypes.get(typ, None)
arg_is_known_command = (
expected.type == mitmproxy.types.Cmd and part in self.commands
)
arg_is_unknown_command = (
expected.type == mitmproxy.types.Cmd and part not in self.commands
)
command_args_following = (
next_params and next_params[0].type == mitmproxy.types.CmdArgs
)
if arg_is_known_command and command_args_following:
next_params = self.commands[part].parameters + next_params[1:]
if arg_is_unknown_command and command_args_following:
next_params.pop(0)
to = mitmproxy.types.CommandTypes.get(expected.type, None)
valid = False
if to:
try:
to.parse(self, typ, parts[i])
to.parse(self, expected.type, part)
except exceptions.TypeError:
valid = False
else:
valid = True
parse.append(
parsed.append(
ParseResult(
value=parts[i],
type=typ,
value=part,
type=expected.type,
valid=valid,
)
)
remhelp: typing.List[str] = []
for x in params:
remt = mitmproxy.types.CommandTypes.get(x, None)
remhelp.append(remt.display)
return parsed, next_params
return parse, remhelp
def call(self, command_name: str, *args: typing.Sequence[typing.Any]) -> typing.Any:
"""
Call a command with native arguments. May raise CommandError.
"""
if command_name not in self.commands:
raise exceptions.CommandError("Unknown command: %s" % command_name)
return self.commands[command_name].func(*args)
def call(self, path: str, *args: typing.Sequence[typing.Any]) -> typing.Any:
def _call_strings(self, command_name: str, args: typing.Sequence[str]) -> typing.Any:
"""
Call a command with native arguments. May raise CommandError.
Call a command using a list of string arguments. May raise CommandError.
"""
if path not in self.commands:
raise exceptions.CommandError("Unknown command: %s" % path)
return self.commands[path].func(*args)
if command_name not in self.commands:
raise exceptions.CommandError("Unknown command: %s" % command_name)
def call_strings(self, path: str, args: typing.Sequence[str]) -> typing.Any:
"""
Call a command using a list of string arguments. May raise CommandError.
"""
if path not in self.commands:
raise exceptions.CommandError("Unknown command: %s" % path)
return self.commands[path].call(args)
return self.commands[command_name].call(args)
def execute(self, cmdstr: str):
def execute(self, cmdstr: str) -> typing.Any:
"""
Execute a command string. May raise CommandError.
Execute a command string. May raise CommandError.
"""
try:
parts = list(lexer(cmdstr))
except ValueError as e:
raise exceptions.CommandError("Command error: %s" % e)
if not len(parts) >= 1:
raise exceptions.CommandError("Invalid command: %s" % cmdstr)
return self.call_strings(parts[0], parts[1:])
parts, _ = self.parse_partial(cmdstr)
if not parts:
raise exceptions.CommandError(f"Invalid command: {cmdstr!r}")
command_name, *args = [
unquote(part.value)
for part in parts
if part.type != mitmproxy.types.Space
]
return self._call_strings(command_name, args)
def dump(self, out=sys.stdout) -> None:
cmds = list(self.commands.values())
@ -262,21 +278,23 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
"""
t = mitmproxy.types.CommandTypes.get(argtype, None)
if not t:
raise exceptions.CommandError("Unsupported argument type: %s" % argtype)
raise exceptions.CommandError(f"Unsupported argument type: {argtype}")
try:
return t.parse(manager, argtype, spec) # type: ignore
return t.parse(manager, argtype, spec)
except exceptions.TypeError as e:
raise exceptions.CommandError from e
def command(path):
def command(name: typing.Optional[str] = None):
def decorator(function):
@functools.wraps(function)
def wrapper(*args, **kwargs):
verify_arg_signature(function, args, kwargs)
return function(*args, **kwargs)
wrapper.__dict__["command_path"] = path
wrapper.__dict__["command_name"] = name or function.__name__.replace("_", ".")
return wrapper
return decorator
@ -286,8 +304,10 @@ def argument(name, type):
specific types such as mitmproxy.types.Choice, which we cannot annotate
directly as mypy does not like that.
"""
def decorator(f: types.FunctionType) -> types.FunctionType:
assert name in f.__annotations__
f.__annotations__[name] = type
return f
return decorator

View File

@ -0,0 +1,49 @@
import ast
import re
import pyparsing
# TODO: There is a lot of work to be done here.
# The current implementation is written in a way that _any_ input is valid,
# which does not make sense once things get more complex.
PartialQuotedString = pyparsing.Regex(
re.compile(
r'''
(["']) # start quote
(?:
(?!\1)[^\\] # unescaped character that is not our quote nor the begin of an escape sequence. We can't use \1 in []
|
(?:\\.) # escape sequence
)*
(?:\1|$) # end quote
''',
re.VERBOSE
)
)
expr = pyparsing.ZeroOrMore(
PartialQuotedString
| pyparsing.Word(" \r\n\t")
| pyparsing.CharsNotIn("""'" \r\n\t""")
).leaveWhitespace()
def quote(val: str) -> str:
if val and all(char not in val for char in "'\" \r\n\t"):
return val
return repr(val) # TODO: More of a hack.
def unquote(x: str) -> str:
quoted = (
(x.startswith('"') and x.endswith('"'))
or
(x.startswith("'") and x.endswith("'"))
)
if quoted:
try:
x = ast.literal_eval(x)
except Exception:
x = x[1:-1]
return x

View File

@ -1,58 +1,55 @@
import abc
import collections
import copy
import typing
import collections
import urwid
from urwid.text_layout import calc_coords
import mitmproxy.command
import mitmproxy.flow
import mitmproxy.master
import mitmproxy.command
import mitmproxy.types
class Completer: # pragma: no cover
class Completer:
@abc.abstractmethod
def cycle(self) -> str:
pass
def cycle(self, forward: bool = True) -> str:
raise NotImplementedError()
class ListCompleter(Completer):
def __init__(
self,
start: str,
options: typing.Sequence[str],
self,
start: str,
options: typing.Sequence[str],
) -> None:
self.start = start
self.options: typing.Sequence[str] = []
self.options: typing.List[str] = []
for o in options:
if o.startswith(start):
self.options.append(o)
self.options.sort()
self.offset = 0
def cycle(self) -> str:
def cycle(self, forward: bool = True) -> str:
if not self.options:
return self.start
ret = self.options[self.offset]
self.offset = (self.offset + 1) % len(self.options)
delta = 1 if forward else -1
self.offset = (self.offset + delta) % len(self.options)
return ret
CompletionState = typing.NamedTuple(
"CompletionState",
[
("completer", Completer),
("parse", typing.Sequence[mitmproxy.command.ParseResult])
]
)
class CompletionState(typing.NamedTuple):
completer: Completer
parsed: typing.Sequence[mitmproxy.command.ParseResult]
class CommandBuffer:
def __init__(self, master: mitmproxy.master.Master, start: str = "") -> None:
self.master = master
self.text = self.flatten(start)
self.text = start
# Cursor is always within the range [0:len(buffer)].
self._cursor = len(self.text)
self.completion: typing.Optional[CompletionState] = None
@ -70,51 +67,30 @@ class CommandBuffer:
else:
self._cursor = x
def maybequote(self, value):
if " " in value and not value.startswith("\""):
return "\"%s\"" % value
return value
def parse_quoted(self, txt):
parts, remhelp = self.master.commands.parse_partial(txt)
for i, p in enumerate(parts):
parts[i] = mitmproxy.command.ParseResult(
value = self.maybequote(p.value),
type = p.type,
valid = p.valid
)
return parts, remhelp
def render(self):
"""
This function is somewhat tricky - in order to make the cursor
position valid, we have to make sure there is a
character-for-character offset match in the rendered output, up
to the cursor. Beyond that, we can add stuff.
"""
parts, remhelp = self.parse_quoted(self.text)
parts, remaining = self.master.commands.parse_partial(self.text)
ret = []
for p in parts:
if p.valid:
if p.type == mitmproxy.types.Cmd:
ret.append(("commander_command", p.value))
else:
ret.append(("text", p.value))
elif p.value:
ret.append(("commander_invalid", p.value))
else:
ret.append(("text", ""))
ret.append(("text", " "))
if remhelp:
ret.append(("text", " "))
for v in remhelp:
ret.append(("commander_hint", "%s " % v))
return ret
if not parts:
# Means we just received the leader, so we need to give a blank
# text to the widget to render or it crashes
ret.append(("text", ""))
else:
for p in parts:
if p.valid:
if p.type == mitmproxy.types.Cmd:
ret.append(("commander_command", p.value))
else:
ret.append(("text", p.value))
elif p.value:
ret.append(("commander_invalid", p.value))
def flatten(self, txt):
parts, _ = self.parse_quoted(txt)
ret = [x.value for x in parts]
return " ".join(ret)
if remaining:
if parts[-1].type != mitmproxy.types.Space:
ret.append(("text", " "))
for param in remaining:
ret.append(("commander_hint", f"{param} "))
return ret
def left(self) -> None:
self.cursor = self.cursor - 1
@ -122,30 +98,38 @@ class CommandBuffer:
def right(self) -> None:
self.cursor = self.cursor + 1
def cycle_completion(self) -> None:
def cycle_completion(self, forward: bool = True) -> None:
if not self.completion:
parts, remainhelp = self.master.commands.parse_partial(self.text[:self.cursor])
last = parts[-1]
ct = mitmproxy.types.CommandTypes.get(last.type, None)
parts, remaining = self.master.commands.parse_partial(self.text[:self.cursor])
if parts and parts[-1].type != mitmproxy.types.Space:
type_to_complete = parts[-1].type
cycle_prefix = parts[-1].value
parsed = parts[:-1]
elif remaining:
type_to_complete = remaining[0].type
cycle_prefix = ""
parsed = parts
else:
return
ct = mitmproxy.types.CommandTypes.get(type_to_complete, None)
if ct:
self.completion = CompletionState(
completer = ListCompleter(
parts[-1].value,
ct.completion(self.master.commands, last.type, parts[-1].value)
completer=ListCompleter(
cycle_prefix,
ct.completion(self.master.commands, type_to_complete, cycle_prefix)
),
parse = parts,
parsed=parsed,
)
if self.completion:
nxt = self.completion.completer.cycle()
buf = " ".join([i.value for i in self.completion.parse[:-1]]) + " " + nxt
buf = buf.strip()
self.text = self.flatten(buf)
nxt = self.completion.completer.cycle(forward)
buf = "".join([i.value for i in self.completion.parsed]) + nxt
self.text = buf
self.cursor = len(self.text)
def backspace(self) -> None:
if self.cursor == 0:
return
self.text = self.flatten(self.text[:self.cursor - 1] + self.text[self.cursor:])
self.text = self.text[:self.cursor - 1] + self.text[self.cursor:]
self.cursor = self.cursor - 1
self.completion = None
@ -153,13 +137,18 @@ class CommandBuffer:
"""
Inserts text at the cursor.
"""
self.text = self.flatten(self.text[:self.cursor] + k + self.text[self.cursor:])
self.cursor += 1
# We don't want to insert a space before the command
if k == ' ' and self.text[0:self.cursor].strip() == '':
return
self.text = self.text[:self.cursor] + k + self.text[self.cursor:]
self.cursor += len(k)
self.completion = None
class CommandHistory:
def __init__(self, master: mitmproxy.master.Master, size: int=30) -> None:
def __init__(self, master: mitmproxy.master.Master, size: int = 30) -> None:
self.saved_commands: collections.deque = collections.deque(
[CommandBuffer(master, "")],
maxlen=size
@ -182,7 +171,7 @@ class CommandHistory:
return self.saved_commands[self.index]
return None
def add_command(self, command: CommandBuffer, execution: bool=False) -> None:
def add_command(self, command: CommandBuffer, execution: bool = False) -> None:
if self.index == self.last_index or execution:
last_item = self.saved_commands[-1]
last_item_empty = not last_item.text
@ -207,7 +196,7 @@ class CommandEdit(urwid.WidgetWrap):
self.history = history
self.update()
def keypress(self, size, key):
def keypress(self, size, key) -> None:
if key == "backspace":
self.cbuf.backspace()
elif key == "left":
@ -219,27 +208,29 @@ class CommandEdit(urwid.WidgetWrap):
self.cbuf = self.history.get_prev() or self.cbuf
elif key == "down":
self.cbuf = self.history.get_next() or self.cbuf
elif key == "shift tab":
self.cbuf.cycle_completion(False)
elif key == "tab":
self.cbuf.cycle_completion()
elif len(key) == 1:
self.cbuf.insert(key)
self.update()
def update(self):
def update(self) -> None:
self._w.set_text([self.leader, self.cbuf.render()])
def render(self, size, focus=False):
def render(self, size, focus=False) -> urwid.Canvas:
(maxcol,) = size
canv = self._w.render((maxcol,))
canv = urwid.CompositeCanvas(canv)
canv.cursor = self.get_cursor_coords((maxcol,))
return canv
def get_cursor_coords(self, size):
def get_cursor_coords(self, size) -> typing.Tuple[int, int]:
p = self.cbuf.cursor + len(self.leader)
trans = self._w.get_line_translation(size[0])
x, y = calc_coords(self._w.get_text()[0], trans, p)
return x, y
def get_edit_text(self):
def get_edit_text(self) -> str:
return self.cbuf.text

View File

@ -1,6 +1,8 @@
import urwid
import blinker
import textwrap
from mitmproxy import command
from mitmproxy.tools.console import layoutwidget
from mitmproxy.tools.console import signals
@ -10,7 +12,7 @@ command_focus_change = blinker.Signal()
class CommandItem(urwid.WidgetWrap):
def __init__(self, walker, cmd, focused):
def __init__(self, walker, cmd: command.Command, focused: bool):
self.walker, self.cmd, self.focused = walker, cmd, focused
super().__init__(None)
self._w = self.get_widget()
@ -18,15 +20,18 @@ class CommandItem(urwid.WidgetWrap):
def get_widget(self):
parts = [
("focus", ">> " if self.focused else " "),
("title", self.cmd.path),
("text", " "),
("text", " ".join(self.cmd.paramnames())),
("title", self.cmd.name)
]
if self.cmd.returntype:
parts.append([
if self.cmd.parameters:
parts += [
("text", " "),
("text", " ".join(str(param) for param in self.cmd.parameters)),
]
if self.cmd.return_type:
parts += [
("title", " -> "),
("text", self.cmd.retname()),
])
("text", command.typename(self.cmd.return_type)),
]
return urwid.AttrMap(
urwid.Padding(urwid.Text(parts)),
@ -92,7 +97,7 @@ class CommandsList(urwid.ListBox):
def keypress(self, size, key):
if key == "m_select":
foc, idx = self.get_focus()
signals.status_prompt_command.send(partial=foc.cmd.path + " ")
signals.status_prompt_command.send(partial=foc.cmd.name + " ")
elif key == "m_start":
self.set_focus(0)
self.walker._modified()

View File

@ -1,21 +1,18 @@
import csv
import shlex
import typing
import mitmproxy.types
from mitmproxy import command, command_lexer
from mitmproxy import contentviews
from mitmproxy import ctx
from mitmproxy import command
from mitmproxy import exceptions
from mitmproxy import flow
from mitmproxy import http
from mitmproxy import log
from mitmproxy import contentviews
from mitmproxy.utils import strutils
import mitmproxy.types
from mitmproxy.tools.console import keymap
from mitmproxy.tools.console import overlay
from mitmproxy.tools.console import signals
from mitmproxy.tools.console import keymap
from mitmproxy.utils import strutils
console_palettes = [
"lowlight",
@ -48,10 +45,12 @@ class UnsupportedLog:
"""
A small addon to dump info on flow types we don't support yet.
"""
def websocket_message(self, f):
message = f.messages[-1]
ctx.log.info(f.message_info(message))
ctx.log.debug(message.content if isinstance(message.content, str) else strutils.bytes_to_escaped_str(message.content))
ctx.log.debug(
message.content if isinstance(message.content, str) else strutils.bytes_to_escaped_str(message.content))
def websocket_end(self, f):
ctx.log.info("WebSocket connection closed by {}: {} {}, {}".format(
@ -78,6 +77,7 @@ class ConsoleAddon:
An addon that exposes console-specific commands, and hooks into required
events.
"""
def __init__(self, master):
self.master = master
self.started = False
@ -86,7 +86,7 @@ class ConsoleAddon:
loader.add_option(
"console_default_contentview", str, "auto",
"The default content view mode.",
choices = [i.name.lower() for i in contentviews.views]
choices=[i.name.lower() for i in contentviews.views]
)
loader.add_option(
"console_eventlog_verbosity", str, 'info',
@ -142,7 +142,7 @@ class ConsoleAddon:
opts = self.layout_options()
off = self.layout_options().index(ctx.options.console_layout)
ctx.options.update(
console_layout = opts[(off + 1) % len(opts)]
console_layout=opts[(off + 1) % len(opts)]
)
@command.command("console.panes.next")
@ -234,17 +234,18 @@ class ConsoleAddon:
@command.command("console.choose")
def console_choose(
self,
prompt: str,
choices: typing.Sequence[str],
cmd: mitmproxy.types.Cmd,
*args: mitmproxy.types.Arg
self,
prompt: str,
choices: typing.Sequence[str],
cmd: mitmproxy.types.Cmd,
*args: mitmproxy.types.CmdArgs
) -> None:
"""
Prompt the user to choose from a specified list of strings, then
invoke another command with all occurrences of {choice} replaced by
the choice the user made.
"""
def callback(opt):
# We're now outside of the call context...
repl = cmd + " " + " ".join(args)
@ -260,22 +261,22 @@ class ConsoleAddon:
@command.command("console.choose.cmd")
def console_choose_cmd(
self,
prompt: str,
choicecmd: mitmproxy.types.Cmd,
subcmd: mitmproxy.types.Cmd,
*args: mitmproxy.types.Arg
self,
prompt: str,
choicecmd: mitmproxy.types.Cmd,
subcmd: mitmproxy.types.Cmd,
*args: mitmproxy.types.CmdArgs
) -> None:
"""
Prompt the user to choose from a list of strings returned by a
command, then invoke another command with all occurrences of {choice}
replaced by the choice the user made.
"""
choices = ctx.master.commands.call_strings(choicecmd, [])
choices = ctx.master.commands.execute(choicecmd)
def callback(opt):
# We're now outside of the call context...
repl = shlex.quote(" ".join(args))
repl = " ".join(command_lexer.quote(x) for x in args)
repl = repl.replace("{choice}", opt)
try:
self.master.commands.execute(subcmd + " " + repl)
@ -287,21 +288,24 @@ class ConsoleAddon:
)
@command.command("console.command")
def console_command(self, *partial: str) -> None:
def console_command(self, *command_str: str) -> None:
"""
Prompt the user to edit a command with a (possibly empty) starting value.
"""
signals.status_prompt_command.send(partial=" ".join(partial)) # type: ignore
quoted = " ".join(command_lexer.quote(x) for x in command_str)
signals.status_prompt_command.send(partial=quoted)
@command.command("console.command.set")
def console_command_set(self, option: str) -> None:
def console_command_set(self, option_name: str) -> None:
"""
Prompt the user to set an option of the form "key[=value]".
Prompt the user to set an option.
"""
option_value = getattr(self.master.options, option, None)
current_value = option_value if option_value else ""
self.master.commands.execute(
"console.command set %s=%s" % (option, current_value)
option_value = getattr(self.master.options, option_name, None) or ""
set_command = f"set {option_name} {option_value!r}"
cursor = len(set_command) - 1
signals.status_prompt_command.send(
partial=set_command,
cursor=cursor
)
@command.command("console.view.keybindings")
@ -351,14 +355,14 @@ class ConsoleAddon:
@command.command("console.bodyview")
@command.argument("part", type=mitmproxy.types.Choice("console.bodyview.options"))
def bodyview(self, f: flow.Flow, part: str) -> None:
def bodyview(self, flow: flow.Flow, part: str) -> None:
"""
Spawn an external viewer for a flow request or response body based
on the detected MIME type. We use the mailcap system to find the
correct viewier, and fall back to the programs in $PAGER or $EDITOR
if necessary.
"""
fpart = getattr(f, part, None)
fpart = getattr(flow, part, None)
if not fpart:
raise exceptions.CommandError("Part must be either request or response, not %s." % part)
t = fpart.headers.get("content-type")
@ -397,8 +401,8 @@ class ConsoleAddon:
]
@command.command("console.edit.focus")
@command.argument("part", type=mitmproxy.types.Choice("console.edit.focus.options"))
def edit_focus(self, part: str) -> None:
@command.argument("flow_part", type=mitmproxy.types.Choice("console.edit.focus.options"))
def edit_focus(self, flow_part: str) -> None:
"""
Edit a component of the currently focused flow.
"""
@ -410,27 +414,27 @@ class ConsoleAddon:
flow.backup()
require_dummy_response = (
part in ("response-headers", "response-body", "set-cookies") and
flow.response is None
flow_part in ("response-headers", "response-body", "set-cookies") and
flow.response is None
)
if require_dummy_response:
flow.response = http.HTTPResponse.make()
if part == "cookies":
if flow_part == "cookies":
self.master.switch_view("edit_focus_cookies")
elif part == "urlencoded form":
elif flow_part == "urlencoded form":
self.master.switch_view("edit_focus_urlencoded_form")
elif part == "multipart form":
elif flow_part == "multipart form":
self.master.switch_view("edit_focus_multipart_form")
elif part == "path":
elif flow_part == "path":
self.master.switch_view("edit_focus_path")
elif part == "query":
elif flow_part == "query":
self.master.switch_view("edit_focus_query")
elif part == "request-headers":
elif flow_part == "request-headers":
self.master.switch_view("edit_focus_request_headers")
elif part == "response-headers":
elif flow_part == "response-headers":
self.master.switch_view("edit_focus_response_headers")
elif part in ("request-body", "response-body"):
if part == "request-body":
elif flow_part in ("request-body", "response-body"):
if flow_part == "request-body":
message = flow.request
else:
message = flow.response
@ -442,16 +446,16 @@ class ConsoleAddon:
# just strip the newlines off the end of the body when we return
# from an editor.
message.content = c.rstrip(b"\n")
elif part == "set-cookies":
elif flow_part == "set-cookies":
self.master.switch_view("edit_focus_setcookies")
elif part == "url":
elif flow_part == "url":
url = flow.request.url.encode()
edited_url = self.master.spawn_editor(url)
url = edited_url.rstrip(b"\n")
flow.request.url = url.decode()
elif part in ["method", "status_code", "reason"]:
elif flow_part in ["method", "status_code", "reason"]:
self.master.commands.execute(
"console.command flow.set @focus %s " % part
"console.command flow.set @focus %s " % flow_part
)
def _grideditor(self):
@ -535,10 +539,8 @@ class ConsoleAddon:
raise exceptions.CommandError("Invalid flowview mode.")
try:
self.master.commands.call_strings(
"view.settings.setval",
["@focus", "flowview_mode_%s" % idx, mode]
)
cmd = 'view.settings.setval @focus flowview_mode_%s %s' % (idx, mode)
self.master.commands.execute(cmd)
except exceptions.CommandError as e:
signals.status_message.send(message=str(e))
@ -558,14 +560,9 @@ class ConsoleAddon:
if not fv:
raise exceptions.CommandError("Not viewing a flow.")
idx = fv.body.tab_offset
return self.master.commands.call_strings(
"view.settings.getval",
[
"@focus",
"flowview_mode_%s" % idx,
self.master.options.console_default_contentview,
]
)
cmd = 'view.settings.getval @focus flowview_mode_%s %s' % (idx, self.master.options.console_default_contentview)
return self.master.commands.execute(cmd)
@command.command("console.key.contexts")
def key_contexts(self) -> typing.Sequence[str]:
@ -576,11 +573,11 @@ class ConsoleAddon:
@command.command("console.key.bind")
def key_bind(
self,
contexts: typing.Sequence[str],
key: str,
cmd: mitmproxy.types.Cmd,
*args: mitmproxy.types.Arg
self,
contexts: typing.Sequence[str],
key: str,
cmd: mitmproxy.types.Cmd,
*args: mitmproxy.types.CmdArgs
) -> None:
"""
Bind a shortcut key.

View File

@ -26,7 +26,7 @@ def map(km):
km.add("ctrl f", "console.nav.pagedown", ["global"], "Page down")
km.add("ctrl b", "console.nav.pageup", ["global"], "Page up")
km.add("I", "set intercept_active=toggle", ["global"], "Toggle intercept")
km.add("I", "set intercept_active toggle", ["global"], "Toggle intercept")
km.add("i", "console.command.set intercept", ["global"], "Set intercept")
km.add("W", "console.command.set save_stream_file", ["global"], "Stream to file")
km.add("A", "flow.resume @all", ["flowlist", "flowview"], "Resume all intercepted flows")
@ -48,14 +48,14 @@ def map(km):
"Export this flow to file"
)
km.add("f", "console.command.set view_filter", ["flowlist"], "Set view filter")
km.add("F", "set console_focus_follow=toggle", ["flowlist"], "Set focus follow")
km.add("F", "set console_focus_follow toggle", ["flowlist"], "Set focus follow")
km.add(
"ctrl l",
"console.command cut.clip ",
["flowlist", "flowview"],
"Send cuts to clipboard"
)
km.add("L", "console.command view.load ", ["flowlist"], "Load flows from file")
km.add("L", "console.command view.flows.load ", ["flowlist"], "Load flows from file")
km.add("m", "flow.mark.toggle @focus", ["flowlist"], "Toggle mark on this flow")
km.add("M", "view.properties.marked.toggle", ["flowlist"], "Toggle viewing marked flows")
km.add(
@ -68,14 +68,14 @@ def map(km):
"o",
"""
console.choose.cmd Order view.order.options
set view_order={choice}
set view_order {choice}
""",
["flowlist"],
"Set flow list order"
)
km.add("r", "replay.client @focus", ["flowlist", "flowview"], "Replay this flow")
km.add("S", "console.command replay.server ", ["flowlist"], "Start server replay")
km.add("v", "set view_order_reversed=toggle", ["flowlist"], "Reverse flow list order")
km.add("v", "set view_order_reversed toggle", ["flowlist"], "Reverse flow list order")
km.add("U", "flow.mark @all false", ["flowlist"], "Un-set all marks")
km.add("w", "console.command save.file @shown ", ["flowlist"], "Save listed flows to file")
km.add("V", "flow.revert @focus", ["flowlist", "flowview"], "Revert changes to this flow")

View File

@ -1,4 +1,5 @@
import os.path
from typing import Optional
import urwid
@ -98,10 +99,15 @@ class ActionBar(urwid.WidgetWrap):
self._w = urwid.Edit(self.prep_prompt(prompt), text or "")
self.prompting = PromptStub(callback, args)
def sig_prompt_command(self, sender, partial=""):
def sig_prompt_command(self, sender, partial: str = "", cursor: Optional[int] = None):
signals.focus.send(self, section="footer")
self._w = commander.CommandEdit(self.master, partial,
self.command_history)
self._w = commander.CommandEdit(
self.master,
partial,
self.command_history,
)
if cursor is not None:
self._w.cbuf.cursor = cursor
self.prompting = commandexecutor.CommandExecutor(self.master)
def sig_prompt_onekey(self, sender, prompt, keys, callback, args=()):

View File

@ -5,6 +5,9 @@ import typing
from mitmproxy import exceptions
from mitmproxy import flow
if typing.TYPE_CHECKING: # pragma: no cover
from mitmproxy.command import CommandManager
class Path(str):
pass
@ -14,7 +17,7 @@ class Cmd(str):
pass
class Arg(str):
class CmdArgs(str):
pass
@ -22,6 +25,10 @@ class Unknown(str):
pass
class Space(str):
pass
class CutSpec(typing.Sequence[str]):
pass
@ -40,27 +47,11 @@ class Choice:
return False
# One of the many charming things about mypy is that introducing type
# annotations can cause circular dependencies where there were none before.
# Rather than putting types and the CommandManger in the same file, we introduce
# a stub type with the signature we use.
class _CommandBase:
commands: typing.MutableMapping[str, typing.Any] = {}
def call_strings(self, path: str, args: typing.Sequence[str]) -> typing.Any:
raise NotImplementedError
def execute(self, cmd: str) -> typing.Any:
raise NotImplementedError
class _BaseType:
typ: typing.Type = object
display: str = ""
def completion(
self, manager: _CommandBase, t: typing.Any, s: str
) -> typing.Sequence[str]:
def completion(self, manager: "CommandManager", t: typing.Any, s: str) -> typing.Sequence[str]:
"""
Returns a list of completion strings for a given prefix. The strings
returned don't necessarily need to be suffixes of the prefix, since
@ -68,9 +59,7 @@ class _BaseType:
"""
raise NotImplementedError
def parse(
self, manager: _CommandBase, typ: typing.Any, s: str
) -> typing.Any:
def parse(self, manager: "CommandManager", typ: typing.Any, s: str) -> typing.Any:
"""
Parse a string, given the specific type instance (to allow rich type annotations like Choice) and a string.
@ -78,7 +67,7 @@ class _BaseType:
"""
raise NotImplementedError
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
"""
Check if data is valid for this type.
"""
@ -89,10 +78,10 @@ class _BoolType(_BaseType):
typ = bool
display = "bool"
def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return ["false", "true"]
def parse(self, manager: _CommandBase, t: type, s: str) -> bool:
def parse(self, manager: "CommandManager", t: type, s: str) -> bool:
if s == "true":
return True
elif s == "false":
@ -102,7 +91,7 @@ class _BoolType(_BaseType):
"Booleans are 'true' or 'false', got %s" % s
)
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return val in [True, False]
@ -110,13 +99,13 @@ class _StrType(_BaseType):
typ = str
display = "str"
def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return []
def parse(self, manager: _CommandBase, t: type, s: str) -> str:
def parse(self, manager: "CommandManager", t: type, s: str) -> str:
return s
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return isinstance(val, str)
@ -124,13 +113,13 @@ class _UnknownType(_BaseType):
typ = Unknown
display = "unknown"
def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return []
def parse(self, manager: _CommandBase, t: type, s: str) -> str:
def parse(self, manager: "CommandManager", t: type, s: str) -> str:
return s
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return False
@ -138,16 +127,16 @@ class _IntType(_BaseType):
typ = int
display = "int"
def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return []
def parse(self, manager: _CommandBase, t: type, s: str) -> int:
def parse(self, manager: "CommandManager", t: type, s: str) -> int:
try:
return int(s)
except ValueError as e:
raise exceptions.TypeError from e
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return isinstance(val, int)
@ -155,7 +144,7 @@ class _PathType(_BaseType):
typ = Path
display = "path"
def completion(self, manager: _CommandBase, t: type, start: str) -> typing.Sequence[str]:
def completion(self, manager: "CommandManager", t: type, start: str) -> typing.Sequence[str]:
if not start:
start = "./"
path = os.path.expanduser(start)
@ -177,10 +166,10 @@ class _PathType(_BaseType):
ret.sort()
return ret
def parse(self, manager: _CommandBase, t: type, s: str) -> str:
def parse(self, manager: "CommandManager", t: type, s: str) -> str:
return os.path.expanduser(s)
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return isinstance(val, str)
@ -188,43 +177,43 @@ class _CmdType(_BaseType):
typ = Cmd
display = "cmd"
def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return list(manager.commands.keys())
def parse(self, manager: _CommandBase, t: type, s: str) -> str:
def parse(self, manager: "CommandManager", t: type, s: str) -> str:
if s not in manager.commands:
raise exceptions.TypeError("Unknown command: %s" % s)
return s
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return val in manager.commands
class _ArgType(_BaseType):
typ = Arg
typ = CmdArgs
display = "arg"
def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return []
def parse(self, manager: _CommandBase, t: type, s: str) -> str:
def parse(self, manager: "CommandManager", t: type, s: str) -> str:
return s
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return isinstance(val, str)
class _StrSeqType(_BaseType):
typ = typing.Sequence[str]
display = "[str]"
display = "str[]"
def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return []
def parse(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
def parse(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return [x.strip() for x in s.split(",")]
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
if isinstance(val, str) or isinstance(val, bytes):
return False
try:
@ -238,7 +227,7 @@ class _StrSeqType(_BaseType):
class _CutSpecType(_BaseType):
typ = CutSpec
display = "[cut]"
display = "cut[]"
valid_prefixes = [
"request.method",
"request.scheme",
@ -277,7 +266,7 @@ class _CutSpecType(_BaseType):
"server_conn.tls_established",
]
def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
spec = s.split(",")
opts = []
for pref in self.valid_prefixes:
@ -285,11 +274,11 @@ class _CutSpecType(_BaseType):
opts.append(",".join(spec))
return opts
def parse(self, manager: _CommandBase, t: type, s: str) -> CutSpec:
def parse(self, manager: "CommandManager", t: type, s: str) -> CutSpec:
parts: typing.Any = s.split(",")
return parts
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
if not isinstance(val, str):
return False
parts = [x.strip() for x in val.split(",")]
@ -327,7 +316,7 @@ class _BaseFlowType(_BaseType):
"~c",
]
def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return self.valid_prefixes
@ -335,9 +324,9 @@ class _FlowType(_BaseFlowType):
typ = flow.Flow
display = "flow"
def parse(self, manager: _CommandBase, t: type, s: str) -> flow.Flow:
def parse(self, manager: "CommandManager", t: type, s: str) -> flow.Flow:
try:
flows = manager.call_strings("view.flows.resolve", [s])
flows = manager.execute("view.flows.resolve %s" % (s))
except exceptions.CommandError as e:
raise exceptions.TypeError from e
if len(flows) != 1:
@ -346,21 +335,21 @@ class _FlowType(_BaseFlowType):
)
return flows[0]
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return isinstance(val, flow.Flow)
class _FlowsType(_BaseFlowType):
typ = typing.Sequence[flow.Flow]
display = "[flow]"
display = "flow[]"
def parse(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[flow.Flow]:
def parse(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[flow.Flow]:
try:
return manager.call_strings("view.flows.resolve", [s])
return manager.execute("view.flows.resolve %s" % (s))
except exceptions.CommandError as e:
raise exceptions.TypeError from e
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
try:
for v in val:
if not isinstance(v, flow.Flow):
@ -372,19 +361,19 @@ class _FlowsType(_BaseFlowType):
class _DataType(_BaseType):
typ = Data
display = "[data]"
display = "data[][]"
def completion(
self, manager: _CommandBase, t: type, s: str
self, manager: "CommandManager", t: type, s: str
) -> typing.Sequence[str]: # pragma: no cover
raise exceptions.TypeError("data cannot be passed as argument")
def parse(
self, manager: _CommandBase, t: type, s: str
self, manager: "CommandManager", t: type, s: str
) -> typing.Any: # pragma: no cover
raise exceptions.TypeError("data cannot be passed as argument")
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
# FIXME: validate that all rows have equal length, and all columns have equal types
try:
for row in val:
@ -400,16 +389,16 @@ class _ChoiceType(_BaseType):
typ = Choice
display = "choice"
def completion(self, manager: _CommandBase, t: Choice, s: str) -> typing.Sequence[str]:
def completion(self, manager: "CommandManager", t: Choice, s: str) -> typing.Sequence[str]:
return manager.execute(t.options_command)
def parse(self, manager: _CommandBase, t: Choice, s: str) -> str:
def parse(self, manager: "CommandManager", t: Choice, s: str) -> str:
opts = manager.execute(t.options_command)
if s not in opts:
raise exceptions.TypeError("Invalid choice.")
return s
def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
try:
opts = manager.execute(typ.options_command)
except exceptions.CommandError:
@ -423,7 +412,7 @@ class TypeManager:
for t in types:
self.typemap[t.typ] = t()
def get(self, t: typing.Optional[typing.Type], default=None) -> _BaseType:
def get(self, t: typing.Optional[typing.Type], default=None) -> typing.Optional[_BaseType]:
if type(t) in self.typemap:
return self.typemap[type(t)]
return self.typemap.get(t, default)

View File

@ -18,6 +18,8 @@ show_missing = True
exclude_lines =
pragma: no cover
raise NotImplementedError()
if typing.TYPE_CHECKING:
if TYPE_CHECKING:
[mypy]
ignore_missing_imports = True

View File

@ -11,7 +11,7 @@ def test_set():
sa = core.Core()
with taddons.context(loadcore=False) as tctx:
assert tctx.master.options.server
tctx.command(sa.set, "server=false")
tctx.command(sa.set, "server", "false")
assert not tctx.master.options.server
with pytest.raises(exceptions.CommandError):

View File

@ -73,7 +73,7 @@ def test_save_command(tmpdir):
v = view.View()
tctx.master.addons.add(v)
tctx.master.addons.add(sa)
tctx.master.commands.call_strings("save.file", ["@shown", p])
tctx.master.commands.execute("save.file @shown %s" % p)
def test_simple(tmpdir):

View File

@ -1,14 +1,16 @@
import typing
import inspect
from mitmproxy import command
from mitmproxy import flow
from mitmproxy import exceptions
from mitmproxy.test import tflow
from mitmproxy.test import taddons
import mitmproxy.types
import io
import typing
import pytest
import mitmproxy.types
from mitmproxy import command
from mitmproxy import exceptions
from mitmproxy import flow
from mitmproxy.test import taddons
from mitmproxy.test import tflow
class TAddon:
@command.command("cmd1")
@ -29,7 +31,7 @@ class TAddon:
return "ok"
@command.command("subcommand")
def subcommand(self, cmd: mitmproxy.types.Cmd, *args: mitmproxy.types.Arg) -> str:
def subcommand(self, cmd: mitmproxy.types.Cmd, *args: mitmproxy.types.CmdArgs) -> str:
return "ok"
@command.command("empty")
@ -83,17 +85,15 @@ class TestCommand:
with pytest.raises(exceptions.CommandError):
command.Command(cm, "invalidret", a.invalidret)
with pytest.raises(exceptions.CommandError):
command.Command(cm, "invalidarg", a.invalidarg)
assert command.Command(cm, "invalidarg", a.invalidarg)
def test_varargs(self):
with taddons.context() as tctx:
cm = command.CommandManager(tctx.master)
a = TAddon()
c = command.Command(cm, "varargs", a.varargs)
assert c.signature_help() == "varargs str *str -> [str]"
assert c.signature_help() == "varargs one *var -> str[]"
assert c.call(["one", "two", "three"]) == ["two", "three"]
with pytest.raises(exceptions.CommandError):
c.call(["one", "two", 3])
def test_call(self):
with taddons.context() as tctx:
@ -101,7 +101,7 @@ class TestCommand:
a = TAddon()
c = command.Command(cm, "cmd.path", a.cmd1)
assert c.call(["foo"]) == "ret foo"
assert c.signature_help() == "cmd.path str -> str"
assert c.signature_help() == "cmd.path foo -> str"
c = command.Command(cm, "cmd.two", a.cmd2)
with pytest.raises(exceptions.CommandError):
@ -115,154 +115,305 @@ class TestCommand:
[
"foo bar",
[
command.ParseResult(
value = "foo", type = mitmproxy.types.Cmd, valid = False
),
command.ParseResult(
value = "bar", type = mitmproxy.types.Unknown, valid = False
)
command.ParseResult(value="foo", type=mitmproxy.types.Cmd, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="bar", type=mitmproxy.types.Unknown, valid=False)
],
[],
],
[
"cmd1 'bar",
[
command.ParseResult(value = "cmd1", type = mitmproxy.types.Cmd, valid = True),
command.ParseResult(value = "'bar", type = str, valid = True)
command.ParseResult(value="cmd1", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="'bar", type=str, valid=True)
],
[],
],
[
"a",
[command.ParseResult(value = "a", type = mitmproxy.types.Cmd, valid = False)],
[command.ParseResult(value="a", type=mitmproxy.types.Cmd, valid=False)],
[],
],
[
"",
[command.ParseResult(value = "", type = mitmproxy.types.Cmd, valid = False)],
[]
[],
[
command.CommandParameter("", mitmproxy.types.Cmd),
command.CommandParameter("", mitmproxy.types.CmdArgs)
]
],
[
"cmd3 1",
[
command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True),
command.ParseResult(value = "1", type = int, valid = True),
command.ParseResult(value="cmd3", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="1", type=int, valid=True),
],
[]
],
[
"cmd3 ",
[
command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True),
command.ParseResult(value = "", type = int, valid = False),
command.ParseResult(value="cmd3", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
],
[]
[command.CommandParameter('foo', int)]
],
[
"subcommand ",
[
command.ParseResult(
value = "subcommand", type = mitmproxy.types.Cmd, valid = True,
),
command.ParseResult(value = "", type = mitmproxy.types.Cmd, valid = False),
command.ParseResult(value="subcommand", type=mitmproxy.types.Cmd, valid=True, ),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
],
["arg"],
[
command.CommandParameter('cmd', mitmproxy.types.Cmd),
command.CommandParameter('args', mitmproxy.types.CmdArgs, kind=inspect.Parameter.VAR_POSITIONAL),
],
],
[
"varargs one",
[
command.ParseResult(value="varargs", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="one", type=str, valid=True),
],
[command.CommandParameter('var', str, kind=inspect.Parameter.VAR_POSITIONAL)]
],
[
"varargs one two three",
[
command.ParseResult(value="varargs", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="one", type=str, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="two", type=str, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="three", type=str, valid=True),
],
[],
],
[
"subcommand cmd3 ",
[
command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd, valid = True),
command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True),
command.ParseResult(value = "", type = int, valid = False),
command.ParseResult(value="subcommand", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="cmd3", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
],
[]
[command.CommandParameter('foo', int)]
],
[
"cmd4",
[
command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True),
command.ParseResult(value="cmd4", type=mitmproxy.types.Cmd, valid=True),
],
["int", "str", "path"]
[
command.CommandParameter('a', int),
command.CommandParameter('b', str),
command.CommandParameter('c', mitmproxy.types.Path),
]
],
[
"cmd4 ",
[
command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True),
command.ParseResult(value = "", type = int, valid = False),
command.ParseResult(value="cmd4", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
],
["str", "path"]
[
command.CommandParameter('a', int),
command.CommandParameter('b', str),
command.CommandParameter('c', mitmproxy.types.Path),
]
],
[
"cmd4 1",
[
command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True),
command.ParseResult(value = "1", type = int, valid = True),
command.ParseResult(value="cmd4", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="1", type=int, valid=True),
],
["str", "path"]
],
[
"cmd4 1",
[
command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True),
command.ParseResult(value = "1", type = int, valid = True),
],
["str", "path"]
command.CommandParameter('b', str),
command.CommandParameter('c', mitmproxy.types.Path),
]
],
[
"flow",
[
command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True),
command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True),
],
["flow", "str"]
[
command.CommandParameter('f', flow.Flow),
command.CommandParameter('s', str),
]
],
[
"flow ",
[
command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True),
command.ParseResult(value = "", type = flow.Flow, valid = False),
command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
],
["str"]
[
command.CommandParameter('f', flow.Flow),
command.CommandParameter('s', str),
]
],
[
"flow x",
[
command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True),
command.ParseResult(value = "x", type = flow.Flow, valid = False),
command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="x", type=flow.Flow, valid=False),
],
["str"]
[
command.CommandParameter('s', str),
]
],
[
"flow x ",
[
command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True),
command.ParseResult(value = "x", type = flow.Flow, valid = False),
command.ParseResult(value = "", type = str, valid = True),
command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="x", type=flow.Flow, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
],
[]
[
command.CommandParameter('s', str),
]
],
[
"flow \"one two",
[
command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True),
command.ParseResult(value = "\"one two", type = flow.Flow, valid = False),
command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="\"one two", type=flow.Flow, valid=False),
],
["str"]
[
command.CommandParameter('s', str),
]
],
[
"flow \"one two\"",
"flow \"three four\"",
[
command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True),
command.ParseResult(value = "one two", type = flow.Flow, valid = False),
command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value='"three four"', type=flow.Flow, valid=False),
],
["str"]
[
command.CommandParameter('s', str),
]
],
[
"spaces ' '",
[
command.ParseResult(value="spaces", type=mitmproxy.types.Cmd, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="' '", type=mitmproxy.types.Unknown, valid=False)
],
[],
],
[
'spaces2 " "',
[
command.ParseResult(value="spaces2", type=mitmproxy.types.Cmd, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value='" "', type=mitmproxy.types.Unknown, valid=False)
],
[],
],
[
'"abc"',
[
command.ParseResult(value='"abc"', type=mitmproxy.types.Cmd, valid=False),
],
[],
],
[
"'def'",
[
command.ParseResult(value="'def'", type=mitmproxy.types.Cmd, valid=False),
],
[],
],
[
"cmd10 'a' \"b\" c",
[
command.ParseResult(value="cmd10", type=mitmproxy.types.Cmd, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="'a'", type=mitmproxy.types.Unknown, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value='"b"', type=mitmproxy.types.Unknown, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="c", type=mitmproxy.types.Unknown, valid=False),
],
[],
],
[
"cmd11 'a \"b\" c'",
[
command.ParseResult(value="cmd11", type=mitmproxy.types.Cmd, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="'a \"b\" c'", type=mitmproxy.types.Unknown, valid=False),
],
[],
],
[
'cmd12 "a \'b\' c"',
[
command.ParseResult(value="cmd12", type=mitmproxy.types.Cmd, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value='"a \'b\' c"', type=mitmproxy.types.Unknown, valid=False),
],
[],
],
[
r'cmd13 "a \"b\" c"',
[
command.ParseResult(value="cmd13", type=mitmproxy.types.Cmd, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value=r'"a \"b\" c"', type=mitmproxy.types.Unknown, valid=False),
],
[],
],
[
r"cmd14 'a \'b\' c'",
[
command.ParseResult(value="cmd14", type=mitmproxy.types.Cmd, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value=r"'a \'b\' c'", type=mitmproxy.types.Unknown, valid=False),
],
[],
],
[
" spaces_at_the_begining_are_not_stripped",
[
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="spaces_at_the_begining_are_not_stripped", type=mitmproxy.types.Cmd,
valid=False),
],
[],
],
[
" spaces_at_the_begining_are_not_stripped neither_at_the_end ",
[
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="spaces_at_the_begining_are_not_stripped", type=mitmproxy.types.Cmd,
valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
command.ParseResult(value="neither_at_the_end", type=mitmproxy.types.Unknown, valid=False),
command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
],
[],
],
]
with taddons.context() as tctx:
tctx.master.addons.add(TAddon())
for s, expected, expectedremain in tests:
current, remain = tctx.master.commands.parse_partial(s)
assert current == expected
assert expectedremain == remain
assert (s, current, expectedremain) == (s, expected, remain)
def test_simple():
@ -270,9 +421,11 @@ def test_simple():
c = command.CommandManager(tctx.master)
a = TAddon()
c.add("one.two", a.cmd1)
assert c.commands["one.two"].help == "cmd1 help"
assert(c.execute("one.two foo") == "ret foo")
assert(c.call("one.two", "foo") == "ret foo")
assert (c.commands["one.two"].help == "cmd1 help")
assert (c.execute("one.two foo") == "ret foo")
assert (c.execute("one.two \"foo\"") == "ret foo")
assert (c.execute("one.two 'foo bar'") == "ret foo bar")
assert (c.call("one.two", "foo") == "ret foo")
with pytest.raises(exceptions.CommandError, match="Unknown"):
c.execute("nonexistent")
with pytest.raises(exceptions.CommandError, match="Invalid"):
@ -281,8 +434,14 @@ def test_simple():
c.execute("one.two too many args")
with pytest.raises(exceptions.CommandError, match="Unknown"):
c.call("nonexistent")
with pytest.raises(exceptions.CommandError, match="No escaped"):
with pytest.raises(exceptions.CommandError, match="Unknown"):
c.execute("\\")
with pytest.raises(exceptions.CommandError, match="Unknown"):
c.execute(r"\'")
with pytest.raises(exceptions.CommandError, match="Unknown"):
c.execute(r"\"")
with pytest.raises(exceptions.CommandError, match="Unknown"):
c.execute(r"\"")
c.add("empty", a.empty)
c.execute("empty")
@ -294,13 +453,13 @@ def test_simple():
def test_typename():
assert command.typename(str) == "str"
assert command.typename(typing.Sequence[flow.Flow]) == "[flow]"
assert command.typename(typing.Sequence[flow.Flow]) == "flow[]"
assert command.typename(mitmproxy.types.Data) == "[data]"
assert command.typename(mitmproxy.types.CutSpec) == "[cut]"
assert command.typename(mitmproxy.types.Data) == "data[][]"
assert command.typename(mitmproxy.types.CutSpec) == "cut[]"
assert command.typename(flow.Flow) == "flow"
assert command.typename(typing.Sequence[str]) == "[str]"
assert command.typename(typing.Sequence[str]) == "str[]"
assert command.typename(mitmproxy.types.Choice("foo")) == "choice"
assert command.typename(mitmproxy.types.Path) == "path"

View File

@ -0,0 +1,38 @@
import pyparsing
import pytest
from mitmproxy import command_lexer
@pytest.mark.parametrize(
"test_input,valid", [
("'foo'", True),
('"foo"', True),
("'foo' bar'", False),
("'foo\\' bar'", True),
("'foo' 'bar'", False),
("'foo'x", False),
('''"foo ''', True),
('''"foo 'bar' ''', True),
]
)
def test_partial_quoted_string(test_input, valid):
if valid:
assert command_lexer.PartialQuotedString.parseString(test_input, parseAll=True)[0] == test_input
else:
with pytest.raises(pyparsing.ParseException):
command_lexer.PartialQuotedString.parseString(test_input, parseAll=True)
@pytest.mark.parametrize(
"test_input,expected", [
("'foo'", ["'foo'"]),
('"foo"', ['"foo"']),
("'foo' 'bar'", ["'foo'", ' ', "'bar'"]),
("'foo'x", ["'foo'", 'x']),
('''"foo''', ['"foo']),
('''"foo 'bar' ''', ['''"foo 'bar' ''']),
]
)
def test_expr(test_input, expected):
assert list(command_lexer.expr.parseString(test_input, parseAll=True)) == expected

View File

@ -2,7 +2,6 @@ import pytest
import os
import typing
import contextlib
from unittest import mock
import mitmproxy.exceptions
import mitmproxy.types
@ -64,13 +63,14 @@ def test_int():
b.parse(tctx.master.commands, int, "foo")
def test_path(tdata):
def test_path(tdata, monkeypatch):
with taddons.context() as tctx:
b = mitmproxy.types._PathType()
assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/foo") == "/foo"
assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/bar") == "/bar"
with mock.patch.dict("os.environ", {"HOME": "/home/test"}):
assert b.parse(tctx.master.commands, mitmproxy.types.Path, "~/mitm") == "/home/test/mitm"
monkeypatch.setenv("HOME", "/home/test")
monkeypatch.setenv("USERPROFILE", "/home/test")
assert b.parse(tctx.master.commands, mitmproxy.types.Path, "~/mitm") == "/home/test/mitm"
assert b.is_valid(tctx.master.commands, mitmproxy.types.Path, "foo") is True
assert b.is_valid(tctx.master.commands, mitmproxy.types.Path, "~/mitm") is True
assert b.is_valid(tctx.master.commands, mitmproxy.types.Path, 3) is False
@ -127,10 +127,9 @@ def test_cutspec():
def test_arg():
with taddons.context() as tctx:
b = mitmproxy.types._ArgType()
assert b.completion(tctx.master.commands, mitmproxy.types.Arg, "") == []
assert b.parse(tctx.master.commands, mitmproxy.types.Arg, "foo") == "foo"
assert b.is_valid(tctx.master.commands, mitmproxy.types.Arg, "foo") is True
assert b.is_valid(tctx.master.commands, mitmproxy.types.Arg, 1) is False
assert b.completion(tctx.master.commands, mitmproxy.types.CmdArgs, "") == []
assert b.parse(tctx.master.commands, mitmproxy.types.CmdArgs, "foo") == "foo"
assert b.is_valid(tctx.master.commands, mitmproxy.types.CmdArgs, 1) is False
def test_strseq():

View File

@ -1,6 +1,7 @@
import pytest
from mitmproxy.tools.console.commander import commander
from mitmproxy.test import taddons
from mitmproxy.tools.console.commander import commander
class TestListCompleter:
@ -28,6 +29,112 @@ class TestListCompleter:
assert c.cycle() == expected
class TestCommandEdit:
def test_open_command_bar(self):
with taddons.context() as tctx:
history = commander.CommandHistory(tctx.master, size=3)
edit = commander.CommandEdit(tctx.master, '', history)
try:
edit.update()
except IndexError:
pytest.faied("Unexpected IndexError")
def test_insert(self):
with taddons.context() as tctx:
history = commander.CommandHistory(tctx.master, size=3)
edit = commander.CommandEdit(tctx.master, '', history)
edit.keypress(1, 'a')
assert edit.get_edit_text() == 'a'
# Don't let users type a space before starting a command
# as a usability feature
history = commander.CommandHistory(tctx.master, size=3)
edit = commander.CommandEdit(tctx.master, '', history)
edit.keypress(1, ' ')
assert edit.get_edit_text() == ''
def test_backspace(self):
with taddons.context() as tctx:
history = commander.CommandHistory(tctx.master, size=3)
edit = commander.CommandEdit(tctx.master, '', history)
edit.keypress(1, 'a')
edit.keypress(1, 'b')
assert edit.get_edit_text() == 'ab'
edit.keypress(1, 'backspace')
assert edit.get_edit_text() == 'a'
def test_left(self):
with taddons.context() as tctx:
history = commander.CommandHistory(tctx.master, size=3)
edit = commander.CommandEdit(tctx.master, '', history)
edit.keypress(1, 'a')
assert edit.cbuf.cursor == 1
edit.keypress(1, 'left')
assert edit.cbuf.cursor == 0
# Do it again to make sure it won't go negative
edit.keypress(1, 'left')
assert edit.cbuf.cursor == 0
def test_right(self):
with taddons.context() as tctx:
history = commander.CommandHistory(tctx.master, size=3)
edit = commander.CommandEdit(tctx.master, '', history)
edit.keypress(1, 'a')
assert edit.cbuf.cursor == 1
# Make sure cursor won't go past the text
edit.keypress(1, 'right')
assert edit.cbuf.cursor == 1
# Make sure cursor goes left and then back right
edit.keypress(1, 'left')
assert edit.cbuf.cursor == 0
edit.keypress(1, 'right')
assert edit.cbuf.cursor == 1
def test_up_and_down(self):
with taddons.context() as tctx:
history = commander.CommandHistory(tctx.master, size=3)
edit = commander.CommandEdit(tctx.master, '', history)
buf = commander.CommandBuffer(tctx.master, 'cmd1')
history.add_command(buf)
buf = commander.CommandBuffer(tctx.master, 'cmd2')
history.add_command(buf)
edit.keypress(1, 'up')
assert edit.get_edit_text() == 'cmd2'
edit.keypress(1, 'up')
assert edit.get_edit_text() == 'cmd1'
edit.keypress(1, 'up')
assert edit.get_edit_text() == 'cmd1'
history = commander.CommandHistory(tctx.master, size=5)
edit = commander.CommandEdit(tctx.master, '', history)
edit.keypress(1, 'a')
edit.keypress(1, 'b')
edit.keypress(1, 'c')
assert edit.get_edit_text() == 'abc'
edit.keypress(1, 'up')
assert edit.get_edit_text() == ''
edit.keypress(1, 'down')
assert edit.get_edit_text() == 'abc'
edit.keypress(1, 'down')
assert edit.get_edit_text() == 'abc'
history = commander.CommandHistory(tctx.master, size=5)
edit = commander.CommandEdit(tctx.master, '', history)
buf = commander.CommandBuffer(tctx.master, 'cmd3')
history.add_command(buf)
edit.keypress(1, 'z')
edit.keypress(1, 'up')
assert edit.get_edit_text() == 'cmd3'
edit.keypress(1, 'down')
assert edit.get_edit_text() == 'z'
class TestCommandHistory:
def fill_history(self, commands):
with taddons.context() as tctx:
@ -148,13 +255,39 @@ class TestCommandBuffer:
cb.cursor = len(cb.text)
cb.cycle_completion()
ch = commander.CommandHistory(tctx.master, 30)
ce = commander.CommandEdit(tctx.master, "se", ch)
ce.keypress(1, 'tab')
ce.update()
ret = ce.cbuf.render()
assert ret == [
('commander_command', 'set'),
('text', ' '),
('commander_hint', 'option '),
('commander_hint', 'value '),
]
def test_render(self):
with taddons.context() as tctx:
cb = commander.CommandBuffer(tctx.master)
cb.text = "foo"
assert cb.render()
def test_flatten(self):
with taddons.context() as tctx:
cb = commander.CommandBuffer(tctx.master)
assert cb.flatten("foo bar") == "foo bar"
cb.text = "set view_filter '~bq test'"
ret = cb.render()
assert ret == [
('commander_command', 'set'),
('text', ' '),
('text', 'view_filter'),
('text', ' '),
('text', "'~bq test'"),
]
cb.text = "set"
ret = cb.render()
assert ret == [
('commander_command', 'set'),
('text', ' '),
('commander_hint', 'option '),
('commander_hint', 'value '),
]

View File

@ -1,14 +1,18 @@
import pytest
import mitmproxy.types
from mitmproxy import command
from mitmproxy import ctx
from mitmproxy.test.tflow import tflow
from mitmproxy.tools.console import defaultkeys
from mitmproxy.tools.console import keymap
from mitmproxy.tools.console import master
from mitmproxy import command
import pytest
@pytest.mark.asyncio
async def test_commands_exist():
command_manager = command.CommandManager(ctx)
km = keymap.Keymap(None)
defaultkeys.map(km)
assert km.bindings
@ -16,7 +20,14 @@ async def test_commands_exist():
await m.load_flow(tflow())
for binding in km.bindings:
cmd, *args = command.lexer(binding.command)
parsed, _ = command_manager.parse_partial(binding.command.strip())
cmd = parsed[0].value
args = [
a.value for a in parsed[1:]
if a.type != mitmproxy.types.Space
]
assert cmd in m.commands.commands
cmd_obj = m.commands.commands[cmd]