mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-23 00:01:36 +00:00
commit
adad33595e
@ -37,7 +37,7 @@ class ClientPlayback:
|
||||
ctx.master.addons.trigger("update", [])
|
||||
|
||||
@command.command("replay.client.file")
|
||||
def load_file(self, path: str) -> None:
|
||||
def load_file(self, path: command.Path) -> None:
|
||||
try:
|
||||
flows = io.read_flows_from_paths([path])
|
||||
except exceptions.FlowReadException as e:
|
||||
|
@ -31,7 +31,7 @@ class ServerPlayback:
|
||||
ctx.master.addons.trigger("update", [])
|
||||
|
||||
@command.command("replay.server.file")
|
||||
def load_file(self, path: str) -> None:
|
||||
def load_file(self, path: command.Path) -> None:
|
||||
try:
|
||||
flows = io.read_flows_from_paths([path])
|
||||
except exceptions.FlowReadException as e:
|
||||
|
@ -3,6 +3,7 @@
|
||||
"""
|
||||
import inspect
|
||||
import types
|
||||
import io
|
||||
import typing
|
||||
import shlex
|
||||
import textwrap
|
||||
@ -14,6 +15,15 @@ from mitmproxy import exceptions
|
||||
from mitmproxy import flow
|
||||
|
||||
|
||||
def lexer(s):
|
||||
# mypy mis-identifies shlex.shlex as abstract
|
||||
lex = shlex.shlex(s) # type: ignore
|
||||
lex.wordchars += "."
|
||||
lex.whitespace_split = True
|
||||
lex.commenters = ''
|
||||
return lex
|
||||
|
||||
|
||||
Cuts = typing.Sequence[
|
||||
typing.Sequence[typing.Union[str, bytes]]
|
||||
]
|
||||
@ -23,6 +33,14 @@ class Path(str):
|
||||
pass
|
||||
|
||||
|
||||
class Cmd(str):
|
||||
pass
|
||||
|
||||
|
||||
class Arg(str):
|
||||
pass
|
||||
|
||||
|
||||
def typename(t: type, ret: bool) -> str:
|
||||
"""
|
||||
Translates a type to an explanatory string. If ret is True, we're
|
||||
@ -118,6 +136,12 @@ class Command:
|
||||
return ret
|
||||
|
||||
|
||||
ParseResult = typing.NamedTuple(
|
||||
"ParseResult",
|
||||
[("value", str), ("type", typing.Type)],
|
||||
)
|
||||
|
||||
|
||||
class CommandManager:
|
||||
def __init__(self, master):
|
||||
self.master = master
|
||||
@ -133,6 +157,42 @@ class CommandManager:
|
||||
def add(self, path: str, func: typing.Callable):
|
||||
self.commands[path] = Command(self, path, func)
|
||||
|
||||
def parse_partial(self, cmdstr: str) -> typing.Sequence[ParseResult]:
|
||||
"""
|
||||
Parse a possibly partial command. Return a sequence of (part, type) tuples.
|
||||
"""
|
||||
buf = io.StringIO(cmdstr)
|
||||
parts = [] # type: typing.List[str]
|
||||
lex = lexer(buf)
|
||||
while 1:
|
||||
remainder = cmdstr[buf.tell():]
|
||||
try:
|
||||
t = lex.get_token()
|
||||
except ValueError:
|
||||
parts.append(remainder)
|
||||
break
|
||||
if not t:
|
||||
break
|
||||
parts.append(t)
|
||||
if not parts:
|
||||
parts = [""]
|
||||
elif cmdstr.endswith(" "):
|
||||
parts.append("")
|
||||
|
||||
parse = [] # type: typing.List[ParseResult]
|
||||
params = [] # type: typing.List[type]
|
||||
for i in range(len(parts)):
|
||||
if i == 0:
|
||||
params[:] = [Cmd]
|
||||
if parts[i] in self.commands:
|
||||
params.extend(self.commands[parts[i]].paramtypes)
|
||||
if params:
|
||||
typ = params.pop(0)
|
||||
else:
|
||||
typ = str
|
||||
parse.append(ParseResult(value=parts[i], type=typ))
|
||||
return parse
|
||||
|
||||
def call_args(self, path, args):
|
||||
"""
|
||||
Call a command using a list of string arguments. May raise CommandError.
|
||||
@ -145,7 +205,7 @@ class CommandManager:
|
||||
"""
|
||||
Call a command using a string. May raise CommandError.
|
||||
"""
|
||||
parts = shlex.split(cmdstr)
|
||||
parts = list(lexer(cmdstr))
|
||||
if not len(parts) >= 1:
|
||||
raise exceptions.CommandError("Invalid command: %s" % cmdstr)
|
||||
return self.call_args(parts[0], parts[1:])
|
||||
|
@ -1,19 +1,10 @@
|
||||
import typing
|
||||
import urwid
|
||||
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import flow
|
||||
from mitmproxy.tools.console import signals
|
||||
|
||||
|
||||
class CommandEdit(urwid.Edit):
|
||||
def __init__(self, partial):
|
||||
urwid.Edit.__init__(self, ":", partial)
|
||||
|
||||
def keypress(self, size, key):
|
||||
return urwid.Edit.keypress(self, size, key)
|
||||
|
||||
|
||||
class CommandExecutor:
|
||||
def __init__(self, master):
|
||||
self.master = master
|
||||
|
0
mitmproxy/tools/console/commander/__init__.py
Normal file
0
mitmproxy/tools/console/commander/__init__.py
Normal file
158
mitmproxy/tools/console/commander/commander.py
Normal file
158
mitmproxy/tools/console/commander/commander.py
Normal file
@ -0,0 +1,158 @@
|
||||
import urwid
|
||||
from urwid.text_layout import calc_coords
|
||||
import typing
|
||||
import abc
|
||||
|
||||
import mitmproxy.master
|
||||
import mitmproxy.command
|
||||
|
||||
|
||||
class Completer:
|
||||
@abc.abstractmethod
|
||||
def cycle(self) -> str:
|
||||
pass
|
||||
|
||||
|
||||
class ListCompleter(Completer):
|
||||
def __init__(
|
||||
self,
|
||||
start: str,
|
||||
options: typing.Sequence[str],
|
||||
) -> None:
|
||||
self.start = start
|
||||
self.options = [] # type: typing.Sequence[str]
|
||||
for o in options:
|
||||
if o.startswith(start):
|
||||
self.options.append(o)
|
||||
self.offset = 0
|
||||
|
||||
def cycle(self) -> str:
|
||||
if not self.options:
|
||||
return self.start
|
||||
ret = self.options[self.offset]
|
||||
self.offset = (self.offset + 1) % len(self.options)
|
||||
return ret
|
||||
|
||||
|
||||
CompletionState = typing.NamedTuple(
|
||||
"CompletionState",
|
||||
[
|
||||
("completer", Completer),
|
||||
("parse", typing.Sequence[mitmproxy.command.ParseResult])
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
class CommandBuffer():
|
||||
def __init__(self, master: mitmproxy.master.Master, start: str = "") -> None:
|
||||
self.master = master
|
||||
self.buf = start
|
||||
# Cursor is always within the range [0:len(buffer)].
|
||||
self._cursor = len(self.buf)
|
||||
self.completion = None # type: CompletionState
|
||||
|
||||
@property
|
||||
def cursor(self) -> int:
|
||||
return self._cursor
|
||||
|
||||
@cursor.setter
|
||||
def cursor(self, x) -> None:
|
||||
if x < 0:
|
||||
self._cursor = 0
|
||||
elif x > len(self.buf):
|
||||
self._cursor = len(self.buf)
|
||||
else:
|
||||
self._cursor = x
|
||||
|
||||
def render(self):
|
||||
return self.buf
|
||||
|
||||
def left(self) -> None:
|
||||
self.cursor = self.cursor - 1
|
||||
|
||||
def right(self) -> None:
|
||||
self.cursor = self.cursor + 1
|
||||
|
||||
def cycle_completion(self) -> None:
|
||||
if not self.completion:
|
||||
parts = self.master.commands.parse_partial(self.buf[:self.cursor])
|
||||
last = parts[-1]
|
||||
if last.type == mitmproxy.command.Cmd:
|
||||
self.completion = CompletionState(
|
||||
completer = ListCompleter(
|
||||
parts[-1].value,
|
||||
self.master.commands.commands.keys(),
|
||||
),
|
||||
parse = parts,
|
||||
)
|
||||
elif isinstance(last.type, mitmproxy.command.Choice):
|
||||
self.completion = CompletionState(
|
||||
completer = ListCompleter(
|
||||
parts[-1].value,
|
||||
self.master.commands.call(last.type.options_command),
|
||||
),
|
||||
parse = parts,
|
||||
)
|
||||
if self.completion:
|
||||
nxt = self.completion.completer.cycle()
|
||||
buf = " ".join([i.value for i in self.completion.parse[:-1]]) + " " + nxt
|
||||
buf = buf.strip()
|
||||
self.buf = buf
|
||||
self.cursor = len(self.buf)
|
||||
|
||||
def backspace(self) -> None:
|
||||
if self.cursor == 0:
|
||||
return
|
||||
self.buf = self.buf[:self.cursor - 1] + self.buf[self.cursor:]
|
||||
self.cursor = self.cursor - 1
|
||||
self.completion = None
|
||||
|
||||
def insert(self, k: str) -> None:
|
||||
"""
|
||||
Inserts text at the cursor.
|
||||
"""
|
||||
self.buf = self.buf = self.buf[:self.cursor] + k + self.buf[self.cursor:]
|
||||
self.cursor += 1
|
||||
self.completion = None
|
||||
|
||||
|
||||
class CommandEdit(urwid.WidgetWrap):
|
||||
leader = ": "
|
||||
|
||||
def __init__(self, master: mitmproxy.master.Master, text: str) -> None:
|
||||
self.master = master
|
||||
self.cbuf = CommandBuffer(master, text)
|
||||
self._w = urwid.Text(self.leader)
|
||||
self.update()
|
||||
|
||||
def keypress(self, size, key):
|
||||
if key == "backspace":
|
||||
self.cbuf.backspace()
|
||||
elif key == "left":
|
||||
self.cbuf.left()
|
||||
elif key == "right":
|
||||
self.cbuf.right()
|
||||
elif key == "tab":
|
||||
self.cbuf.cycle_completion()
|
||||
elif len(key) == 1:
|
||||
self.cbuf.insert(key)
|
||||
self.update()
|
||||
|
||||
def update(self):
|
||||
self._w.set_text([self.leader, self.cbuf.render()])
|
||||
|
||||
def render(self, size, focus=False):
|
||||
(maxcol,) = size
|
||||
canv = self._w.render((maxcol,))
|
||||
canv = urwid.CompositeCanvas(canv)
|
||||
canv.cursor = self.get_cursor_coords((maxcol,))
|
||||
return canv
|
||||
|
||||
def get_cursor_coords(self, size):
|
||||
p = self.cbuf.cursor + len(self.leader)
|
||||
trans = self._w.get_line_translation(size[0])
|
||||
x, y = calc_coords(self._w.get_text()[0], trans, p)
|
||||
return x, y
|
||||
|
||||
def get_value(self):
|
||||
return self.cbuf.buf
|
@ -224,7 +224,11 @@ class ConsoleAddon:
|
||||
|
||||
@command.command("console.choose")
|
||||
def console_choose(
|
||||
self, prompt: str, choices: typing.Sequence[str], *cmd: str
|
||||
self,
|
||||
prompt: str,
|
||||
choices: typing.Sequence[str],
|
||||
cmd: command.Cmd,
|
||||
*args: command.Arg
|
||||
) -> None:
|
||||
"""
|
||||
Prompt the user to choose from a specified list of strings, then
|
||||
@ -233,7 +237,7 @@ class ConsoleAddon:
|
||||
"""
|
||||
def callback(opt):
|
||||
# We're now outside of the call context...
|
||||
repl = " ".join(cmd)
|
||||
repl = cmd + " " + " ".join(args)
|
||||
repl = repl.replace("{choice}", opt)
|
||||
try:
|
||||
self.master.commands.call(repl)
|
||||
@ -246,7 +250,7 @@ class ConsoleAddon:
|
||||
|
||||
@command.command("console.choose.cmd")
|
||||
def console_choose_cmd(
|
||||
self, prompt: str, choicecmd: str, *cmd: str
|
||||
self, prompt: str, choicecmd: command.Cmd, *cmd: command.Arg
|
||||
) -> None:
|
||||
"""
|
||||
Prompt the user to choose from a list of strings returned by a
|
||||
@ -492,14 +496,20 @@ class ConsoleAddon:
|
||||
return list(sorted(keymap.Contexts))
|
||||
|
||||
@command.command("console.key.bind")
|
||||
def key_bind(self, contexts: typing.Sequence[str], key: str, *command: str) -> None:
|
||||
def key_bind(
|
||||
self,
|
||||
contexts: typing.Sequence[str],
|
||||
key: str,
|
||||
cmd: command.Cmd,
|
||||
*args: command.Arg
|
||||
) -> None:
|
||||
"""
|
||||
Bind a shortcut key.
|
||||
"""
|
||||
try:
|
||||
self.master.keymap.add(
|
||||
key,
|
||||
" ".join(command),
|
||||
cmd + " " + " ".join(args),
|
||||
contexts,
|
||||
""
|
||||
)
|
||||
|
@ -6,6 +6,7 @@ from mitmproxy.tools.console import common
|
||||
from mitmproxy.tools.console import signals
|
||||
from mitmproxy.tools.console import commandeditor
|
||||
import mitmproxy.tools.console.master # noqa
|
||||
from mitmproxy.tools.console.commander import commander
|
||||
|
||||
|
||||
class PromptPath:
|
||||
@ -66,7 +67,7 @@ class ActionBar(urwid.WidgetWrap):
|
||||
|
||||
def sig_prompt_command(self, sender, partial=""):
|
||||
signals.focus.send(self, section="footer")
|
||||
self._w = commandeditor.CommandEdit(partial)
|
||||
self._w = commander.CommandEdit(self.master, partial)
|
||||
self.prompting = commandeditor.CommandExecutor(self.master)
|
||||
|
||||
def sig_prompt_onekey(self, sender, prompt, keys, callback, args=()):
|
||||
@ -100,7 +101,7 @@ class ActionBar(urwid.WidgetWrap):
|
||||
elif k in self.onekey:
|
||||
self.prompt_execute(k)
|
||||
elif k == "enter":
|
||||
self.prompt_execute(self._w.get_edit_text())
|
||||
self.prompt_execute(self._w.get_value())
|
||||
else:
|
||||
if common.is_keypress(k):
|
||||
self._w.keypress(size, k)
|
||||
|
@ -11,19 +11,24 @@ from mitmproxy.utils import typecheck
|
||||
|
||||
|
||||
class TAddon:
|
||||
@command.command("cmd1")
|
||||
def cmd1(self, foo: str) -> str:
|
||||
"""cmd1 help"""
|
||||
return "ret " + foo
|
||||
|
||||
@command.command("cmd2")
|
||||
def cmd2(self, foo: str) -> str:
|
||||
return 99
|
||||
|
||||
@command.command("cmd3")
|
||||
def cmd3(self, foo: int) -> int:
|
||||
return foo
|
||||
|
||||
@command.command("empty")
|
||||
def empty(self) -> None:
|
||||
pass
|
||||
|
||||
@command.command("varargs")
|
||||
def varargs(self, one: str, *var: str) -> typing.Sequence[str]:
|
||||
return list(var)
|
||||
|
||||
@ -34,6 +39,7 @@ class TAddon:
|
||||
def choose(self, arg: str) -> typing.Sequence[str]:
|
||||
return ["one", "two", "three"]
|
||||
|
||||
@command.command("path")
|
||||
def path(self, arg: command.Path) -> None:
|
||||
pass
|
||||
|
||||
@ -64,6 +70,44 @@ class TestCommand:
|
||||
c = command.Command(cm, "cmd.three", a.cmd3)
|
||||
assert c.call(["1"]) == 1
|
||||
|
||||
def test_parse_partial(self):
|
||||
tests = [
|
||||
[
|
||||
"foo bar",
|
||||
[
|
||||
command.ParseResult(value = "foo", type = command.Cmd),
|
||||
command.ParseResult(value = "bar", type = str)
|
||||
],
|
||||
],
|
||||
[
|
||||
"foo 'bar",
|
||||
[
|
||||
command.ParseResult(value = "foo", type = command.Cmd),
|
||||
command.ParseResult(value = "'bar", type = str)
|
||||
]
|
||||
],
|
||||
["a", [command.ParseResult(value = "a", type = command.Cmd)]],
|
||||
["", [command.ParseResult(value = "", type = command.Cmd)]],
|
||||
[
|
||||
"cmd3 1",
|
||||
[
|
||||
command.ParseResult(value = "cmd3", type = command.Cmd),
|
||||
command.ParseResult(value = "1", type = int),
|
||||
]
|
||||
],
|
||||
[
|
||||
"cmd3 ",
|
||||
[
|
||||
command.ParseResult(value = "cmd3", type = command.Cmd),
|
||||
command.ParseResult(value = "", type = int),
|
||||
]
|
||||
],
|
||||
]
|
||||
with taddons.context() as tctx:
|
||||
tctx.master.addons.add(TAddon())
|
||||
for s, expected in tests:
|
||||
assert tctx.master.commands.parse_partial(s) == expected
|
||||
|
||||
|
||||
def test_simple():
|
||||
with taddons.context() as tctx:
|
||||
@ -100,6 +144,7 @@ def test_typename():
|
||||
|
||||
assert command.typename(command.Choice("foo"), False) == "choice"
|
||||
assert command.typename(command.Path, False) == "path"
|
||||
assert command.typename(command.Cmd, False) == "cmd"
|
||||
|
||||
|
||||
class DummyConsole:
|
||||
@ -162,6 +207,9 @@ def test_parsearg():
|
||||
assert command.parsearg(
|
||||
tctx.master.commands, "foo", command.Path
|
||||
) == "foo"
|
||||
assert command.parsearg(
|
||||
tctx.master.commands, "foo", command.Cmd
|
||||
) == "foo"
|
||||
|
||||
|
||||
class TDec:
|
||||
|
68
test/mitmproxy/tools/console/test_commander.py
Normal file
68
test/mitmproxy/tools/console/test_commander.py
Normal file
@ -0,0 +1,68 @@
|
||||
from mitmproxy.tools.console.commander import commander
|
||||
from mitmproxy.test import taddons
|
||||
|
||||
|
||||
class TestListCompleter:
|
||||
def test_cycle(self):
|
||||
tests = [
|
||||
[
|
||||
"",
|
||||
["a", "b", "c"],
|
||||
["a", "b", "c", "a"]
|
||||
],
|
||||
[
|
||||
"xxx",
|
||||
["a", "b", "c"],
|
||||
["xxx", "xxx", "xxx"]
|
||||
],
|
||||
[
|
||||
"b",
|
||||
["a", "b", "ba", "bb", "c"],
|
||||
["b", "ba", "bb", "b"]
|
||||
],
|
||||
]
|
||||
for start, options, cycle in tests:
|
||||
c = commander.ListCompleter(start, options)
|
||||
for expected in cycle:
|
||||
assert c.cycle() == expected
|
||||
|
||||
|
||||
class TestCommandBuffer:
|
||||
|
||||
def test_backspace(self):
|
||||
tests = [
|
||||
[("", 0), ("", 0)],
|
||||
[("1", 0), ("1", 0)],
|
||||
[("1", 1), ("", 0)],
|
||||
[("123", 3), ("12", 2)],
|
||||
[("123", 2), ("13", 1)],
|
||||
[("123", 0), ("123", 0)],
|
||||
]
|
||||
with taddons.context() as tctx:
|
||||
for start, output in tests:
|
||||
cb = commander.CommandBuffer(tctx.master)
|
||||
cb.buf, cb.cursor = start[0], start[1]
|
||||
cb.backspace()
|
||||
assert cb.buf == output[0]
|
||||
assert cb.cursor == output[1]
|
||||
|
||||
def test_insert(self):
|
||||
tests = [
|
||||
[("", 0), ("x", 1)],
|
||||
[("a", 0), ("xa", 1)],
|
||||
[("xa", 2), ("xax", 3)],
|
||||
]
|
||||
with taddons.context() as tctx:
|
||||
for start, output in tests:
|
||||
cb = commander.CommandBuffer(tctx.master)
|
||||
cb.buf, cb.cursor = start[0], start[1]
|
||||
cb.insert("x")
|
||||
assert cb.buf == output[0]
|
||||
assert cb.cursor == output[1]
|
||||
|
||||
def test_cycle_completion(self):
|
||||
with taddons.context() as tctx:
|
||||
cb = commander.CommandBuffer(tctx.master)
|
||||
cb.buf = "foo bar"
|
||||
cb.cursor = len(cb.buf)
|
||||
cb.cycle_completion()
|
Loading…
Reference in New Issue
Block a user